Skip to content

cmflib.cmf

This class provides methods to log metadata for distributed AI pipelines. The class instance creates an ML metadata store to store the metadata. It creates a driver to store nodes and its relationships to neo4j. The user has to provide the name of the pipeline, that needs to be recorded with CMF.

cmflib.cmf.Cmf(
    filename="mlmd",
    pipeline_name="test_pipeline",
    custom_properties={"owner": "user_a"},
    graph=False
)
Args: filename: Path to the sqlite file to store the metadata pipeline_name: Name to uniquely identify the pipeline. Note that name is the unique identifier for a pipeline. If a pipeline already exist with the same name, the existing pipeline object is reused. custom_properties: Additional properties of the pipeline that needs to be stored. graph: If set to true, the libray also stores the relationships in the provided graph database. The following variables should be set: neo4j_uri (graph server URI), neo4j_user (user name) and neo4j_password (user password), e.g.:
cmf init local --path /home/user/local-storage --git-remote-url https://github.com/XXX/exprepo.git --neo4j-user neo4j --neo4j-password neo4j
                      --neo4j-uri bolt://localhost:7687

Source code in cmflib/cmf.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def __init__(
    self,
    filename: str = "mlmd",
    pipeline_name: str = "",
    custom_properties: t.Optional[t.Dict] = None,
    graph: bool = False,
    is_server: bool = False,
):
    if is_server is False:
        Cmf.__prechecks()
    if custom_properties is None:
        custom_properties = {}
    config = mlpb.ConnectionConfig()
    config.sqlite.filename_uri = filename
    self.store = metadata_store.MetadataStore(config)
    self.filename = filename
    self.child_context = None
    self.execution = None
    self.execution_name = ""
    self.execution_command = ""
    self.metrics = {}
    self.input_artifacts = []
    self.execution_label_props = {}
    self.graph = graph
    self.branch_name = filename.rsplit("/", 1)[-1]

    if is_server is False:
        git_checkout_new_branch(self.branch_name)
    self.parent_context = get_or_create_parent_context(
        store=self.store,
        pipeline=pipeline_name,
        custom_properties=custom_properties,
    )
    if is_server:
        Cmf.__get_neo4j_server_config()
    if graph is True:
        Cmf.__load_neo4j_params()
        self.driver = graph_wrapper.GraphDriver(
            Cmf.__neo4j_uri, Cmf.__neo4j_user, Cmf.__neo4j_password
        )
        self.driver.create_pipeline_node(
            pipeline_name, self.parent_context.id, custom_properties
        )

create_context(pipeline_stage, custom_properties=None)

Create's a context(stage). Every call creates a unique pipeline stage. Updates Pipeline_stage name. Example:

#Create context
# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
# Create context
context: mlmd.proto.Context = cmf.create_context(
    pipeline_stage="prepare",
    custom_properties ={"user-metadata1": "metadata_value"}
)
Args: Pipeline_stage: Name of the Stage. custom_properties: Developers can provide key value pairs with additional properties of the execution that need to be stored. Returns: Context object from ML Metadata library associated with the new context for this stage.

Source code in cmflib/cmf.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def create_context(
    self, pipeline_stage: str, custom_properties: t.Optional[t.Dict] = None
) -> mlpb.Context:
    """Create's a  context(stage).
    Every call creates a unique pipeline stage.
    Updates Pipeline_stage name.
    Example:
        ```python
        #Create context
        # Import CMF
        from cmflib.cmf import Cmf
        from ml_metadata.proto import metadata_store_pb2 as mlpb
        # Create CMF logger
        cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
        # Create context
        context: mlmd.proto.Context = cmf.create_context(
            pipeline_stage="prepare",
            custom_properties ={"user-metadata1": "metadata_value"}
        )

        ```
        Args:
            Pipeline_stage: Name of the Stage.
            custom_properties: Developers can provide key value pairs with additional properties of the execution that
                need to be stored.
        Returns:
            Context object from ML Metadata library associated with the new context for this stage.
    """
    custom_props = {} if custom_properties is None else custom_properties
    pipeline_stage = self.parent_context.name + "/" + pipeline_stage
    ctx = get_or_create_run_context(
        self.store, pipeline_stage, custom_props)
    self.child_context = ctx
    associate_child_to_parent_context(
        store=self.store, parent_context=self.parent_context, child_context=ctx
    )
    if self.graph:
        self.driver.create_stage_node(
            pipeline_stage, self.parent_context, ctx.id, custom_props
        )
    return ctx

merge_created_context(pipeline_stage, custom_properties=None)

Merge created context. Every call creates a unique pipeline stage. Created for metadata push purpose. Example:

```python
#Create context
# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
# Create context
context: mlmd.proto.Context = cmf.merge_created_context(
    pipeline_stage="Test-env/prepare",
    custom_properties ={"user-metadata1": "metadata_value"}
```
Args:
    Pipeline_stage: Pipeline_Name/Stage_name.
    custom_properties: Developers can provide key value pairs with additional properties of the execution that
        need to be stored.
Returns:
    Context object from ML Metadata library associated with the new context for this stage.
Source code in cmflib/cmf.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def merge_created_context(
    self, pipeline_stage: str, custom_properties: t.Optional[t.Dict] = None
) -> mlpb.Context:
    """Merge created context.
    Every call creates a unique pipeline stage.
    Created for metadata push purpose.
    Example:

        ```python
        #Create context
        # Import CMF
        from cmflib.cmf import Cmf
        from ml_metadata.proto import metadata_store_pb2 as mlpb
        # Create CMF logger
        cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
        # Create context
        context: mlmd.proto.Context = cmf.merge_created_context(
            pipeline_stage="Test-env/prepare",
            custom_properties ={"user-metadata1": "metadata_value"}
        ```
        Args:
            Pipeline_stage: Pipeline_Name/Stage_name.
            custom_properties: Developers can provide key value pairs with additional properties of the execution that
                need to be stored.
        Returns:
            Context object from ML Metadata library associated with the new context for this stage.
    """

    custom_props = {} if custom_properties is None else custom_properties
    ctx = get_or_create_run_context(
        self.store, pipeline_stage, custom_props)
    self.child_context = ctx
    associate_child_to_parent_context(
        store=self.store, parent_context=self.parent_context, child_context=ctx
    )
    if self.graph:
        self.driver.create_stage_node(
            pipeline_stage, self.parent_context, ctx.id, custom_props
        )
    return ctx

create_execution(execution_type, custom_properties=None, cmd=None, create_new_execution=True)

Create execution. Every call creates a unique execution. Execution can only be created within a context, so create_context must be called first. Example:

# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
# Create or reuse context for this stage
context: mlmd.proto.Context = cmf.create_context(
    pipeline_stage="prepare",
    custom_properties ={"user-metadata1": "metadata_value"}
)
# Create a new execution for this stage run
execution: mlmd.proto.Execution = cmf.create_execution(
    execution_type="Prepare",
    custom_properties = {"split": split, "seed": seed}
)
Args: execution_type: Type of the execution.(when create_new_execution is False, this is the name of execution) custom_properties: Developers can provide key value pairs with additional properties of the execution that need to be stored.

cmd: command used to run this execution.

create_new_execution:bool = True, This can be used by advanced users to re-use executions
    This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the
    custom call-backs are used to log metrics.
    if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type.
    if create_new_execution is False, if existing execution exist with the same name as execution_type.
    it will be reused.
    Only executions created with  create_new_execution as False will have "name" as a property.

Returns:

Type Description
Execution

Execution object from ML Metadata library associated with the new execution for this stage.

Source code in cmflib/cmf.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def create_execution(
    self,
    execution_type: str,
    custom_properties: t.Optional[t.Dict] = None,
    cmd: str = None,
    create_new_execution: bool = True,
) -> mlpb.Execution:
    """Create execution.
    Every call creates a unique execution. Execution can only be created within a context, so
    [create_context][cmflib.cmf.Cmf.create_context] must be called first.
    Example:
        ```python
        # Import CMF
        from cmflib.cmf import Cmf
        from ml_metadata.proto import metadata_store_pb2 as mlpb
        # Create CMF logger
        cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
        # Create or reuse context for this stage
        context: mlmd.proto.Context = cmf.create_context(
            pipeline_stage="prepare",
            custom_properties ={"user-metadata1": "metadata_value"}
        )
        # Create a new execution for this stage run
        execution: mlmd.proto.Execution = cmf.create_execution(
            execution_type="Prepare",
            custom_properties = {"split": split, "seed": seed}
        )
        ```
    Args:
        execution_type: Type of the execution.(when create_new_execution is False, this is the name of execution)
        custom_properties: Developers can provide key value pairs with additional properties of the execution that
            need to be stored.

        cmd: command used to run this execution.

        create_new_execution:bool = True, This can be used by advanced users to re-use executions
            This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the
            custom call-backs are used to log metrics.
            if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type.
            if create_new_execution is False, if existing execution exist with the same name as execution_type.
            it will be reused.
            Only executions created with  create_new_execution as False will have "name" as a property.


    Returns:
        Execution object from ML Metadata library associated with the new execution for this stage.
    """
    # Initializing the execution related fields
    self.metrics = {}
    self.input_artifacts = []
    self.execution_label_props = {}
    custom_props = {} if custom_properties is None else custom_properties
    git_repo = git_get_repo()
    git_start_commit = git_get_commit()
    cmd = str(sys.argv) if cmd is None else cmd
    python_env=get_python_env()
    self.execution = create_new_execution_in_existing_run_context(
        store=self.store,
        # Type field when re-using executions
        execution_type_name=self.child_context.name,
        execution_name=execution_type, 
        #Name field if we are re-using executions
        #Type field , if creating new executions always 
        context_id=self.child_context.id,
        execution=cmd,
        pipeline_id=self.parent_context.id,
        pipeline_type=self.parent_context.name,
        git_repo=git_repo,
        git_start_commit=git_start_commit,
        python_env=python_env,
        custom_properties=custom_props,
        create_new_execution=create_new_execution,
    )
    uuids = self.execution.properties["Execution_uuid"].string_value
    if uuids:
        self.execution.properties["Execution_uuid"].string_value = uuids+","+str(uuid.uuid1())
    else:
        self.execution.properties["Execution_uuid"].string_value = str(uuid.uuid1())            
    self.store.put_executions([self.execution])
    self.execution_name = str(self.execution.id) + "," + execution_type
    self.execution_command = cmd
    for k, v in custom_props.items():
        k = re.sub("-", "_", k)
        self.execution_label_props[k] = v
    self.execution_label_props["Execution_Name"] = (
        execution_type + ":" + str(self.execution.id)
    )

    self.execution_label_props["execution_command"] = cmd
    if self.graph:
        self.driver.create_execution_node(
        self.execution_name,
        self.child_context.id,
        self.parent_context,
        cmd,
        self.execution.id,
        custom_props,
    )

    return self.execution

update_execution(execution_id, custom_properties=None)

Updates an existing execution. The custom properties can be updated after creation of the execution. The new custom properties is merged with earlier custom properties. Example

# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
# Update a execution
execution: mlmd.proto.Execution = cmf.update_execution(
    execution_id=8,
    custom_properties = {"split": split, "seed": seed}
)
Args: execution_id: id of the execution. custom_properties: Developers can provide key value pairs with additional properties of the execution that need to be updated. Returns: Execution object from ML Metadata library associated with the updated execution for this stage.

Source code in cmflib/cmf.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
def update_execution(
    self, execution_id: int, custom_properties: t.Optional[t.Dict] = None
):
    """Updates an existing execution.
    The custom properties can be updated after creation of the execution.
    The new custom properties is merged with earlier custom properties.
    Example
        ```python
        # Import CMF
        from cmflib.cmf import Cmf
        from ml_metadata.proto import metadata_store_pb2 as mlpb
        # Create CMF logger
        cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline")
        # Update a execution
        execution: mlmd.proto.Execution = cmf.update_execution(
            execution_id=8,
            custom_properties = {"split": split, "seed": seed}
        )
        ```
        Args:
            execution_id: id of the execution.
            custom_properties: Developers can provide key value pairs with additional properties of the execution that
            need to be updated.
        Returns:
            Execution object from ML Metadata library associated with the updated execution for this stage.
    """
    self.execution = self.store.get_executions_by_id([execution_id])[0]
    if self.execution is None:
        print("Error - no execution id")
        return
    execution_type = self.store.get_execution_types_by_id([self.execution.type_id])[
        0
    ]

    if custom_properties:
        for key, value in custom_properties.items():
            if isinstance(value, int):
                self.execution.custom_properties[key].int_value = value
            else:
                self.execution.custom_properties[key].string_value = str(
                    value)
    self.store.put_executions([self.execution])
    c_props = {}
    for k, v in self.execution.custom_properties.items():
        key = re.sub("-", "_", k)
        val_type = str(v).split(":", maxsplit=1)[0]
        if val_type == "string_value":
            val = self.execution.custom_properties[k].string_value
        else:
            val = str(v).split(":")[1]
        # The properties value are stored in the format type:value hence,
        # taking only value
        self.execution_label_props[key] = val
        c_props[key] = val
    self.execution_name = str(self.execution.id) + \
        "," + execution_type.name
    self.execution_command = self.execution.properties["Execution"]
    self.execution_label_props["Execution_Name"] = (
        execution_type.name + ":" + str(self.execution.id)
    )
    self.execution_label_props["execution_command"] = self.execution.properties[
        "Execution"
    ].string_value
    if self.graph:
        self.driver.create_execution_node(
            self.execution_name,
            self.child_context.id,
            self.parent_context,
            self.execution.properties["Execution"].string_value,
            self.execution.id,
            c_props,
        )
    return self.execution

log_dataset(url, event, custom_properties=None, external=False)

Logs a dataset as artifact. This call adds the dataset to dvc. The dvc metadata file created (.dvc) will be added to git and committed. The version of the dataset is automatically obtained from the versioning software(DVC) and tracked as a metadata. Example:

artifact: mlmd.proto.Artifact = cmf.log_dataset(
    url="/repo/data.xml",
    event="input",
    custom_properties={"source":"kaggle"}
)
Args: url: The path to the dataset. event: Takes arguments INPUT OR OUTPUT. custom_properties: Dataset properties (key/value pairs). Returns: Artifact object from ML Metadata library associated with the new dataset artifact.

Source code in cmflib/cmf.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def log_dataset(
    self,
    url: str,
    event: str,
    custom_properties: t.Optional[t.Dict] = None,
    external: bool = False,
) -> mlpb.Artifact:
    """Logs a dataset as artifact.
    This call adds the dataset to dvc. The dvc metadata file created (.dvc) will be added to git and committed. The
    version of the  dataset is automatically obtained from the versioning software(DVC) and tracked as a metadata.
    Example:
        ```python
        artifact: mlmd.proto.Artifact = cmf.log_dataset(
            url="/repo/data.xml",
            event="input",
            custom_properties={"source":"kaggle"}
        )
        ```
    Args:
         url: The path to the dataset.
         event: Takes arguments `INPUT` OR `OUTPUT`.
         custom_properties: Dataset properties (key/value pairs).
    Returns:
        Artifact object from ML Metadata library associated with the new dataset artifact.
    """
            ### To Do : Technical Debt. 
    # If the dataset already exist , then we just link the existing dataset to the execution
    # We do not update the dataset properties . 
    # We need to append the new properties to the existing dataset properties

    custom_props = {} if custom_properties is None else custom_properties
    git_repo = git_get_repo()
    name = re.split("/", url)[-1]
    event_type = mlpb.Event.Type.OUTPUT
    existing_artifact = []
    if event.lower() == "input":
        event_type = mlpb.Event.Type.INPUT

    commit_output(url, self.execution.id)
    c_hash = dvc_get_hash(url)
    dataset_commit = c_hash
    dvc_url = dvc_get_url(url)
    dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}"
    url = url + ":" + c_hash
    if c_hash and c_hash.strip:
        existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash))

    # To Do - What happens when uri is the same but names are different
    if existing_artifact and len(existing_artifact) != 0:
        existing_artifact = existing_artifact[0]

        # Quick fix- Updating only the name
        if custom_properties is not None:
            self.update_existing_artifact(
                existing_artifact, custom_properties)
        uri = c_hash
        # update url for existing artifact
        self.update_dataset_url(existing_artifact, dvc_url_with_pipeline)
        artifact = link_execution_to_artifact(
            store=self.store,
            execution_id=self.execution.id,
            uri=uri,
            input_name=url,
            event_type=event_type,
        )
    else:
        # if((existing_artifact and len(existing_artifact )!= 0) and c_hash != ""):
        #   url = url + ":" + str(self.execution.id)
        uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
        artifact = create_new_artifact_event_and_attribution(
            store=self.store,
            execution_id=self.execution.id,
            context_id=self.child_context.id,
            uri=uri,
            name=url,
            type_name="Dataset",
            event_type=event_type,
            properties={
                "git_repo": str(git_repo),
                # passing c_hash value to commit
                "Commit": str(dataset_commit),
                "url": str(dvc_url_with_pipeline),
            },
            artifact_type_properties={
                "git_repo": mlpb.STRING,
                "Commit": mlpb.STRING,
                "url": mlpb.STRING,
            },
            custom_properties=custom_props,
            milliseconds_since_epoch=int(time.time() * 1000),
        )
    custom_props["git_repo"] = git_repo
    custom_props["Commit"] = dataset_commit
    self.execution_label_props["git_repo"] = git_repo
    self.execution_label_props["Commit"] = dataset_commit

    if self.graph:
        self.driver.create_dataset_node(
            name,
            url,
            uri,
            event,
            self.execution.id,
            self.parent_context,
            custom_props,
        )
        if event.lower() == "input":
            self.input_artifacts.append(
                {
                    "Name": name,
                    "Path": url,
                    "URI": uri,
                    "Event": event.lower(),
                    "Execution_Name": self.execution_name,
                    "Type": "Dataset",
                    "Execution_Command": self.execution_command,
                    "Pipeline_Id": self.parent_context.id,
                    "Pipeline_Name": self.parent_context.name,
                }
            )
            self.driver.create_execution_links(uri, name, "Dataset")
        else:
            child_artifact = {
                "Name": name,
                "Path": url,
                "URI": uri,
                "Event": event.lower(),
                "Execution_Name": self.execution_name,
                "Type": "Dataset",
                "Execution_Command": self.execution_command,
                "Pipeline_Id": self.parent_context.id,
                "Pipeline_Name": self.parent_context.name,
            }
            self.driver.create_artifact_relationships(
                self.input_artifacts, child_artifact, self.execution_label_props
            )
    return artifact

log_dataset_with_version(url, version, event, props=None, custom_properties=None)

Logs a dataset when the version (hash) is known. Example:

artifact: mlpb.Artifact = cmf.log_dataset_with_version( 
    url="path/to/dataset", 
    version="abcdef",
    event="output",
    props={ "git_repo": "https://github.com/example/repo",
            "url": "/path/in/repo", },
    custom_properties={ "custom_key": "custom_value", }, 
    ) 
Args: url: Path to the dataset. version: Hash or version identifier for the dataset. event: Takes arguments INPUT or OUTPUT. props: Optional properties for the dataset (e.g., git_repo, url). custom_properties: Optional custom properties for the dataset. Returns: Artifact object from the ML Protocol Buffers library associated with the new dataset artifact.

Source code in cmflib/cmf.py
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
def log_dataset_with_version(
    self,
    url: str,
    version: str,
    event: str,
    props: t.Optional[t.Dict] = None,
    custom_properties: t.Optional[t.Dict] = None,
) -> mlpb.Artifact:
    """Logs a dataset when the version (hash) is known.
       Example: 
         ```python 
         artifact: mlpb.Artifact = cmf.log_dataset_with_version( 
             url="path/to/dataset", 
             version="abcdef",
             event="output",
             props={ "git_repo": "https://github.com/example/repo",
                     "url": "/path/in/repo", },
             custom_properties={ "custom_key": "custom_value", }, 
             ) 
         ```
         Args: 
            url: Path to the dataset. 
            version: Hash or version identifier for the dataset. 
            event: Takes arguments `INPUT` or `OUTPUT`. 
            props: Optional properties for the dataset (e.g., git_repo, url). 
            custom_properties: Optional custom properties for the dataset.
         Returns:
            Artifact object from the ML Protocol Buffers library associated with the new dataset artifact. 
    """

    props = {} if props is None else props
    custom_props = {} if custom_properties is None else custom_properties
    git_repo = props.get("git_repo", "")
    name = url
    event_type = mlpb.Event.Type.OUTPUT
    existing_artifact = []
    c_hash = version
    if event.lower() == "input":
        event_type = mlpb.Event.Type.INPUT

    # dataset_commit = commit_output(url, self.execution.id)

    dataset_commit = version
    url = url + ":" + c_hash
    if c_hash and c_hash.strip:
        existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash))

    # To Do - What happens when uri is the same but names are different
    if existing_artifact and len(existing_artifact) != 0:
        existing_artifact = existing_artifact[0]

        # Quick fix- Updating only the name
        if custom_properties is not None:
            self.update_existing_artifact(
                existing_artifact, custom_properties)
        uri = c_hash
        # update url for existing artifact
        self.update_dataset_url(existing_artifact, props.get("url", ""))
        artifact = link_execution_to_artifact(
            store=self.store,
            execution_id=self.execution.id,
            uri=uri,
            input_name=url,
            event_type=event_type,
        )
    else:
        # if((existing_artifact and len(existing_artifact )!= 0) and c_hash != ""):
        #   url = url + ":" + str(self.execution.id)
        uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
        artifact = create_new_artifact_event_and_attribution(
            store=self.store,
            execution_id=self.execution.id,
            context_id=self.child_context.id,
            uri=uri,
            name=url,
            type_name="Dataset",
            event_type=event_type,
            properties={
                "git_repo": str(git_repo),
                "Commit": str(dataset_commit),
                "url": props.get("url", " "),
            },
            artifact_type_properties={
                "git_repo": mlpb.STRING,
                "Commit": mlpb.STRING,
                "url": mlpb.STRING,
            },
            custom_properties=custom_props,
            milliseconds_since_epoch=int(time.time() * 1000),
        )
    custom_props["git_repo"] = git_repo
    custom_props["Commit"] = dataset_commit
    self.execution_label_props["git_repo"] = git_repo
    self.execution_label_props["Commit"] = dataset_commit

    if self.graph:
        self.driver.create_dataset_node(
            name,
            url,
            uri,
            event,
            self.execution.id,
            self.parent_context,
            custom_props,
        )
        if event.lower() == "input":
            self.input_artifacts.append(
                {
                    "Name": name,
                    "Path": url,
                    "URI": uri,
                    "Event": event.lower(),
                    "Execution_Name": self.execution_name,
                    "Type": "Dataset",
                    "Execution_Command": self.execution_command,
                    "Pipeline_Id": self.parent_context.id,
                    "Pipeline_Name": self.parent_context.name,
                }
            )
            self.driver.create_execution_links(uri, name, "Dataset")
        else:
            child_artifact = {
                "Name": name,
                "Path": url,
                "URI": uri,
                "Event": event.lower(),
                "Execution_Name": self.execution_name,
                "Type": "Dataset",
                "Execution_Command": self.execution_command,
                "Pipeline_Id": self.parent_context.id,
                "Pipeline_Name": self.parent_context.name,
            }
            self.driver.create_artifact_relationships(
                self.input_artifacts, child_artifact, self.execution_label_props
            )
    return artifact

log_model(path, event, model_framework='Default', model_type='Default', model_name='Default', custom_properties=None)

Logs a model. The model is added to dvc and the metadata file (.dvc) gets committed to git. Example:

artifact: mlmd.proto.Artifact= cmf.log_model(
    path="path/to/model.pkl",
    event="output",
    model_framework="SKlearn",
    model_type="RandomForestClassifier",
    model_name="RandomForestClassifier:default"
)
Args: path: Path to the model file. event: Takes arguments INPUT OR OUTPUT. model_framework: Framework used to create the model. model_type: Type of model algorithm used. model_name: Name of the algorithm used. custom_properties: The model properties. Returns: Artifact object from ML Metadata library associated with the new model artifact.

Source code in cmflib/cmf.py
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
def log_model(
    self,
    path: str,
    event: str,
    model_framework: str = "Default",
    model_type: str = "Default",
    model_name: str = "Default",
    custom_properties: t.Optional[t.Dict] = None,
) -> mlpb.Artifact:
    """Logs a model.
    The model is added to dvc and the metadata file (.dvc) gets committed to git.
    Example:
        ```python
        artifact: mlmd.proto.Artifact= cmf.log_model(
            path="path/to/model.pkl",
            event="output",
            model_framework="SKlearn",
            model_type="RandomForestClassifier",
            model_name="RandomForestClassifier:default"
        )
        ```
    Args:
        path: Path to the model file.
        event: Takes arguments `INPUT` OR `OUTPUT`.
        model_framework: Framework used to create the model.
        model_type: Type of model algorithm used.
        model_name: Name of the algorithm used.
        custom_properties: The model properties.
    Returns:
        Artifact object from ML Metadata library associated with the new model artifact.
    """
    # To Do : Technical Debt. 
    # If the model already exist , then we just link the existing model to the execution
    # We do not update the model properties . 
    # We need to append the new properties to the existing model properties

    if custom_properties is None:
        custom_properties = {}
    custom_props = {} if custom_properties is None else custom_properties
    # name = re.split('/', path)[-1]
    event_type = mlpb.Event.Type.OUTPUT
    existing_artifact = []
    if event.lower() == "input":
        event_type = mlpb.Event.Type.INPUT

    commit_output(path, self.execution.id)
    c_hash = dvc_get_hash(path)
    model_commit = c_hash

    # If connecting to an existing artifact - The name of the artifact is
    # used as path/steps/key
    model_uri = path + ":" + c_hash
    dvc_url = dvc_get_url(path, False)
    url = dvc_url
    url_with_pipeline = f"{self.parent_context.name}:{url}"
    uri = ""
    if c_hash and c_hash.strip():
        uri = c_hash.strip()
        existing_artifact.extend(self.store.get_artifacts_by_uri(uri))
    else:
        raise RuntimeError("Model commit failed, Model uri empty")

    if (
        existing_artifact
        and len(existing_artifact) != 0
    ):
        # update url for existing artifact
        existing_artifact = self.update_model_url(
            existing_artifact, url_with_pipeline
        )
        artifact = link_execution_to_artifact(
            store=self.store,
            execution_id=self.execution.id,
            uri=c_hash,
            input_name=model_uri,
            event_type=event_type,
        )
        model_uri = artifact.name
    else:
        uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
        model_uri = model_uri + ":" + str(self.execution.id)
        artifact = create_new_artifact_event_and_attribution(
            store=self.store,
            execution_id=self.execution.id,
            context_id=self.child_context.id,
            uri=uri,
            name=model_uri,
            type_name="Model",
            event_type=event_type,
            properties={
                "model_framework": str(model_framework),
                "model_type": str(model_type),
                "model_name": str(model_name),
                # passing c_hash value to commit
                "Commit": str(model_commit),
                "url": str(url_with_pipeline),
            },
            artifact_type_properties={
                "model_framework": mlpb.STRING,
                "model_type": mlpb.STRING,
                "model_name": mlpb.STRING,
                "Commit": mlpb.STRING,
                "url": mlpb.STRING,
            },
            custom_properties=custom_props,
            milliseconds_since_epoch=int(time.time() * 1000),
        )
    # custom_properties["Commit"] = model_commit
    self.execution_label_props["Commit"] = model_commit
    #To DO model nodes should be similar to dataset nodes when we create neo4j
    if self.graph:
        self.driver.create_model_node(
            model_uri,
            uri,
            event,
            self.execution.id,
            self.parent_context,
            custom_props,
        )
        if event.lower() == "input":
            self.input_artifacts.append(
                {
                    "Name": model_uri,
                    "URI": uri,
                    "Event": event.lower(),
                    "Execution_Name": self.execution_name,
                    "Type": "Model",
                    "Execution_Command": self.execution_command,
                    "Pipeline_Id": self.parent_context.id,
                    "Pipeline_Name": self.parent_context.name,
                }
            )
            self.driver.create_execution_links(uri, model_uri, "Model")
        else:
            child_artifact = {
                "Name": model_uri,
                "URI": uri,
                "Event": event.lower(),
                "Execution_Name": self.execution_name,
                "Type": "Model",
                "Execution_Command": self.execution_command,
                "Pipeline_Id": self.parent_context.id,
                "Pipeline_Name": self.parent_context.name,
            }

            self.driver.create_artifact_relationships(
                self.input_artifacts, child_artifact, self.execution_label_props
            )

    return artifact

log_model_with_version(path, event, props=None, custom_properties=None)

Logs a model when the version(hash) is known The model is added to dvc and the metadata file (.dvc) gets committed to git. Example:

artifact: mlmd.proto.Artifact= cmf.log_model_with_version(
    path="path/to/model.pkl",
    event="output",
    props={
            "url": "/home/user/local-storage/bf/629ccd5cd008066b72c04f9a918737",
            "model_type": "RandomForestClassifier",
            "model_name": "RandomForestClassifier:default",
            "Commit": "commit 1146dad8b74cae205db6a3132ea403db1e4032e5",
            "model_framework": "SKlearn",
           },
    custom_properties={
            "uri": "bf629ccd5cd008066b72c04f9a918737",
    },

)
Args: path: Path to the model file. event: Takes arguments INPUT OR OUTPUT. props: Model artifact properties. custom_properties: The model properties. Returns: Artifact object from ML Metadata library associated with the new model artifact.

Source code in cmflib/cmf.py
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
def log_model_with_version(
    self,
    path: str,
    event: str,
    props=None,
    custom_properties: t.Optional[t.Dict] = None,
) -> object:
    """Logs a model when the version(hash) is known
     The model is added to dvc and the metadata file (.dvc) gets committed to git.
    Example:
        ```python
        artifact: mlmd.proto.Artifact= cmf.log_model_with_version(
            path="path/to/model.pkl",
            event="output",
            props={
                    "url": "/home/user/local-storage/bf/629ccd5cd008066b72c04f9a918737",
                    "model_type": "RandomForestClassifier",
                    "model_name": "RandomForestClassifier:default",
                    "Commit": "commit 1146dad8b74cae205db6a3132ea403db1e4032e5",
                    "model_framework": "SKlearn",
                   },
            custom_properties={
                    "uri": "bf629ccd5cd008066b72c04f9a918737",
            },

        )
        ```
    Args:
        path: Path to the model file.
        event: Takes arguments `INPUT` OR `OUTPUT`.
        props: Model artifact properties.
        custom_properties: The model properties.
    Returns:
        Artifact object from ML Metadata library associated with the new model artifact.
    """

    if custom_properties is None:
        custom_properties = {}
    custom_props = {} if custom_properties is None else custom_properties
    name = re.split("/", path)[-1]
    event_type = mlpb.Event.Type.OUTPUT
    existing_artifact = []
    if event.lower() == "input":
        event_type = mlpb.Event.Type.INPUT

    # props["commit"] = "" # To do get from incoming data
    c_hash = props.get("uri", " ")
    # If connecting to an existing artifact - The name of the artifact is used as path/steps/key
    model_uri = path + ":" + c_hash
    # dvc_url = dvc_get_url(path, False)
    url = props.get("url", "")
    # uri = ""
    if c_hash and c_hash.strip():
        uri = c_hash.strip()
        existing_artifact.extend(self.store.get_artifacts_by_uri(uri))
    else:
        raise RuntimeError("Model commit failed, Model uri empty")

    if (
        existing_artifact
        and len(existing_artifact) != 0
    ):
        # update url for existing artifact
        existing_artifact = self.update_model_url(existing_artifact, url)
        artifact = link_execution_to_artifact(
            store=self.store,
            execution_id=self.execution.id,
            uri=c_hash,
            input_name=model_uri,
            event_type=event_type,
        )
        model_uri = artifact.name
    else:
        uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
        model_uri = model_uri + ":" + str(self.execution.id)
        artifact = create_new_artifact_event_and_attribution(
            store=self.store,
            execution_id=self.execution.id,
            context_id=self.child_context.id,
            uri=uri,
            name=model_uri,
            type_name="Model",
            event_type=event_type,
            properties={
                "model_framework": props.get("model_framework", ""),
                "model_type": props.get("model_type", ""),
                "model_name": props.get("model_name", ""),
                "Commit": props.get("Commit", ""),
                "url": str(url),
            },
            artifact_type_properties={
                "model_framework": mlpb.STRING,
                "model_type": mlpb.STRING,
                "model_name": mlpb.STRING,
                "Commit": mlpb.STRING,
                "url": mlpb.STRING,
            },
            custom_properties=custom_props,
            milliseconds_since_epoch=int(time.time() * 1000),
        )
    # custom_properties["Commit"] = model_commit
    # custom_props["url"] = url
    self.execution_label_props["Commit"] = props.get("Commit", "")
    if self.graph:
        self.driver.create_model_node(
            model_uri,
            uri,
            event,
            self.execution.id,
            self.parent_context,
            custom_props,
        )
        if event.lower() == "input":
            self.input_artifacts.append(
                {
                    "Name": model_uri,
                    "URI": uri,
                    "Event": event.lower(),
                    "Execution_Name": self.execution_name,
                    "Type": "Model",
                    "Execution_Command": self.execution_command,
                    "Pipeline_Id": self.parent_context.id,
                    "Pipeline_Name": self.parent_context.name,
                }
            )
            self.driver.create_execution_links(uri, model_uri, "Model")
        else:
            child_artifact = {
                "Name": model_uri,
                "URI": uri,
                "Event": event.lower(),
                "Execution_Name": self.execution_name,
                "Type": "Model",
                "Execution_Command": self.execution_command,
                "Pipeline_Id": self.parent_context.id,
                "Pipeline_Name": self.parent_context.name,
            }
            self.driver.create_artifact_relationships(
                self.input_artifacts, child_artifact, self.execution_label_props
            )

    return artifact

log_execution_metrics_from_client(metrics_name, custom_properties=None)

Logs execution metrics from a client. Data from pre-existing metrics from client side is used to create identical metrics on server side. Example:

artifact: mlpb.Artifact = cmf.log_execution_metrics_from_client( 
        metrics_name="example_metrics:uri:123", 
        custom_properties={"custom_key": "custom_value"}, 
        )
Args: metrics_name: Name of the metrics in the format "name:uri:execution_id". custom_properties: Optional custom properties for the metrics. Returns: Artifact object from the ML Protocol Buffers library associated with the metrics artifact.

Source code in cmflib/cmf.py
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
def log_execution_metrics_from_client(self, metrics_name: str,
                                     custom_properties: t.Optional[t.Dict] = None) -> mlpb.Artifact:
    """ Logs execution metrics from a client.
        Data from pre-existing metrics from client side is used to create identical metrics on server side. 
        Example: 
          ```python 
          artifact: mlpb.Artifact = cmf.log_execution_metrics_from_client( 
                  metrics_name="example_metrics:uri:123", 
                  custom_properties={"custom_key": "custom_value"}, 
                  )
          ``` 
          Args: 
             metrics_name: Name of the metrics in the format "name:uri:execution_id". 
             custom_properties: Optional custom properties for the metrics. 
          Returns: 
             Artifact object from the ML Protocol Buffers library associated with the metrics artifact.
    """
    metrics = None
    custom_props = {} if custom_properties is None else custom_properties
    existing_artifact = []
    name_tokens = metrics_name.split(":")
    if name_tokens and len(name_tokens) > 2:
        name = name_tokens[0]
        uri = name_tokens[1]
        execution_id = name_tokens[2]
    else:
        print(f"Error : metrics name {metrics_name} is not in the correct format")
        return 

    #we need to add the execution id to the metrics name
    new_metrics_name = f"{name}:{uri}:{str(self.execution.id)}"
    existing_artifacts = self.store.get_artifacts_by_uri(uri)

    existing_artifact = existing_artifacts[0] if existing_artifacts else None
    if not existing_artifact or \
       ((existing_artifact) and not
        (existing_artifact.name == new_metrics_name)):  #we need to add the artifact otherwise its already there 
        metrics = create_new_artifact_event_and_attribution(
        store=self.store,
        execution_id=self.execution.id,
        context_id=self.child_context.id,
        uri=uri,
        name=new_metrics_name,
        type_name="Metrics",
        event_type=mlpb.Event.Type.OUTPUT,
        properties={"metrics_name": metrics_name},
        artifact_type_properties={"metrics_name": mlpb.STRING},
        custom_properties=custom_props,
        milliseconds_since_epoch=int(time.time() * 1000),
    )
        if self.graph:
            # To do create execution_links
            self.driver.create_metrics_node(
                metrics_name,
                uri,
                "output",
                self.execution.id,
                self.parent_context,
                custom_props,
            )
            child_artifact = {
                "Name": metrics_name,
                "URI": uri,
                "Event": "output",
                "Execution_Name": self.execution_name,
                "Type": "Metrics",
                "Execution_Command": self.execution_command,
                "Pipeline_Id": self.parent_context.id,
                "Pipeline_Name": self.parent_context.name,
            }
            self.driver.create_artifact_relationships(
                self.input_artifacts, child_artifact, self.execution_label_props
            )
    return metrics

log_execution_metrics(metrics_name, custom_properties=None)

Log the metadata associated with the execution (coarse-grained tracking). It is stored as a metrics artifact. This does not have a backing physical file, unlike other artifacts that we have. Example:

exec_metrics: mlpb.Artifact = cmf.log_execution_metrics(
    metrics_name="Training_Metrics",
    {"auc": auc, "loss": loss}
)
Args: metrics_name: Name to identify the metrics. custom_properties: Dictionary with metric values. Returns: Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact.

Source code in cmflib/cmf.py
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
def log_execution_metrics(
    self, metrics_name: str, custom_properties: t.Optional[t.Dict] = None
) -> mlpb.Artifact:
    """Log the metadata associated with the execution (coarse-grained tracking).
    It is stored as a metrics artifact. This does not have a backing physical file, unlike other artifacts that we
    have.
    Example:
        ```python
        exec_metrics: mlpb.Artifact = cmf.log_execution_metrics(
            metrics_name="Training_Metrics",
            {"auc": auc, "loss": loss}
        )
        ```
    Args:
        metrics_name: Name to identify the metrics.
        custom_properties: Dictionary with metric values.
    Returns:
          Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact.
    """
    custom_props = {} if custom_properties is None else custom_properties
    uri = str(uuid.uuid1())
    metrics_name = metrics_name + ":" + uri + ":" + str(self.execution.id)
    metrics = create_new_artifact_event_and_attribution(
        store=self.store,
        execution_id=self.execution.id,
        context_id=self.child_context.id,
        uri=uri,
        name=metrics_name,
        type_name="Metrics",
        event_type=mlpb.Event.Type.OUTPUT,
        properties={"metrics_name": metrics_name},
        artifact_type_properties={"metrics_name": mlpb.STRING},
        custom_properties=custom_props,
        milliseconds_since_epoch=int(time.time() * 1000),
    )
    if self.graph:
        # To do create execution_links
        self.driver.create_metrics_node(
            metrics_name,
            uri,
            "output",
            self.execution.id,
            self.parent_context,
            custom_props,
        )
        child_artifact = {
            "Name": metrics_name,
            "URI": uri,
            "Event": "output",
            "Execution_Name": self.execution_name,
            "Type": "Metrics",
            "Execution_Command": self.execution_command,
            "Pipeline_Id": self.parent_context.id,
            "Pipeline_Name": self.parent_context.name,
        }
        self.driver.create_artifact_relationships(
            self.input_artifacts, child_artifact, self.execution_label_props
        )
    return metrics

log_metric(metrics_name, custom_properties=None)

Stores the fine-grained (per step or per epoch) metrics to memory. The metrics provided are stored in a parquet file. The commit_metrics call add the parquet file in the version control framework. The metrics written in the parquet file can be retrieved using the read_metrics call. Example:

# Can be called at every epoch or every step in the training. This is logged to a parquet file and committed
# at the commit stage.
# Inside training loop
while True:
     cmf.log_metric("training_metrics", {"train_loss": train_loss})
cmf.commit_metrics("training_metrics")
Args: metrics_name: Name to identify the metrics. custom_properties: Dictionary with metrics.

Source code in cmflib/cmf.py
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
def log_metric(
    self, metrics_name: str, custom_properties: t.Optional[t.Dict] = None
) -> None:
    """Stores the fine-grained (per step or per epoch) metrics to memory.
    The metrics provided are stored in a parquet file. The `commit_metrics` call add the parquet file in the version
    control framework. The metrics written in the parquet file can be retrieved using the `read_metrics` call.
    Example:
        ```python
        # Can be called at every epoch or every step in the training. This is logged to a parquet file and committed
        # at the commit stage.
        # Inside training loop
        while True:
             cmf.log_metric("training_metrics", {"train_loss": train_loss})
        cmf.commit_metrics("training_metrics")
        ```
    Args:
        metrics_name: Name to identify the metrics.
        custom_properties: Dictionary with metrics.
    """
    if metrics_name in self.metrics:
        key = max((self.metrics[metrics_name]).keys()) + 1
        self.metrics[metrics_name][key] = custom_properties
    else:
        self.metrics[metrics_name] = {}
        self.metrics[metrics_name][1] = custom_properties

commit_existing_metrics(metrics_name, uri, custom_properties=None)

Commits existing metrics associated with the given URI to MLMD. Example:

   artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", 
   {"custom_key": "custom_value"}) 
Args: metrics_name: Name of the metrics. uri: Unique identifier associated with the metrics. custom_properties: Optional custom properties for the metrics. Returns: Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact.

Source code in cmflib/cmf.py
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None):
    """ 
    Commits existing metrics associated with the given URI to MLMD. 
    Example: 
    ```python 
       artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", 
       {"custom_key": "custom_value"}) 
    ``` 
    Args: 
       metrics_name: Name of the metrics. 
       uri: Unique identifier associated with the metrics. 
       custom_properties: Optional custom properties for the metrics. 
    Returns:
       Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. 
    """

    custom_props =  {} if custom_properties is None else custom_properties
    c_hash = uri.strip()
    existing_artifact = []
    existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash))
    if (existing_artifact
        and len(existing_artifact) != 0 ):
        metrics = link_execution_to_artifact(
            store=self.store,
            execution_id=self.execution.id,
            uri=c_hash,
            input_name=metrics_name,
            event_type=mlpb.Event.Type.OUTPUT,
        )
    else:
        metrics = create_new_artifact_event_and_attribution(
            store=self.store,
            execution_id=self.execution.id,
            context_id=self.child_context.id,
            uri=uri,
            name=metrics_name,
            type_name="Step_Metrics",
            event_type=mlpb.Event.Type.OUTPUT,
            custom_properties=custom_props,
            milliseconds_since_epoch=int(time.time() * 1000),
        )
    if self.graph:
        self.driver.create_metrics_node(
            metrics_name,
            uri,
            "output",
            self.execution.id,
            self.parent_context,
            custom_props,
        )
        child_artifact = {
            "Name": metrics_name,
            "URI": uri,
            "Event": "output",
            "Execution_Name": self.execution_name,
            "Type": "Metrics",
            "Execution_Command": self.execution_command,
            "Pipeline_Id": self.parent_context.id,
        }
        self.driver.create_artifact_relationships(
            self.input_artifacts, child_artifact, self.execution_label_props
        )
    return metrics

create_dataslice(name)

Creates a dataslice object. Once created, users can add data instances to this data slice with add_data method. Users are also responsible for committing data slices by calling the commit method. Example:

dataslice = cmf.create_dataslice("slice-a")
Args: name: Name to identify the dataslice.

Returns:

Type Description
DataSlice

Instance of a newly created [DataSlice][cmflib.cmf.Cmf.DataSlice].

Source code in cmflib/cmf.py
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
def create_dataslice(self, name: str) -> "Cmf.DataSlice":
    """Creates a dataslice object.
    Once created, users can add data instances to this data slice with [add_data][cmflib.cmf.Cmf.DataSlice.add_data]
    method. Users are also responsible for committing data slices by calling the
    [commit][cmflib.cmf.Cmf.DataSlice.commit] method.
    Example:
        ```python
        dataslice = cmf.create_dataslice("slice-a")
        ```
    Args:
        name: Name to identify the dataslice.

    Returns:
        Instance of a newly created [DataSlice][cmflib.cmf.Cmf.DataSlice].
    """
    return Cmf.DataSlice(name, self)

update_dataslice(name, record, custom_properties)

Updates a dataslice record in a Parquet file with the provided custom properties. Example:

   dataslice=cmf.update_dataslice("dataslice_file.parquet", "record_id", 
   {"key1": "updated_value"})
Args: name: Name of the Parquet file. record: Identifier of the dataslice record to be updated. custom_properties: Dictionary containing custom properties to update.

Returns:

Type Description

None

Source code in cmflib/cmf.py
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
def update_dataslice(self, name: str, record: str, custom_properties: t.Dict):
    """Updates a dataslice record in a Parquet file with the provided custom properties.
    Example:
    ```python
       dataslice=cmf.update_dataslice("dataslice_file.parquet", "record_id", 
       {"key1": "updated_value"})
    ```
    Args:
       name: Name of the Parquet file.
       record: Identifier of the dataslice record to be updated.
       custom_properties: Dictionary containing custom properties to update.

    Returns:
       None
    """
    df = pd.read_parquet(name)
    temp_dict = df.to_dict("index")
    temp_dict[record].update(custom_properties)
    dataslice_df = pd.DataFrame.from_dict(temp_dict, orient="index")
    dataslice_df.index.names = ["Path"]
    dataslice_df.to_parquet(name)

This module contains all the public API for CMF

cmf_init_show()

Initializes and shows details of the CMF command. Example:

     result = cmf_init_show() 
Returns: Output from the _cmf_cmd_init function.

Source code in cmflib/cmf.py
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
def cmf_init_show():
    """ Initializes and shows details of the CMF command. 
    Example: 
    ```python 
         result = cmf_init_show() 
    ``` 
    Returns: 
       Output from the _cmf_cmd_init function. 
    """

    output=_cmf_cmd_init()
    return output

cmf_init(type='', path='', git_remote_url='', cmf_server_url='', neo4j_user='', neo4j_password='', neo4j_uri='', url='', endpoint_url='', access_key_id='', secret_key='', user='', password='', port=0)

Initializes the CMF configuration based on the provided parameters. Example:

   cmf_init( type="local", 
             path="/path/to/re",
             git_remote_url="git@github.com:user/repo.git",
             cmf_server_url="http://cmf-server"
             neo4j_user", 
             neo4j_password="password",
             neo4j_uri="bolt://localhost:76"
           )
Args: type: Type of repository ("local", "minioS3", "amazonS3", "sshremote") path: Path for the local repository. git_remote_url: Git remote URL for version control. cmf_server_url: CMF server URL. neo4j_user: Neo4j database username. neo4j_password: Neo4j database password. neo4j_uri: Neo4j database URI. url: URL for MinioS3 or AmazonS3. endpoint_url: Endpoint URL for MinioS3. access_key_id: Access key ID for MinioS3 or AmazonS3. secret_key: Secret key for MinioS3 or AmazonS3. user: SSH remote username. password: SSH remote password. port: SSH remote port Returns: Output based on the initialized repository type.

Source code in cmflib/cmf.py
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
def cmf_init(type: str="",
        path: str="",
        git_remote_url: str="",
        cmf_server_url: str = "",
        neo4j_user: str = "",
        neo4j_password: str = "",
        neo4j_uri: str = "",
        url: str="",
        endpoint_url: str="",
        access_key_id: str="",
        secret_key: str="",
        user: str="",
        password: str="",
        port: int=0
         ):

    """ Initializes the CMF configuration based on the provided parameters. 
    Example:
    ```python
       cmf_init( type="local", 
                 path="/path/to/re",
                 git_remote_url="git@github.com:user/repo.git",
                 cmf_server_url="http://cmf-server"
                 neo4j_user", 
                 neo4j_password="password",
                 neo4j_uri="bolt://localhost:76"
               )
    ```
    Args: 
       type: Type of repository ("local", "minioS3", "amazonS3", "sshremote")
       path: Path for the local repository. 
       git_remote_url: Git remote URL for version control.
       cmf_server_url: CMF server URL.
       neo4j_user: Neo4j database username.
       neo4j_password: Neo4j database password.
       neo4j_uri: Neo4j database URI.
       url: URL for MinioS3 or AmazonS3.
       endpoint_url: Endpoint URL for MinioS3.
       access_key_id: Access key ID for MinioS3 or AmazonS3.
       secret_key: Secret key for MinioS3 or AmazonS3. 
       user: SSH remote username.
       password: SSH remote password. 
       port: SSH remote port
    Returns:
       Output based on the initialized repository type.
    """

    if type=="":
        return print("Error: Type is not provided")
    if type not in ["local","minioS3","amazonS3","sshremote"]:
        return print("Error: Type value is undefined"+ " "+type+".Expected: "+",".join(["local","minioS3","amazonS3","sshremote"]))

    if neo4j_user!="" and  neo4j_password != "" and neo4j_uri != "":
        pass
    elif neo4j_user == "" and  neo4j_password == "" and neo4j_uri == "":
        pass
    else:
        return print("Error: Enter all neo4j parameters.") 

    args={'path':path,
        'git_remote_url':git_remote_url,
       'url':url,
        'endpoint_url':endpoint_url,
        'access_key_id':access_key_id,
        'secret_key':secret_key,
        'user':user,
        'password':password,
        }

    status_args=non_related_args(type,args)

    if type=="local" and path!= "" and  git_remote_url!= "" :
        """Initialize local repository"""
        output = _init_local(
            path, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri
        )
        if status_args != []:
            print("There are non-related arguments: "+",".join(status_args)+".Please remove them.")
        return output

    elif type=="minioS3" and url!= "" and endpoint_url!= "" and access_key_id!= "" and secret_key!= "" and git_remote_url!= "":
        """Initialize minioS3 repository"""
        output = _init_minioS3(
            url,
            endpoint_url,
            access_key_id,
            secret_key,
            git_remote_url,
            cmf_server_url,
            neo4j_user,
            neo4j_password,
            neo4j_uri,
        )
        if status_args != []:
            print("There are non-related arguments: "+",".join(status_args)+".Please remove them.")
        return output

    elif type=="amazonS3" and url!= "" and access_key_id!= "" and secret_key!= "" and git_remote_url!= "":
        """Initialize amazonS3 repository"""
        output = _init_amazonS3(
            url,
            access_key_id,
            secret_key,
            git_remote_url,
            cmf_server_url,
            neo4j_user,
            neo4j_password,
            neo4j_uri,
        )
        if status_args != []:
            print("There are non-related arguments: "+",".join(status_args)+".Please remove them.")

        return output

    elif type=="sshremote" and path !="" and user!="" and port!=0 and password!="" and git_remote_url!="":
        """Initialize sshremote repository"""
        output = _init_sshremote(
            path,
            user,
            port,
            password,
            git_remote_url,
            cmf_server_url,
            neo4j_user,
            neo4j_password,
            neo4j_uri,
        )
        if status_args != []:
            print("There are non-related arguments: "+",".join(status_args)+".Please remove them.")

        return output

    else:
        print("Error: Enter all arguments")

metadata_push(pipeline_name, filename, execution_id='')

Pushes MLMD file to CMF-server. Example:

     result = metadata_push("example_pipeline", "mlmd_file", "3")
Args: pipeline_name: Name of the pipeline. filename: Path to the MLMD file. execution_id: Optional execution ID.

Returns:

Type Description

Response output from the _metadata_push function.

Source code in cmflib/cmf.py
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
def metadata_push(pipeline_name,filename,execution_id: str = ""):
    """ Pushes MLMD file to CMF-server.
    Example:
    ```python
         result = metadata_push("example_pipeline", "mlmd_file", "3")
    ```
    Args:
        pipeline_name: Name of the pipeline.
        filename: Path to the MLMD file.
        execution_id: Optional execution ID.

    Returns:
        Response output from the _metadata_push function.
    """
    # Required arguments:  pipeline_name, filename (mlmd file path) 
    #Optional arguments: Execution_ID
    output = _metadata_push(pipeline_name,filename, execution_id)
    return output

metadata_pull(pipeline_name, filename='./mlmd', execution_id='')

Pulls MLMD file from CMF-server. Example:

     result = metadata_pull("example_pipeline", "./mlmd_directory", "execution_123") 
Args: pipeline_name: Name of the pipeline. filename: File path to store the MLMD file. execution_id: Optional execution ID. Returns: Message from the _metadata_pull function.

Source code in cmflib/cmf.py
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
def metadata_pull(pipeline_name,filename ="./mlmd", execution_id: str = ""):
    """ Pulls MLMD file from CMF-server. 
     Example: 
     ```python 
          result = metadata_pull("example_pipeline", "./mlmd_directory", "execution_123") 
     ``` 
     Args: 
        pipeline_name: Name of the pipeline. 
        filename: File path to store the MLMD file. 
        execution_id: Optional execution ID. 
     Returns: 
        Message from the _metadata_pull function. 
     """
    # Required arguments:  pipeline_name, filename(file path to store mlmd file) 
    #Optional arguments: Execution_ID
    output = _metadata_pull(pipeline_name,filename, execution_id)
    return output

artifact_pull(pipeline_name, filename='./mlmd')

Pulls artifacts from the initialized repository.

Example:

     result = artifact_pull("example_pipeline", "./mlmd_directory")

Parameters:

Name Type Description Default
pipeline_name

Name of the pipeline.

required
filename

Path to store artifacts.

'./mlmd'

Returns:

Type Description

Output from the _artifact_pull function.

Source code in cmflib/cmf.py
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
def artifact_pull(pipeline_name,filename="./mlmd"):
    """ Pulls artifacts from the initialized repository.

    Example:
    ```python
         result = artifact_pull("example_pipeline", "./mlmd_directory")
    ```

    Args:
        pipeline_name: Name of the pipeline.
        filename: Path to store artifacts.

    Returns:
        Output from the _artifact_pull function.
    """

    # Required arguments: Pipeline_name
    # Optional arguments: filename( path to store artifacts)
    output = _artifact_pull(pipeline_name,filename)
    return output

artifact_pull_single(pipeline_name, filename, artifact_name)

Pulls a single artifact from the initialized repository. Example:

    result = artifact_pull_single("example_pipeline", "./mlmd_directory", "example_artifact") 
Args: pipeline_name: Name of the pipeline. filename: Path to store the artifact. artifact_name: Name of the artifact. Returns: Output from the _artifact_pull_single function.

Source code in cmflib/cmf.py
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
def artifact_pull_single(pipeline_name,filename,artifact_name):
    """ Pulls a single artifact from the initialized repository. 
    Example: 
    ```python 
        result = artifact_pull_single("example_pipeline", "./mlmd_directory", "example_artifact") 
    ```
    Args: 
       pipeline_name: Name of the pipeline. 
       filename: Path to store the artifact. 
       artifact_name: Name of the artifact. 
    Returns:
       Output from the _artifact_pull_single function. 
    """

    # Required arguments: Pipeline_name
    # Optional arguments: filename( path to store artifacts)
    output = _artifact_pull_single(pipeline_name,filename,artifact_name)
    return output

artifact_push()

Pushes artifacts to the initialized repository.

Example:

     result = artifact_push()

Returns:

Type Description

Output from the _artifact_push function.

Source code in cmflib/cmf.py
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
def artifact_push():
    """ Pushes artifacts to the initialized repository.

    Example:
    ```python
         result = artifact_push()
    ```

    Returns:
        Output from the _artifact_push function.
    """
    output = _artifact_push()
    return output