Map Reduce Tutorial
Welcome to the DataTailr map-reduce tutorial! In this guide, you'll learn how to harness the power of distributed computing to process large datasets efficiently. We'll start with the basics and gradually build up to more advanced features.
What You'll Learn
By the end of this tutorial, you'll be able to:
- Set up parallel computations using DataTailr's map-reduce framework
- Create and manage distributed jobs on a cluster
- Use reduce functions for post-processing results
- Control resource usage and test your code locally
- Monitor job progress in real-time
Prerequisites
Before starting, make sure you have:
- Basic Python knowledge
- Jupyter notebook inside Datatailr is opened
Getting Started
Step 1: Import the Required Modules
First, let's import the tools we need:
from dt.map_reduce import Calc, create_inputs
The Calc
class is our main interface for map-reduce operations, while create_inputs
is a helper function that makes it easy to generate properly formatted input data.
Step 2: Understanding the Map Function
In map-reduce, the "map" function is what gets executed in parallel across multiple nodes. Let's create a simple example:
def f(a=0, b=0, c=0):
return a * (b + c)
Important: Your map function must use named arguments only (keyword arguments). This is a requirement for DataTailr to properly distribute your function across the cluster.
Step 3: Preparing Your Data
Now we need to create input data for our function. Let's generate some test data:
# Create three lists of values
a = [x*3 for x in range(10)] # [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
b = [x*2 for x in range(10)] # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
c = [x for x in range(10)] # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Convert to DataTailr input format
inputs = create_inputs(a=a, b=b, c=c)
print(f"Created {len(inputs)} input combinations")
The create_inputs()
function takes your parameter lists and creates a list of dictionaries. Each dictionary represents one function call with specific parameter values. This gives us 10 different combinations to process in parallel.
Step 4: Creating Your First Calculation
Now let's set up our map-reduce job:
# Create a Calc instance
calc = Calc(f, inputs, name="my_first_map_reduce")
The Calc
constructor takes:
- Your map function (
f
) - The input data (
inputs
) - An optional name for your job
If you don't provide a name, DataTailr will generate one automatically using the format: <user_name>-<map_name>-<reduce_name>
.
Step 5: Running Your Job
Time to execute your calculation on the cluster:
# Start the job
calc.run()
# Monitor progress inside your Jupyter Notebook
calc.live_status()
The run()
method submits your job to the cluster, while live_status()
provides real-time updates on job scheduling and completion. You'll see notifications when the job starts and finishes.
Step 6: Getting Your Results
Once your job completes, retrieve the results:
results = calc.get_result()
print("Map results:", results)
The results will be a list containing the output from each function call, in the same order as your inputs.
Adding Reduce Functionality
So far, we've only used the "map" part of map-reduce. Now let's add a "reduce" function to process our results further.
Step 7: Creating a Reduce Function
A reduce function takes all the map outputs and processes them together:
def aggregate(*args):
"""
Process all map results by dividing each by 100
"""
result = []
for arg in args:
result.append(arg / 100)
return result
This function receives all map results as arguments and performs additional processing. You can implement any logic you need here.
Step 8: Map-Reduce with Both Phases
Now let's create a calculation that includes both map and reduce:
# Create calculation with reduce function
calc_with_reduce = Calc(f, inputs, user_reduce=aggregate, name="map_reduce_example")
# Run the job
calc_with_reduce.run()
calc_with_reduce.live_status()
# Get final results
final_results = calc_with_reduce.get_result()
print("Final results after reduce:", final_results)
The reduce function processes the map outputs before returning the final result, giving you more control over the final output format.
Advanced Features
Step 9: Controlling Resource Usage
When you have many inputs that can be processed quickly, you might want to limit the number of compute nodes used:
# Limit to 2 workers
calc = Calc(f, inputs, N_workers=2, name="limited_workers")
calc.run()
calc.live_status()
# Different number of workers for map-reduce
calc_with_reduce = Calc(f, inputs, user_reduce=aggregate, N_workers=3, name="map_reduce_limited")
calc_with_reduce.run()
calc_with_reduce.live_status()
Note: N_workers
doesn't have to be a divisor of your input count. DataTailr will handle the distribution automatically.
Step 10: Local Testing
Before running expensive cluster jobs, it's always a good idea to test your code locally:
# Test map-only calculation locally
calc.run(local=True)
local_results = calc.get_local_results()
print("Local map results:", local_results)
# Test map-reduce calculation locally
calc_with_reduce.run(local=True)
local_final_results = calc_with_reduce.get_local_results()
print("Local map-reduce results:", local_final_results)
Important: When running locally (local=True
), the N_workers
parameter is ignored since everything runs on your local machine.
Best Practices
1. Always Use Named Arguments
# ✅ Correct
def my_function(param1=0, param2=0):
return param1 + param2
# ❌ Incorrect
def my_function(param1, param2):
return param1 + param2
2. Test Locally First
Always test your calculations with local=True
before running on the cluster to catch errors early.
3. Monitor Your Jobs
Use live_status()
to keep track of job progress and identify any issues quickly.
4. Choose Appropriate Worker Count
- Use fewer workers for quick computations
- Use more workers for computationally intensive tasks
- Consider your cluster's capacity and other users
5. Use Meaningful Names
Give your calculations descriptive names to make them easier to identify and manage.
Common Use Cases
Data Processing
def process_data_row(data_row=[]):
# Process a single row of data
return sum(data_row) / len(data_row)
# Process thousands of data rows in parallel
inputs = create_inputs(data_row=your_data_rows)
calc = Calc(process_data_row, inputs)
Machine Learning
def train_model(hyperparams={}):
# Train a model with specific hyperparameters
return {"accuracy": 0.95, "loss": 0.1}
# Grid search across hyperparameter combinations
inputs = create_inputs(hyperparams=hyperparameter_combinations)
calc = Calc(train_model, inputs)
File Processing
def process_file(filename=""):
# Process a single file
return {"filename": filename, "word_count": 1000}
# Process multiple files in parallel
inputs = create_inputs(filename=file_list)
calc = Calc(process_file, inputs)
Troubleshooting
Job Not Starting
- Check your cluster connection
- Verify your function uses named arguments
- Ensure your input data is properly formatted
Unexpected Results
- Test locally first to verify your function logic
- Check that your reduce function handles all input types correctly
- Verify your input data structure
Performance Issues
- Adjust
N_workers
based on your workload - Consider breaking large jobs into smaller chunks
- Monitor cluster resource usage
Next Steps
Now that you've mastered the basics of DataTailr map-reduce, you can:
- Explore more complex reduce functions for advanced data aggregation
- Combine multiple calculations for complex workflows
- Integrate with other DataTailr features like data storage and visualization
- Optimize performance for your specific use cases
Happy computing! 🚀
Updated about 15 hours ago