Skip to content

Index

Datatailr Scheduler Module

The datatailr.scheduler module provides a framework for scheduling and managing batch jobs.

The main job types are:


  • Batch: Represents a batch job that can be scheduled and executed. The job can include multiple tasks which can be run in parallel or sequentially.
  • Service: Represents a service job that runs continuously.
  • App: Represents a web app or a dashboard, which can be built using one of the supported frameworks, such as Streamlit <https://streamlit.io/>, Dash <https://dash.plotly.com/>, or Panel <https://panel.holoviz.org/>_.
  • Excel: Represents an Excel add-in.

App

Deploy a web application or dashboard to Datatailr.

This supports Streamlit, Dash, or any framework that can be started via a standard entrypoint (for example, streamlit run app.py).

Parameters:

Name Type Description Default
name str

App display name.

required
entrypoint Callable | str

Callable that starts the app server.

required
environment Optional[Environment]

Target environment for the job.

DEV
image Optional[Image]

Container image to use for execution.

None
run_as Optional[Union[str, User]]

User to execute the job as.

None
resources Resources

Resource requirements for the job.

Resources()
acl Optional[ACL]

Access control settings for the job.

None
framework Optional[str]

Framework to use for the app.

None
python_version str

Python version to use.

'3.12'
python_requirements str | List[str]

Requirements to install (string or list).

''
build_script_pre str

Shell script to run before build.

''
build_script_post str

Shell script to run after build.

''
env_vars Optional[Dict[str, str | int | float | bool]]

Environment variables to set.

None
update_existing bool

Update an existing job with the same name.

False

Examples:

Minimal Streamlit app.

# app.py
import streamlit as st

def main():
    st.title("Hello Datatailr App")

if __name__ == "__main__":
    main()

Test locally.

streamlit run app.py

Deploy to Datatailr.

from app import main
from datatailr import App

app = App(
    name="Simple Dashboard App",
    entrypoint=main,
    python_requirements="streamlit")

app.run()

__init__(name, entrypoint, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, framework=None, python_version='3.12', python_requirements='', build_script_pre='', build_script_post='', env_vars=None, update_existing=False, app_section='')

Initialize an App deployment.

Parameters:

Name Type Description Default
name str

Display name for the app.

required
entrypoint Callable | str

The callable (function) that starts the application.

required
environment Optional[Environment]

Target environment for the deployment.

DEV
image Optional[Image]

Pre-configured container Image. When None, an image is built from python_version, python_requirements, and the build scripts.

None
run_as Optional[Union[str, User]]

User or username to run the app as. Defaults to the currently signed-in user.

None
resources Resources

CPU and memory resources for the container.

Resources()
acl Optional[ACL]

Access control list. Defaults to standard permissions for the current user.

None
framework Optional[str]

Optional framework name (e.g., 'streamlit', 'dash', 'flask'). If not provided, the framework is inferred from the entrypoint.

None
python_version str

Python version for the container image.

'3.12'
python_requirements str | List[str]

Python dependencies (see Image).

''
build_script_pre str

Dockerfile commands to run before installing requirements.

''
build_script_post str

Dockerfile commands to run after installing requirements.

''
env_vars Optional[Dict[str, str | int | float | bool]]

Environment variables passed to the running container.

None
update_existing bool

If True, load and update an existing job definition with the same name instead of creating a new one.

False
app_section str

The section to which the app belongs. If not provided, the app will be assigned to the default section. This affects the app launcher page.

''

DuplicateJobNameError

Exception raised when a job with a duplicate name is added to the batch.

EntryPoint

Represents an entry point for a Datatailr job. This can be a function or a callable object.

Environment

Enum representing different environments for Datatailr jobs.

ExcelAddin

Represents an Excel add-in deployment on Datatailr.

An Excel add-in exposes Python functions as Excel worksheet functions, allowing users to call server-side computations directly from Excel spreadsheets.

Example
from datatailr import ExcelAddin
from datatailr.excel import Addin

addin_def = Addin("Options Pricer", "Option pricing functions")

@addin_def.expose(description="Black-Scholes price")
def price_option(spot, strike, vol, rate, expiry):
    ...

def __excel_main__(port=8080, ws_port=8000):
    addin_def.run(port, ws_port)

addin = ExcelAddin(
    name="Options Pricer",
    entrypoint=__excel_main__,
    python_requirements=["numpy", "scipy"],
)
addin.run()

__init__(name, entrypoint, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, python_version='3.12', python_requirements='', build_script_pre='', build_script_post='', env_vars=None, update_existing=False, app_section='')

Initialize an Excel add-in deployment.

Parameters:

Name Type Description Default
name str

Display name for the add-in.

required
entrypoint Callable

The callable (function) that starts the add-in server.

required
environment Optional[Environment]

Target environment for the deployment.

DEV
image Optional[Image]

Pre-configured container Image.

None
run_as Optional[Union[str, User]]

User or username to run the add-in as.

None
resources Resources

CPU and memory resources for the container.

Resources()
acl Optional[ACL]

Access control list.

None
python_version str

Python version for the container image.

'3.12'
python_requirements str | List[str]

Python dependencies (see Image).

''
build_script_pre str

Dockerfile commands to run before installing requirements.

''
build_script_post str

Dockerfile commands to run after installing requirements.

''
env_vars Optional[Dict[str, str | int | float | bool]]

Environment variables passed to the running container.

None
update_existing bool

If True, update an existing job definition.

False
app_section str

The section to which the app belongs. If not provided, the app will be assigned to the default section. This affects the app launcher page.

''

Job

Base class for all Datatailr compute processes (App, Service, ExcelAddin, and workflows).

A Job encapsulates the container image configuration, resource requirements, access control, and deployment metadata needed to schedule and run work on the Datatailr platform. In most cases you should use the concrete subclasses (App, Service, ExcelAddin) or the @workflow decorator rather than instantiating Job directly.

id property

Unique identifier for the job.

__find_missing_python_packages()

Compare the imports detected in the entry point code with the packages listed in python_requirements. Raise a warning if there are any imported packages that are not listed in python_requirements.

__get_existing__(job_name, environment, get_tasks=False) classmethod

Retrieve an existing job instance from the Datatailr platform. Based on the job name and environment.

__init__(name, environment=Environment.DEV, image=None, run_as=None, resources=None, acl=None, python_version='auto', python_requirements='', build_script_pre='', build_script_post='', env_vars=None, type=JobType.UNKNOWN, entrypoint=None, get_existing=False, app_section='')

Initialize a Job.

Parameters:

Name Type Description Default
name str

Display name for the job.

required
environment Optional[Environment]

Target environment (dev / pre / prod).

DEV
image Optional[Image]

Pre-configured container Image. When None, an image is built from python_version, python_requirements, and the build scripts.

None
run_as Optional[Union[str, User]]

User or username under which the job runs. Defaults to the currently signed-in user.

None
resources Resources | None

CPU and memory resources for the container.

None
acl Optional[ACL]

Access control list. Defaults to standard permissions for the current user.

None
python_version str

Python version for the container image.

'auto'
python_requirements str | List[str]

Python dependencies (see Image).

''
build_script_pre str

Dockerfile commands before pip install.

''
build_script_post str

Dockerfile commands after pip install.

''
env_vars Dict[str, str | int | float | bool] | None

Environment variables passed to the container.

None
type Optional[JobType]

The kind of job (workflow, service, app, excel).

UNKNOWN
entrypoint Optional[EntryPoint]

The entry point that starts the job.

None
get_existing bool

If True, load an existing job with the same name and update it instead of creating a new definition.

False
app_section str

The section to which the app belongs. If not provided, the app will be assigned to the default section. This affects the app launcher page.

''

__run_command__(command, *args, **kwargs)

Run a command in the context of the job. This is used to execute the job's entry point.

from_dict(job_dict)

Populate this Job instance from a dictionary (e.g. a platform API response).

Parameters:

Name Type Description Default
job_dict dict

A dictionary containing the serialized job data.

required

get_schedule_args()

Returns additional arguments for scheduling the job. Override or extend this method as needed.

promote(from_environment=None, version=None)

Promote the job to the next environment. This method is used to promote a version of the job from one environment to the next one. If none of the environments to promote from are specified, it defaults to promote from all environments.

:param from_environment: The environment to promote from. If None, it will promote from all environments. :param version: The version to promote. If None, it will promote the latest version. :return: A tuple of (success: bool, message: str).

from datatailr import Environment from datatailr.scheduler import Job job = Job(name="my_job", environment=Environment.DEV) job.promote(from_environment=Environment.DEV, version=3)

This will promote version 3 of the job from the DEV environment to the next environment (PRE).

run()

Run the job. This is equivalent to running job.save() and then job.start().

runs(refresh=False)

List all runs of the job.

Returns a list of mother-job dictionaries. Each dictionary contains the top-level run metadata and a 'tasks' key holding a list of child task run dictionaries. See :meth:_process_run_data for the full structure.

:param refresh: If True, it will refresh the cached runs. :return: A list of mother-job run dictionaries.

save()

Save the job to the Datatailr platform. If the job already exists, it will be updated. The repository state is verified and an image is prepared for execution.

start()

Start the job. This will start the job execution on a schedule for workflows if a schedule was specified. For other types of jobs and for workflows without a schedule the job will be run immediately.

to_dict()

Convert the Job instance to a dictionary representation.

to_json()

Convert the Job instance to a JSON string representation.

verify_repo_is_ready()

Verify if the repository is ready for job execution. The check consists of two parts: 1. Check if there are uncommitted changes in the repository. 2. Check if the local commit matches the remote HEAD (the repo is synced with the remote). Returns a tuple of (branch: str, commit_hash: str).

versions(environment=None)

List all versions of the job in the specified environment If no environment is specified, it lists versions across all environments.

JobType

Enum representing different types of Datatailr jobs.

from_str(value) classmethod

Create a JobType from a string while normalizing legacy aliases.

Resources dataclass

Represents the compute resources allocated to a job container.

Attributes:

Name Type Description
memory str

Memory limit as a string (e.g. "256m", "1g").

cpu float

Number of CPU cores to allocate (e.g. 1, 0.5).

Schedule

Build a schedule object for batch/workflow jobs using friendly fields.

Parameters:

Name Type Description Default
cron_expression str

Raw cron expression to use (string).

''
at_minutes list[int] | None

Specific minutes within the hour, e.g. [0, 30].

None
every_minute int | None

Run every N minutes.

None
at_hours list[int] | None

Specific hours within the day, e.g. [0, 12].

None
every_hour int | None

Run every N hours.

None
weekdays list[str] | None

Weekdays by name, e.g. ["Mon", "Wed", "Fri"].

None
day_of_month int | None

Day of the month (1-31).

None
in_month list[str] | None

Months by name, e.g. ["Jan", "Jul"].

None
every_month int | None

Run every N months.

None
timezone str | None

Time zone name, e.g. "UTC".

None
run_after_job_uuid str | None

Job UUID to run after.

None
run_after_job_name str | None

Job name to run after.

None
run_after_job_condition str | None

Condition for dependency, e.g. "on failure".

None

Examples:

schedule = Schedule(at_hours=[0])
schedule = Schedule(at_minutes=[0, 30], weekdays=["Mon", "Wed", "Fri"])

__init__(cron_expression='', at_minutes=None, every_minute=None, at_hours=None, every_hour=None, weekdays=None, day_of_month=None, in_month=None, every_month=None, timezone=None, run_after_job_uuid=None, run_after_job_name=None, run_after_job_condition=None)

Initialize a Schedule.

You can either provide a raw cron_expression or use the human-readable helper parameters (which are compiled into cron syntax automatically).

Parameters:

Name Type Description Default
cron_expression str

A raw cron string (e.g. "0 */2 * * *"). If provided alongside helper parameters, the helpers take precedence.

''
at_minutes list[int] | None

Specific minutes past the hour to run (e.g. [0, 30] for ":00" and ":30").

None
every_minute int | None

Run every N minutes (e.g. 5 for every 5 minutes).

None
at_hours list[int] | None

Specific hours of the day to run (0 -- 23).

None
every_hour int | None

Run every N hours.

None
weekdays list[str] | None

Days of the week to run (e.g. ["mon", "wed", "fri"]).

None
day_of_month int | None

Day of the month to run (1 -- 31).

None
in_month list[str] | None

Months to run in (e.g. ["jan", "apr", "jul", "oct"]).

None
every_month int | None

Run every N months.

None
timezone str | None

IANA timezone name (e.g. "America/New_York").

None
run_after_job_uuid str | None

UUID of a job that must complete before this schedule triggers.

None
run_after_job_name str | None

Name of a job that must complete before this schedule triggers.

None
run_after_job_condition str | None

Required completion status of the predecessor job (e.g. "success").

None
Example
from datatailr import Schedule
# Every weekday at 08:00 and 16:00 UTC
s = Schedule(at_hours=[8, 16], weekdays=["mon","tue","wed","thu","fri"])

get_cron_string()

Return the compiled cron string.

Returns:

Type Description
str

Cron string (str).

Examples:

>>> Schedule(
...     at_minutes=[0, 15, 30, 45],
...     at_hours=[0, 12],
...     weekdays=["Mon", "Wed", "Fri"],
...     day_of_month=15,
...     in_month=["Jan", "Jul"],
... ).get_cron_string()
'0 0,15,30,45 0,12 15 1,7 1,3,5'

Service

Represents a long-running background service deployment on Datatailr.

A service runs continuously (e.g., an API server, a message consumer, or any always-on process). It is restarted automatically if it exits.

Example
from datatailr import Service

# service.py
from flask import Flask

app = Flask(__name__)

@app.route("/health")
def health_check():
    return "OK"

# Service entrypoints receive the port from Datatailr.
def run_server(port):
    app.run("0.0.0.0", port=int(port), debug=False)

svc = Service(
    name="Simple Service",
    entrypoint=run_server,
    python_requirements=["flask"],
)
svc.run()

__init__(name, entrypoint, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, python_version='3.12', python_requirements='', build_script_pre='', build_script_post='', env_vars=None, update_existing=False)

Initialize a Service deployment.

Parameters:

Name Type Description Default
name str

Display name for the service.

required
entrypoint Callable

The callable (function) that starts the service.

required
environment Optional[Environment]

Target environment for the deployment.

DEV
image Optional[Image]

Pre-configured container Image.

None
run_as Optional[Union[str, User]]

User or username to run the service as.

None
resources Resources

CPU and memory resources for the container.

Resources()
acl Optional[ACL]

Access control list.

None
python_version str

Python version for the container image.

'3.12'
python_requirements str | List[str]

Python dependencies (see Image).

''
build_script_pre str

Dockerfile commands to run before installing requirements.

''
build_script_post str

Dockerfile commands to run after installing requirements.

''
env_vars Optional[Dict[str, str | int | float | bool]]

Environment variables passed to the running container.

None
update_existing bool

If True, update an existing job definition.

False

Task

Represents a job within a batch job.

This class can be extended to define specific configurations for each job in the batch.

args property writable

Returns the arguments for the Task instance.

id property

Returns the unique identifier of the Task instance.

__call__(*args, **kwds)

Allows the Task instance to be called like a function, returning itself. This is useful for chaining or functional-style programming.

alias(name)

Set an alias for the Task instance.

:param name: The alias name to set.

run()

Execute the job's entrypoint.

set_resources(resources=None, memory=None, cpu=None)

Set the resources for the Task instance.

:param resources: The Resources instance to set.

to_dict()

Convert the Task instance to a dictionary representation.

to_json()

Convert the Task instance to a JSON string representation.

translate_dependencies()

Translate the dependencies of the Task instance into a format suitable for the batch job.

update_env_vars(env_vars)

Update the environment variables for the Task instance.

:param env_vars: A dictionary of environment variables to update.

TaskError

Exception raised for errors related to workflow tasks.

Workflow

Represents a workflow in the scheduler.

Inherits from Job and is used to define workflows with specific configurations.

next_job_id property

Returns a generator for the next task ID in the workflow.

add_job(job)

Adds a task to the workflow.

:param job: The Task instance to add.

get_env_vars_copy()

Returns a copy of the environment variables for the Workflow instance.

rerun(run_id, version=None, full=False)

Rerun a run id with specific version (latest version by default). If full is true then all the tasks will be rerun, including the ones that already ran successfully.

:param run_id: The run id to rerun. :param version: The version to rerun. If None, it will rerun the latest version. :param full: If True, it will rerun the full workflow, including the ones that already ran successfully. :return: A tuple of (success: bool, message: str).

reset()

Reset the Workflow instance to its initial state.

set_local_run(local_run)

Set the local run flag for the Workflow instance.

:param local_run: A boolean indicating whether to run locally.

to_dict()

Convert the Workflow instance to a dictionary representation.

to_json()

Convert the Workflow instance to a JSON string representation.

task(memory=DEFAULT_TASK_MEMORY, cpu=DEFAULT_TASK_CPU)

Decorator that marks a function as a workflow task (batch job).

Use this via the public alias @task() to declare individual computational steps inside a @workflow-decorated function.

Parameters:

Name Type Description Default
memory str

Memory limit for this task's container (e.g. "256m", "1g").

DEFAULT_TASK_MEMORY
cpu float

Number of CPU cores to allocate for this task (e.g. 1, 0.5).

DEFAULT_TASK_CPU

Returns:

Type Description

A decorator that wraps the target function so it can participate

in DAG-based workflow orchestration.

Example
from datatailr import task

@task(memory="512m", cpu=2)
def heavy_computation(x, y):
    return x ** y