Map-Reduce API Reference

dt.map_reduce.Calc()

The Calc class is used to evaluate a user-defined function on an array of inputs, distributing evaluations to multiple workers (locally or via Nomad), and then to aggregate the results with an optional reduce step.

Parameters:

  • user_map(function): User-defined "map" function to evaluate at each input. All arguments must be keyword arguments.
  • inputs(list): List of input data items. Each input has to be a dict of keyword arguments for the user-defined "map" function.
  • user_reduce=None(function): Optional user-defined reduce function to aggregate "map" results. Should only take *args as arguments - the outputs of the user_map function.
  • name(str): Optional name for the job that will be displayed in Datatailr's Job Scheduler in case it's run in distributed mode.

Example usage:

from dt.map_reduce import Calc

def f(a=0, b=0, c=0):
    return a * (b + c)

inputs = [{'a': 1, 'b': 2, 'c': 3},
          {'a': 4, 'b': 5, 'c': 6}]

calc = Calc(f, inputs)

def aggregate(*args):
    result = []
    for arg in args:
        result.append(arg / 100)
    return result

calc_with_reduce = Calc(f, inputs, user_reduce=aggregate)

dt.map_reduce.Calc.run()

Execute the map-reduce job. Makes Calc object remember the last run by id. Results will always be fetched from the last run.

The calculations are performed on Datatailr Job Scheduler and this function should not block execution.

Parameters:

  • local=False If True, runs map and reduce functions locally, ignores N_workers.
  • n_jobs=None(int) Number of workers to distribute evaluations across.
  • CPU=DEFAULT_TASK_CPU(int) If passed, replaces the default value (1000MHz) when requesting compute.
  • Memory=DEFAULT_TASK_MEMORY(int) If passed, replaces the default value (2000MB) when requesting compute.
  • SpotInstance=False(bool) If True, will request spot instances.
  • Image=None(str) - if passed, replaces your IDE's image for workers.
  • VirtualEnv=None(str) If passed, uses the specified env for workers.

Returns:

  • None

Example usage:

from dt.map_reduce import Calc

def f(a=0, b=0, c=0):
    return a * (b + c)

inputs = [{'a': 1, 'b': 2, 'c': 3},
          {'a': 4, 'b': 5, 'c': 6}]

calc = Calc(f, inputs)
calc.run()

dt.map_reduce.Calc.live_status()

Wait for the remote job to start and complete, with periodic status updates.

This function blocks execution and only displays live updates in Jupyter Notebook (using IPython).

Parameters:

  • None

Returns:

  • Results of the last run.

Example usage:

from dt.map_reduce import Calc

def f(a=0, b=0, c=0):
    return a * (b + c)

inputs = [{'a': 1, 'b': 2, 'c': 3},
          {'a': 4, 'b': 5, 'c': 6}]

calc = Calc(f, inputs)

# In a separate cell:
calc.run()
calc.live_status()

dt.map_reduce.Calc.get_result()

Get results of the last run.

Parameters:

  • None

Returns:

  • (list) - a list of the same length as inputs, containing evaluations of the "map" function at each input

Example usage:

from dt.map_reduce import Calc

def f(a=0, b=0, c=0):
    return a * (b + c)

inputs = [{'a': 1, 'b': 2, 'c': 3},
          {'a': 4, 'b': 5, 'c': 6}]

calc = Calc(f, inputs)
calc.run()

status = calc.live_status() # This will block execution until the job finishes or fails

if status == 0: # Only get results if the job succeeds
		calc.get_result()  

dt.map_reduce.Calc.get_local_results()

Get results of the last local run.

Parameters:

  • None

Returns:

  • (list) - a list of the same length as inputs, containing evaluations of the "map" function at each input.

dt.map_reduce.create_inputs()

Format inputs - create a list of input dictionaries for map-reduce from lists of values of each variable.

Parameters:

  • kwargs: Key-value pairs where values are constants or arrays. Keys have to match the argument names in the user-defined map function. Either all values are arrays of equal length, or only one value is an array - the rest are constants.

Returns:

  • (list) - list of dictionaries each containing every key from kwargs and a single value for each of them.

Example usage:

from dt.map_reduce import creeate_inputs

a = [x*3 for x in range(10)]
b = [x*2 for x in range(10)]
c = [x for x in range(10)]

inputs = create_inputs(a=a, b=b, c=c)