Index
Datatailr Scheduler Module
The datatailr.scheduler module provides a framework for scheduling and managing batch jobs.
The main job types are:
- Batch: Represents a batch job that can be scheduled and executed. The job can include multiple tasks which can be run in parallel or sequentially.
- Service: Represents a service job that runs continuously.
- App: Represents a web app or a dashboard, which can be built using one of the supported frameworks,
such as
Streamlit <https://streamlit.io/>,Dash <https://dash.plotly.com/>, orPanel <https://panel.holoviz.org/>_. - Excel: Represents an Excel add-in.
App
Represents a web application or a dashboard deployment on Datatailr.
This could be a Streamlit app, Dash app, or any other web-based data application.
The implementation of the app does not need to follow any specific framework, as long as it can be started
in the standard way (e.g., streamlit run app.py).
Example:
# app.py
import streamlit as st
def main():
st.title("Hello Datatailr App")
if __name__ == "__main__":
main()
streamlit run app.py
To deploy the app to Datatailr, you would create an App job in your Python code:
from app import main
from datatailr import App
app = App(
name="Simple Dashboard App",
entrypoint=main,
python_requirements="streamlit")
app.run()
https://YOUR-DOMAIN.datatailr.com/job-details/simple-dashboard-app/1/dev/definition
The deployed app can be accessed by all users with the appropriate permissions from the app launcher page or directly via its URL:
https://YOUR-DOMAIN.datatailr.com/job/dev/simple-dashboard-app/
__init__(name, entrypoint, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, framework=None, python_version='3.12', python_requirements='', build_script_pre='', build_script_post='', env_vars={}, update_existing=False)
Initialize an App deployment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Display name for the app. |
required |
entrypoint
|
Callable
|
The callable (function) that starts the application. |
required |
environment
|
Optional[Environment]
|
Target environment for the deployment. |
DEV
|
image
|
Optional[Image]
|
Pre-configured container Image. When |
None
|
run_as
|
Optional[Union[str, User]]
|
User or username to run the app as. Defaults to the currently signed-in user. |
None
|
resources
|
Resources
|
CPU and memory resources for the container. |
Resources()
|
acl
|
Optional[ACL]
|
Access control list. Defaults to standard permissions for the current user. |
None
|
framework
|
Optional[str]
|
Optional framework name (e.g., 'streamlit', 'dash', 'flask'). If not provided, the framework is inferred from the entrypoint. |
None
|
python_version
|
str
|
Python version for the container image. |
'3.12'
|
python_requirements
|
str | List[str]
|
Python dependencies (see |
''
|
build_script_pre
|
str
|
Dockerfile commands to run before installing requirements. |
''
|
build_script_post
|
str
|
Dockerfile commands to run after installing requirements. |
''
|
env_vars
|
Dict[str, str | int | float | bool]
|
Environment variables passed to the running container. |
{}
|
update_existing
|
bool
|
If |
False
|
Batch
Represents a batch job in the scheduler.
Inherits from Job and is used to define batch jobs with specific configurations.
next_job_id
property
Returns a generator for the next job ID in the batch.
add_job(job)
Adds a job to the batch job.
:param job: The BatchJob instance to add.
get_env_vars_copy()
Returns a copy of the environment variables for the Batch instance.
reset()
Reset the Batch instance to its initial state.
set_local_run(local_run)
Set the local run flag for the Batch instance.
:param local_run: A boolean indicating whether to run locally.
to_dict()
Convert the Batch instance to a dictionary representation.
to_json()
Convert the Batch instance to a JSON string representation.
BatchJob
Represents a job within a batch job.
This class can be extended to define specific configurations for each job in the batch.
args
property
writable
Returns the arguments for the BatchJob instance.
id
property
Returns the unique identifier of the BatchJob instance.
__call__(*args, **kwds)
Allows the BatchJob instance to be called like a function, returning itself. This is useful for chaining or functional-style programming.
alias(name)
Set an alias for the BatchJob instance.
:param name: The alias name to set.
run()
Execute the job's entrypoint.
set_resources(resources=None, memory=None, cpu=None)
Set the resources for the BatchJob instance.
:param resources: The Resources instance to set.
to_dict()
Convert the BatchJob instance to a dictionary representation.
to_json()
Convert the BatchJob instance to a JSON string representation.
translate_dependencies()
Translate the dependencies of the BatchJob instance into a format suitable for the batch job.
update_env_vars(env_vars)
Update the environment variables for the BatchJob instance.
:param env_vars: A dictionary of environment variables to update.
BatchJobError
Exception raised for errors related to batch jobs.
DuplicateJobNameError
Exception raised when a job with a duplicate name is added to the batch.
EntryPoint
Represents an entry point for a DataTailr job. This can be a function or a callable object.
Environment
Enum representing different environments for DataTailr jobs.
ExcelAddin
Represents an Excel add-in deployment on Datatailr.
An Excel add-in exposes Python functions as Excel worksheet functions, allowing users to call server-side computations directly from Excel spreadsheets.
Example
from datatailr import ExcelAddin
def price_option(spot, strike, vol, rate, expiry):
# Black-Scholes pricing logic
...
addin = ExcelAddin(
name="Options Pricer",
entrypoint=price_option,
python_requirements="numpy,scipy",
)
addin.run()
__init__(name, entrypoint, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, python_version='3.12', python_requirements='', build_script_pre='', build_script_post='', env_vars={}, update_existing=False)
Initialize an Excel add-in deployment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Display name for the add-in. |
required |
entrypoint
|
Callable
|
The callable (function) to expose as an Excel function. |
required |
environment
|
Optional[Environment]
|
Target environment for the deployment. |
DEV
|
image
|
Optional[Image]
|
Pre-configured container Image. |
None
|
run_as
|
Optional[Union[str, User]]
|
User or username to run the add-in as. |
None
|
resources
|
Resources
|
CPU and memory resources for the container. |
Resources()
|
acl
|
Optional[ACL]
|
Access control list. |
None
|
python_version
|
str
|
Python version for the container image. |
'3.12'
|
python_requirements
|
str | List[str]
|
Python dependencies (see |
''
|
build_script_pre
|
str
|
Dockerfile commands to run before installing requirements. |
''
|
build_script_post
|
str
|
Dockerfile commands to run after installing requirements. |
''
|
env_vars
|
Dict[str, str | int | float | bool]
|
Environment variables passed to the running container. |
{}
|
update_existing
|
bool
|
If |
False
|
Job
Base class for all Datatailr compute processes (App, Service, ExcelAddin, and batch workflows).
A Job encapsulates the container image configuration, resource requirements,
access control, and deployment metadata needed to schedule and run work on
the Datatailr platform. In most cases you should use the concrete
subclasses (App, Service, ExcelAddin) or the @workflow
decorator rather than instantiating Job directly.
id
property
Unique identifier for the job.
__get_existing__(job_name, environment)
classmethod
Retrieve an existing job instance from the DataTailr platform. Based on the job name and environment.
__init__(name, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, python_version='auto', python_requirements='', build_script_pre='', build_script_post='', env_vars={}, type=JobType.UNKNOWN, entrypoint=None, update_existing=False)
Initialize a Job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Display name for the job. |
required |
environment
|
Optional[Environment]
|
Target environment (dev / pre / prod). |
DEV
|
image
|
Optional[Image]
|
Pre-configured container Image. When |
None
|
run_as
|
Optional[Union[str, User]]
|
User or username under which the job runs. Defaults to the currently signed-in user. |
None
|
resources
|
Resources
|
CPU and memory resources for the container. |
Resources()
|
acl
|
Optional[ACL]
|
Access control list. Defaults to standard permissions for the current user. |
None
|
python_version
|
str
|
Python version for the container image. |
'auto'
|
python_requirements
|
str | List[str]
|
Python dependencies (see |
''
|
build_script_pre
|
str
|
Dockerfile commands before pip install. |
''
|
build_script_post
|
str
|
Dockerfile commands after pip install. |
''
|
env_vars
|
Dict[str, str | int | float | bool]
|
Environment variables passed to the container. |
{}
|
type
|
Optional[JobType]
|
The kind of job (batch, service, app, excel). |
UNKNOWN
|
entrypoint
|
Optional[EntryPoint]
|
The entry point that starts the job. |
None
|
update_existing
|
bool
|
If |
False
|
__run_command__(command)
Run a command in the context of the job. This is used to execute the job's entry point.
from_dict(job_dict)
Populate this Job instance from a dictionary (e.g. a platform API response).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_dict
|
dict
|
A dictionary containing the serialized job data. |
required |
get_schedule_args()
Returns additional arguments for scheduling the job. Override or extend this method as needed.
promote(from_environment=None, version=None)
Promote the job to the next environment. This method is used to promote a version of the job from one environment to the next one. If none of the environments to promote from are specified, it defaults to promote from all environments.
:param from_environment: The environment to promote from. If None, it will promote from all environments. :param version: The version to promote. If None, it will promote the latest version. :return: A tuple of (success: bool, message: str).
from datatailr import Environment from datatailr.scheduler import Job job = Job(name="my_job", environment=Environment.DEV) job.promote(from_environment=Environment.DEV, version=3)
This will promote version 3 of the job from the DEV environment to the next environment (PRE).
run()
Run the job. This is equivalent to running job.save() and then job.start().
save()
Save the job to the DataTailr platform. If the job already exists, it will be updated. The repository state is verified and an image is prepared for execution.
start()
Start the job. This will start the job execution on a schedule for batches if a schedule was specified. For other types of jobs and for batches without a schedule the job will be run immediately.
to_dict()
Convert the Job instance to a dictionary representation.
to_json()
Convert the Job instance to a JSON string representation.
verify_repo_is_ready()
Verify if the repository is ready for job execution. The check consists of two parts: 1. Check if there are uncommitted changes in the repository. 2. Check if the local commit matches the remote HEAD (the repo is synced with the remote). Returns a tuple of (branch: str, commit_hash: str).
versions(environment=None)
List all versions of the job in the specified environment If no environment is specified, it lists versions across all environments.
JobType
Enum representing different types of DataTailr jobs.
Resources
dataclass
Represents the compute resources allocated to a job container.
Attributes:
| Name | Type | Description |
|---|---|---|
memory |
str
|
Memory limit as a string (e.g. |
cpu |
float
|
Number of CPU cores to allocate (e.g. |
Schedule
Represents a schedule for batch jobs.
__init__(cron_expression='', at_minutes=None, every_minute=None, at_hours=None, every_hour=None, weekdays=None, day_of_month=None, in_month=None, every_month=None, timezone=None, run_after_job_uuid=None, run_after_job_name=None, run_after_job_condition=None)
Initialize a Schedule.
You can either provide a raw cron_expression or use the human-readable helper parameters (which are compiled into cron syntax automatically).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cron_expression
|
str
|
A raw cron string (e.g. |
''
|
at_minutes
|
list[int] | None
|
Specific minutes past the hour to run
(e.g. |
None
|
every_minute
|
int | None
|
Run every N minutes (e.g. |
None
|
at_hours
|
list[int] | None
|
Specific hours of the day to run (0 -- 23). |
None
|
every_hour
|
int | None
|
Run every N hours. |
None
|
weekdays
|
list[str] | None
|
Days of the week to run
(e.g. |
None
|
day_of_month
|
int | None
|
Day of the month to run (1 -- 31). |
None
|
in_month
|
list[str] | None
|
Months to run in
(e.g. |
None
|
every_month
|
int | None
|
Run every N months. |
None
|
timezone
|
str | None
|
IANA timezone name (e.g. |
None
|
run_after_job_uuid
|
str | None
|
UUID of a job that must complete before this schedule triggers. |
None
|
run_after_job_name
|
str | None
|
Name of a job that must complete before this schedule triggers. |
None
|
run_after_job_condition
|
str | None
|
Required completion status of the
predecessor job (e.g. |
None
|
Example
from datatailr import Schedule
# Every weekday at 08:00 and 16:00 UTC
s = Schedule(at_hours=[8, 16], weekdays=["mon","tue","wed","thu","fri"])
get_cron_string()
Returns the compiled cron string.
Service
Represents a long-running background service deployment on Datatailr.
A service runs continuously (e.g., an API server, a message consumer, or any always-on process). It is restarted automatically if it exits.
Example
from datatailr import Service
def run_server():
import uvicorn
uvicorn.run("myapi:app", host="0.0.0.0", port=8000)
svc = Service(
name="My API Service",
entrypoint=run_server,
python_requirements="fastapi,uvicorn",
)
svc.run()
__init__(name, entrypoint, environment=Environment.DEV, image=None, run_as=None, resources=Resources(), acl=None, python_version='3.12', python_requirements='', build_script_pre='', build_script_post='', env_vars={}, update_existing=False)
Initialize a Service deployment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Display name for the service. |
required |
entrypoint
|
Callable
|
The callable (function) that starts the service. |
required |
environment
|
Optional[Environment]
|
Target environment for the deployment. |
DEV
|
image
|
Optional[Image]
|
Pre-configured container Image. |
None
|
run_as
|
Optional[Union[str, User]]
|
User or username to run the service as. |
None
|
resources
|
Resources
|
CPU and memory resources for the container. |
Resources()
|
acl
|
Optional[ACL]
|
Access control list. |
None
|
python_version
|
str
|
Python version for the container image. |
'3.12'
|
python_requirements
|
str | List[str]
|
Python dependencies (see |
''
|
build_script_pre
|
str
|
Dockerfile commands to run before installing requirements. |
''
|
build_script_post
|
str
|
Dockerfile commands to run after installing requirements. |
''
|
env_vars
|
Dict[str, str | int | float | bool]
|
Environment variables passed to the running container. |
{}
|
update_existing
|
bool
|
If |
False
|
task(memory=DEFAULT_TASK_MEMORY, cpu=DEFAULT_TASK_CPU)
Decorator that marks a function as a workflow task (batch job).
Use this via the public alias @task() to declare individual
computational steps inside a @workflow-decorated function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
memory
|
str
|
Memory limit for this task's container (e.g. |
DEFAULT_TASK_MEMORY
|
cpu
|
float
|
Number of CPU cores to allocate for this task (e.g. |
DEFAULT_TASK_CPU
|
Returns:
| Type | Description |
|---|---|
|
A decorator that wraps the target function so it can participate |
|
|
in DAG-based workflow orchestration. |
Example
from datatailr import task
@task(memory="512m", cpu=2)
def heavy_computation(x, y):
return x ** y