API Reference

This page provides a detailed description of Datatailr API that you can use to interact with the Scheduler. API allows to schedule jobs, issue and manage pre-warming requests, or retrieve Batch Runs reports.

Scheduling

Scheduling API provides a way to schedule Batch Jobs programmatically using Python scripts. It can also be used to create Batch Jobs dynamically, such as through a Streamlit App that generates Batch Job suites based on user-provided parameters.


dt.scheduler.batch

The batch decorator allows to define any existing function as a possible batch entrypoint. Any file in a package can include functions with this decorator. When a package is built, functions decorated with dt_batch are registered as batch entrypoints included in the package. These functions become available to be scheduled as batch jobs once an image with the package is built.

The function signature can have any number of arguments, but the following arguments are reserved for the Datatailr batch framework:

  • sub_job_name: The name of the sub job - this is a unique identifier used by the execution engine.
  • scheduled_time: The time the job was scheduled to run.
  • runtime: The time the job actually started running.
  • part_num: The part number of the job (see ParallelTask).
  • num_parts: The total number of parts the job is split into (see ParallelTask).
  • job_config: The job configuration, which is a dictionary defined when a job is scheduled. This dictionary with user-defined parameters will be passed to the job at runtime.
  • rundate: The date for which the job is being run - by default it is derived from runtime, except for two cases:
    • if the job is explicitly set to run with a custom rundate when being scheduled;
    • if it is a rerun of a job which ran with a custom rundate, in which case the same custom rundate will be used.

Examples:

Including the following function definitions in a file /my_package/my_file.py and building it into a package

import dt.scheduler

@dt.scheduler.batch()
def foo(rundate, job_config):
    print(f'{job_config=}')
    print(f'{rundate=}')
    return 'This is the return value of function foo'

@dt.scheduler.batch()
def bar(foo):
    print(f'{foo=}')
    return 'This is the return value of function bar'

@dt.scheduler.batch(translation_table={'running_date': 'rundate'})
def baz(foo, bar, running_date):
    print(f'{foo=}')
    print(f'{bar=}')
    print(f'I am running with a custom rundate: {running_date}')
    return 'This is the return value of function baz'

will make functions my_package.my_file.foo, my_package.my_file.bar and my_package.my_file.baz available as batch entrypoints in any image which includes my_package.

These entrypoints can then be scheduled as a sample batch job using the following code –

from dt.scheduler import DAG, Schedule, Task

with DAG(Name='Test batch with decorators', Tag='dev', Schedule=Schedule()) as dag:
    foo = Task(
      Name='foo task',
      Image=IMAGE,
      Entrypoint='my_package.my_file.foo',
      dag=dag,
      Description='This is for testing'
    )

    bar = Task(
      Name='bar task',
      Image=IMAGE,
      Entrypoint='my_package.my_file.bar',
      dag=dag,
      ConfigurationJson={'arg_translation_table': {'foo task': 'foo'}},
      Description='This is for testing'
    )

    baz = Task(
      Name='baz task',
      Image=IMAGE,
      Entrypoint='my_package.my_file.baz',
      dag=dag,
      ConfigurationJson={'arg_translation_table': {'foo task': 'foo', 'bar task': 'bar'}},
      Description='This is for testing'
    )

    foo >> bar
    [foo, bar] >> baz

    dag.save()

Executing the above code will create a batch job Test batch with decorators with an empty schedule and three jobs: foo, bar and baz. Dependencies between the jobs are defined on L31: bar depends on foo, and L32: baz depends on both foo and bar.

Note – The return values of the decorated functions are associated with the function names, and are passed as arguments to dependent functions.

Note – Parameter arg_translation_table can be provided within ConfigurationJson in Task definitions to map a specific Task in the DAG to the input arguments which an underlying function expects.

The resulting DAG in the above example is equivalent to the following function composition –

rundate = '2021-01-01'
job_config = {'key': 'value'}
foo_return = foo(rundate=rundate, job_config=job_config)
bar_return = bar(foo=foo_return)
baz_return = baz(foo=foo_return, bar=bar_return, running_date=rundate)

The user does not need to worry about explicitly passing the return values of the functions to the next function. Moreover, the returned values are cached and stored for future use in (partial) reruns of the DAG and can save time and resources by avoiding unnecessary re-computation.

Decorator Parameters:

  • translation_table (dict, optional) – a translation table can be provided to map the names of the arguments in the function signature to the reserved batch arguments (see function baz in the first example above).

dt.scheduler.DAG

The DAG class defines a Directed Acyclic Graph (DAG) of Tasks, which can be managed using a context manager. Tasks defined within this context are automatically added to the DAG.

Note – Dependencies between Tasks in a DAG can be established using the >> operator.

For instance, task1 >> task2 indicates that task2 will only be triggered after task1 completes.

Examples:

The following script would schedule a Test Batch comprised of two jobs, where task2 depends on task1, to run at 18:00 UTC –

from dt.scheduler import DAG, Schedule, Task

with DAG(Name='Test Batch', Tag='dev', Schedule=Schedule(at_hours=[18], at_minutes=[0]) as dag:
    task1 = Task(Image='Test Image', Entrypoint='test_entrypoint.main', Name='Task 1', dag=dag)
    task2 = Task(Image='Test Image', Entrypoint='test_entrypoint.main', Name='Task 2', dag=dag)
    task1 >> task2
    dag.save()

Dev run example:

from my_test_batch.entrypoints import *
from dt.scheduler import DAG

with DAG(Name='Test Batch DevRun', DevRun=True, WorkDir='~/example'):
    value = load()
    compute(input=value)

Here, the file structure would be: ~/example/my_test_batch/entrypoints.py. And in this file two functions decorated with @dt.scheduler.batch() are defined: load() and compute(), where compute() expects the output of load(). The decorator allows "calling" the functions inside with DAG context to add them as jobs to the DAG. This is syntax sugar for defining Task() objects. Also since this is a dev run, specifying Image is optional.

Constructor Parameters:

  • Name (str): The name of the Batch Job being created.
  • Tag (str, optional): The environment tag (dev, pre or prod) where the DAG will be executed. Note that only images matching this environment tag can be used.
  • Schedule (dt.scheduler.api.Schedule, optional): An instance of the Schedule class that defines when the DAG should run (e.g., specific hours and minutes).
  • DevRun (bool, optional): Will execute the DAG as a dev run using the code from your IDE. Use this only for development. If True, WorkDir is required and Image is optional (in both Task and DAG definitions).
  • WorkDir (str, optional): For dev runs only. Path to the directory with your python packages or files. This argument is required if DevRun is True.
  • Image (str, optional): The image for tasks which do not have an image specified in their definition. E.g. when jobs are scheduled via function calls rather than with Task() object. This is optional only if all the Task() definitions have Image specified.
  • Labels (List[str], optional): Custom labels for the Batch Job separated by commas. Useful for categorizing jobs - Batch Runs and Batch Job tabs of the Job Scheduler can be filtered based on labels.
  • FailAfter (str, optional): Defines the maximum run duration for the job before it is transitioned to the "failure" status.
  • FailAfterRunChildren (str, optional): Specifies the behavior of subsequent jobs scheduled as Run-After if this job fails due to the FailAfter constraint. The default is 'on success', other possible conditions are 'on failure' or 'always'.
  • RunAs (str, optional): Specifies the user under which the job will run. The job's environment will have mounted the home directory of this user. Default is the user who runs the scheduling script.
  • Owner (str, optional): The user who owns the job. Owner of the job may not be a member of the Group to which the job belongs. Default is the user who runs the scheduling script.
  • Group (str, optional): The group that owns the job, which defines access (read/write) permissions. Non-admin users outside of the specified group will not see the job on the Batch Jobs tab, its runs on the Batch Runs tab, and instances of the job on the Active Jobs tab. Default is a primary group of the user who runs the scheduling script.
  • Permissions (str, optional): Read/write permissions set for the job, categorized by User, Group, and Others. Default is "rwrwr-", meaning anyone has read access, while write access is restricted to the owner and members of the specified group.
  • Priority (dt.scheduler.api.Priority, optional): Determines the priority of the job. Can be set to LOW, NORMAL, or HIGH. Default is NORMAL.
  • AlertEmail (str, optional): An email address where notifications about job runs will be sent.
  • AlertOnlyFailures (bool, optional): If set to True, alerts are only sent in case of job failures.
  • AlertSMS (str, optional): A phone number for SMS notifications about job runs.

Methods:

  • save() – schedule the DAG to run based on provided Schedule object.
  • run() – trigger a run of the DAG immediately without scheduling it.

dt.scheduler.Schedule

The Schedule class allows you to define when and how often a Batch Job should run. This class offers flexibility in specifying the exact timing of job execution, whether on specific minutes, hours, days, or months, as well as event-triggered execution based on runs of specified jobs and their statuses.

Note – The Schedule can be either time-based or event-based, but not both.

Examples:

The following would create a Schedule where the job is triggered at 10:30 and 16:30 Paris time on Mondays, Wednesdays, and the last Friday of the month with a -1 rundate offset (as of yesterday) –

from dt.scheduler import Schedule

schedule = Schedule(
    at_minutes=[30],                                 # Run at 30 minutes past the hour
    at_hours=[10, 16],                               # Run at 10 AM and 4 PM
    weekdays=['Mon', 'Wed'],                         # Mondays and Wednesdays only
    first_last={'Weekday': 'Fri', 'Which': 'Last'},  # And on Last Friday of the month
    timezone='Europe/Paris',                         # Set the timezone to Paris
    rundate_offset=-1                                # Add a -1 day offset
)

If you want to give your job an event-based Schedule, e.g. to run only after "My other Batch Job" succeeds –

from dt.scheduler import Schedule

schedule = Schedule(
    run_after_job_name='My other Batch Job',         # Preceding job Name
    run_after_job_condition='on success'             # Run only if the preceding job succeeds
)

Constructor Parameters:

  • at_minutes (List[int]): A list of specific minutes during each hour when the job should run. If empty, the job is not constrained to specific minutes.
  • every_minute (int): If set to a value (e.g., 5), the job will run every X minutes, where X is the value specified.
  • at_hours (List[int]): A list of specific hours during the day when the job should run. If empty, the job is not constrained to specific hours.
  • every_hour (int): If set to a value (e.g., 2), the job will run every X hours, where X is the value specified.
  • weekdays (List[str]): A list of weekdays (['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) on which the job should run. If empty, the job is not constrained to specific days of the week.
  • day_of_month (int): Specifies a particular day of the month when the job should run (e.g., 15 for the 15th of the month).
  • first_last (dict): A dictionary defining specific weekdays (['First', 'Second', 'Third', 'Fourth', 'Fifth', 'Last']) within the month when the job should run. For example, setting {'Weekday': 'Mon', 'Which': 'First'} schedules the job for the first Monday of each month.
  • in_months (List[int]): A list of months (e.g., [1, 6, 12]) during which the job should run. If empty, the job is not constrained to specific months.
  • every_month (int): If set to a value (e.g., 3), the job will run every X months, where X is the value specified.
  • timezone (str): Specifies the timezone identifier (e.g. Europe/London) in which the schedule times should be interpreted. If not set, the UTC timezone is used.
  • rundate_offset (int): An integer representing the offset in days from the specified schedule date when the job should actually run. Negative integers offset the rundate to the past, while positive integers offset it to the future.
  • run_after_job_name (str): The Name of another job that must complete before this job is run. This allows for chaining jobs based on their names.
  • run_after_job_uuid (str): The UUID of another job that must complete before this job is run. This is an alternative to using the job's Name.
  • run_after_job_condition (str): Specifies the condition under which this job should run after the Run-After job. The default is 'on success', meaning the job will run only if the preceding job completes successfully. Other possible conditions are 'on failure' or 'always'.

dt.scheduler.Task

The Task class is used to define individual tasks within a Directed Acyclic Graph (DAG). A task represents a specific job that will be executed within the DAG, and it includes details necessary to run the job such as the image to run, the entrypoint for the task, resource requirements, etc. Dependencies between tasks are established using the right shift operator (>>), allowing for the creation of complex workflows where the execution order of tasks is controlled.

Examples:

The following script would create a Batch Job that includes a group of 5 tasks tasks_per_country, and a task final_task, where final_task will only run after all tasks in tasks_per_country complete.

Note how f-strings are used to dynamically populate the group of tasks based on the content of list_of_countries.

Note also that since CPU and Memory are not provided for the final_task, the default values will be used.

from dt.scheduler import DAG, Schedule, Task

list_of_countries = ['FR', 'DE', 'GB', 'IT', 'NL']

with DAG(Name='My Batch Job', Tag='dev', Schedule=Schedule(at_hours=[10], at_minutes=[0])) as dag:
    tasks_per_country = [Task(
        Name=f'Task #{i} - {country_code}',
        Image='My image',
        Description=f'This job calculates something for {country_code}',
        dag=dag,
        CPU=1000,
        Memory=1500,
        Entrypoint='my_package.file_with_entrypoint',
        CountForPrewarming=True,
        ConfigurationJson={'country': country_code}
    ) for i, country_code in enumerate(list_of_countries)]

    final_task = Task(
        Name='Final Task',
        Image='My image',
        Description='The final job which aggregates results',
        dag=dag,
        Entrypoint='my_package.other_file_with_entrypoint',
        CountForPrewarming=False,
        ConfigurationJson={'all_countries': list_of_countries}
    )

    tasks_per_country >> final_task
    dag.save()

Constructor Parameters:

  • Image (str): The Name of a container image to run for this task. This defines the Image with which the task will be executed.
  • Entrypoint (str): The entrypoint for the task in a 'package_name.file_name' format, where package_name is a name of Dt package that has the entrypoint, and file_name is a name of the file where __batch_main__ is defined.
  • Name (str): A unique name for the task, used to identify it within the DAG.
  • Description (str): A textual description of the task, providing additional context or notes.
  • dag (dt.scheduler.api.DAG): The DAG object that this task is part of. This links the task to the overall workflow structure.
  • ConfigurationJson (dict, optional): A dictionary containing configuration parameters specific to this task, which will be passed to the task at runtime. This allows for customizable and complex setups.
  • Arch (str, optional): The architecture for the task's execution environment - amd64 or arm64. Default is 'amd64'.
  • Os (str, optional): The operating system for the task's execution environment - fedora or alpine. Default is 'fedora'.
  • CPU (int, optional): The amount of CPU resources allocated to the task, specified in MHz (1000 MHz = 1 GHz ~ 0.25 CPU core). Default is 400.
  • Memory (int, optional): The amount of memory allocated to the task, specified in megabytes. Default is 400.
  • Disk (int, optional): The amount of disk space allocated to the task, specified in megabytes. Default is 300.
  • Retries (int, optional): The number of times to retry the task in case of failure. Default is 0, meaning no retries.
  • CountForPrewarming (bool, optional): Indicates whether this task should be considered for pre-warming, a process that brings up compute machines in advance to reduce job startup time. CountForPrewarming is accounted for if the corresponding setting is enabled in global Settings. Default is False.
  • Priority (dt.scheduler.api.Priority, optional): The priority level for this task. This can be set to LOW, NORMAL, or HIGH. Default is NORMAL.

dt.scheduler.ParallelTask

The ParallelTask class allows creating and running groups of Tasks in a more optimal way when all tasks in a group use the same image and entrypoint.

Examples:

Let's take the previous example and replace a group of tasks with a single ParallelTask tasks_per_country. The ParallelTask in this case will consist of 5 parts, one part per each country in the list. The dependencies can be defined in the same manner, so final_task will only run after all parts of tasks_per_country complete.

Note that if you use ParallelTask, you need to construct ConfigurationJson in advance, and it needs to be a list of dicts rather than a dict. Besides, each dict in the list of dicts has to have 'Name' and 'Description' keys, which will be used as names and descriptions for each part of the ParallelTask.

Note also that each part of the ParallelTask will get the amount of CPU and Memory specified in the ParallelTask definition.

from dt.scheduler import DAG, Schedule, Task

list_of_countries = ['FR', 'DE', 'GB', 'IT', 'NL']
config_parallel = [{
  	'country': country_code,
    'Name': f'Task #{i} - {country_code}',
    'Description': f'This job calculates something for {country_code}'
  } for i, country_code in enumerate(list_of_countries)]

with DAG(Name='My Batch Job', Tag='dev', Schedule=Schedule(at_hours=[10], at_minutes=[0])) as dag:
    tasks_per_country = ParallelTask(
        Name='Parallel Task for countries',
        Image='My image',
        Description='This job calculates something for specified countries',
        dag=dag,
        CPU=1000,
        Memory=1500,
        Entrypoint='my_package.file_with_entrypoint',
        CountForPrewarming=True,
        ConfigurationJson=config_parallel
    )

    final_task = Task(
        Name='Final Task',
        Image='My image',
        Description='The final job which aggregates results',
        dag=dag,
        Entrypoint='my_package.other_file_with_entrypoint',
        CountForPrewarming=False,
        ConfigurationJson={'all_countries': list_of_countries}
    )

    tasks_per_country >> final_task
    dag.save()

Constructor Parameters:

ParallelTask has the same constructor parameters as the Task class, with the only difference ConfigurationJson being a list of dicts rather than a dict.


Pre-warming


dt.scheduler.estimate_required_machines()

The estimate_required_machines function calculates the number of machines needed to run a specified number of jobs based on the CPU and Memory requirements of each job. Either the type of machine or its exact CPU and Memory can be specified. If the available resources on the machine are insufficient to run even a single job, the function returns -1.

Examples:

from dt.scheduler import estimate_required_machines

machines_needed = estimate_required_machines(
    job_cpu=4 * 3.5,                 # Each job requires 14 GHz of CPU (4 cores at 3.5 GHz each)
    job_memory=16,                   # Each job requires 16 GB of memory
    num_jobs=1000,                   # Total number of jobs to run
    instance_type='c6i.32xlarge',    # AWS instance type specifying machine's CPU and memory
    # machine_cpu=64 * 3.5,          # Ignored since instance_type is specified
    # machine_memory=128             # Ignored since instance_type is specified
)

print(f"Number of machines required: {machines_needed}")

Parameters:

  • job_cpu (float): The amount of CPU required by a single job, specified in GHz.
  • job_memory (float): The amount of Memory required by a single job, specified in GB.
  • num_jobs (int): The total number of jobs that need to be run.
  • machine_reserve_cpu (float, optional): The amount of CPU reserved by Datatailr, specified in GHz. This reserved amount is subtracted from the total CPU available on the machine for running jobs. The default value is 1.5 GHz.
  • machine_reserve_memory (float, optional): The amount of Memory reserved by Datatailr, specified in GB. This reserved amount is subtracted from the total Memory available on the machine for running jobs. The default value is 4.5 GB.
  • instance_type (str, optional): The AWS instance type that defines the machine's CPU and Memory specifications. If instance_type is provided, the machine_cpu and machine_memory parameters are ignored.
  • machine_cpu (float, optional): The total CPU available on a machine, specified in GHz. This parameter is required if instance_type is not provided.
  • machine_memory (float, optional): The total Memory available on a machine, specified in GB. This parameter is required if instance_type is not provided.

Returns:

  • (int): The number of machines required to run the specified number of jobs. If the available resources on a machine (after accounting for reserved CPU and Memory) are insufficient to run even one job, the function returns -1.

dt.scheduler.prewarming.get_num_instances()

The get_num_instances function calculates the number of machines required to run specified jobs. Unlike estimate_required_machines, it takes a list of Task/ParallelTask objects from dt.scheduler as an input argument, and it uses a default instance family specified in the Settings app for calculations.

Parameters:

  • jobs (List): a list of Task/ParallelTask objects from dt.scheduler.api, for which the number of instances has to be calculated

Returns:

  • (str): Message containing the number of instances along with the instance type.

dt.scheduler.schedule_prewarm()

The schedule_prewarm function schedules a pre-warm operation for the specified Batch Job in a corresponding environment at given time. This function should be used when a Batch Job doesn't run on Schedule (e.g. event-based Run-After jobs), but you want it to have the required compute waiting for the job at its runtime, so the job can start running immediately.

Note – The pre-warm request should be issued at least 5-10 minutes before the runtime to ensure that required compute resources can be prepared in a timely manner.

Note – After the pre-warm request is fulfilled, you will get a notification in the Notifications center.

Examples:

The following issues a pre-warming request for the "My Batch" job in dev environment 10 minutes from now –

from dt.scheduler import schedule_prewarm

schedule_prewarm('My Batch', 'dev', 10)

Parameters:

  • batch_name (str): The name of the Batch Job to pre-warm the compute instances for.
  • tag (str): The environment tag (dev, pre or prod) where the Batch Job will be executed.
  • minutes_to_start (int): The number of minutes from the current time when the job is expected to start.

Returns:

  • (bool): True if the request was successfully recorded, else False.

dt.scheduler.list_prewarm_entries()

The list_prewarm_entries function lists pre-warming entries. By default, only non-deleted entries are retrieved.

Parameters:

  • include_deleted (bool, optional): If set to True, all entries including the deleted ones are retrieved. The default value is False.

Returns:

  • (List[dict]): A list of dictionaries containing the details of each entry.

dt.scheduler.cancel_prewarm_request()

The cancel_prewarm_request function marks a specific pre-warming entry as "deleted".

Parameters:

  • entry_id (int): The ID of the entry to be deleted.

Returns:

  • (bool): True if the operation was successful, else False.

Querying


dt.scheduler.BatchRunResults

The BatchRunResults class is used to retrieve the results of runs of a specified Batch Job.

Usage:

To get the output of a given batch run, this class needs to be instantiated with a given Batch Job name.
Then, functions for accessing runs information and task outputs can be called on that instance.

The result will be loaded in a lazy way, i.e. only when a specific run is requested, then the result will be loaded from the database.

Examples:

The following would print the total number of runs of 'My Batch Job' and the reports of its runs –

from dt.scheduler import BatchRunResults
import pandas as pd

batch_runs = BatchRunResults('My Batch Job')
print(batch_runs.count)
runs = batch_runs.get_runs()
print(pd.DataFrame.from_dict(runs))

Parameters:

  • batch_name (str): The name of a Batch Job to retrieve results for.

Methods:

  • count – Returns the total number of runs for the Batch Job.
  • get_runs() – Returns the list of runs for the Batch Job.
  • get_run()(run_id: int) – Returns the run with the given id.
  • get_run_graph()(run_id: int) – Returns the dependencies between tasks that form a Directed Acyclic Graph.
  • get_task_result()(run_id: int) – Returns the output of a task given the task run id

dt.scheduler.BatchRunResults.count

The total number of runs for the batch.

The name of the batch is determined during BatchRunResults object creation.

Example usage:

from dt.scheduler import BatchRunResults

results = BatchRunResults("My Batch")
print(results.count)

This script will print how many times the "My Batch" batch job was run.

dt.scheduler.BatchRunResults.get_runs()

Function for getting general information about runs of a particular batch job. This does not include information about tasks within the batch or any of the task outputs.

Returns:

  • (List[dict]): The list of runs for the batch. Each run is represented by a dictionary with the following keys:
    name(str), tag(str), scheduled_time(str), started_at(str), finished_at(str),
    rundate(str), status(str), run_as(str), owner(str), group(str), permissions(str),
    id(int), schedule_run_type(str).

Example usage:

from dt.scheduler import BatchRunResults

results = BatchRunResults("My Batch")
runs = results.get_runs()
print(pd.DataFrame.from_dict(runs))

This script will print the information about every run attempt of a batch named "My Batch".

dt.scheduler.BatchRunResults.get_run()

Function for retrieving information about all tasks run within the batch execution. This does not include dependencies between tasks or any of the task outputs.

Parameters:

  • run_id(int): id of the batch run. Can be found using the get_runs() function.

Returns:

  • (List[dict]): The list of all tasks in the batch run. Each task run is represented by a dictionary with the following keys:
    id(int), name(str), description(str), start(str), end(str), stdout(str),
    stderr(str), configuration_json(dict), image(str), status(str), vm_id(str).

Note: id of every task will be different in every batch run. So id of a task from one batch run cannot be used to access a similar task from another run of the same batch.

Example usage:

from dt.scheduler import BatchRunResults
import pandas as pd

results = BatchRunResults("My Batch")
runs = results.get_runs()

# here you may inspect the data to get id of the run you want:
print(pd.DataFrame(runs))

# or get the latest one, which will be at the end of the list
run_id = runs[-1]['id']

task_runs = results.get_run(run_id)
print(pd.DataFrame.from_dict(task_runs))

Usually the get_runs() function call will be necessary to determine the id of the batch run, which then can be used to get the list of tasks in that run.

dt.scheduler.BatchRunResults.get_run_graph()

Function for getting the information task dependencies, including their names and statuses. This does not include task outputs.

Note: The only data provided here that is not present in the output of get_run() function is dependency between tasks.

Parameters:

  • run_id(int): id of the batch run. Can be found using the get_runs() function

Returns:

  • (dict): Dictionary with two keys:
    • statuses(list[dict]): A list of dictionaries storing information about each task with the following keys:
      id(int), name(str), status(str), start_time(str), finish_time(str).
    • dependencies(list[dict]): A list of dictionaries, each representing dependency between two tasks. In terms of DAG, this is a list of all its edges. Each dictionary has two keys: from(str), to(str) with task names as values.

dt.scheduler.BatchRunResults.get_task_result()

Function for retrieving the result of a task run given its id.

Parameters:

  • run_id(int): id of the task run. Can be found using the get_run() or get_run_graph() function.

Returns:

  • (tuple): Tuple with two values:
    • (str): The name of the task
    • (any): The value returned by the entrypoint function of that task

Example usage:

from dt.scheduler import BatchRunResults
import pandas as pd

results = BatchRunResults("My Batch")
runs = results.get_runs()
tasks = results.get_run(runs[-1]['id'])
task_name, task_output = results.get_task_result(runs[-1]['id'])

Note: If the task execution failed, its output won't be stored in the database. Calling get_task_result() on such tasks will raise ClientError.


dt.scheduler.get_batches()

The get_batches function is used to retrieve the list of all scheduled Batch Jobs, their schedules and dependencies. The same information is available on the "Batch Jobs" tab of the Job Scheduler.

Returns:

  • (List[dict]): The list of dictionaries containing the information about scheduled Batch Jobs.