Skip to content

cmflib.cmf

cmflib.cmf.Cmf(filepath='mlmd', pipeline_name='', custom_properties=None, graph=False, is_server=False)

This class provides methods to log metadata for distributed AI pipelines. The class instance creates an ML metadata store to store the metadata. It creates a driver to store nodes and its relationships to neo4j. The user has to provide the name of the pipeline, that needs to be recorded with CMF.

cmflib.cmf.Cmf(
    filepath="mlmd",
    pipeline_name="test_pipeline",
    custom_properties={"owner": "user_a"},
    graph=False
)

Parameters:

Name Type Description Default
filepath str

Path to the sqlite file to store the metadata

'mlmd'
pipeline_name str

Name to uniquely identify the pipeline. Note that name is the unique identifier for a pipeline. If a pipeline already exist with the same name, the existing pipeline object is reused.

''
custom_properties Optional[Dict]

Additional properties of the pipeline that needs to be stored.

None
graph bool

If set to true, the libray also stores the relationships in the provided graph database.

False

The following variables should be set: neo4j_uri (graph server URI), neo4j_user (user name) and neo4j_password (user password), e.g.:

cmf init local --path /home/user/local-storage --git-remote-url https://github.com/XXX/exprepo.git --neo4j-user neo4j --neo4j-password neo4j
                        --neo4j-uri bolt://localhost:7687

create_context(pipeline_stage, custom_properties=None)

Create's a context(stage). Every call creates a unique pipeline stage. Updates Pipeline_stage name.

#Create context
# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create context
context: mlmd.proto.Context = cmf.create_context(
    pipeline_stage="prepare",
    custom_properties ={"user-metadata1": "metadata_value"}
)

Parameters:

Name Type Description Default
Pipeline_stage

Name of the Stage.

required
custom_properties Optional[Dict]

Developers can provide key value pairs with additional properties of the execution that need to be stored.

None

Returns:

Type Description
Context

Context object from ML Metadata library associated with the new context for this stage.

create_execution(execution_type, custom_properties=None, cmd=None, create_new_execution=True)

Create execution. Every call creates a unique execution. Execution can only be created within a context, so create_context must be called first.

# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create or reuse context for this stage
context: mlmd.proto.Context = cmf.create_context(
    pipeline_stage="prepare",
    custom_properties ={"user-metadata1": "metadata_value"}
)
# Create a new execution for this stage run
execution: mlmd.proto.Execution = cmf.create_execution(
    execution_type="Prepare",
    custom_properties = {"split": split, "seed": seed}
)

Parameters:

Name Type Description Default
execution_type str

Type of the execution.(when create_new_execution is False, this is the name of execution)

required
custom_properties Optional[Dict]

Developers can provide key value pairs with additional properties of the execution that need to be stored.

None
cmd Optional[str]

command used to run this execution.

None
create_new_execution bool

bool = True, This can be used by advanced users to re-use executions This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the custom call-backs are used to log metrics. if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type. if create_new_execution is False, if existing execution exist with the same name as execution_type. it will be reused. Only executions created with create_new_execution as False will have "name" as a property.

True

Returns:

Type Description
Execution

Execution object from ML Metadata library associated with the new execution for this stage.

update_execution(execution_id, custom_properties=None)

Updates an existing execution. The custom properties can be updated after creation of the execution. The new custom properties is merged with earlier custom properties.

# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Update a execution
execution: mlmd.proto.Execution = cmf.update_execution(
    execution_id=8,
    custom_properties = {"split": split, "seed": seed}
)

Parameters:

Name Type Description Default
execution_id int

id of the execution.

required
custom_properties Optional[Dict]

Developers can provide key value pairs with additional properties of the execution that need to be updated.

None

Returns:

Type Description

Execution object from ML Metadata library associated with the updated execution for this stage.

log_dataset(url, event, custom_properties=None, label=None, label_properties=None, external=False)

Logs a dataset as artifact. This call adds the dataset to dvc. The dvc metadata file created (.dvc) will be added to git and committed. The version of the dataset is automatically obtained from the versioning software(DVC) and tracked as a metadata.

artifact: mlmd.proto.Artifact = cmf.log_dataset(
    url="/repo/data.xml",
    event="input",
    custom_properties={"source":"kaggle"},
    label=artifacts/labels.csv,
    label_properties={"user":"Ron"}
)

Parameters:

Name Type Description Default
url str

The path to the dataset.

required
event str

Takes arguments INPUT OR OUTPUT.

required
custom_properties Optional[Dict]

Dataset properties (key/value pairs).

None
label Optional[str]

Labels are usually .csv files containing information regarding the dataset.

None
label_properties Optional[Dict]

Custom properties for a label.

None

Returns:

Type Description
Artifact

Artifact object from ML Metadata library associated with the new dataset artifact.

log_model(path, event, model_framework='Default', model_type='Default', model_name='Default', custom_properties=None)

Logs a model. The model is added to dvc and the metadata file (.dvc) gets committed to git.

artifact: mlmd.proto.Artifact= cmf.log_model(
    path="path/to/model.pkl",
    event="output",
    model_framework="SKlearn",
    model_type="RandomForestClassifier",
    model_name="RandomForestClassifier:default"
)

Parameters:

Name Type Description Default
path str

Path to the model file.

required
event str

Takes arguments INPUT OR OUTPUT.

required
model_framework str

Framework used to create the model.

'Default'
model_type str

Type of model algorithm used.

'Default'
model_name str

Name of the algorithm used.

'Default'
custom_properties Optional[Dict]

The model properties.

None

Returns:

Type Description
Artifact

Artifact object from ML Metadata library associated with the new model artifact.

log_execution_metrics(metrics_name, custom_properties=None)

Log the metadata associated with the execution (coarse-grained tracking). It is stored as a metrics artifact. This does not have a backing physical file, unlike other artifacts that we have.

exec_metrics: mlpb.Artifact = cmf.log_execution_metrics(
    metrics_name="Training_Metrics",
    {"auc": auc, "loss": loss}
)

Parameters:

Name Type Description Default
metrics_name str

Name to identify the metrics.

required
custom_properties Optional[Dict]

Dictionary with metric values.

None

Returns:

Type Description
Artifact

Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact.

log_metric(metrics_name, custom_properties=None)

Stores the fine-grained (per step or per epoch) metrics to memory. The metrics provided are stored in a parquet file. The commit_metrics call add the parquet file in the version control framework. The metrics written in the parquet file can be retrieved using the read_metrics call.

# Can be called at every epoch or every step in the training. This is logged to a parquet file and committed
# at the commit stage.
# Inside training loop
while True:
        cmf.log_metric("training_metrics", {"train_loss": train_loss})
cmf.commit_metrics("training_metrics")

Parameters:

Name Type Description Default
metrics_name str

Name to identify the metrics.

required
custom_properties Optional[Dict]

Dictionary with metrics.

None

create_dataslice(name)

Creates a dataslice object. Once created, users can add data instances to this data slice with add_data method. Users are also responsible for committing data slices by calling the commit method.

dataslice = cmf.create_dataslice("slice-a")

Parameters:

Name Type Description Default
name str

Name to identify the dataslice.

required

Returns:

Type Description
DataSlice

Instance of a newly created DataSlice.

update_dataslice(name, record, custom_properties)

Updates a dataslice record in a Parquet file with the provided custom properties.

   dataslice=cmf.update_dataslice("dataslice_file.parquet", "record_id", 
   {"key1": "updated_value"})

Parameters:

Name Type Description Default
name str

Name of the Parquet file.

required
record str

Identifier of the dataslice record to be updated.

required
custom_properties Dict

Dictionary containing custom properties to update.

required

Returns:

Type Description

None

cmflib.cmf

This module contains all the public API for CMF

cmf_init_show()

Initializes and shows details of the CMF command.

result = cmf_init_show() 

Returns:

Type Description

Output from the _cmf_cmd_init function.

cmf_init(type='', path='', git_remote_url='', cmf_server_url='', neo4j_user='', neo4j_password='', neo4j_uri='', url='', endpoint_url='', access_key_id='', secret_key='', session_token='', user='', password='', port=0, osdf_path='', osdf_cache='', key_id='', key_path='', key_issuer='')

Initializes the CMF configuration based on the provided parameters.

cmf_init( type="local", 
            path="/path/to/re",
            git_remote_url="git@github.com:user/repo.git",
            cmf_server_url="http://cmf-server"
            neo4j_user", 
            neo4j_password="password",
            neo4j_uri="bolt://localhost:76"
        )

Parameters:

Name Type Description Default
type str

Type of repository ("local", "minioS3", "amazonS3", "sshremote", "osdfremote")

''
path str

Path for the local repository.

''
git_remote_url str

Git remote URL for version control.

''
cmf_server_url str

CMF server URL.

''
neo4j_user str

Neo4j database username.

''
neo4j_password str

Neo4j database password.

''
neo4j_uri str

Neo4j database URI.

''
url str

URL for MinioS3 or AmazonS3.

''
endpoint_url str

Endpoint URL for MinioS3.

''
access_key_id str

Access key ID for MinioS3 or AmazonS3.

''
secret_key str

Secret key for MinioS3 or AmazonS3.

''
session_token str

Session token for AmazonS3.

''
user str

SSH remote username.

''
password str

SSH remote password.

''
port int

SSH remote port.

0
osdf_path str

OSDF Origin Path.

''
osdf_cache str

OSDF Cache Path (Optional).

''
key_id str

OSDF Key ID.

''
key_path str

OSDF Private Key Path.

''
key_issuer str

OSDF Key Issuer URL.

''

Returns:

Type Description

Output based on the initialized repository type.

metadata_push(pipeline_name, file_name='./mlmd', tensorboard_path='', execution_uuid='')

Pushes metadata file to CMF-server.

result = metadata_push("example_pipeline", "mlmd_file", "eg_execution_uuid", "tensorboard_log")

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify input metadata file name.

'./mlmd'
execution_uuid str

Optional execution UUID.

''
tensorboard_path str

Path to tensorboard logs.

''

Returns:

Type Description

Response output from the _metadata_push function.

metadata_pull(pipeline_name, file_name='./mlmd', execution_uuid='')

Pulls metadata file from CMF-server.

result = metadata_pull("example_pipeline", "./mlmd_directory", "eg_execution_uuid") 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify output metadata file name.

'./mlmd'
execution_uuid str

Optional execution UUID.

''

Returns:

Type Description

Message from the _metadata_pull function.

metadata_export(pipeline_name, json_file_name='', file_name='./mlmd')

Export local mlmd's metadata in json format to a json file.

result = metadata_export("example_pipeline", "./jsonfile", "./mlmd_directory") 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
json_file_name str

File path of json file.

''
file_name

Specify input metadata file name.

'./mlmd'

Returns:

Type Description

Message from the _metadata_export function.

artifact_pull(pipeline_name, file_name='./mlmd')

Pulls artifacts from the initialized repository.

result = artifact_pull("example_pipeline", "./mlmd_directory")

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify input metadata file name.

'./mlmd'

Returns:

Type Description

Output from the _artifact_pull function.

artifact_pull_single(pipeline_name, file_name, artifact_name)

Pulls a single artifact from the initialized repository.

result = artifact_pull_single("example_pipeline", "./mlmd_directory", "example_artifact") 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name str

Specify input metadata file name.

required
artifact_name str

Name of the artifact.

required

Returns:

Type Description

Output from the _artifact_pull_single function.

artifact_push(pipeline_name, filepath='./mlmd', jobs=32)

Pushes artifacts to the initialized repository.

result = artifact_push("example_pipeline", "./mlmd_directory", 32)

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
filepath

Path to store the artifact.

'./mlmd'
jobs int

Number of jobs to use for pushing artifacts.

32

Returns:

Type Description

Output from the _artifact_push function.

artifact_list(pipeline_name, file_name='./mlmd', artifact_name='')

Displays artifacts from the input metadata file with a few properties in a 7-column table, limited to 20 records per page.

result = _artifact_list("example_pipeline", "./mlmd_directory", "example_artifact_name") 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify input metadata file name.

'./mlmd'
artifact_name str

Artifacts for particular artifact name.

''

Returns:

Type Description

Output from the _artifact_list function.

pipeline_list(file_name='./mlmd')

Display a list of pipeline name(s) from the available input metadata file.

result = _pipeline_list("./mlmd_directory")

Parameters:

Name Type Description Default
file_name

Specify input metadata file name.

'./mlmd'

Returns:

Type Description

Output from the _pipeline_list function.

execution_list(pipeline_name, file_name='./mlmd', execution_uuid='')

Displays executions from the input metadata file with a few properties in a 7-column table, limited to 20 records per page.

result = _execution_list("example_pipeline", "./mlmd_directory", "example_execution_uuid") 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify input metadata file name.

'./mlmd'
execution_uuid str

Specify the execution uuid to retrieve execution.

''

Returns:

Type Description

Output from the _execution_list function.

repo_push(pipeline_name, filepath='./mlmd', tensorboard_path='', execution_uuid='', jobs=32)

Push artifacts, metadata files, and source code to the user's artifact repository, cmf-server, and git respectively.

result = _repo_push("example_pipeline", "./mlmd_directory", "example_execution_uuid", "./tensorboard_path", 32) 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify input metadata file name.

required
execution_uuid str

Specify execution uuid.

''
tensorboard_path str

Path to tensorboard logs.

''
jobs int

Number of jobs to use for pushing artifacts.

32

Returns:

Type Description

Output from the _repo_push function.

repo_pull(pipeline_name, file_name='./mlmd', execution_uuid='')

Pull artifacts, metadata files, and source code from the user's artifact repository, cmf-server, and git respectively.

result = _repo_pull("example_pipeline", "./mlmd_directory", "example_execution_uuid") 

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline.

required
file_name

Specify output metadata file name.

'./mlmd'
execution_uuid str

Specify execution uuid.

''

Returns:

Type Description

Output from the _repo_pull function.

dvc_ingest(file_name='./mlmd')

Ingests metadata from the dvc.lock file into the CMF. If an existing MLMD file is provided, it merges and updates execution metadata based on matching commands, or creates new executions if none exist.

result = _dvc_ingest("./mlmd_directory") 

Parameters:

Name Type Description Default
file_name

Specify input metadata file name.

'./mlmd'

Returns:

Type Description

Output from the _dvc_ingest function.