Skip to content

cmflib.cmf.Cmf.DataSlice

A data slice represents a named subset of data. It can be used to track performance of an ML model on different slices of the training or testing dataset splits. This can be useful from different perspectives, for instance, to mitigate model bias.

Instances of data slices are not meant to be created manually by users. Instead, use Cmf.create_dataslice method.

Source code in cmflib/cmf.py
1806
1807
1808
1809
def __init__(self, name: str, writer):
    self.props = {}
    self.name = name
    self.writer = writer

add_data(path, custom_properties=None)

Add data to create the dataslice. Currently supported only for file abstractions. Pre-condition - the parent folder, containing the file should already be versioned. Example:

#dataslice.add_data(f"data/raw_data/{j}.xml)
Args: path: Name to identify the file to be added to the dataslice. custom_properties: Properties associated with this datum.

Source code in cmflib/cmf.py
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
def add_data(
    self, path: str, custom_properties: t.Optional[t.Dict] = None
) -> None:
    """Add data to create the dataslice.
    Currently supported only for file abstractions. Pre-condition - the parent folder, containing the file
        should already be versioned.
    Example:
        ```python
        #dataslice.add_data(f"data/raw_data/{j}.xml)
        ```
    Args:
        path: Name to identify the file to be added to the dataslice.
        custom_properties: Properties associated with this datum.
    """

    self.props[path] = {}
    self.props[path]['hash'] = dvc_get_hash(path)
    parent_path = path.rsplit("/", 1)[0]
    self.data_parent = parent_path.rsplit("/", 1)[1]
    if custom_properties:
        for k, v in custom_properties.items():
            self.props[path][k] = v

commit(custom_properties=None)

Commit the dataslice. The created dataslice is versioned and added to underneath data versioning software. Example:

dataslice.commit()
```

Args: custom_properties: Dictionary to store key value pairs associated with Dataslice Example{"mean":2.5, "median":2.6}

Source code in cmflib/cmf.py
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:
    """Commit the dataslice.
    The created dataslice is versioned and added to underneath data versioning software.
    Example:

        dataslice.commit()
        ```
    Args:
        custom_properties: Dictionary to store key value pairs associated with Dataslice
        Example{"mean":2.5, "median":2.6}
    """

    logging_dir = change_dir(self.writer.cmf_init_path)
    # code for nano cmf
    # Assigning current file name as stage and execution name
    current_script = sys.argv[0]
    file_name = os.path.basename(current_script)
    name_without_extension = os.path.splitext(file_name)[0]
    # create context if not already created
    if not self.writer.child_context:
        self.writer.create_context(pipeline_stage=name_without_extension)
        assert self.writer.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

    # create execution if not already created
    if not self.writer.execution:
        self.writer.create_execution(execution_type=name_without_extension)
        assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"

    directory_path = os.path.join(self.writer.ARTIFACTS_PATH, self.writer.execution.properties["Execution_uuid"].string_value.split(',')[0], self.writer.DATASLICE_PATH)
    os.makedirs(directory_path, exist_ok=True)
    custom_props = {} if custom_properties is None else custom_properties
    git_repo = git_get_repo()
    dataslice_df = pd.DataFrame.from_dict(self.props, orient="index")
    dataslice_df.index.names = ["Path"]
    dataslice_path = os.path.join(directory_path,self.name)
    dataslice_df.to_parquet(dataslice_path)
    existing_artifact = []

    commit_output(dataslice_path, self.writer.execution.id)
    c_hash = dvc_get_hash(dataslice_path)
    if c_hash == "":
        print("Error in getting the dvc hash,return without logging")
        return

    dataslice_commit = c_hash
    url = dvc_get_url(dataslice_path)
    dvc_url_with_pipeline = f"{self.writer.parent_context.name}:{url}"
    if c_hash and c_hash.strip():
        existing_artifact.extend(
            self.writer.store.get_artifacts_by_uri(c_hash))
    if existing_artifact and len(existing_artifact) != 0:
        print("Adding to existing data slice")
        # Haven't added event type in this if cond, is it not needed??
        slice = link_execution_to_input_artifact(
            store=self.writer.store,
            execution_id=self.writer.execution.id,
            uri=c_hash,
            input_name=dataslice_path + ":" + c_hash,
        )
    else:
        props={
                "git_repo": str(git_repo),
                # passing c_hash value to commit
                "Commit": str(dataslice_commit),
                "url": str(dvc_url_with_pipeline),
            },
        slice = create_new_artifact_event_and_attribution(
            store=self.writer.store,
            execution_id=self.writer.execution.id,
            context_id=self.writer.child_context.id,
            uri=c_hash,
            name=dataslice_path + ":" + c_hash,
            type_name="Dataslice",
            event_type=mlpb.Event.Type.OUTPUT,
            properties={
                "git_repo": str(git_repo),
                # passing c_hash value to commit
                "Commit": str(dataslice_commit),
                "url": str(dvc_url_with_pipeline),
            },
            artifact_type_properties={
                "git_repo": mlpb.STRING,
                "Commit": mlpb.STRING,
                "url": mlpb.STRING,
            },
            custom_properties=custom_props,
            milliseconds_since_epoch=int(time.time() * 1000),
        )
    if self.writer.graph:
        self.writer.driver.create_dataslice_node(
            self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, props
        )
    os.chdir(logging_dir)
    return slice