cmflib¶
The cmflib
package is the foundational Python library that provides the core metadata tracking capabilities for the Common Metadata Framework (CMF). It offers a unified API for logging metadata across distributed AI/ML pipelines, integrating with multiple storage backends and versioning systems.
This document covers the core library components, main API classes, and integration mechanisms. For CLI usage patterns, see Quick Start with cmf-client. For server-side components, see cmf-server.
Core Architecture¶
Complex ML projects rely on ML pipelines
to train and test ML models. An ML pipeline is a sequence of stages where
each stage performs a particular task, such as data loading, pre-processing, ML model training, and testing stages.
Each stage can have multiple Executions which:
- consume
inputs
and produceoutputs
. - are parameterized by parameters that guide the process of producing outputs.
CMF uses the abstractions of Pipeline
, Context
, and Executions
to store the metadata of complex ML pipelines.
Each pipeline has a name. Users provide it when they initialize the CMF. Each stage is represented by a Context
object.
Metadata associated with each run of a stage is captured in the Execution object.
Inputs and outputs of Executions can be logged as dataset, model, or metrics. While parameters of executions
are recorded as properties of executions.
Main API (cmf.Cmf)¶
The Cmf
class is the primary interface for metadata tracking in CMF. It provides methods for creating pipelines, contexts, executions, and logging artifacts.
Key Methods¶
Method | Purpose | Usage |
---|---|---|
__init__(filename, pipeline_name) |
Initialize CMF instance | cmf = Cmf(filename="mlmd", pipeline_name="my_pipeline") |
create_context(pipeline_stage) |
Create a pipeline stage context | context = cmf.create_context(pipeline_stage="train") |
create_execution(execution_type) |
Create an execution within a context | execution = cmf.create_execution(execution_type="training_run") |
log_dataset(url, event, custom_properties) |
Log dataset artifacts | cmf.log_dataset(url="data.csv", event="input") |
log_model(path, event, model_framework) |
Log model artifacts | cmf.log_model(path="model.pkl", event="output") |
log_metrics(metrics_name, custom_properties) |
Log metrics | cmf.log_metrics(metrics_name="accuracy", custom_properties={"value": 0.95}) |
Start tracking the pipeline metadata by initializing the CMF runtime. The metadata will be associated with the
pipeline named
Before we can start tracking metadata, we need to let CMF know about the stage type. This is not yet associated with this particular execution.
Now we can create a new stage execution associated with the
Finally, we can log an input (train dataset), and once trained, an output (ML model) artifact.
|
![]() |
Quick Example¶
Go through the Getting Started page to learn more about CMF API usage.
API Overview¶
Import CMF.
from cmflib import cmf
Initialize CMF. The [CMF][cmflibcmfcmf] object is responsible for managing a CMF backend to record the pipeline metadata. Internally, it creates a pipeline abstraction that groups individual stages and their executions. All stages, their executions, and produced artifacts will be associated with a pipeline with the given name.
cmf = cmf.Cmf(
filename="mlmd", # Path to ML Metadata file.
pipeline_name="mnist" # Name of an ML pipeline.
)
Define a stage. An ML pipeline can have multiple stages, and each stage can be associated with multiple executions. A stage is described by a context, which specifies its name and optional properties. You can create a context using the create_context method:
context = cmf.create_context(
pipeline_stage="download", # Stage name
custom_properties={ # Optional properties
"uses_network": True, # Downloads from the Internet
"disk_space": "10GB" # Needs this much space
}
)
Create a stage execution. A stage in an ML pipeline can have multiple executions. Every run is marked as an execution. This API helps to track the metadata associated with the execution, like stage parameters (e.g., number of epochs and learning rate for train stages). The stage execution name does not need to be the same as the name of its context. Moreover, the CMF will adjust this name to ensure every execution has a unique name. The CMF will internally associate this execution with the context created previously. Stage executions are created by calling the create_execution method.
execution = cmf.create_execution(
execution_type="download", # Execution name.
custom_properties = { # Execution parameters
"url": "https://a.com/mnist.gz" # Data URL.
}
)
Log artifacts. A stage execution can consume (inputs) and produce (outputs) multiple artifacts (datasets, models, and performance metrics). The path of these artifacts must be relative to the project (repository) root path. Artifacts might have optional metadata associated with them. These metadata could include feature statistics for ML datasets, or useful parameters for ML models (such as, for instance, number of trees in a random forest classifier).
-
Datasets are logged with the log_dataset method.
cmf.log_dataset('data/mnist.gz', "input", custom_properties={"name": "mnist", "type": 'raw'}) cmf.log_dataset('data/train.csv', "output", custom_properties={"name": "mnist", "type": "train_split"}) cmf.log_dataset('data/test.csv', "output", custom_properties={"name": "mnist", "type": "test_split"})
-
ML models produced by training stages are logged using the log_model API. ML models can be both input and output artifacts. The metadata associated with the artifact could be logged as an optional argument.
# In train stage cmf.log_model( path="model/rf.pkl", event="output", model_framework="scikit-learn", model_type="RandomForestClassifier", model_name="RandomForestClassifier:default" ) # In test stage cmf.log_model( path="model/rf.pkl", event="input" )
-
Metrics of every optimization step (one epoch of Stochastic Gradient Descent, or one boosting round in Gradient Boosting Trees) are logged using the log_metric API.
# Can be called at every epoch or every step in the training. This is logged to a parquet file and committed at the # commit stage. # Inside training loop while True: cmf.log_metric("training_metrics", {"loss": loss}) cmf.commit_metrics("training_metrics")
-
Stage metrics, or final metrics, are logged with the log_execution_metrics method. These are final metrics of a stage, such as final train or test accuracy.
cmf.log_execution_metrics("metrics", {"avg_prec": avg_prec, "roc_auc": roc_auc})
Dataslices are intended to be used to track subsets of the data. For instance, this can be used to track and compare accuracies of ML models on these subsets to identify model bias. Data slices are created with the create_dataslice method.
dataslice = cmf.create_dataslice("slice-a")
for i in range(1, 20, 1):
j = random.randrange(100)
dataslice.add_data("data/raw_data/"+str(j)+".xml")
dataslice.commit()
Graph Layer Overview¶
The CMF library has an optional graph layer
which stores the relationships in a Neo4J graph database. To use the graph
layer, the graph
parameter in the library init call must be set to true (it is set to false by default). The
library reads the configuration parameters of the graph database from the cmf config
generated by the cmf init
command.
cmf init minioS3 --url s3://dvc-art --endpoint-url http://x.x.x.x:9000 --access-key-id minioadmin --secret-key minioadmin --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://x.x.x.x:8080 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687
Here, "dvc-art" is provided as an example bucket name. However, users can change it as needed. If the user chooses to change it, they will need to update the Dockerfile for minioS3 accordingly.
To use the graph layer, instantiate the CMF with the graph=True
parameter:
from cmflib import cmf
cmf = cmf.Cmf(
filename="mlmd",
pipeline_name="anomaly_detection_pipeline",
graph=True
)