- Print
- DarkLight
Custom Transform
- Print
- DarkLight
Creating a custom transform
In FactoryTX, a Transform manipulates data extracted by a Receiver. FactoryTX offers a few built-in transform components. A list of them and their settings can be found in the Transforms section of the Configuration Reference. If none of the provided FactoryTX transforms fulfill your needs, you’ll have to create a custom one.
Let’s revisit our imaginary manufacturing startup and see how it’s doing. While mining in Mauritius for rare-earth metals to use in the head-up displays of our vehicles, we unearthed a large amount of amber. One of our top engineers, Dr. Wu, noticed that there were some mosquitoes preserved in the amber. He had a great idea of extracting the blood ingested by these mosquitoes and sequencing the DNA. Excited by the prospect of resurrecting a dodo, we decided to expand from automotive manufacturing and into bioengineering. We built a new facility devoted to genome sequencing and now want to link it up to FactoryTX, so we can do some data modeling and analysis in the Sight Machine platform.
The data created by the DNA sequencers in our bioengineering facility is stored in CSV files. For example:
timestamp | counter | accession number | completion | genus | species |
---|---|---|---|---|---|
2018-09-13 12:01:00 | 1 | 12345 | 88 | Anopheles | gambiae |
2018-09-13 12:11:00 | 2 | 23456 | 75 | Stegosaurus | stenops |
2018-09-13 12:21:00 | 3 | 34567 | 68 | Ankylosaurus | magniventris |
2018-09-13 12:31:00 | 4 | 45678 | 90 | Raphus | cucullatus |
2018-09-13 12:41:00 | 5 | 56789 | 100 | Deinonychus | antirrhopus |
2018-09-13 12:51:00 | 6 | 67890 | 99 | Achillobator | giganticus |
2018-09-13 13:01:00 | 7 | 98778 | 76 | Brontosaurus | excelsus |
2018-09-13 13:11:00 | 8 | 12346 | 84 | Aedes | aegypti |
2018-09-13 13:21:00 | 9 | 88888 | 89 | Tyrannosaurus | rex |
We want to be able to group entries with similar sequences based on their locus name, which is a unique combination of an entry’s accession number and the first letters of its genus and species. For example, the locus name for the first entry should be: 12345AG. Since the data doesn’t have a locus name column, we’ll need to create a transform that concatenates the accession number, genus, and species, and saves the value as locus_name.
Setting up with the Developer Console
From the web UI, navigate to the Developer Console.
The Developer Console is a JupyterLab environment.
In the left sidebar, select the file browser and navigate to the /opt/sightmachine/factorytx2/ directory. This directory contains FactoryTX’s source code, and it’s where we will be saving our transform. Create a directory for our custom FactoryTX components (e.g. factorytx_tutorial).
From the Launcher menu, open a new Terminal window by clicking the + button in the upper left corner and navigate to the FactoryTX directory.
# ls
LIMITATIONS.md docs factorytx_heineken nginx scripts test-data
README.md entrypoint.sh factorytx_nissanpowertrain postman setup.py tests
UPGRADING.md factorytx factorytx_orora requirements-test.txt ssh-server
config-templates factorytx_asianpaints factorytx_tutorial requirements.txt stubs
dist factorytx_film3m ftxui screenshots supervisord.conf
# cd factorytx_tutorial
Inside of the FactoryTX directory, create a blank __init__.py file and a transforms folder (with a blank __init__.py file as well).
touch __init__.py && mkdir transforms && (cd transforms && touch __init__.py)
With the file browser, enter the transforms directory, open a new Launcher instance, and start a new Python 3 Notebook instance. This is where we will draft our custom transform.
Defining a Transform
Our custom transform should inherit from the base Transform class and define the following methods:
__init__(config, root_config): This method initializes the transform instance. It accepts two parameters:
config: The configuration dictionary of the transform from the FactoryTX config. The clean method is used to validate and normalize this configuration, and the transform will only be initialized if there is no ValidationError.
root_config: The root configuration dictionary, which can be used to access global settings (e.g. Sight Machine cloud credentials). This is the FactoryTX config.
clean(config, root_config): This method validates and normalizes a configuration dictionary. It validates that the configuration it receives is well-formed and will be accepted by the constructor. The method may also modify the configuration, such as inserting default values, and changes made to the config will persist in the configuration passed to the constructor. This method returns a list of ValidationMessage objects.
process(record): This method implements the transformation logic that converts a record into a new record.
In the Python Notebook, import the base Transform class and other components that we’ll need (which we’ll go into more detail below), and create our custom transform.
from typing import List
import pandas as pd
from factorytx import markers
from factorytx.base import Transform, transforms
from factorytx.exceptions import Failure
from factorytx.validation import (
ValidationError, ValidationMessage, clean_with_json_schema, has_errors
)
class ConcatTransform(Transform):
@classmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
pass
def __init__(self, config: dict, root_config: dict) -> None:
pass
def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
pass
Processing a record
The process method of our ConcatTransform should contain the logic to transform the record. When a Receiver gets data, the receiver converts the data into a Pandas DataFrame with labeled columns of asset data. We refer to the DataFrame as “record” and it is sent through the transform components and ultimately packaged up and transmitted by the transmit component.
For our transform, process will perform the concatenation that creates the locus_name. We want to combine the accession number, the first letter of the genus, and the first letter of the species in order. Let’s also make sure that the letters are uppercase.
def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
input_frame["locus_name"] = input_frame["accession number"].astype(str) \
+ input_frame["genus"].str[0].str.upper() \
+ input_frame["species"].str[0].str.upper()
When we transform a record (DataFrame), we should be making changes to a copy of the record and return it, so chained transformations do not produce unpredictable results.
def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
df = input_frame.copy(deep=False)
df["locus_name"] = df["accession number"].astype(str) \
+ df["genus"].str[0].str.upper() \
+ df["species"].str[0].str.upper()
return df
Validating and cleaning configurations
Currently, our transform is hard-coded to concatenate a record’s accession number, the first letter of genus, and the first letter of species and save it as the locus_name. The transform is enough for creating the locus name, but what happens if we want to concatenate different values like use completion instead of accession number? We could create a separate transform to perform that action; or we could make our current transform configurable.
FactoryTX components use JSON Schema to describe configurable settings and validate input. For our ConcatTransform, let’s create a basic schema that describes the desired behavior: takes in 3 input fields to concatenate and returns the output with a configurable title.
schema = {
"properties": {
"input_field1": {},
"input_field2": {},
"input_field3": {},
"output_title": {}
}
}
Since a component’s schema is also used for validating its configuration, we can use some of the built-in contraints that JSON Schema has available. For example, let’s make sure the types of the input received for the configuration match our expectations. Let’s also make sure that all the fields are specified in the configuration.
schema = {
"type": "object",
"properties": {
"input_field1": {"type": "string"},
"input_field2": {"type": "string"},
"input_field3": {"type": "string"},
"output_title": {"type": "string"}
},
"required": ["input_field1", "input_field2", "input_field3", "output_title"]
}
For reference on the available types, please refer to the Instance Data Model section of the JSON Schema documentation.
Our transform’s clean method can use this schema to check for configuration issues as well as inject default values with the clean_with_json_schema function.
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
validation_results = clean_with_json_schema(cls.schema, config)
return validation_results
The clean method is also where we can add custom validation logic that JSON schema constraints cannot handle. For instance, let’s make sure that the input fields are unique and raise a ValidationError if there are any duplicates.
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
validation_results = clean_with_json_schema(cls.schema, config)
if has_errors(validation_results):
return validation_results
input1 = config['input_field1']
input2 = config['input_field2']
input3 = config['input_field3']
input_fields = [input1, input2, input3]
if len(set(input_fields)) != len(input_fields):
if input1 in (input2, input3):
error = ValidationError(('input_field1',), 'Duplicate input field')
validation_results.append(error)
if input2 in (input1, input3):
error = ValidationError(('input_field2',), 'Duplicate input field')
validation_results.append(error)
if input3 in (input2, input3):
error = ValidationError(('input_field3',), 'Duplicate input field')
validation_results.append(error)
return validation_results
The first parameter of a ValidationError is a JSON path to indicate which part of the configuration is incorrect. The Configuration Editor uses this information to highlight which line is incorrect. We also added a has_errors check to return early if any JSON schema constraints are violated.
Initializing the Transform
The __init__ method sets up the Transform instance based on the configuration block in the FactoryTX config. We can use this opportunity to copy the necessary fields that the transform needs.
def __init__(self, config: dict, root_config: dict) -> None:
super().__init__(config, root_config)
self.input_field1 = config['input_field1']
self.input_field2 = config['input_field2']
self.input_field3 = config['input_field3']
self.output_title = config['output_title']
Since our transform now uses configurable settings, the process method will need to be modified slightly to accommodate the change.
def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
df = input_frame.copy(deep=False)
df[self.output_title] = df[self.input_field1].astype(str) \
+ df[self.input_field2].astype(str).str[0].str.upper() \
+ df[self.input_field3].astype(str).str[0].str.upper()
return df
Surfacing transform issues
To report issues encountered while transforming the record, we should use the factorytx.markers module. Marked warnings and errors will be logged, displayed in the Streams UI for increased visibility, and persist between FactoryTX restarts. For example, if an input field does not exist in the record, we should raise an error.
def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
df = input_frame.copy(deep=False)
if self.input_field1 not in df.columns:
markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field1}, does not exist in the record")
raise Failure
if self.input_field2 not in df.columns:
markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field2}, does not exist in the record")
raise Failure
if self.input_field3 not in df.columns:
markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field3}, does not exist in the record")
raise Failure
markers.clear('transforms.data')
df[self.output_title] = df[self.input_field1].astype(str) \
+ df[self.input_field2].astype(str).str[0].str.upper() \
+ df[self.input_field3].astype(str).str[0].str.upper()
return df
We clear the markers (markers.clear) when the input field checks pass, so if we fixed an incorrect configuration, the outdated markers do not persist.
Registering and using the Transform
Before we can use our ConcatTransform, we’ll need to register it as a transform that FactoryTX can use. Whenever FactoryTX is started, it’ll activate all of the components in its Registry, so they are readily available. Our transform file should now look like this:
from typing import List
import pandas as pd
from factorytx import markers
from factorytx.base import Transform, transforms
from factorytx.exceptions import Failure
from factorytx.validation import (
ValidationError, ValidationMessage, clean_with_json_schema, has_errors
)
@transforms.register('locus_name_concat')
class ConcatTransform(Transform):
schema = {
"type": "object",
"properties": {
"input_field1": {"type": "string"},
"input_field2": {"type": "string"},
"input_field3": {"type": "string"},
"output_title": {"type": "string"}
},
"required": ["input_field1", "input_field2", "input_field3", "output_title"]
}
@classmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
validation_results = clean_with_json_schema(cls.schema, config)
if has_errors(validation_results):
return validation_results
input1 = config['input_field1']
input2 = config['input_field2']
input3 = config['input_field3']
input_fields = [input1, input2, input3]
if len(set(input_fields)) != len(input_fields):
if input1 in (input2, input3):
error = ValidationError(('input_field1',), 'Duplicate input field')
validation_results.append(error)
if input2 in (input1, input3):
error = ValidationError(('input_field2',), 'Duplicate input field')
validation_results.append(error)
if input3 in (input2, input3):
error = ValidationError(('input_field3',), 'Duplicate input field')
validation_results.append(error)
return validation_results
def __init__(self, config: dict, root_config: dict) -> None:
super().__init__(config, root_config)
self.input_field1 = config['input_field1']
self.input_field2 = config['input_field2']
self.input_field3 = config['input_field3']
self.output_title = config['output_title']
def process(self, input_frame: pd.DataFrame) -> pd.DataFrame:
df = input_frame.copy(deep=False)
if self.input_field1 not in df.columns:
markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field1}, does not exist in the record")
raise Failure
if self.input_field2 not in df.columns:
markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field2}, does not exist in the record")
raise Failure
if self.input_field3 not in df.columns:
markers.error('transforms.data', f"Unable to concatenate: input field, {self.input_field3}, does not exist in the record")
raise Failure
markers.clear('transforms.data')
df[self.output_title] = df[self.input_field1].astype(str) \
+ df[self.input_field2].astype(str).str[0].str.upper() \
+ df[self.input_field3].astype(str).str[0].str.upper()
return df
Before enabling our transform to process real data, let’s check that it works as expected with simulated data in the Python Notebook. Run the selected cell with our custom transform. In the second cell, we’ll use some functions that FactoryTX uses in unit tests:
csv_string_to_dataframe: changes a comma-delimited multiline string into a DataFrame
load_transform: loads a FactoryTX transform based on the provided configuration
Our quick test should look something like this:
from factorytx.test_utils import (csv_string_to_dataframe, load_transform)
CONFIG = {
'transform_type': "locus_name_concat",
'transform_name': "Locus Namer",
'filter_stream': ['*'],
"input_field1": "accession number",
"input_field2": "genus",
"input_field3": "species",
"output_field": "locus_name"
}
INPUT_DATA = """\
timestamp,counter,accession number,completion,genus,species
2018-09-13 12:01:00,1,12345,88,Anopheles,gambiae
2018-09-13 12:11:00,2,23456,75,Stegosaurus,stenops
2018-09-13 12:21:00,3,34567,68,Ankylosaurus,magniventris
2018-09-13 12:31:00,4,45678,90,Raphus,cucullatus
2018-09-13 12:41:00,5,56789,100,Deinonychus,antirrhopus
2018-09-13 12:51:00,6,67890,99,Achillobator,giganticus
2018-09-13 13:01:00,7,98778,76,Brontosaurus,excelsus
2018-09-13 13:11:00,8,12346,84,Aedes,aegypti
2018-09-13 13:21:00,9,88888,89,Tyrannosaurus,rex
"""
input_df = csv_string_to_dataframe(INPUT_DATA)
transform = load_transform(CONFIG)
transformed_df = transform.process(input_df)
transformed_df
Run the selected cell and the output should print the transformed DataFrame.
timestamp counter accession number completion genus species locus_name
0 2018-09-13 12:01:00 1 12345 88 Anopheles gambiae 12345AG
1 2018-09-13 12:11:00 2 23456 75 Stegosaurus stenops 23456SS
2 2018-09-13 12:21:00 3 34567 68 Ankylosaurus magniventris 34567AM
3 2018-09-13 12:31:00 4 45678 90 Raphus cucullatus 45678RC
4 2018-09-13 12:41:00 5 56789 100 Deinonychus antirrhopus 56789DA
5 2018-09-13 12:51:00 6 67890 99 Achillobator giganticus 67890AG
6 2018-09-13 13:01:00 7 98778 76 Brontosaurus excelsus 98778BE
7 2018-09-13 13:11:00 8 12346 84 Aedes aegypti 12346AA
8 2018-09-13 13:21:00 9 88888 89 Tyrannosaurus rex 88888TR
Delete the cell with the test code, and convert the Python Notebook into a Python file that FactoryTX can use. You can copy the code and save it to a new .py file or run the conversion command in the Terminal.
jupyter nbconvert --to=python locus_name_concat.ipynb
Now restart FactoryTX with the FactoryTX Controller. Once FactoryTX has been restarted, we can use our custom transform by setting it in our FactoryTX configuration file.
Snapshot testing the Transform
We should write a unit test for our custom transform before sharing it, so any changes that break the expected behavior will be flagged. FactoryTX has the capability to perform gold master (or snapshot) testing. A snapshot test is similar to what we did for the quick test in the Python Notebook. We’ll take CSV data and process it with the specified transform. A snapshot test takes a further step by comparing the output with the result from a previous test run.
To add a snapshot test to FactoryTX unit tests, create a Python directory named after the FactoryTX tenant (e.g. factorytx_tutorial) in the tests directory. Then, create a transforms Python directory and create the Python unit test file (e.g. test_snapshot_locus_name_concat.py). We’ll import helper functions from the factorytx.snapshot_test_utils module and factorytx.test_utils module. The contents of your test file should look something like this:
from factorytx.snapshot_test_utils import compare_with_snapshot
from factorytx.test_utils import (csv_string_to_dataframe, load_transform)
CONFIG = {
'transform_type': "locus_name_concat",
'transform_name': "Locus Namer",
'filter_stream': ['*'],
"input_field1": "accession number",
"input_field2": "genus",
"input_field3": "species",
"output_field": "locus_name"
}
INPUT_DATA = """\
timestamp,counter,accession number,completion,genus,species
2018-09-13 12:01:00,1,12345,88,Anopheles,gambiae
2018-09-13 12:11:00,2,23456,75,Stegosaurus,stenops
2018-09-13 12:21:00,3,34567,68,Ankylosaurus,magniventris
2018-09-13 12:31:00,4,45678,90,Raphus,cucullatus
2018-09-13 12:41:00,5,56789,100,Deinonychus,antirrhopus
2018-09-13 12:51:00,6,67890,99,Achillobator,giganticus
2018-09-13 13:01:00,7,98778,76,Brontosaurus,excelsus
2018-09-13 13:11:00,8,12346,84,Aedes,aegypti
2018-09-13 13:21:00,9,88888,89,Tyrannosaurus,rex
"""
def test_cookie_type_concat_transform(capsys, snapshot):
input_df = csv_string_to_dataframe(INPUT_DATA)
transform = load_transform(CONFIG)
transformed_df = transform.process(input_df)
with capsys.disabled():
compare_with_snapshot(transformed_df, snapshot, [])
Anatomy of a snapshot test:
CONFIG: a dictionary-type variable that has key-value pairs necessary for configuring the transform. The configuration is based on the transform’s schema.
INPUT_DATA: CSV data that will be processed by the transform. The data can come in the form of a multiline Python string or a CSV file saved in a directory. It is suggested to only save data as a CSV file if the transform needs to process a large amount of data. For an example of a snapshot test using a CSV file, please refer to the snapshot test for the Rename transform.
The test function should pass in capsys and snapshot as parameters.
snapshot for reading and writing snapshot data
capsys needs to be disabled so differences between the current output and snapshot are formatted correctly in the Terminal
csv_string_to_dataframe: Converts the CSV string buffer into a Pandas DataFrame for the transform to process.
load_transform: Loads the FactoryTX transform based on the provided configuration.
compare_with_snapshot: Compares the output from the transform processing the data with the saved snapshot.
To run snapshot tests, please use Pytest:
# If you are running within the Docker container or Developer Console (recommended):
cd /opt/sightmachine/factorytx2
# If you are running from the factorytx-core directory:
cd /opt/sightmachine/factorytx-core
# If you are running from the factorytx-<customer> directory:
# Remember to replace "<customer>" with the customer name
cd /opt/sightmachine/factorytx-<customer>
pytest tests/factorytx/transforms/test_snapshot_locus_name_concat.py
# To update a snapshot, add `--snapshot-update` to the command
pytest tests/factorytx/transforms/test_snapshot_locus_name_concat.py --snapshot-update
NOTE: When a test is run for the first time, the snapshot will be automatically created. Snapshots will be saved in the tests/.../transforms/snapshots directory.