0% found this document useful (0 votes)
48 views14 pages

Hadoop MapReduce with Python Examples

The document provides an overview of implementing various machine learning algorithms using Hadoop MapReduce Streaming with Python, including summary statistics, logistic regression, SVM, and decision trees. It includes example mapper and reducer scripts for each algorithm, detailing how to calculate metrics like count, min, max, sum, and average, as well as how to train models using logistic regression and SVM. The document emphasizes the scalability of these implementations for large datasets in a Hadoop cluster.

Uploaded by

baccha1556677788
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
48 views14 pages

Hadoop MapReduce with Python Examples

The document provides an overview of implementing various machine learning algorithms using Hadoop MapReduce Streaming with Python, including summary statistics, logistic regression, SVM, and decision trees. It includes example mapper and reducer scripts for each algorithm, detailing how to calculate metrics like count, min, max, sum, and average, as well as how to train models using logistic regression and SVM. The document emphasizes the scalability of these implementations for large datasets in a Hadoop cluster.

Uploaded by

baccha1556677788
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
You are on page 1/ 14

Hadoop MapReduce Streaming with Python

Contents
1. Summary statistics......................................................................................... 1
2. Logistic regression.......................................................................................... 3
3. SVM................................................................................................................. 6
4. Decision Trees................................................................................................. 9

1. Summary statistics
To calculate summary statistics (e.g., count, min, max, sum, and average) using
Hadoop MapReduce Streaming with Python, you can write a mapper and reducer
script. Below is an example implementation.

Problem Statement:
Assume you have a dataset where each line contains a numeric value. The goal
is to calculate the count, minimum, maximum, sum, and average of these values
using Hadoop MapReduce Streaming.

Python Mapper Script (`mapper.py`):


This script reads input data and emits each value with a key (e.g., `"value"`).

```python
#!/usr/bin/env python3
import sys

# Read input from stdin


for line in sys.stdin:
# Remove leading/trailing whitespace
value = line.strip()

# Emit the value with a key (e.g., "value")


if value:
print(f"value\t{value}")
```

Python Reducer Script (`reducer.py`):


This script aggregates the values to calculate count, min, max, sum, and
average.

```python
#!/usr/bin/env python3
import sys

# Initialize variables
count = 0
min_value = None
max_value = None

1
total_sum = 0

# Process each key-value pair from the mapper


for line in sys.stdin:
# Split the input line into key and value
key, value = line.strip().split('\t')

# Convert value to a float


value = float(value)

# Update statistics
if min_value is None or value < min_value:
min_value = value
if max_value is None or value > max_value:
max_value = value
total_sum += value
count += 1

# Calculate the average


if count > 0:
average = total_sum / count
else:
average = 0

# Emit the summary statistics


print(f"Count\t{count}")
print(f"Min\t{min_value}")
print(f"Max\t{max_value}")
print(f"Sum\t{total_sum}")
print(f"Average\t{average}")
```

Running the Hadoop MapReduce Job

1. Prepare the Input Data:


Save your input data in a file (e.g., `input.txt`) and upload it to HDFS:
```bash
hdfs dfs -put input.txt /user/hadoop/input
```

2. Run the Hadoop Streaming Job:


Use the `hadoop jar` command to run the MapReduce job with the Python
scripts:
```bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-
*.jar \
-input /user/hadoop/input/input.txt \
-output /user/hadoop/output \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \

2
-file reducer.py
```

3. Check the Output:


The results will be stored in the HDFS output directory. You can view them
using:
```bash
hdfs dfs -cat /user/hadoop/output/part-00000
```

Example Input and Output

# Input (`input.txt`):
```
10
20
30
40
50
```

# Output:
```
Count 5
Min 10.0
Max 50.0
Sum 150.0
Average 30.0
```

Explanation:
- Mapper: Emits each value with a key (`"value"`).
- Reducer: Aggregates the values to compute count, min, max, sum, and
average.
- Hadoop Streaming: Allows you to use Python scripts for MapReduce jobs.

This approach is scalable and can handle large datasets distributed across a
Hadoop cluster.

2. Logistic regression
Implementing Logistic Regression using Hadoop MapReduce with Python involves
writing a Mapper and Reducer in Python and running them using Hadoop
Streaming. Below is an example of how you can implement a simple logistic
regression algorithm using Hadoop MapReduce.

Logistic Regression Overview


Logistic Regression is a supervised learning algorithm used for binary
classification. The goal is to minimize the logistic loss function using gradient
descent.

Hadoop MapReduce Streaming Python Code

3
1. Mapper (`mapper.py`)
The mapper processes input data and emits intermediate key-value pairs for the
reducer.

```python
#!/usr/bin/env python
import sys
import numpy as np

# Initialize weights (can be passed as a file or hardcoded)


weights = np.array([0.0, 0.0, 0.0]) # Example: [w0, w1, w2] for 2
features + bias

def sigmoid(z):
return 1 / (1 + np.exp(-z))

def map_function(line):
# Parse input line
data = line.strip().split(',')
y = float(data[0]) # Label (0 or 1)
x = np.array([1.0] + [float(val) for val in data[1:]]) #
Features with bias term

# Compute prediction
z = np.dot(weights, x)
prediction = sigmoid(z)

# Emit gradient contributions


gradient = (prediction - y) * x
for i, val in enumerate(gradient):
print(f"{i}\t{val}")

if __name__ == "__main__":
for line in sys.stdin:
map_function(line)
```

2. Reducer (`reducer.py`)
The reducer aggregates the gradients and updates the weights.

```python
#!/usr/bin/env python
import sys
import numpy as np

# Learning rate
learning_rate = 0.01

def reduce_function():
current_key = None

4
total_gradient = 0.0
key = None

for line in sys.stdin:


line = line.strip()
key, val = line.split('\t')
val = float(val)

if current_key == key:
total_gradient += val
else:
if current_key is not None:
# Emit updated weight
updated_weight = float(current_key) - learning_rate
* total_gradient
print(f"{current_key}\t{updated_weight}")
current_key = key
total_gradient = val

# Emit the last key


if current_key == key:
updated_weight = float(current_key) - learning_rate *
total_gradient
print(f"{current_key}\t{updated_weight}")

if __name__ == "__main__":
reduce_function()
```

3. Input Data
Assume the input data is in CSV format, where the first column is the label (`0`
or `1`), and the remaining columns are features.

Example (`input_data.txt`):
```
1,2.0,3.0
0,1.0,1.5
1,3.0,4.0
0,2.0,2.5
```
4. Running the Hadoop Streaming Job
Save the mapper and reducer scripts as `mapper.py` and `reducer.py`,
respectively. Then run the Hadoop Streaming job using the following command:

```bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-
*.jar \
-input /path/to/input_data.txt \
-output /path/to/output_directory \
-mapper mapper.py \
-reducer reducer.py \

5
-file mapper.py \
-file reducer.py
```

5. Explanation
- Mapper:
- Reads input data line by line.
- Computes the gradient contribution for each feature and emits it.
- Reducer:
- Aggregates the gradients for each weight.
- Updates the weights using gradient descent and emits the new weights.
- Hadoop Streaming:
- Runs the Python scripts in a distributed manner using Hadoop.

6. Iterative Process
Logistic regression typically requires multiple iterations to converge. You can run
the MapReduce job iteratively, updating the weights file (`weights.txt`) after
each iteration and passing it to the mapper.

7. Weight Initialization
You can initialize the weights in a file (`weights.txt`) and pass it to the mapper
using the `-files` option in Hadoop Streaming.

```bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-
*.jar \
-input /path/to/input_data.txt \
-output /path/to/output_directory \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py \
-files weights.txt
```

This is a basic implementation. For a production-level solution, you may need to


handle additional details like convergence criteria, regularization, and distributed
weight updates.

3. SVM
To implement a Support Vector Machine (SVM) using Hadoop MapReduce with
Python, you can use Hadoop Streaming. Hadoop Streaming allows you to run
MapReduce jobs with any executable or script as the mapper and/or reducer.
Below is an example of how you can implement a basic SVM using Python for the
mapper and reducer.

Steps:
1. Mapper: Preprocess the data and pass it to the reducer.
2. Reducer: Train the SVM model using a library like `scikit-learn`.

Prerequisites:

6
- Install `scikit-learn` on all nodes in the Hadoop cluster.
- Ensure Python is installed on all nodes.

Python Code for Hadoop MapReduce Streaming

# Mapper (`mapper.py`):
The mapper reads the input data and passes it to the reducer.

```python
#!/usr/bin/env python
import sys

# Read input from stdin


for line in sys.stdin:
# Remove leading/trailing whitespace
line = line.strip()
# Emit the line as key-value pair (key can be a constant)
print(f"1\t{line}")
```

# Reducer (`reducer.py`):
The reducer trains the SVM model using `scikit-learn`.

```python
#!/usr/bin/env python
import sys
from sklearn import svm
import numpy as np

# Initialize lists to store data


X = []
y = []

# Read input from mapper


for line in sys.stdin:
# Split the line into key and value
key, value = line.strip().split('\t')

# Parse the input data (assume CSV format:


label,feature1,feature2,...)
parts = value.split(',')
label = float(parts[0])
features = list(map(float, parts[1:]))

# Append to the dataset


X.append(features)
y.append(label)

# Convert lists to numpy arrays


X = np.array(X)
y = np.array(y)

7
# Train the SVM model
clf = svm.SVC(kernel='linear') # Use a linear kernel
clf.fit(X, y)

# Output the model coefficients (for demonstration purposes)


print("Model coefficients:")
print(clf.coef_)
print("Intercept:")
print(clf.intercept_)
```

Input Data Format


The input data should be in CSV format, where the first column is the label, and
the remaining columns are features. For example:
```
1,2.3,4.5
0,1.2,3.4
1,5.6,7.8
0,9.0,1.2
```

Running the Hadoop MapReduce Job


1. Save the mapper and reducer scripts as `mapper.py` and `reducer.py`,
respectively.
2. Make the scripts executable:
```bash
chmod +x mapper.py
chmod +x reducer.py
```
3. Run the Hadoop Streaming job:
```bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-
*.jar \
-input /path/to/input \
-output /path/to/output \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py
```

Explanation:
- Mapper: Reads the input data and emits it to the reducer. The key is a constant
(`1`), and the value is the data line.
- Reducer: Collects all the data, trains an SVM model using `scikit-learn`, and
outputs the model coefficients.
- Hadoop Streaming: Runs the Python scripts as part of the MapReduce job.

Notes:

8
- This example assumes the dataset fits into memory on the reducer node. For
large datasets, you may need to use distributed machine learning libraries like
`Spark MLlib` or `H2O`.
- The SVM model is trained on the reducer side, which is not ideal for very large
datasets. For distributed training, consider using frameworks like `PySpark` or
`TensorFlow`.

Let me know if you need further clarification!

4. Decision Trees
Implementing a Decision Tree algorithm from scratch using Hadoop MapReduce
is complex because MapReduce is not inherently designed for iterative machine
learning tasks. However, you can use Hadoop Streaming to process data in a
distributed manner and implement parts of the Decision Tree algorithm (e.g.,
calculating splits, entropy, or information gain).

Below is an example of how you can use Hadoop Streaming with Python to
calculate entropy (a key component of Decision Trees) for a dataset. This is a
simplified example and focuses on one step of the Decision Tree algorithm.

To calculate summary statistics (e.g., count, min, max, sum, and average) using
Hadoop MapReduce Streaming with Python, you can write a mapper and reducer
script. Below is an example implementation.

Problem Statement:
Assume you have a dataset where each line contains a numeric value. The goal
is to calculate the count, minimum, maximum, sum, and average of these values
using Hadoop MapReduce Streaming.

Python Mapper Script (`mapper.py`):

This script reads input data and emits each value with a key (e.g., `"value"`).

```python
#!/usr/bin/env python3
import sys

# Read input from stdin


for line in sys.stdin:
# Remove leading/trailing whitespace
value = line.strip()

# Emit the value with a key (e.g., "value")


if value:
print(f"value\t{value}")
```

Python Reducer Script (`reducer.py`):

This script aggregates the values to calculate count, min, max, sum, and
average.

9
```python
#!/usr/bin/env python3
import sys

# Initialize variables
count = 0
min_value = None
max_value = None
total_sum = 0

# Process each key-value pair from the mapper


for line in sys.stdin:
# Split the input line into key and value
key, value = line.strip().split('\t')

# Convert value to a float


value = float(value)

# Update statistics
if min_value is None or value < min_value:
min_value = value
if max_value is None or value > max_value:
max_value = value
total_sum += value
count += 1

# Calculate the average


if count > 0:
average = total_sum / count
else:
average = 0

# Emit the summary statistics


print(f"Count\t{count}")
print(f"Min\t{min_value}")
print(f"Max\t{max_value}")
print(f"Sum\t{total_sum}")
print(f"Average\t{average}")
```

Running the Hadoop MapReduce Job

1. Prepare the Input Data:


Save your input data in a file (e.g., `input.txt`) and upload it to HDFS:
```bash
hdfs dfs -put input.txt /user/hadoop/input
```

2. Run the Hadoop Streaming Job:

10
Use the `hadoop jar` command to run the MapReduce job with the Python
scripts:

```bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-
*.jar \
-input /user/hadoop/input/input.txt \
-output /user/hadoop/output \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py
```

3. Check the Output:


The results will be stored in the HDFS output directory. You can view them
using:

```bash
hdfs dfs -cat /user/hadoop/output/part-00000
```

Example Input and Output

Input (`input.txt`):
```
10
20
30
40
50
```

Output:

```
Count 5
Min 10.0
Max 50.0
Sum 150.0
Average 30.0
```

Explanation:
- Mapper: Emits each value with a key (`"value"`).
- Reducer: Aggregates the values to compute count, min, max, sum, and
average.
- Hadoop Streaming: Allows you to use Python scripts for MapReduce jobs.

This approach is scalable and can handle large datasets distributed across a
Hadoop cluster.---

11
Hadoop MapReduce Streaming Example: Calculating Entropy

Input Data Format

Assume the input data is in CSV format, where each row represents a record, and
the last column is the label (class). For example:

```
feature1,feature2,feature3,label
1,0,1,A
0,1,0,B
1,1,1,A
0,0,0,B
```

Python Code for Mapper and Reducer

Mapper (`mapper.py`)
The mapper will emit the label and a count of 1 for each record.

```python
#!/usr/bin/env python
import sys

for line in sys.stdin:


# Remove leading/trailing whitespace
line = line.strip()
# Split the line into fields
fields = line.split(',')
# Assume the last field is the label
label = fields[-1]
# Emit the label and a count of 1
print(f"{label}\t1")
```

Reducer (`reducer.py`)
The reducer will calculate the entropy for the labels.

```python
#!/usr/bin/env python
import sys
from math import log2

def calculate_entropy(counts, total):


entropy = 0.0
for count in counts.values():
probability = count / total
entropy -= probability * log2(probability)
return entropy

12
def main():
label_counts = {}
total = 0

for line in sys.stdin:


# Remove leading/trailing whitespace
line = line.strip()
# Split the line into label and count
label, count = line.split('\t')
count = int(count)
# Update the label count
if label in label_counts:
label_counts[label] += count
else:
label_counts[label] = count
# Update the total count
total += count

# Calculate entropy
entropy = calculate_entropy(label_counts, total)
print(f"Entropy: {entropy}")

if __name__ == "__main__":
main()
```

Running the Hadoop Streaming Job

1. Prepare the input data: Upload your dataset to HDFS.

```bash
hdfs dfs -put input_data.txt /user/hadoop/input
```

2. Run the Hadoop Streaming job:

```bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-
*.jar \
-input /user/hadoop/input/input_data.txt \
-output /user/hadoop/output \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py
```

3. Check the output:

```bash
hdfs dfs -cat /user/hadoop/output/part-00000

13
```

Explanation of the Code


- Mapper: Emits each label with a count of 1.
- Reducer: Aggregates the counts for each label and calculates the entropy using
the formula:
[expression]
Limitations
1. Scalability: This example only calculates entropy for the labels. A full Decision
Tree implementation would require multiple MapReduce jobs to handle feature
splitting, recursive partitioning, and tree construction.
2. Iterative Process: Decision Trees are inherently iterative, which is not a natural
fit for MapReduce. Frameworks like Apache Spark (with MLlib) are better suited
for such tasks.

Alternative: Use Apache Spark for Decision Trees


If you need to implement a full Decision Tree algorithm, consider using Apache
Spark with its MLlib library, which provides a distributed implementation of
Decision Trees. Here's an example:

```python
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql import SparkSession

# Initialize Spark session


spark =
SparkSession.builder.appName("DecisionTreeExample").getOrCreate()

# Load data
data = spark.read.format("libsvm").load("data.txt")

# Train a Decision Tree model


dt = DecisionTreeClassifier(labelCol="label",
featuresCol="features")
model = dt.fit(data)

# Make predictions
predictions = model.transform(data)
predictions.select("prediction", "label", "features").show()
```

This approach is much more efficient and scalable for machine learning tasks.

14

You might also like