Map Reduce Tutorial

Welcome to the DataTailr map-reduce tutorial! In this guide, you'll learn how to harness the power of distributed computing to process large datasets efficiently. We'll start with the basics and gradually build up to more advanced features.

What You'll Learn

By the end of this tutorial, you'll be able to:

  • Set up parallel computations using DataTailr's map-reduce framework
  • Create and manage distributed jobs on a cluster
  • Use reduce functions for post-processing results
  • Control resource usage and test your code locally
  • Monitor job progress in real-time

Prerequisites

Before starting, make sure you have:

  • Basic Python knowledge
  • Jupyter notebook inside Datatailr is opened

Getting Started

Step 1: Import the Required Modules

First, let's import the tools we need:

from dt.map_reduce import Calc, create_inputs

The Calc class is our main interface for map-reduce operations, while create_inputs is a helper function that makes it easy to generate properly formatted input data.

Step 2: Understanding the Map Function

In map-reduce, the "map" function is what gets executed in parallel across multiple nodes. Let's create a simple example:

def f(a=0, b=0, c=0):
    return a * (b + c)

Important: Your map function must use named arguments only (keyword arguments). This is a requirement for DataTailr to properly distribute your function across the cluster.

Step 3: Preparing Your Data

Now we need to create input data for our function. Let's generate some test data:

# Create three lists of values
a = [x*3 for x in range(10)]  # [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
b = [x*2 for x in range(10)]  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
c = [x for x in range(10)]    # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# Convert to DataTailr input format
inputs = create_inputs(a=a, b=b, c=c)
print(f"Created {len(inputs)} input combinations")

The create_inputs() function takes your parameter lists and creates a list of dictionaries. Each dictionary represents one function call with specific parameter values. This gives us 10 different combinations to process in parallel.

Step 4: Creating Your First Calculation

Now let's set up our map-reduce job:

# Create a Calc instance
calc = Calc(f, inputs, name="my_first_map_reduce")

The Calc constructor takes:

  • Your map function (f)
  • The input data (inputs)
  • An optional name for your job

If you don't provide a name, DataTailr will generate one automatically using the format: <user_name>-<map_name>-<reduce_name>.

Step 5: Running Your Job

Time to execute your calculation on the cluster:

# Start the job
calc.run()

# Monitor progress inside your Jupyter Notebook
calc.live_status()

The run() method submits your job to the cluster, while live_status() provides real-time updates on job scheduling and completion. You'll see notifications when the job starts and finishes.

Step 6: Getting Your Results

Once your job completes, retrieve the results:

results = calc.get_result()
print("Map results:", results)

The results will be a list containing the output from each function call, in the same order as your inputs.

Adding Reduce Functionality

So far, we've only used the "map" part of map-reduce. Now let's add a "reduce" function to process our results further.

Step 7: Creating a Reduce Function

A reduce function takes all the map outputs and processes them together:

def aggregate(*args):
    """
    Process all map results by dividing each by 100
    """
    result = []
    for arg in args:
        result.append(arg / 100)
    return result

This function receives all map results as arguments and performs additional processing. You can implement any logic you need here.

Step 8: Map-Reduce with Both Phases

Now let's create a calculation that includes both map and reduce:

# Create calculation with reduce function
calc_with_reduce = Calc(f, inputs, user_reduce=aggregate, name="map_reduce_example")

# Run the job
calc_with_reduce.run()
calc_with_reduce.live_status()

# Get final results
final_results = calc_with_reduce.get_result()
print("Final results after reduce:", final_results)

The reduce function processes the map outputs before returning the final result, giving you more control over the final output format.

Advanced Features

Step 9: Controlling Resource Usage

When you have many inputs that can be processed quickly, you might want to limit the number of compute nodes used:

# Limit to 2 workers
calc = Calc(f, inputs, N_workers=2, name="limited_workers")
calc.run()
calc.live_status()

# Different number of workers for map-reduce
calc_with_reduce = Calc(f, inputs, user_reduce=aggregate, N_workers=3, name="map_reduce_limited")
calc_with_reduce.run()
calc_with_reduce.live_status()

Note: N_workers doesn't have to be a divisor of your input count. DataTailr will handle the distribution automatically.

Step 10: Local Testing

Before running expensive cluster jobs, it's always a good idea to test your code locally:

# Test map-only calculation locally
calc.run(local=True)
local_results = calc.get_local_results()
print("Local map results:", local_results)

# Test map-reduce calculation locally
calc_with_reduce.run(local=True)
local_final_results = calc_with_reduce.get_local_results()
print("Local map-reduce results:", local_final_results)

Important: When running locally (local=True), the N_workers parameter is ignored since everything runs on your local machine.

Best Practices

1. Always Use Named Arguments

# ✅ Correct
def my_function(param1=0, param2=0):
    return param1 + param2

# ❌ Incorrect
def my_function(param1, param2):
    return param1 + param2

2. Test Locally First

Always test your calculations with local=True before running on the cluster to catch errors early.

3. Monitor Your Jobs

Use live_status() to keep track of job progress and identify any issues quickly.

4. Choose Appropriate Worker Count

  • Use fewer workers for quick computations
  • Use more workers for computationally intensive tasks
  • Consider your cluster's capacity and other users

5. Use Meaningful Names

Give your calculations descriptive names to make them easier to identify and manage.

Common Use Cases

Data Processing

def process_data_row(data_row=[]):
    # Process a single row of data
    return sum(data_row) / len(data_row)

# Process thousands of data rows in parallel
inputs = create_inputs(data_row=your_data_rows)
calc = Calc(process_data_row, inputs)

Machine Learning

def train_model(hyperparams={}):
    # Train a model with specific hyperparameters
    return {"accuracy": 0.95, "loss": 0.1}

# Grid search across hyperparameter combinations
inputs = create_inputs(hyperparams=hyperparameter_combinations)
calc = Calc(train_model, inputs)

File Processing

def process_file(filename=""):
    # Process a single file
    return {"filename": filename, "word_count": 1000}

# Process multiple files in parallel
inputs = create_inputs(filename=file_list)
calc = Calc(process_file, inputs)

Troubleshooting

Job Not Starting

  • Check your cluster connection
  • Verify your function uses named arguments
  • Ensure your input data is properly formatted

Unexpected Results

  • Test locally first to verify your function logic
  • Check that your reduce function handles all input types correctly
  • Verify your input data structure

Performance Issues

  • Adjust N_workers based on your workload
  • Consider breaking large jobs into smaller chunks
  • Monitor cluster resource usage

Next Steps

Now that you've mastered the basics of DataTailr map-reduce, you can:

  1. Explore more complex reduce functions for advanced data aggregation
  2. Combine multiple calculations for complex workflows
  3. Integrate with other DataTailr features like data storage and visualization
  4. Optimize performance for your specific use cases

Happy computing! 🚀