Custom Receiver
    • Dark
      Light

    Custom Receiver

    • Dark
      Light

    Article summary

    Creating a custom receiver

    In FactoryTX, a Receiver  collects the data from a source and passes it to the data pipeline for Transform and Transmit components to process. FactoryTX has a few built-in receivers, and a list of available receivers and their configuration options can be found in the data_receiver section of the Configuration Reference. If none of the available data receivers meet your needs, you’ll need to create a custom one.

    Defining a Receiver

    Your custom data receiver should inherit from the base Receiver class and define the following methods:

    • __init__(dataflow, config, root_config): This method initializes the receiver instance. It accepts three parameters:

      • dataflow: The object to store the retrieved data. Calling dataflow.process(df) will route the DataFrame “df” to all applicable transforms and transmits.

      • config: The configuration dictionary of the receiver from the FactoryTX config. The clean method is used to validate and normalize this configuration, and the transform will only be initialized if there is no ValidationError.

      • root_config: The root configuration dictionary, which can be used to access global settings (e.g. Sight Machine cloud credentials). This is the FactoryTX config.

    • clean(config, root_config): This method validates and normalizes a configuration dictionary. It validates that the configuration it receives is well-formed and will be accepted by the constructor. The method may also modify the configuration, such as inserting default values, and changes made to the config will persist in the configuration passed to the constructor. This method returns a list of ValidationMessagee objects.

    • purge(streams): Removes all receiver state associated with specified streams. It takes in a list of InputStreamId to purge.

    • run(): Performs the work of extracting data from the data source.

    As mentioned in the Creating a custom transform tutorial, you’ll need to include a schema that can be applied to your receiver. This schema defines what properties can be configured for your receiver and is what should be used in the clean method for validation.

    It is important to note that when a Receiver object is initialized, FactoryTX runs it in a separate process. The run method should not return, but it doesn’t necessarily have to be an infinite loop. The method could call another method or function that keeps the process alive.

    Passing records to the data pipeline

    When the receiver collects data from a source, the data is converted into a record. A record is a Pandas DataFrame with labeled columns of asset data. Records from the receiver should be passed to the DataflowGraph object, which serves as a wrapper for a collection of Transform and Transmit components. When the DataflowGraph processes a record, the record is routed through transform(s) and uploaded to the Sight Machine cloud by the transmit.

    Loading and storing state

    A receiver is stateful. It keeps track of data that has already been processed and transmitted per input stream, so it can differentiate between new and old data. For example, the FileReceiver stores the size and last modified time of each file processed in the data pipeline. If any of the files are modified, the receiver will re-parse the file and send the new data to the data pipeline.

    The __init__ method of your custom receiver should initialize the connection to the StateStore. While your receiver is running, it should load and update the StateStore with information about the processed and transmitted data.

    The purge method of your custom receiver should completely clear its StateStore. A restream uses this method, so data that was previously processed will be processed and transmitted again.

    Handling transient problems

    Sometimes a receiver will encounter temporary issues getting data like connectivity problems to a remote database. You can use the factorytx.markers module to surface a warning or error message to the Streams UI.

    When the problem arises, your custom receiver should create a marker with markers.warning or markers.error based on the issue’s severity. Markers are stored in a SQLite database, so if FactoryTX is stopped and/or restarted, the markers will persist. When the receiver successfully collects data again, it should clear the warning/error marker with markers.clear.

    Spawning receiver threads

    A receiver process can spawn threads. We strongly recommend using the factorytx.supervisor module to create and manage threads. The Supervisor class will automatically log errors and restart threads that have unexpectedly terminated. Threads that crash frequently will wait longer to be restarted, preventing system overload.

    Creating an integration test

    To test that your custom receiver works as expected, create an integration test. An integration test is different from a unit test, which is the type used for testing transform components. An integration test creates and runs an instance of FactoryTX that collects data, and processes and transmits records to a local instance of the Sight Machine platform.

    Create a new Python file for your test in the integration-tests/test-runner directory (e.g. test_custom_receiver1.py). In your test file, you’ll want to import some helper methods from integration-tests/test-runner/ftx_test_utils.py.

    clear_sslogs()

    Removes all sslogs (records) from the tenant’s database.

    fetch_all_sslogs(sleep_seconds)

    Waits until the sslog count stops changing then returns all sslogs.

    get_rdpv2_credentials()

    Returns RDPv2 credentials for the running ma instance as a (id, key) tuple.

    get_tenant()

    Returns the name of the tenant being used for the test suite.

    setup_config_file(config)

    Creates a configuration file in a temporary directory for the FactoryTX instance to use and returns its path.

    setup_data_dir()

    Creates a temporary data directory that FactoryTX can use and returns its path.

    spawn_ftx_container(config_dir, data_dir)

    Spawns an instance of FactoryTX based on the provided configuration and data directory.

    You’ll also want to import some helper functions from the factorytx.snapshot_test_utils module if you want to compare the observed records with a snapshot.

    Create a FactoryTX configuration that uses your custom receiver, including any necessary parsers and transforms. The data_transmit section should be directed to the local instance of the Sight Machine platform, using the get_tenant and get_rdpv2_credentials helper functions.

    rdpv2_api_id, rdpv2_api_key = get_rdpv2_credentials()
    tenant = get_tenant()
    config = {
        "data_receiver": [...],
        "transforms": [...],
        "data_transmit": [
            {
                "transmit_name": "My RDP Transmit",
                "transmit_type": "remotedatapost",
                "base_url": f"http://{tenant}.ma",
                "API_key_ID": rdpv2_api_id,
                "API_key": rdpv2_api_key,
                "filter_stream": ["*"],
                "poll_interval": 1
            }
        ]
    }

    Your test function should look something like this:

    def test_my_custom_receiver(snapshot):
        # Clears the records database, so only records generated in the test
        # are used for comparison
        clear_sslogs()
    
        # Creates a FactoryTX configuration file in a temporary directory
        ftx_config_dir = setup_config_file(config)
    
        # Sets up a data directory for FactoryTX to use
        data_dir = setup_data_dir()
    
        # Spawn the FactoryTX instance
        ftx_container = spawn_ftx_container(ftx_config_dir, data_dir)
    
        # On an interval, check the number of records in the Sight Machine
        # platform database. When the count of records stops changing, all the
        # records will be returned.
        sslogs = fetch_all_sslogs(sleep_seconds=10)
    
        # Stop the FactoryTX instance
        ftx_container.stop()
    
        # Sort the results so the comparison does not need to worry
        # about the order of records
        sslogs = sorted(sslogs, key=lambda s: (s['data']['source'], s['data']['timestamp']))
    
        # Convert the records into a Pandas DataFrame for comparison
        df = convert_sslogs_to_dataframe(sslogs)
    
        # Compare the observed results with a snapshot (gold master)
        compare_with_snapshot(df, snapshot, index=['data.source', 'data.timestamp'])

    To run the test, please follow the instructions in the README file of the integration-tests directory.