Skip to content

GettingStarted

Ricard Lado edited this page Mar 9, 2025 · 4 revisions

To start a new project use the canonada new command to create the project's directory structure.

canonada new my_project # Replace 'my_project' with a name of your choosing

Project Structure

Before we start it is important to understand the project's directory structure. This is an example of how a Canonada project should be structured:


  • canonada.toml : The project's configuration file; contains basic information about the project and determines the logging level.
  • config/ : Contains key configuration files for the project. The names of these files are standardized and must not be changed.
    • catalog.toml : Describes the catalog of data sources and data sinks that can be used in the pipelines.
    • parameters.toml : Centralizes all of the project's parameters.
    • credentials.toml : Contains the credentials for the project and is excluded from version control by default.
  • data/ : Optionally used to store the project's data files in one place.
    • ...
  • datahandlers/ : Datahandlers are classes that function as streaming data sources and sinks. Canonada provides several built-in datahandlers but this directory can optionally be used to define your own.
    • __init__.py
    • custom_datahandler_1.py
    • custom_datahandler_2.py
    • ...
  • notebooks/ : Optionally used to store project related notebooks. Check out the notebook usage section for more information.
    • ...
  • pipelines/ : One of the most important directories in the project. Contains the project's pipelines and nodes.
    • __init__.py
    • pipeline_1.py
    • pipeline_2.py
    • nodes_1/
      • __init__.py
      • node_1.py
      • node_2.py
      • ...
    • nodes_2/
      • __init__.py
      • node_3.py
      • node_4.py
      • ...
    • ...
  • systems/ : Pipeline systems are objects that manage the sequential execution of pipelines. This directory contains their definitions.
    • __init__.py
    • system_1.py
    • system_2.py
    • ...
  • tests/ : Optionally contains the project's tests.
    • test_node_group_1.py
    • test_node_group_2.py
    • ...

Let's make an example

You can get the full code for this example here.

In this example we will create a simple pipeline that reads from a directory full of timeseries json files, modify the signals and write a summary of the data to a csv file. Let's start.

First we need to define some catalog entries for our data sources and sinks. Go to config/catalog.toml and add the following:

Add an entry for the raw signals:

[raw_signals]
type = "canonada.json_multi" # This refers to the built-in json_multi datahandler
keys = [] # Keys are used to coordinate multiple sources, in this case we don't need them
path = "data/raw_signals" # The path to the directory containing the json files

Our test files are structured like this:

{
    "id": "UUID-OF-THE-SIGNAL",
    "time": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...],
    "signal": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]
}

The built-in json_multi datahandler will read all the files in the directory and return them as a dictionary.

Also we want to add a few entries for the output files:

[stats]
type = "canonada.csv_rows"
keys = []
headers = ["id", "maximum", "mean"]
path = "data/stats.csv"

[offset_signals_catalog]
type = "canonada.json_multi"
keys = []
path = "data/offset_signals"

Now let's create some example functions for the nodes. Nodes can be created in any directory that you can import from, but in this example we chose pipelines/nodes/example_nodes.py. Add the following code to the file:

def add_offset(signal: dict, offset: float) -> dict:
    """
    Adds a given offset to a signal

    Args:
        signal (dict): A dictionary with the signal values
        offset (float): The offset to add to the signal

    Returns:
        dict: A dictionary with the signal values plus the offset
    """

    signal["signal"] = [value + offset for value in signal["signal"]]
    return signal


def get_signal_max(signal: dict) -> float:
    """
    Returns the maximum value of a signal

    Args:
        signal (dict): A dictionary with the signal values

    Returns:
        float: The maximum value of the signal
    """

    return max(signal["signal"])


def calculate_mean(signal: dict) -> float:
    """
    Returns the mean value of a signal

    Args:
        signal (dict): A dictionary with the signal values

    Returns:
        float: The mean value of the signal
    """

    return sum(signal["signal"]) / len(signal["signal"])


def list_stats(signals: dict, maximum: float, mean: float) -> dict:
    """
    Returns a dictionary with the stats of the signals

    Args:
        signals (dict): A dictionary with the signals
        maximum (float): The maximum value of the signals
        mean (float): The mean value of the signals

    Returns:
        dict: A dictionary with the stats of the signals
    """

    return {
        "id": signals["id"],
        "maximum": maximum,
        "mean": mean,
    }

Finally, it is time to create a pipeline! Create a new file in pipelines/ to contain one or more pipelines (you may create as many files as you want). For this example, create a file called example_pipelines.py and add the following code to it:

# Import the Canonada Classes
from canonada.pipeline import Node, Pipeline

# Import the nodes from pipelines/nodes/example_nodes.py 
from .nodes import example_nodes


# Simple pipeline
streaming_pipe = Pipeline("streaming_pipe", [
        # Read each signal from the catalog and add an offset defined in the parameters
        Node(
            func=example_nodes.add_offset, 
            input=["raw_signals", "params:section_1.offset"], # Load raw_signals from the catalog and the offset from the parameters
            output=["offset_signals"], # If not declared in the catalog, the output is saved in memory and passed onwards
            name="create_offsets",
            description="Adds parametrized offset to the signals"
            ),
        # Save the previous output to disk with a dummy module
        Node(
            func=lambda x: x, # Just pass the input to the output
            input=["offset_signals"],
            output=["offset_signals_catalog"],
            name="save_offsets",
            description="Saves the offset signals using the datahandler specified in the catalog"
        ),
        # Calculate the maximum value of each signal
        Node(
            func=example_nodes.get_signal_max,
            input=["offset_signals"],
            output=["max_values"],
            name="get_signal_max",
            description="Calculates the maximum value of the signals"
        ),
        # Calculate the mean value of each signal
        Node(
            func=example_nodes.calculate_mean,
            input=["offset_signals"],
            output=["mean_values"],
            name="calculate_mean",
            description="Calculates the mean value of the signals"
        ),
        # Save the stats of the signals in a CSV file
        Node(
            func=example_nodes.list_stats,
            input=["offset_signals", "max_values", "mean_values"],
            output=["stats"], # It will be saved in the defined file in the catalog
            name="list_stats",
            description="Returns the stats of the signals"
        )
    ],
    description="This pipeline reads signals from the catalog, adds an offset, calculates the maximum and mean values, and saves the stats to disk"
)

As you can see, creating a pipeline is quite simple. You just need to define the nodes and the connections between them. The input and output parameters of the nodes are used to define the data flow between them. The input parameter is a list of the data sources that the node will use, and the output parameter is a list of the data sinks that the node will write to. By default, if a data sink is not declared in the catalog, the data will be saved in memory and passed to the next node that requires it; otherwise it will be saved in the file/directory specified in the catalog.

Note that the params:section_1.offset is a reference to the offset parameter in the parameters.toml file. This is how you can pass static parameters to the pipeline. Let's add this parameter to the parameters.toml file:

[section_1]
offset = 5

You are ready to run your first pipeline. Just run the following command in the terminal:

canonada run pipelines streaming_pipe # If you want to run more than one pipeline, just append them to the command

This will run the streaming_pipe pipeline and save the results in the files specified in the catalog. You can check the results in the data/ directory.

And that's it! You have created your first Canonada pipeline and ran it concurrently without any extra effort. You can now start creating more complex pipelines and pipeline systems to run pipelines sequentially. Check the full example of this code here.

Clone this wiki locally