Skip to content

Workflows

A workflow is a DAG-based data pipeline where you define a series of tasks with dependencies between them. Datatailr automatically determines which tasks can run in parallel based on their dependencies, making your pipelines faster without extra effort.

Workflows are the most common type of compute process. They are ideal for batch data processing — ETL pipelines, ML model training, report generation, and any workload that has a defined start and end.

How It Works

You define a workflow using the @workflow and @task decorators. Each function decorated with @task becomes a node in the pipeline graph. When one task's output is passed as input to another, Datatailr creates a dependency edge between them. Independent tasks run in parallel automatically.

from datatailr import workflow, task

@task()
def fetch_data():
    return {"sales": [100, 200, 150]}

@task()
def calculate_total(data):
    return sum(data["sales"])

@workflow(name="daily_sales_report")
def sales_pipeline():
    data = fetch_data()
    total = calculate_total(data)

When deployed, this creates the following execution graph:

graph LR A[fetch_data] --> B[calculate_total]

fetch_data runs first, and once it completes, its result is automatically passed to calculate_total.

Key Features

  • Automatic parallelism — Independent tasks run simultaneously without any configuration.
  • Scheduling — Workflows can run on a schedule (e.g., daily, hourly) or be triggered manually.
  • Local testing — Run workflows locally before deploying by passing local_run=True.
  • Fault tolerance — Failed tasks report detailed logs. You can fix and re-run without restarting the entire pipeline.
  • Task aliasing — Use .alias() to give distinct names when the same task function is invoked multiple times.

Building Complex Pipelines

More complex workflows are built using simple function composition:

@task()
def get_number():
    return 42

@task()
def add(a, b):
    return a + b

@task()
def reduce_list(*numbers):
    return sum(numbers)

@workflow("Complex Pipeline")
def complex_pipeline():
    total_1 = add(get_number().alias("my_number"), 13).alias("first_addition")
    total_2 = reduce_list(*[get_number().alias(f"num_{i}") for i in range(5)])
    add(total_2, 1).alias("add_one")
    add(total_2, 2).alias("add_two")
    add(total_2, 3).alias("add_three")

This generates the following graph, where num_0 through num_4 all run in parallel:

graph LR A[my_number] --> B[first_addition]; 13((13)) --> B; D[num_0] --> E[reduce_list]; D1[num_1] --> E; D2[num_2] --> E; D3[num_3] --> E; D4[num_4] --> E; E --> G1[add_one]; E --> G2[add_two]; E --> G3[add_three];

Deploying a Workflow

Run your workflow script to deploy it to Datatailr:

python my_workflow.py

Datatailr packages your code into a container image, builds it, and schedules it for execution. You can monitor the workflow from the dashboard or the CLI:

dt job ls              # List all compute processes
dt job get my_workflow # View workflow details
dt log read my_workflow # Read workflow logs

Managing in the Dashboard

Navigate to Compute Processes and select the Workflows tab to see all your workflows. The view shows the workflow name, number of tasks, current state, schedule, and a visual history of the last 10 runs along with a weekly activity heatmap.

Workflows tab Workflows tab

Click on any workflow to view its DAG graph, run history, logs, and definition details.