cmflib.cmf¶
cmflib.cmf.Cmf(filepath='mlmd', pipeline_name='', custom_properties=None, graph=False, is_server=False)
¶
This class provides methods to log metadata for distributed AI pipelines. The class instance creates an ML metadata store to store the metadata. It creates a driver to store nodes and its relationships to neo4j. The user has to provide the name of the pipeline, that needs to be recorded with CMF.
cmflib.cmf.Cmf(
filepath="mlmd",
pipeline_name="test_pipeline",
custom_properties={"owner": "user_a"},
graph=False
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath
|
str
|
Path to the sqlite file to store the metadata |
'mlmd'
|
pipeline_name
|
str
|
Name to uniquely identify the pipeline. Note that name is the unique identifier for a pipeline. If a pipeline already exist with the same name, the existing pipeline object is reused. |
''
|
custom_properties
|
Optional[Dict]
|
Additional properties of the pipeline that needs to be stored. |
None
|
graph
|
bool
|
If set to true, the libray also stores the relationships in the provided graph database. |
False
|
The following
variables should be set: neo4j_uri
(graph server URI), neo4j_user
(user name) and
neo4j_password
(user password), e.g.:
cmf init local --path /home/user/local-storage --git-remote-url https://github.com/XXX/exprepo.git --neo4j-user neo4j --neo4j-password neo4j
--neo4j-uri bolt://localhost:7687
create_context(pipeline_stage, custom_properties=None)
¶
Create's a context(stage). Every call creates a unique pipeline stage. Updates Pipeline_stage name.
#Create context
# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create context
context: mlmd.proto.Context = cmf.create_context(
pipeline_stage="prepare",
custom_properties ={"user-metadata1": "metadata_value"}
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
Pipeline_stage
|
Name of the Stage. |
required | |
custom_properties
|
Optional[Dict]
|
Developers can provide key value pairs with additional properties of the execution that need to be stored. |
None
|
Returns:
Type | Description |
---|---|
Context
|
Context object from ML Metadata library associated with the new context for this stage. |
create_execution(execution_type, custom_properties=None, cmd=None, create_new_execution=True)
¶
Create execution. Every call creates a unique execution. Execution can only be created within a context, so create_context must be called first.
# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Create or reuse context for this stage
context: mlmd.proto.Context = cmf.create_context(
pipeline_stage="prepare",
custom_properties ={"user-metadata1": "metadata_value"}
)
# Create a new execution for this stage run
execution: mlmd.proto.Execution = cmf.create_execution(
execution_type="Prepare",
custom_properties = {"split": split, "seed": seed}
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_type
|
str
|
Type of the execution.(when create_new_execution is False, this is the name of execution) |
required |
custom_properties
|
Optional[Dict]
|
Developers can provide key value pairs with additional properties of the execution that need to be stored. |
None
|
cmd
|
Optional[str]
|
command used to run this execution. |
None
|
create_new_execution
|
bool
|
bool = True, This can be used by advanced users to re-use executions This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the custom call-backs are used to log metrics. if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type. if create_new_execution is False, if existing execution exist with the same name as execution_type. it will be reused. Only executions created with create_new_execution as False will have "name" as a property. |
True
|
Returns:
Type | Description |
---|---|
Execution
|
Execution object from ML Metadata library associated with the new execution for this stage. |
update_execution(execution_id, custom_properties=None)
¶
Updates an existing execution. The custom properties can be updated after creation of the execution. The new custom properties is merged with earlier custom properties.
# Import CMF
from cmflib.cmf import Cmf
from ml_metadata.proto import metadata_store_pb2 as mlpb
# Create CMF logger
cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline")
# Update a execution
execution: mlmd.proto.Execution = cmf.update_execution(
execution_id=8,
custom_properties = {"split": split, "seed": seed}
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_id
|
int
|
id of the execution. |
required |
custom_properties
|
Optional[Dict]
|
Developers can provide key value pairs with additional properties of the execution that need to be updated. |
None
|
Returns:
Type | Description |
---|---|
Execution object from ML Metadata library associated with the updated execution for this stage. |
log_dataset(url, event, custom_properties=None, label=None, label_properties=None, external=False)
¶
Logs a dataset as artifact. This call adds the dataset to dvc. The dvc metadata file created (.dvc) will be added to git and committed. The version of the dataset is automatically obtained from the versioning software(DVC) and tracked as a metadata.
artifact: mlmd.proto.Artifact = cmf.log_dataset(
url="/repo/data.xml",
event="input",
custom_properties={"source":"kaggle"},
label=artifacts/labels.csv,
label_properties={"user":"Ron"}
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url
|
str
|
The path to the dataset. |
required |
event
|
str
|
Takes arguments |
required |
custom_properties
|
Optional[Dict]
|
Dataset properties (key/value pairs). |
None
|
label
|
Optional[str]
|
Labels are usually .csv files containing information regarding the dataset. |
None
|
label_properties
|
Optional[Dict]
|
Custom properties for a label. |
None
|
Returns:
Type | Description |
---|---|
Artifact
|
Artifact object from ML Metadata library associated with the new dataset artifact. |
log_model(path, event, model_framework='Default', model_type='Default', model_name='Default', custom_properties=None)
¶
Logs a model. The model is added to dvc and the metadata file (.dvc) gets committed to git.
artifact: mlmd.proto.Artifact= cmf.log_model(
path="path/to/model.pkl",
event="output",
model_framework="SKlearn",
model_type="RandomForestClassifier",
model_name="RandomForestClassifier:default"
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
Path to the model file. |
required |
event
|
str
|
Takes arguments |
required |
model_framework
|
str
|
Framework used to create the model. |
'Default'
|
model_type
|
str
|
Type of model algorithm used. |
'Default'
|
model_name
|
str
|
Name of the algorithm used. |
'Default'
|
custom_properties
|
Optional[Dict]
|
The model properties. |
None
|
Returns:
Type | Description |
---|---|
Artifact
|
Artifact object from ML Metadata library associated with the new model artifact. |
log_execution_metrics(metrics_name, custom_properties=None)
¶
Log the metadata associated with the execution (coarse-grained tracking). It is stored as a metrics artifact. This does not have a backing physical file, unlike other artifacts that we have.
exec_metrics: mlpb.Artifact = cmf.log_execution_metrics(
metrics_name="Training_Metrics",
{"auc": auc, "loss": loss}
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metrics_name
|
str
|
Name to identify the metrics. |
required |
custom_properties
|
Optional[Dict]
|
Dictionary with metric values. |
None
|
Returns:
Type | Description |
---|---|
Artifact
|
Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact. |
log_metric(metrics_name, custom_properties=None)
¶
Stores the fine-grained (per step or per epoch) metrics to memory.
The metrics provided are stored in a parquet file. The commit_metrics
call add the parquet file in the version
control framework. The metrics written in the parquet file can be retrieved using the read_metrics
call.
# Can be called at every epoch or every step in the training. This is logged to a parquet file and committed
# at the commit stage.
# Inside training loop
while True:
cmf.log_metric("training_metrics", {"train_loss": train_loss})
cmf.commit_metrics("training_metrics")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metrics_name
|
str
|
Name to identify the metrics. |
required |
custom_properties
|
Optional[Dict]
|
Dictionary with metrics. |
None
|
create_dataslice(name)
¶
Creates a dataslice object. Once created, users can add data instances to this data slice with add_data method. Users are also responsible for committing data slices by calling the commit method.
dataslice = cmf.create_dataslice("slice-a")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name to identify the dataslice. |
required |
Returns:
Type | Description |
---|---|
DataSlice
|
Instance of a newly created DataSlice. |
update_dataslice(name, record, custom_properties)
¶
Updates a dataslice record in a Parquet file with the provided custom properties.
dataslice=cmf.update_dataslice("dataslice_file.parquet", "record_id",
{"key1": "updated_value"})
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the Parquet file. |
required |
record
|
str
|
Identifier of the dataslice record to be updated. |
required |
custom_properties
|
Dict
|
Dictionary containing custom properties to update. |
required |
Returns:
Type | Description |
---|---|
None |
cmflib.cmf
¶
This module contains all the public API for CMF
cmf_init_show()
¶
Initializes and shows details of the CMF command.
result = cmf_init_show()
Returns:
Type | Description |
---|---|
Output from the _cmf_cmd_init function. |
cmf_init(type='', path='', git_remote_url='', cmf_server_url='', neo4j_user='', neo4j_password='', neo4j_uri='', url='', endpoint_url='', access_key_id='', secret_key='', session_token='', user='', password='', port=0, osdf_path='', osdf_cache='', key_id='', key_path='', key_issuer='')
¶
Initializes the CMF configuration based on the provided parameters.
cmf_init( type="local",
path="/path/to/re",
git_remote_url="git@github.com:user/repo.git",
cmf_server_url="http://cmf-server"
neo4j_user",
neo4j_password="password",
neo4j_uri="bolt://localhost:76"
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type
|
str
|
Type of repository ("local", "minioS3", "amazonS3", "sshremote", "osdfremote") |
''
|
path
|
str
|
Path for the local repository. |
''
|
git_remote_url
|
str
|
Git remote URL for version control. |
''
|
cmf_server_url
|
str
|
CMF server URL. |
''
|
neo4j_user
|
str
|
Neo4j database username. |
''
|
neo4j_password
|
str
|
Neo4j database password. |
''
|
neo4j_uri
|
str
|
Neo4j database URI. |
''
|
url
|
str
|
URL for MinioS3 or AmazonS3. |
''
|
endpoint_url
|
str
|
Endpoint URL for MinioS3. |
''
|
access_key_id
|
str
|
Access key ID for MinioS3 or AmazonS3. |
''
|
secret_key
|
str
|
Secret key for MinioS3 or AmazonS3. |
''
|
session_token
|
str
|
Session token for AmazonS3. |
''
|
user
|
str
|
SSH remote username. |
''
|
password
|
str
|
SSH remote password. |
''
|
port
|
int
|
SSH remote port. |
0
|
osdf_path
|
str
|
OSDF Origin Path. |
''
|
osdf_cache
|
str
|
OSDF Cache Path (Optional). |
''
|
key_id
|
str
|
OSDF Key ID. |
''
|
key_path
|
str
|
OSDF Private Key Path. |
''
|
key_issuer
|
str
|
OSDF Key Issuer URL. |
''
|
Returns:
Type | Description |
---|---|
Output based on the initialized repository type. |
metadata_push(pipeline_name, file_name='./mlmd', tensorboard_path='', execution_uuid='')
¶
Pushes metadata file to CMF-server.
result = metadata_push("example_pipeline", "mlmd_file", "eg_execution_uuid", "tensorboard_log")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify input metadata file name. |
'./mlmd'
|
|
execution_uuid
|
str
|
Optional execution UUID. |
''
|
tensorboard_path
|
str
|
Path to tensorboard logs. |
''
|
Returns:
Type | Description |
---|---|
Response output from the _metadata_push function. |
metadata_pull(pipeline_name, file_name='./mlmd', execution_uuid='')
¶
Pulls metadata file from CMF-server.
result = metadata_pull("example_pipeline", "./mlmd_directory", "eg_execution_uuid")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify output metadata file name. |
'./mlmd'
|
|
execution_uuid
|
str
|
Optional execution UUID. |
''
|
Returns:
Type | Description |
---|---|
Message from the _metadata_pull function. |
metadata_export(pipeline_name, json_file_name='', file_name='./mlmd')
¶
Export local mlmd's metadata in json format to a json file.
result = metadata_export("example_pipeline", "./jsonfile", "./mlmd_directory")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
json_file_name
|
str
|
File path of json file. |
''
|
file_name
|
Specify input metadata file name. |
'./mlmd'
|
Returns:
Type | Description |
---|---|
Message from the _metadata_export function. |
artifact_pull(pipeline_name, file_name='./mlmd')
¶
Pulls artifacts from the initialized repository.
result = artifact_pull("example_pipeline", "./mlmd_directory")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify input metadata file name. |
'./mlmd'
|
Returns:
Type | Description |
---|---|
Output from the _artifact_pull function. |
artifact_pull_single(pipeline_name, file_name, artifact_name)
¶
Pulls a single artifact from the initialized repository.
result = artifact_pull_single("example_pipeline", "./mlmd_directory", "example_artifact")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
str
|
Specify input metadata file name. |
required |
artifact_name
|
str
|
Name of the artifact. |
required |
Returns:
Type | Description |
---|---|
Output from the _artifact_pull_single function. |
artifact_push(pipeline_name, filepath='./mlmd', jobs=32)
¶
Pushes artifacts to the initialized repository.
result = artifact_push("example_pipeline", "./mlmd_directory", 32)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
filepath
|
Path to store the artifact. |
'./mlmd'
|
|
jobs
|
int
|
Number of jobs to use for pushing artifacts. |
32
|
Returns:
Type | Description |
---|---|
Output from the _artifact_push function. |
artifact_list(pipeline_name, file_name='./mlmd', artifact_name='')
¶
Displays artifacts from the input metadata file with a few properties in a 7-column table, limited to 20 records per page.
result = _artifact_list("example_pipeline", "./mlmd_directory", "example_artifact_name")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify input metadata file name. |
'./mlmd'
|
|
artifact_name
|
str
|
Artifacts for particular artifact name. |
''
|
Returns:
Type | Description |
---|---|
Output from the _artifact_list function. |
pipeline_list(file_name='./mlmd')
¶
Display a list of pipeline name(s) from the available input metadata file.
result = _pipeline_list("./mlmd_directory")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_name
|
Specify input metadata file name. |
'./mlmd'
|
Returns:
Type | Description |
---|---|
Output from the _pipeline_list function. |
execution_list(pipeline_name, file_name='./mlmd', execution_uuid='')
¶
Displays executions from the input metadata file with a few properties in a 7-column table, limited to 20 records per page.
result = _execution_list("example_pipeline", "./mlmd_directory", "example_execution_uuid")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify input metadata file name. |
'./mlmd'
|
|
execution_uuid
|
str
|
Specify the execution uuid to retrieve execution. |
''
|
Returns:
Type | Description |
---|---|
Output from the _execution_list function. |
repo_push(pipeline_name, filepath='./mlmd', tensorboard_path='', execution_uuid='', jobs=32)
¶
Push artifacts, metadata files, and source code to the user's artifact repository, cmf-server, and git respectively.
result = _repo_push("example_pipeline", "./mlmd_directory", "example_execution_uuid", "./tensorboard_path", 32)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify input metadata file name. |
required | |
execution_uuid
|
str
|
Specify execution uuid. |
''
|
tensorboard_path
|
str
|
Path to tensorboard logs. |
''
|
jobs
|
int
|
Number of jobs to use for pushing artifacts. |
32
|
Returns:
Type | Description |
---|---|
Output from the _repo_push function. |
repo_pull(pipeline_name, file_name='./mlmd', execution_uuid='')
¶
Pull artifacts, metadata files, and source code from the user's artifact repository, cmf-server, and git respectively.
result = _repo_pull("example_pipeline", "./mlmd_directory", "example_execution_uuid")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name
|
str
|
Name of the pipeline. |
required |
file_name
|
Specify output metadata file name. |
'./mlmd'
|
|
execution_uuid
|
str
|
Specify execution uuid. |
''
|
Returns:
Type | Description |
---|---|
Output from the _repo_pull function. |
dvc_ingest(file_name='./mlmd')
¶
Ingests metadata from the dvc.lock file into the CMF. If an existing MLMD file is provided, it merges and updates execution metadata based on matching commands, or creates new executions if none exist.
result = _dvc_ingest("./mlmd_directory")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_name
|
Specify input metadata file name. |
'./mlmd'
|
Returns:
Type | Description |
---|---|
Output from the _dvc_ingest function. |