Custom Transform
    • Dark
      Light

    Custom Transform

    • Dark
      Light

    Article summary

    Creating a custom transform

    In FactoryTX, a Transform manipulates data extracted by a Receiver. FactoryTX offers a few built-in transform components. A list of them and their settings can be found in the Transforms section of the Configuration Reference. If none of the provided FactoryTX transforms fulfill your needs, you’ll have to create a custom one.

    Let’s revisit our imaginary manufacturing startup and see how it’s doing. While mining in Mauritius for rare-earth metals to use in the head-up displays of our vehicles, we unearthed a large amount of amber. One of our top engineers, Dr. Wu, noticed that there were some mosquitoes preserved in the amber. He had a great idea of extracting the blood ingested by these mosquitoes and sequencing the DNA. Excited by the prospect of resurrecting a dodo, we decided to expand from automotive manufacturing and into bioengineering. We built a new facility devoted to genome sequencing and now want to link it up to FactoryTX, so we can do some data modeling and analysis in the Sight Machine platform.

    The data created by the DNA sequencers in our bioengineering facility is stored in CSV files. For example:

    timestamp

    counter

    accession number

    completion

    genus

    species

    2018-09-13 12:01:00

    1

    12345

    88

    Anopheles

    gambiae

    2018-09-13 12:11:00

    2

    23456

    75

    Stegosaurus

    stenops

    2018-09-13 12:21:00

    3

    34567

    68

    Ankylosaurus

    magniventris

    2018-09-13 12:31:00

    4

    45678

    90

    Raphus

    cucullatus

    2018-09-13 12:41:00

    5

    56789

    100

    Deinonychus

    antirrhopus

    2018-09-13 12:51:00

    6

    67890

    99

    Achillobator

    giganticus

    2018-09-13 13:01:00

    7

    98778

    76

    Brontosaurus

    excelsus

    2018-09-13 13:11:00

    8

    12346

    84

    Aedes

    aegypti

    2018-09-13 13:21:00

    9

    88888

    89

    Tyrannosaurus

    rex

    We want to be able to group entries with similar sequences based on their locus name, which is a unique combination of an entry’s accession number and the first letters of its genus and species. For example, the locus name for the first entry should be: 12345AG. Since the data doesn’t have a locus name column, we’ll need to create a transform that concatenates the accession number, genus, and species, and saves the value as locus_name.

    Setting up with the Developer Console

    From the web UI, navigate to the Developer Console.

    The Developer Console is a JupyterLab environment.

    In the left sidebar, select the file browser and navigate to the /opt/sightmachine/factorytx2/ directory. This directory contains FactoryTX’s source code, and it’s where we will be saving our transform. Create a directory for our custom FactoryTX components (e.g. factorytx_tutorial).

    From the Launcher menu, open a new Terminal window by clicking the + button in the upper left corner and navigate to the FactoryTX directory.

    # ls
    LIMITATIONS.md    docs                   factorytx_heineken          nginx                  scripts           test-data
    README.md         entrypoint.sh          factorytx_nissanpowertrain  postman                setup.py          tests
    UPGRADING.md      factorytx              factorytx_orora             requirements-test.txt  ssh-server
    config-templates  factorytx_asianpaints  factorytx_tutorial          requirements.txt       stubs
    dist              factorytx_film3m       ftxui                       screenshots            supervisord.conf
    # cd factorytx_tutorial

    Inside of the FactoryTX directory, create a blank __init__.py file and a transforms folder (with a blank __init__.py file as well).

    touch __init__.py && mkdir transforms && (cd transforms && touch __init__.py)
    

    With the file browser, enter the transforms directory, open a new Launcher instance, and start a new Python 3 Notebook instance. This is where we will draft our custom transform.

    Defining a Transform

    Our custom transform should inherit from the base Transform class and define the following methods:

    • __init__(config, root_config): This method initializes the transform instance. It accepts two parameters:

      • config: The configuration dictionary of the transform from the FactoryTX config. The clean method is used to validate and normalize this configuration, and the transform will only be initialized if there is no ValidationError.

      • root_config: The root configuration dictionary, which can be used to access global settings (e.g. Sight Machine cloud credentials). This is the FactoryTX config.

    • clean(config, root_config): This method validates and normalizes a configuration dictionary. It validates that the configuration it receives is well-formed and will be accepted by the constructor. The method may also modify the configuration, such as inserting default values, and changes made to the config will persist in the configuration passed to the constructor. This method returns a list of ValidationMessage objects.

    • process(record): This method implements the transformation logic that converts a record into a new record.

    In the Python Notebook, import the base Transform class and other components that we’ll need (which we’ll go into more detail below), and create our custom transform.

    from typing import List
    
    import pandas as pd
    
    from factorytx import markers
    from factorytx.base import Transform, transforms
    from factorytx.exceptions import Failure
    from factorytx.validation import (
        ValidationError, ValidationMessage, clean_with_json_schema, has_errors
    )
    
    
    class ConcatTransform(Transform):
        @classmethod
        def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
            pass
    
        def __init__(self, config: dict, root_config: dict) -> None:
            pass
    
        def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
            pass
    
    

    Processing a record

    The process method of our ConcatTransform should contain the logic to transform the record. When a Receiver gets data, the receiver converts the data into a Pandas DataFrame with labeled columns of asset data. We refer to the DataFrame as “record” and it is sent through the transform components and ultimately packaged up and transmitted by the transmit component.

    For our transform, process will perform the concatenation that creates the locus_name. We want to combine the accession number, the first letter of the genus, and the first letter of the species in order. Let’s also make sure that the letters are uppercase.

    def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
        input_frame["locus_name"] = input_frame["accession number"].astype(str) \
                                    + input_frame["genus"].str[0].str.upper() \
                                    + input_frame["species"].str[0].str.upper()

    When we transform a record (DataFrame), we should be making changes to a copy of the record and return it, so chained transformations do not produce unpredictable results.

    def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
        df = input_frame.copy(deep=False)
        df["locus_name"] = df["accession number"].astype(str) \
                           + df["genus"].str[0].str.upper() \
                           + df["species"].str[0].str.upper()
        return df

    Validating and cleaning configurations

    Currently, our transform is hard-coded to concatenate a record’s accession number, the first letter of genus, and the first letter of species and save it as the locus_name. The transform is enough for creating the locus name, but what happens if we want to concatenate different values like use completion instead of accession number? We could create a separate transform to perform that action; or we could make our current transform configurable.

    FactoryTX components use JSON Schema to describe configurable settings and validate input. For our ConcatTransform, let’s create a basic schema that describes the desired behavior: takes in 3 input fields to concatenate and returns the output with a configurable title.

    schema = {
        "properties": {
            "input_field1": {},
            "input_field2": {},
            "input_field3": {},
            "output_title": {}
        }
    }
    
    

    Since a component’s schema is also used for validating its configuration, we can use some of the built-in contraints that JSON Schema has available. For example, let’s make sure the types of the input received for the configuration match our expectations. Let’s also make sure that all the fields are specified in the configuration.

    schema = {
        "type": "object",
        "properties": {
            "input_field1": {"type": "string"},
            "input_field2": {"type": "string"},
            "input_field3": {"type": "string"},
            "output_title": {"type": "string"}
        },
        "required": ["input_field1", "input_field2", "input_field3", "output_title"]
    }

    For reference on the available types, please refer to the Instance Data Model section of the JSON Schema documentation.

    Our transform’s clean method can use this schema to check for configuration issues as well as inject default values with the clean_with_json_schema function.

    def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
        validation_results = clean_with_json_schema(cls.schema, config)
        return validation_results

    The clean method is also where we can add custom validation logic that JSON schema constraints cannot handle. For instance, let’s make sure that the input fields are unique and raise a ValidationError if there are any duplicates.

    def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
        validation_results = clean_with_json_schema(cls.schema, config)
        if has_errors(validation_results):
            return validation_results
    
        input1 = config['input_field1']
        input2 = config['input_field2']
        input3 = config['input_field3']
        input_fields = [input1, input2, input3]
        if len(set(input_fields)) != len(input_fields):
            if input1 in (input2, input3):
                error = ValidationError(('input_field1',), 'Duplicate input field')
                validation_results.append(error)
            if input2 in (input1, input3):
                error = ValidationError(('input_field2',), 'Duplicate input field')
                validation_results.append(error)
            if input3 in (input2, input3):
                error = ValidationError(('input_field3',), 'Duplicate input field')
                validation_results.append(error)
    
        return validation_results

    The first parameter of a ValidationError is a JSON path to indicate which part of the configuration is incorrect. The Configuration Editor uses this information to highlight which line is incorrect. We also added a has_errors check to return early if any JSON schema constraints are violated.

    Initializing the Transform

    The __init__ method sets up the Transform instance based on the configuration block in the FactoryTX config. We can use this opportunity to copy the necessary fields that the transform needs.

    def __init__(self, config: dict, root_config: dict) -> None:
        super().__init__(config, root_config)
        self.input_field1 = config['input_field1']
        self.input_field2 = config['input_field2']
        self.input_field3 = config['input_field3']
        self.output_title = config['output_title']

    Since our transform now uses configurable settings, the process method will need to be modified slightly to accommodate the change.

    def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
        df = input_frame.copy(deep=False)
        df[self.output_title] = df[self.input_field1].astype(str) \
                                + df[self.input_field2].astype(str).str[0].str.upper() \
                                + df[self.input_field3].astype(str).str[0].str.upper()
        return df

    Surfacing transform issues

    To report issues encountered while transforming the record, we should use the factorytx.markers module. Marked warnings and errors will be logged, displayed in the Streams UI for increased visibility, and persist between FactoryTX restarts. For example, if an input field does not exist in the record, we should raise an error.

    def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
        df = input_frame.copy(deep=False)
    
        if self.input_field1 not in df.columns:
            markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field1}, does not exist in the record")
            raise Failure
    
        if self.input_field2 not in df.columns:
            markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field2}, does not exist in the record")
            raise Failure
    
        if self.input_field3 not in df.columns:
            markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field3}, does not exist in the record")
            raise Failure
    
        markers.clear('transforms.data')
        df[self.output_title] = df[self.input_field1].astype(str) \
                                + df[self.input_field2].astype(str).str[0].str.upper() \
                                + df[self.input_field3].astype(str).str[0].str.upper()
        return df

    We clear the markers (markers.clear) when the input field checks pass, so if we fixed an incorrect configuration, the outdated markers do not persist.

    Registering and using the Transform

    Before we can use our ConcatTransform, we’ll need to register it as a transform that FactoryTX can use. Whenever FactoryTX is started, it’ll activate all of the components in its Registry, so they are readily available. Our transform file should now look like this:

    from typing import List
    
    import pandas as pd
    
    from factorytx import markers
    from factorytx.base import Transform, transforms
    from factorytx.exceptions import Failure
    from factorytx.validation import (
        ValidationError, ValidationMessage, clean_with_json_schema, has_errors
    )
    
    @transforms.register('locus_name_concat')
    class ConcatTransform(Transform):
    
        schema = {
            "type": "object",
            "properties": {
                "input_field1": {"type": "string"},
                "input_field2": {"type": "string"},
                "input_field3": {"type": "string"},
                "output_title": {"type": "string"}
            },
            "required": ["input_field1", "input_field2", "input_field3", "output_title"]
        }
    
        @classmethod
        def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
            validation_results = clean_with_json_schema(cls.schema, config)
            if has_errors(validation_results):
                return validation_results
    
            input1 = config['input_field1']
            input2 = config['input_field2']
            input3 = config['input_field3']
            input_fields = [input1, input2, input3]
            if len(set(input_fields)) != len(input_fields):
                if input1 in (input2, input3):
                    error = ValidationError(('input_field1',), 'Duplicate input field')
                    validation_results.append(error)
                if input2 in (input1, input3):
                    error = ValidationError(('input_field2',), 'Duplicate input field')
                    validation_results.append(error)
                if input3 in (input2, input3):
                    error = ValidationError(('input_field3',), 'Duplicate input field')
                    validation_results.append(error)
    
            return validation_results
    
        def __init__(self, config: dict, root_config: dict) -> None:
            super().__init__(config, root_config)
            self.input_field1 = config['input_field1']
            self.input_field2 = config['input_field2']
            self.input_field3 = config['input_field3']
            self.output_title = config['output_title']
    
        def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
            df = input_frame.copy(deep=False)
    
            if self.input_field1 not in df.columns:
                markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field1}, does not exist in the record")
                raise Failure
    
            if self.input_field2 not in df.columns:
                markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field2}, does not exist in the record")
                raise Failure
    
            if self.input_field3 not in df.columns:
                markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field3}, does not exist in the record")
                raise Failure
    
            markers.clear('transforms.data')
            df[self.output_title] = df[self.input_field1].astype(str) \
                                    + df[self.input_field2].astype(str).str[0].str.upper() \
                                    + df[self.input_field3].astype(str).str[0].str.upper()
            return df

    Before enabling our transform to process real data, let’s check that it works as expected with simulated data in the Python Notebook. Run the selected cell with our custom transform. In the second cell, we’ll use some functions that FactoryTX uses in unit tests:

    Our quick test should look something like this:

    from factorytx.test_utils import (csv_string_to_dataframe, load_transform)
    
    CONFIG = {
        'transform_type': "locus_name_concat",
        'transform_name': "Locus Namer",
        'filter_stream': ['*'],
        "input_field1": "accession number",
        "input_field2": "genus",
        "input_field3": "species",
        "output_field": "locus_name"
    }
    
    INPUT_DATA = """\
    timestamp,counter,accession number,completion,genus,species
    2018-09-13 12:01:00,1,12345,88,Anopheles,gambiae
    2018-09-13 12:11:00,2,23456,75,Stegosaurus,stenops
    2018-09-13 12:21:00,3,34567,68,Ankylosaurus,magniventris
    2018-09-13 12:31:00,4,45678,90,Raphus,cucullatus
    2018-09-13 12:41:00,5,56789,100,Deinonychus,antirrhopus
    2018-09-13 12:51:00,6,67890,99,Achillobator,giganticus
    2018-09-13 13:01:00,7,98778,76,Brontosaurus,excelsus
    2018-09-13 13:11:00,8,12346,84,Aedes,aegypti
    2018-09-13 13:21:00,9,88888,89,Tyrannosaurus,rex
    """
    
    input_df = csv_string_to_dataframe(INPUT_DATA)
    transform = load_transform(CONFIG)
    transformed_df = transform.process(input_df)
    transformed_df

    Run the selected cell and the output should print the transformed DataFrame.

         timestamp  counter  accession number  completion          genus       species locus_name
    0  2018-09-13 12:01:00        1             12345          88      Anopheles       gambiae    12345AG
    1  2018-09-13 12:11:00        2             23456          75    Stegosaurus       stenops    23456SS
    2  2018-09-13 12:21:00        3             34567          68   Ankylosaurus  magniventris    34567AM
    3  2018-09-13 12:31:00        4             45678          90         Raphus    cucullatus    45678RC
    4  2018-09-13 12:41:00        5             56789         100    Deinonychus   antirrhopus    56789DA
    5  2018-09-13 12:51:00        6             67890          99   Achillobator    giganticus    67890AG
    6  2018-09-13 13:01:00        7             98778          76   Brontosaurus      excelsus    98778BE
    7  2018-09-13 13:11:00        8             12346          84          Aedes       aegypti    12346AA
    8  2018-09-13 13:21:00        9             88888          89  Tyrannosaurus           rex    88888TR

    Delete the cell with the test code, and convert the Python Notebook into a Python file that FactoryTX can use. You can copy the code and save it to a new .py file or run the conversion command in the Terminal.

    jupyter nbconvert --to=python locus_name_concat.ipynb
    

    Now restart FactoryTX with the FactoryTX Controller. Once FactoryTX has been restarted, we can use our custom transform by setting it in our FactoryTX configuration file.

    Snapshot testing the Transform

    We should write a unit test for our custom transform before sharing it, so any changes that break the expected behavior will be flagged. FactoryTX has the capability to perform gold master (or snapshot) testing. A snapshot test is similar to what we did for the quick test in the Python Notebook. We’ll take CSV data and process it with the specified transform. A snapshot test takes a further step by comparing the output with the result from a previous test run.

    To add a snapshot test to FactoryTX unit tests, create a Python directory named after the FactoryTX tenant (e.g. factorytx_tutorial) in the tests directory. Then, create a transforms Python directory and create the Python unit test file (e.g. test_snapshot_locus_name_concat.py). We’ll import helper functions from the factorytx.snapshot_test_utils module and factorytx.test_utils module. The contents of your test file should look something like this:

    from factorytx.snapshot_test_utils import compare_with_snapshot
    from factorytx.test_utils import (csv_string_to_dataframe, load_transform)
    
    CONFIG = {
        'transform_type': "locus_name_concat",
        'transform_name': "Locus Namer",
        'filter_stream': ['*'],
        "input_field1": "accession number",
        "input_field2": "genus",
        "input_field3": "species",
        "output_field": "locus_name"
    }
    
    INPUT_DATA = """\
    timestamp,counter,accession number,completion,genus,species
    2018-09-13 12:01:00,1,12345,88,Anopheles,gambiae
    2018-09-13 12:11:00,2,23456,75,Stegosaurus,stenops
    2018-09-13 12:21:00,3,34567,68,Ankylosaurus,magniventris
    2018-09-13 12:31:00,4,45678,90,Raphus,cucullatus
    2018-09-13 12:41:00,5,56789,100,Deinonychus,antirrhopus
    2018-09-13 12:51:00,6,67890,99,Achillobator,giganticus
    2018-09-13 13:01:00,7,98778,76,Brontosaurus,excelsus
    2018-09-13 13:11:00,8,12346,84,Aedes,aegypti
    2018-09-13 13:21:00,9,88888,89,Tyrannosaurus,rex
    """
    
    def test_cookie_type_concat_transform(capsys, snapshot):
        input_df = csv_string_to_dataframe(INPUT_DATA)
        transform = load_transform(CONFIG)
        transformed_df = transform.process(input_df)
    
        with capsys.disabled():
            compare_with_snapshot(transformed_df, snapshot, [])

    Anatomy of a snapshot test:

    • CONFIG: a dictionary-type variable that has key-value pairs necessary for configuring the transform. The configuration is based on the transform’s schema.

    • INPUT_DATA: CSV data that will be processed by the transform. The data can come in the form of a multiline Python string or a CSV file saved in a directory. It is suggested to only save data as a CSV file if the transform needs to process a large amount of data. For an example of a snapshot test using a CSV file, please refer to the snapshot test for the Rename transform.

    • The test function should pass in capsys and snapshot as parameters.

      • snapshot for reading and writing snapshot data

      • capsys needs to be disabled so differences between the current output and snapshot are formatted correctly in the Terminal

    • csv_string_to_dataframe: Converts the CSV string buffer into a Pandas DataFrame for the transform to process.

    • load_transform: Loads the FactoryTX transform based on the provided configuration.

    • compare_with_snapshot: Compares the output from the transform processing the data with the saved snapshot.

    To run snapshot tests, please use Pytest:

    # If you are running within the Docker container or Developer Console (recommended):
    cd /opt/sightmachine/factorytx2
    
    # If you are running from the factorytx-core directory:
    cd /opt/sightmachine/factorytx-core
    
    # If you are running from the factorytx-<customer> directory:
    # Remember to replace "<customer>" with the customer name
    cd /opt/sightmachine/factorytx-<customer>
    
    pytest tests/factorytx/transforms/test_snapshot_locus_name_concat.py
    
    # To update a snapshot, add `--snapshot-update` to the command
    pytest tests/factorytx/transforms/test_snapshot_locus_name_concat.py --snapshot-update

    NOTE: When a test is run for the first time, the snapshot will be automatically created. Snapshots will be saved in the tests/.../transforms/snapshots directory.