Scheduling API Reference
This page provides a detailed description of Datatailr API that you can use to interact with the Scheduler. API allows to schedule jobs, issue and manage pre-warming requests, or retrieve Batch Runs reports.
Scheduling
Scheduling API provides a way to schedule Batch Jobs programmatically using Python scripts. It can also be used to create Batch Jobs dynamically, such as through a Streamlit App that generates Batch Job suites based on user-provided parameters.
dt.scheduler.batch
The batch decorator allows to define any existing function as a possible batch entrypoint. Any file in a package can include functions with this decorator. When a package is built, functions decorated with dt_batch
are registered as batch entrypoints included in the package. These functions become available to be scheduled as batch jobs once an image with the package is built.
The function signature can have any number of arguments, but the following arguments are reserved for the Datatailr batch framework:
sub_job_name
: The name of the sub job - this is a unique identifier used by the execution engine.scheduled_time
: The time the job was scheduled to run.runtime
: The time the job actually started running.part_num
: The part number of the job (see ParallelTask).num_parts
: The total number of parts the job is split into (see ParallelTask).job_config
: The job configuration, which is a dictionary defined when a job is scheduled. This dictionary with user-defined parameters will be passed to the job at runtime.rundate
: The date for which the job is being run - by default it is derived fromruntime
, except for two cases:- if the job is explicitly set to run with a custom
rundate
when being scheduled; - if it is a rerun of a job which ran with a custom
rundate
, in which case the same customrundate
will be used.
- if the job is explicitly set to run with a custom
Examples:
Including the following function definitions in a file /my_package/my_file.py
and building it into a package –
import dt.scheduler
@dt.scheduler.batch()
def foo(rundate, job_config):
print(f'{job_config=}')
print(f'{rundate=}')
return 'This is the return value of function foo'
@dt.scheduler.batch()
def bar(foo):
print(f'{foo=}')
return 'This is the return value of function bar'
@dt.scheduler.batch(translation_table={'running_date': 'rundate'})
def baz(foo, bar, running_date):
print(f'{foo=}')
print(f'{bar=}')
print(f'I am running with a custom rundate: {running_date}')
return 'This is the return value of function baz'
will make functions my_package.my_file.foo
, my_package.my_file.bar
and my_package.my_file.baz
available as batch entrypoints in any image which includes my_package
.
These entrypoints can then be scheduled as a sample batch job using the following code –
from dt.scheduler import DAG, Schedule, Task
with DAG(Name='Test batch with decorators', Tag='dev', Schedule=Schedule()) as dag:
foo = Task(
Name='foo task',
Image=IMAGE,
Entrypoint='my_package.my_file.foo',
dag=dag,
Description='This is for testing'
)
bar = Task(
Name='bar task',
Image=IMAGE,
Entrypoint='my_package.my_file.bar',
dag=dag,
ConfigurationJson={'arg_translation_table': {'foo task': 'foo'}},
Description='This is for testing'
)
baz = Task(
Name='baz task',
Image=IMAGE,
Entrypoint='my_package.my_file.baz',
dag=dag,
ConfigurationJson={'arg_translation_table': {'foo task': 'foo', 'bar task': 'bar'}},
Description='This is for testing'
)
foo >> bar
[foo, bar] >> baz
dag.save()
Executing the above code will create a batch job Test batch with decorators
with an empty schedule and three jobs: foo
, bar
and baz
. Dependencies between the jobs are defined on L31: bar
depends on foo
, and L32: baz
depends on both foo
and bar
.
Note – The return values of the decorated functions are associated with the function names, and are passed as arguments to dependent functions.
Note – Parameter
arg_translation_table
can be provided within ConfigurationJson in Task definitions to map a specific Task in the DAG to the input arguments which an underlying function expects.
The resulting DAG in the above example is equivalent to the following function composition –
rundate = '2021-01-01'
job_config = {'key': 'value'}
foo_return = foo(rundate=rundate, job_config=job_config)
bar_return = bar(foo=foo_return)
baz_return = baz(foo=foo_return, bar=bar_return, running_date=rundate)
The user does not need to worry about explicitly passing the return values of the functions to the next function. Moreover, the returned values are cached and stored for future use in (partial) reruns of the DAG and can save time and resources by avoiding unnecessary re-computation.
Decorator Parameters:
translation_table
(dict, optional) – a translation table can be provided to map the names of the arguments in the function signature to the reserved batch arguments (see functionbaz
in the first example above).
dt.scheduler.DAG
The DAG class defines a Directed Acyclic Graph (DAG) of Tasks, which can be managed using a context manager. Tasks defined within this context are automatically added to the DAG.
Note – Dependencies between Tasks in a DAG can be established using the >> operator.
For instance,
task1 >> task2
indicates thattask2
will only be triggered aftertask1
completes.
Examples:
The following script would schedule a Test Batch comprised of two jobs, where task2
depends on task1
, to run at 18:00 UTC –
from dt.scheduler import DAG, Schedule, Task
with DAG(Name='Test Batch', Tag='dev', Schedule=Schedule(at_hours=[18], at_minutes=[0]) as dag:
task1 = Task(Image='Test Image', Entrypoint='test_entrypoint.main', Name='Task 1', dag=dag)
task2 = Task(Image='Test Image', Entrypoint='test_entrypoint.main', Name='Task 2', dag=dag)
task1 >> task2
dag.save()
Dev run example:
from my_test_batch.entrypoints import *
from dt.scheduler import DAG
with DAG(Name='Test Batch DevRun', DevRun=True, WorkDir='~/example'):
value = load()
compute(input=value)
Here, the file structure would be: ~/example/my_test_batch/entrypoints.py
. And in this file two functions decorated with @dt.scheduler.batch()
are defined: load()
and compute()
, where compute()
expects the output of load()
. The decorator allows "calling" the functions inside with DAG
context to add them as jobs to the DAG. This is syntax sugar for defining Task()
objects. Also since this is a dev run, specifying Image
is optional.
Constructor Parameters:
Name
(str): The name of the Batch Job being created.Tag
(str, optional): The environment tag (dev, pre or prod) where the DAG will be executed. Note that only images matching this environment tag can be used.Schedule
(dt.scheduler.api.Schedule, optional): An instance of the Schedule class that defines when the DAG should run (e.g., specific hours and minutes).DevRun
(bool, optional): Will execute the DAG as a dev run using the code from your IDE. Use this only for development. IfTrue
,WorkDir
is required andImage
is optional (in both Task and DAG definitions).WorkDir
(str, optional): For dev runs only. Path to the directory with your python packages or files. This argument is required ifDevRun
isTrue
.Image
(str, optional): The image for tasks which do not have an image specified in their definition. E.g. when jobs are scheduled via function calls rather than withTask()
object. This is optional only if all theTask()
definitions haveImage
specified.Labels
(List[str], optional): Custom labels for the Batch Job separated by commas. Useful for categorizing jobs - Batch Runs and Batch Job tabs of the Job Scheduler can be filtered based on labels.FailAfter
(str, optional): Defines the maximum run duration for the job before it is transitioned to the "failure" status.FailAfterRunChildren
(str, optional): Specifies the behavior of subsequent jobs scheduled as Run-After if this job fails due to the FailAfter constraint. The default is 'on success', other possible conditions are 'on failure' or 'always'.RunAs
(str, optional): Specifies the user under which the job will run. The job's environment will have mounted the home directory of this user. Default is the user who runs the scheduling script.Owner
(str, optional): The user who owns the job. Owner of the job may not be a member of the Group to which the job belongs. Default is the user who runs the scheduling script.Group
(str, optional): The group that owns the job, which defines access (read/write) permissions. Non-admin users outside of the specified group will not see the job on the Batch Jobs tab, its runs on the Batch Runs tab, and instances of the job on the Active Jobs tab. Default is a primary group of the user who runs the scheduling script.Permissions
(str, optional): Read/write permissions set for the job, categorized by User, Group, and Others. Default is "rwrwr-", meaning anyone has read access, while write access is restricted to the owner and members of the specified group.Priority
(dt.scheduler.api.Priority, optional): Determines the priority of the job. Can be set to LOW, NORMAL, or HIGH. Default is NORMAL.AlertEmail
(str, optional): An email address where notifications about job runs will be sent.AlertOnlyFailures
(bool, optional): If set to True, alerts are only sent in case of job failures.AlertSMS
(str, optional): A phone number for SMS notifications about job runs.
Methods:
save()
– schedule the DAG to run based on provided Schedule object.run()
– trigger a run of the DAG immediately without scheduling it.
dt.scheduler.Schedule
The Schedule class allows you to define when and how often a Batch Job should run. This class offers flexibility in specifying the exact timing of job execution, whether on specific minutes, hours, days, or months, as well as event-triggered execution based on runs of specified jobs and their statuses.
Note – The Schedule can be either time-based or event-based, but not both.
Examples:
The following would create a Schedule where the job is triggered at 10:30 and 16:30 Paris time on Mondays, Wednesdays, and the last Friday of the month with a -1 rundate offset (as of yesterday) –
from dt.scheduler import Schedule
schedule = Schedule(
at_minutes=[30], # Run at 30 minutes past the hour
at_hours=[10, 16], # Run at 10 AM and 4 PM
weekdays=['Mon', 'Wed'], # Mondays and Wednesdays only
first_last={'Weekday': 'Fri', 'Which': 'Last'}, # And on Last Friday of the month
timezone='Europe/Paris', # Set the timezone to Paris
rundate_offset=-1 # Add a -1 day offset
)
If you want to give your job an event-based Schedule, e.g. to run only after "My other Batch Job" succeeds –
from dt.scheduler import Schedule
schedule = Schedule(
run_after_job_name='My other Batch Job', # Preceding job Name
run_after_job_condition='on success' # Run only if the preceding job succeeds
)
Constructor Parameters:
at_minutes
(List[int]): A list of specific minutes during each hour when the job should run. If empty, the job is not constrained to specific minutes.every_minute
(int): If set to a value (e.g., 5), the job will run every X minutes, where X is the value specified.at_hours
(List[int]): A list of specific hours during the day when the job should run. If empty, the job is not constrained to specific hours.every_hour
(int): If set to a value (e.g., 2), the job will run every X hours, where X is the value specified.weekdays
(List[str]): A list of weekdays (['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) on which the job should run. If empty, the job is not constrained to specific days of the week.day_of_month
(int): Specifies a particular day of the month when the job should run (e.g., 15 for the 15th of the month).first_last
(dict): A dictionary defining specific weekdays (['First', 'Second', 'Third', 'Fourth', 'Fifth', 'Last']) within the month when the job should run. For example, setting {'Weekday': 'Mon', 'Which': 'First'} schedules the job for the first Monday of each month.in_months
(List[int]): A list of months (e.g., [1, 6, 12]) during which the job should run. If empty, the job is not constrained to specific months.every_month
(int): If set to a value (e.g., 3), the job will run every X months, where X is the value specified.timezone
(str): Specifies the timezone identifier (e.g. Europe/London) in which the schedule times should be interpreted. If not set, the UTC timezone is used.rundate_offset
(int): An integer representing the offset in days from the specified schedule date when the job should actually run. Negative integers offset the rundate to the past, while positive integers offset it to the future.run_after_job_name
(str): The Name of another job that must complete before this job is run. This allows for chaining jobs based on their names.run_after_job_uuid
(str): The UUID of another job that must complete before this job is run. This is an alternative to using the job's Name.run_after_job_condition
(str): Specifies the condition under which this job should run after the Run-After job. The default is 'on success', meaning the job will run only if the preceding job completes successfully. Other possible conditions are 'on failure' or 'always'.
dt.scheduler.Task
The Task class is used to define individual tasks within a Directed Acyclic Graph (DAG). A task represents a specific job that will be executed within the DAG, and it includes details necessary to run the job such as the image to run, the entrypoint for the task, resource requirements, etc. Dependencies between tasks are established using the right shift operator (>>), allowing for the creation of complex workflows where the execution order of tasks is controlled.
Examples:
The following script would create a Batch Job that includes a group of 5 tasks tasks_per_country, and a task final_task, where final_task will only run after all tasks in tasks_per_country complete.
Note how f-strings are used to dynamically populate the group of tasks based on the content of list_of_countries.
Note also that since CPU and Memory are not provided for the final_task, the default values will be used.
from dt.scheduler import DAG, Schedule, Task
list_of_countries = ['FR', 'DE', 'GB', 'IT', 'NL']
with DAG(Name='My Batch Job', Tag='dev', Schedule=Schedule(at_hours=[10], at_minutes=[0])) as dag:
tasks_per_country = [Task(
Name=f'Task #{i} - {country_code}',
Image='My image',
Description=f'This job calculates something for {country_code}',
dag=dag,
CPU=1000,
Memory=1500,
Entrypoint='my_package.file_with_entrypoint',
CountForPrewarming=True,
ConfigurationJson={'country': country_code}
) for i, country_code in enumerate(list_of_countries)]
final_task = Task(
Name='Final Task',
Image='My image',
Description='The final job which aggregates results',
dag=dag,
Entrypoint='my_package.other_file_with_entrypoint',
CountForPrewarming=False,
ConfigurationJson={'all_countries': list_of_countries}
)
tasks_per_country >> final_task
dag.save()
Constructor Parameters:
Image
(str): The Name of a container image to run for this task. This defines the Image with which the task will be executed.Entrypoint
(str): The entrypoint for the task in a 'package_name.file_name' format, where package_name is a name of Dt package that has the entrypoint, and file_name is a name of the file where __batch_main__ is defined.Name
(str): A unique name for the task, used to identify it within the DAG.Description
(str): A textual description of the task, providing additional context or notes.dag
(dt.scheduler.api.DAG): The DAG object that this task is part of. This links the task to the overall workflow structure.ConfigurationJson
(dict, optional): A dictionary containing configuration parameters specific to this task, which will be passed to the task at runtime. This allows for customizable and complex setups.Arch
(str, optional): The architecture for the task's execution environment - amd64 or arm64. Default is 'amd64'.Os
(str, optional): The operating system for the task's execution environment - fedora or alpine. Default is 'fedora'.CPU
(int, optional): The amount of CPU resources allocated to the task, specified in MHz (1000 MHz = 1 GHz ~ 0.25 CPU core). Default is 400.Memory
(int, optional): The amount of memory allocated to the task, specified in megabytes. Default is 400.Disk
(int, optional): The amount of disk space allocated to the task, specified in megabytes. Default is 300.Retries
(int, optional): The number of times to retry the task in case of failure. Default is 0, meaning no retries.CountForPrewarming
(bool, optional): Indicates whether this task should be considered for pre-warming, a process that brings up compute machines in advance to reduce job startup time. CountForPrewarming is accounted for if the corresponding setting is enabled in global Settings. Default is False.Priority
(dt.scheduler.api.Priority, optional): The priority level for this task. This can be set to LOW, NORMAL, or HIGH. Default is NORMAL.
dt.scheduler.ParallelTask
The ParallelTask class allows creating and running groups of Tasks in a more optimal way when all tasks in a group use the same image and entrypoint.
Examples:
Let's take the previous example and replace a group of tasks with a single ParallelTask tasks_per_country. The ParallelTask in this case will consist of 5 parts, one part per each country in the list. The dependencies can be defined in the same manner, so final_task will only run after all parts of tasks_per_country complete.
Note that if you use ParallelTask, you need to construct ConfigurationJson in advance, and it needs to be a list of dicts rather than a dict. Besides, each dict in the list of dicts has to have 'Name' and 'Description' keys, which will be used as names and descriptions for each part of the ParallelTask.
Note also that each part of the ParallelTask will get the amount of CPU and Memory specified in the ParallelTask definition.
from dt.scheduler import DAG, Schedule, Task
list_of_countries = ['FR', 'DE', 'GB', 'IT', 'NL']
config_parallel = [{
'country': country_code,
'Name': f'Task #{i} - {country_code}',
'Description': f'This job calculates something for {country_code}'
} for i, country_code in enumerate(list_of_countries)]
with DAG(Name='My Batch Job', Tag='dev', Schedule=Schedule(at_hours=[10], at_minutes=[0])) as dag:
tasks_per_country = ParallelTask(
Name='Parallel Task for countries',
Image='My image',
Description='This job calculates something for specified countries',
dag=dag,
CPU=1000,
Memory=1500,
Entrypoint='my_package.file_with_entrypoint',
CountForPrewarming=True,
ConfigurationJson=config_parallel
)
final_task = Task(
Name='Final Task',
Image='My image',
Description='The final job which aggregates results',
dag=dag,
Entrypoint='my_package.other_file_with_entrypoint',
CountForPrewarming=False,
ConfigurationJson={'all_countries': list_of_countries}
)
tasks_per_country >> final_task
dag.save()
Constructor Parameters:
ParallelTask has the same constructor parameters as the Task class, with the only difference ConfigurationJson being a list of dicts rather than a dict.
Updated about 1 month ago