0% found this document useful (0 votes)
1K views36 pages

ETL-Kafka (Talend) Student MANUAL - For Merge

The document is a student manual for a Skill Development Course V (ETL-Kafka) at CMR Engineering College, outlining the course objectives, outcomes, and experiments related to Apache Kafka and Talend. It includes the department's vision and mission, program educational objectives, and detailed instructions for setting up Kafka, performing experiments, and integrating with Talend. The manual emphasizes practical experience in data science and engineering, preparing students for industry demands in data-related roles.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
1K views36 pages

ETL-Kafka (Talend) Student MANUAL - For Merge

The document is a student manual for a Skill Development Course V (ETL-Kafka) at CMR Engineering College, outlining the course objectives, outcomes, and experiments related to Apache Kafka and Talend. It includes the department's vision and mission, program educational objectives, and detailed instructions for setting up Kafka, performing experiments, and integrating with Talend. The manual emphasizes practical experience in data science and engineering, preparing students for industry demands in data-related roles.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 36

CMR Engineering College CSD

Department Of Computer Science & Engineering


(DATA SCIENCE)

STUDENT MANUAL

Name of lab : Skill Development Course V


(ETL- Kafka)
Class : III Year I Sem

Branch : Computer Science & Engineering


(DATA SCIENCE)

Regulation : R22

A.Y. : 2024- 2025

1
CMR Engineering College CSD

DEPARTMENT VISION
To create the next generation and globally competent data
scientists/data engineers in the field of Data Science domain by
providing quality engineering education along with cutting edge

DEPARTMENT MISSION

● To provide value based engineering education through continues learning and research
by imparting solid foundation in applied mathematics, algorithms and programming
paradigms to build software models and simulations.
● To develop concepts building, logical and problem solving skills of graduates to
address current global challenges of industry and society.
● To offer excellence in teaching and learning process, industry collaboration activities
and research to mould graduates into industry ready professionals

PROGRAM EDUCATIONAL OBJECTIVES (PEO)


1. To prepare graduates with a varied range of expertise in different aspects of data science
such as data collection, processing, modeling and visualization of large data sets
2. To acquire good knowledge of both theory and application of applied statistics,
mathematics and computer science based existing data science models to analyze huge
data sets originating from different application areas
3. To create models using the knowledge acquired from the program to solve future
challenges and real-world problems requiring large scale data analysis.
4. To make better trained professionals to cater the growing demand for data scientists,
data analysts, data architects and data engineers in industry.

SD509PC: Skill Development Course V (ETL- KAFKA)


B.Tech. III Year I Sem. L T P C
0021
Course Objectives:
● Develop a comprehensive understanding of Extract, Transform, Load (ETL) processes

2
CMR Engineering College CSD

using Apache Kafka and Talend.


● Understand how to scale Kafka clusters seamlessly to handle growing data volumes,
ensuring optimal performance for ETL operations.
Course Outcomes:
● Learn to design and deploy fault-tolerant Kafka clusters, ensuring data integrity and
availability in real-world scenarios.
● Gain practical experience in cluster management, topic creation, and basic operations
such as producing and consuming messages.
LIST OF EXPERIMENTS:
1. Install Apache Kafka on a single node.
2. Demonstrate setting up a single-node, single-broker Kafka cluster and show basic
operations such as creating topics and producing/consuming messages.
3. Extend the cluster to multiple brokers on a single node.
4. Write a simple Java program to create a Kafka producer and Produce messages to a topic.
5. Implement sending messages both synchronously and asynchronously in the producer.
6. Develop a Java program to create a Kafka consumer and subscribe to a topic and
consume messages.
7. Write a script to create a topic with specific partition and replication factor settings.
8. Simulate fault tolerance by shutting down one broker and observing the cluster behavior.
9. Implement operations such as listing topics, modifying configurations, and deleting
topics.
10. Introduce Kafka Connect and demonstrate how to use connectors to integrate with
external systems.
11. Implement a simple word count stream processing application using Kafka Stream
12. Implement Kafka integration with the Hadoop ecosystem
TEXT BOOK:
1. Neha Narkhede, Gwen Shapira, Todd Palino, Kafka – The Definitive Guide: Real-time data and
stream processing at scale, O′Reilly

LAB CODE

Students should report to the concerned lab as per the time table.
Students who turn up late to the labs will in no case be permitted to do the program schedule
for the day.
After completion of the program, certification of the concerned staff in-charge in the

3
CMR Engineering College CSD

observation book is necessary.


Student should bring a notebook of 100 pages and should enter the readings /observations into
the notebook while performing the experiment.
The record of observations along with the detailed experimental procedure of the experiment in
the immediate last session should be submitted and certified staff member in-charge.
The group-wise division made in the beginning should be adhered to and no mix up of students
among different groups will be permitted.
When the experiment is completed, should disconnect the setup made by them, and should
return all the components/instruments taken for the purpose.
Any damage of the equipment or burn-out components will be viewed seriously either by putting
penalty or by dismissing the total group of students from the lab for the semester/year.
Students should be present in the labs for total scheduled duration.
Students are required to prepare thoroughly to perform the experiment before coming to
laboratory.

INDEX
Experi
ment Name of the Experiment Pa
ge
No.
N
o
1. Install Apache Kafka on a single node. 9

2. Demonstrate setting up a single-node, single-broker 11


Kafka cluster and show basic operations such as
creating topics and producing/consuming messages.
3. Extend the cluster to multiple brokers on a single node. 13

4. Write a simple Java program to create a Kafka producer 15


and Produce messages to a topic.

5. Implement sending messages both synchronously and 16


asynchronously in the producer.

6. Develop a Java program to create a Kafka consumer and 18


subscribe to a topic and consume messages.
7. Write a script to create a topic with specific partition 20
and replication factor settings.
8. Simulate fault tolerance by shutting down one broker 22
and observing the cluster behavior.
9. Implement operations such as listing topics, modifying 23
configurations, and deleting topics.
4
CMR Engineering College CSD

Introduce Kafka Connect and demonstrate how to use 24


10.
connectors to integrate with external systems.
11. Implement a simple word count stream processing 26
application using Kafka Stream
12. Implement Kafka integration with the Hadoop 29
ecosystem.

Apache Kafka Overview:

**Apache Kafka** is a distributed event streaming platform that is designed to handle large-
scale data streaming in real-time. Originally developed by LinkedIn, Kafka has become an open-
source project under the Apache Software Foundation. Kafka is known for its high-throughput,
fault-tolerance, scalability, and durability.

Key Concepts in Kafka:

1. **Producer:**

- Publishes messages to Kafka topics.

2. **Consumer:**

- Subscribes to Kafka topics and processes the messages.

3. **Broker:**

- Kafka server that stores and manages the topics and messages.

4. **Topic:**

- A category or feed name to which records are published.

5. **Partition:**

- Topics are divided into partitions to parallelize processing.

6. **Zookeeper:**

- Coordinates and manages distributed brokers and topics.

Use Cases for Kafka:

- **Real-time Data Pipeline:**

- Used to build robust and scalable data pipelines for real-time analytics.

5
CMR Engineering College CSD

- **Log Aggregation:**

- Centralized logging for applications, enabling easy analysis of logs.

- **Event Sourcing:**

- Stores events as a source of truth for system state.

- **Metrics and Monitoring:**

- Streams and processes metrics data in real-time.

Talend Overview:

**Talend** is an open-source integration platform that provides a set of tools and technologies
to connect, access, and manage different systems and data sources. Talend supports a wide
range of data integration and transformation tasks, including ETL (Extract, Transform, Load)
processes.

Key Features of Talend:

1. **Data Integration:**

- Enables the extraction, transformation, and loading of data between different systems.

2. **Big Data Integration:**

- Supports integration with big data technologies such as Apache Hadoop, Apache Spark, and
Apache Kafka.

3. **Cloud Integration:**

- Provides connectors for popular cloud platforms like AWS, Azure, and Google Cloud.

4. **Data Quality and Governance:**

- Includes features for data profiling, cleansing, and governance.

5. **Real-time Data Integration:**

- Supports real-time data processing and integration.

6. **Master Data Management (MDM):**

- Manages master data across an organization.

Apache Kafka and Talend Integration:

Talend provides connectors and components for integrating with Apache Kafka, allowing users

6
CMR Engineering College CSD

to build end-to-end data integration and streaming solutions. With Talend, you can easily design
workflows that involve Kafka as a source or destination for data.

Use Cases for Talend and Kafka Integration:

- **Real-time Data Processing:**

- Use Kafka as a streaming source or destination for real-time data processing.

- **Data Ingestion:**

- Ingest data from various sources into Kafka for centralized processing.

- **Event-Driven Architectures:**

- Build event-driven architectures by integrating Talend with Kafka.

- **Data Integration Pipelines:**

- Design complex data integration pipelines that involve Kafka as a key component.

Integration Steps:

1. **Talend Kafka Component:**

- Talend includes components specifically designed for interacting with Kafka, allowing you to
easily configure and manage Kafka connections within your data integration jobs.

2. **Designing Jobs:**

- Use Talend Studio to design jobs that involve reading from or writing to Kafka topics.

3. **Configuration:**

- Configure the Kafka connection settings, topic information, and other parameters within
Talend components.

4. **Deployment:**

- Deploy the Talend jobs to your runtime environment, where they can interact with Kafka in a
production environment.
By combining the strengths of Apache Kafka and Talend, organizations can achieve
robust, scalable, and real-time data integration solutions that meet their business needs. The
integration allows for seamless handling of streaming data within the broader context of data
integration and processing workflows.

7
CMR Engineering College CSD

1. Installing Apache Kafka on a single node

Please follow the steps below:

Prerequisites:

1. **Java Installation:**
- Ensure that Java is installed on your Windows machine. Kafka requires Java to run.
- You can download Java from [Oracle's website](https://www.oracle.com/java/technologies/
javase-downloads.html) or use OpenJDK.

2. **Environment Variables:**
- Set the ` JAVA_ HOME` environment variable to the path where Java is installed.
- Add the ` % JAVA_ HOME% \bin` directory to your system's ` PATH` variable.

Step-by-Step Installation:

1. **Download Apache Kafka:**


- Visit the [Apache Kafka download page](https://kafka.apache.org/downloads).
- Download the latest stable release for Windows.

2. **Extract Kafka Archive:**


- Extract the downloaded Kafka archive to a directory of your choice (e.g., ` C:\kafka` ).

3. **Configure Kafka:**
- Navigate to the Kafka installation directory.
- Open the ` config` directory.
- Edit the ` server.properties` file using a text editor.
- Set the following properties:
` ` ` properties
listeners=PLAINTEXT://localhost:9092
8
CMR Engineering College CSD

advertised.listeners=PLAINTEXT://localhost:9092
```

4. **Start Zookeeper (required for Kafka):**


- Open a command prompt in the Kafka directory.
- Run the following command to start Zookeeper:
` ` ` bash
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
```

5. **Start Kafka:**
- Open a new command prompt in the Kafka directory.
- Run the following command to start Kafka:
` ` ` bash
.\bin\windows\kafka-server-start.bat .\config\server.properties
```

6. **Create a Kafka Topic:**


- Open a new command prompt in the Kafka directory.
- Run the following command to create a topic named "test" (you can change the topic name):
` ` ` bash
.\bin\windows\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092 --
partitions 1 --replication-factor 1
```

7. **Produce and Consume Messages:**


- Open a command prompt for producing messages:
` ` ` bash
.\bin\windows\kafka-console-producer.bat --topic test --bootstrap-server localhost:9092
```

- Open another command prompt for consuming messages:


` ` ` bash
.\bin\windows\kafka-console-consumer.bat --topic test --bootstrap-server localhost:9092
```

Now you have Apache Kafka installed and running on a single node in your Windows
environment. You can start producing and consuming messages in the created topic. Remember
to check the [official Kafka documentation](https://kafka.apache.org/documentation/) for the
latest information and any updates.

9
CMR Engineering College CSD

2. Demonstrate setting up a single-node, single-broker Kafka cluster and show basic operations
such as creating topics and producing/consuming messages.

Setting Up a Single-Node, Single-Broker Kafka Cluster on Windows:

Prerequisites:

1. **Java Installation:**
- Ensure Java is installed on your Windows machine. Download it from [Oracle's website]
(https://www.oracle.com/java/technologies/javase-downloads.html) or use OpenJDK.

2. **Environment Variables:**
- Set the ` JAVA_ HOME` environment variable to the Java installation path.
- Add ` % JAVA_ HOME% \bin` to your system's ` PATH` .

Steps:

1. **Download Apache Kafka:**


- Visit [Apache Kafka download page](https://kafka.apache.org/downloads).
- Download the latest stable release for Windows.

2. **Extract Kafka Archive:**


- Extract the downloaded Kafka archive to a directory (e.g., ` C:\kafka` ).

3. **Configure Kafka:**
- Navigate to the Kafka installation directory.
- Open the ` config` directory.
- Edit ` server.properties` using a text editor.
10
CMR Engineering College CSD

- Set ` listeners=PLAINTEXT://localhost:9092` .
- Set ` advertised.listeners=PLAINTEXT://localhost:9092` .

4. **Start Zookeeper (required for Kafka):**


- Open a command prompt in the Kafka directory.
- Run the following command:
` ` ` bash
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
```

5. **Start Kafka Broker:**


- Open another command prompt in the Kafka directory.
- Run the following command:
` ` ` bash
.\bin\windows\kafka-server-start.bat .\config\server.properties
```

Basic Kafka Operations:


Create a Topic:

` ` ` bash
.\bin\windows\kafka-topics.bat --create --topic my-topic --bootstrap-server localhost:9092 --
partitions 1 --replication-factor 1
```

List Topics:

` ` ` bash
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
```

#### Produce Messages:

Open a command prompt and run:

` ` ` bash
.\bin\windows\kafka-console-producer.bat --topic my-topic --bootstrap-server localhost:9092
```

Type messages and press Enter.

Consume Messages:

Open another command prompt and run:

11
CMR Engineering College CSD

` ` ` bash
.\bin\windows\kafka-console-consumer.bat --topic my-topic --bootstrap-server localhost:9092
--from-beginning
```

You should see the messages you produced.

Cleanup:

To stop Kafka and Zookeeper, press ` Ctrl+C` in their respective command prompt windows.

These steps demonstrate a basic setup of a single-node, single-broker Kafka cluster on Windows
and showcase fundamental operations. Adjust topic names and configurations as needed for
your use case. Always refer to the [official Kafka documentation](https://kafka.apache.org/
documentation/) for the latest information and updates.

3. Extending the Kafka cluster to multiple brokers on a single node


Extending the Kafka cluster to multiple brokers on a single node involves starting additional
Kafka broker instances and adjusting configurations. Here's a step-by-step guide to set up a
multi-broker Kafka cluster on a single node in a Windows environment:

1. Clone Configuration:

1. Copy the existing Kafka directory to create multiple broker instances. For example, if your
current Kafka directory is ` C:\kafka` , you can copy it to ` C:\kafka2` and ` C:\kafka3` .

2. Update Broker Configurations:

1. Navigate to the configuration directory of each copied Kafka instance (` C:\kafka2\config` and
` C:\kafka3\config` ).

2. In each configuration directory, open the ` server.properties` file and adjust the following
properties:

- For ` C:\kafka2\config\server.properties` :
` ` ` properties
broker.id=1
listeners=PLAINTEXT://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dirs=C:/kafka2/data
```

12
CMR Engineering College CSD

- For ` C:\kafka3\config\server.properties` :
` ` ` properties
broker.id=2
listeners=PLAINTEXT://localhost:9094
advertised.listeners=PLAINTEXT://localhost:9094
log.dirs=C:/kafka3/data
```

Adjust the ` broker.id` , ` listeners` , ` advertised.listeners` , and ` log.dirs` properties for each
broker.

3. Start Additional Brokers:

1. Open new command prompt windows for each additional Kafka broker.

2. Start each broker with the following command, replacing the paths accordingly:

` ` ` bash
.\bin\windows\kafka-server-start.bat .\config\server.properties
```

For example:
` ` ` bash
C:\kafka2> .\bin\windows\kafka-server-start.bat .\config\server.properties
C:\kafka3> .\bin\windows\kafka-server-start.bat .\config\server.properties
```
4. Verify Broker Status:

1. Open a command prompt and run the following command to check the status of each broker:

` ` ` bash
.\bin\windows\kafka-topics.bat --list --bootstrap-server
localhost:9092,localhost:9093,localhost:9094
```

5. Create Topics and Produce/Consume Messages:

1. You can create topics, produce, and consume messages as before, but now you can specify
any of the brokers in the ` --bootstrap-server` parameter:

` ` ` bash
.\bin\windows\kafka-topics.bat --create --topic my-topic --bootstrap-server
localhost:9092,localhost:9093,localhost:9094 --partitions 3 --replication-factor 2
```

Cleanup:

To stop Kafka and Zookeeper for all instances, press ` Ctrl+C` in their respective command
prompt windows.
13
CMR Engineering College CSD

This setup now demonstrates a multi-broker Kafka cluster on a single node with three brokers.
Adjust the configuration and number of brokers based on your requirements. Always refer to the
[official Kafka documentation](https://kafka.apache.org/documentation/) for the latest
information and updates.

4. Write a simple Java program to create a Kafka producer and Produce messages to a
topic.

Below is a simple Java program that demonstrates how to create a Kafka producer and
produce messages to a topic using the Kafka Producer API. Make sure you have the Kafka
libraries included in your project. You can download them from the [Apache Kafka
website](https://kafka.apache.org/downloads).

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {

public static void main(String[] args) {

14
CMR Engineering College CSD

// Set up producer properties

Properties properties = new Properties();

properties.put("bootstrap.servers", "localhost:9092"); // Kafka broker addresses

properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

// Create a Kafka producer

Producer<String, String> producer = new KafkaProducer<>(properties);

// Specify the topic to which you want to send messages

String topic = "my-topic";

// Produce messages to the topic

for (int i = 0; i < 10; i++) {

String message = "Message " + i;

ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);

// Send the message

producer.send(record);

System.out.println("Produced message: " + message);

// Close the producer to release resources

producer.close();

```

This program uses the Kafka Producer API to create a producer, set up necessary
properties (such as bootstrap servers, key and value serializers), and produce ten
messages to the specified topic ("my-topic" in this case). Adjust the properties and topic
name according to your Kafka setup.

15
CMR Engineering College CSD

5. Implement sending messages both synchronously and asynchronously in the producer.

Below is an example of a Java program that uses the Kafka Producer API to send
messages both synchronously and asynchronously.

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Future;

public class KafkaProducerExample {

private static final String TOPIC_ NAME = "my-topic";

private static final String BOOTSTRAP_ SERVERS = "localhost:9092";

public static void main(String[] args) {

// Set up producer properties

Properties properties = new Properties();

properties.put("bootstrap.servers", BOOTSTRAP_ SERVERS);

properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

// Create a Kafka producer

Producer<String, String> producer = new KafkaProducer<>(properties);

16
CMR Engineering College CSD

// Send messages synchronously

sendMessagesSynchronously(producer);

// Send messages asynchronously

sendMessagesAsynchronously(producer);

// Close the producer to release resources

producer.close();

private static void sendMessagesSynchronously(Producer<String, String> producer) {

for (int i = 0; i < 5; i++) {

String message = "Synchronous Message " + i;

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_ NAME,


message);

try {

// Send the message and wait for acknowledgement

RecordMetadata metadata = producer.send(record).get();

System.out.println("Synchronous message sent to partition " +


metadata.partition() +

" with offset " + metadata.offset());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

17
CMR Engineering College CSD

private static void sendMessagesAsynchronously(Producer<String, String> producer) {

for (int i = 0; i < 5; i++) {

String message = "Asynchronous Message " + i;

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_ NAME,


message);

// Send the message asynchronously

producer.send(record, new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (exception == null) {

System.out.println("Asynchronous message sent to partition " +

metadata.partition() + " with offset " + metadata.offset());

} else {

exception.printStackTrace();

});

In this example, the ` sendMessagesSynchronously` method sends messages and waits for
acknowledgment using the ` get()` method. On the other hand, the
` sendMessagesAsynchronously` method sends messages asynchronously using the
` send` method with a callback function (` Callback` ) to handle the acknowledgment.

18
CMR Engineering College CSD

Adjust the number of messages, topic name, and other properties according to your
requirements. Additionally, handle exceptions appropriately in a production code.

6. Develop a Java program to create a Kafka consumer and subscribe to a topic and
consume

Messages.

Below is a simple Java program that demonstrates how to create a Kafka consumer,
subscribe to a topic, and consume messages using the Kafka Consumer API. Make sure
you have the Kafka libraries included in your project.

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

public class KafkaConsumerExample {

public static void main(String[] args) {

// Set up consumer properties

Properties properties = new Properties();

properties.put("bootstrap.servers", "localhost:9092"); // Kafka broker addresses

properties.put("group.id", "my-consumer-group"); // Consumer group ID

19
CMR Engineering College CSD

properties.put("key.deserializer", StringDeserializer.class.getName());

properties.put("value.deserializer", StringDeserializer.class.getName());

// Create a Kafka consumer

Consumer<String, String> consumer = new KafkaConsumer<>(properties);

// Subscribe to a topic

String topic = "my-topic";

consumer.subscribe(Collections.singletonList(topic));

// Consume messages

try {

while (true) {

ConsumerRecords<String, String> records =


consumer.poll(Duration.ofMillis(100));

records.forEach(record ->

System.out.printf("Consumed record with key % s and value % s% n",

record.key(), record.value()));

} finally {

// Close the consumer to release resources

consumer.close();

This program uses the Kafka Consumer API to create a consumer, set up necessary

20
CMR Engineering College CSD

properties (such as bootstrap servers, group ID, key and value deserializers), and subscribe
to a specified topic ("my-topic" in this case). It then enters an infinite loop to continuously
poll for new messages and print their keys and values.

Adjust the properties and topic name according to your Kafka setup. Remember to handle
exceptions appropriately in a production code. Also, consider implementing a graceful
shutdown mechanism for the consumer. This example is simplified for clarity.

21
CMR Engineering College CSD

7. Write a script to create a topic with specific partition and replication factor settings.

To create a topic with specific partition and replication factor settings using the Kafka
command-line tools, you can use the ` kafka-topics.sh` script (or ` kafka-topics.bat` on
Windows). Below is an example script to create a topic named "my-topic" with three
partitions and a replication factor of two:

Unix/Linux Script (` create-topic.sh` ):

#!/bin/bash

# Set the Kafka home directory

KAFKA_ HOME="/path/to/your/kafka"

# Kafka broker addresses

BROKER="localhost:9092"

# Topic settings

TOPIC_ NAME="my-topic"

PARTITIONS=3

REPLICATION_ FACTOR=2

22
CMR Engineering College CSD

# Create the topic

${KAFKA_ HOME}/bin/kafka-topics.sh \

--create \

--topic ${TOPIC_ NAME} \

--bootstrap-server ${BROKER} \

--partitions ${PARTITIONS} \

--replication-factor ${REPLICATION_ FACTOR}

```

Windows Script (` create-topic.bat` ):

@echo off

rem Set the Kafka home directory

set KAFKA_ HOME=C:\path\to\your\kafka

rem Kafka broker addresses

set BROKER=localhost:9092

rem Topic settings

set TOPIC_ NAME=my-topic

set PARTITIONS=3

set REPLICATION_ FACTOR=2

rem Create the topic

% KAFKA_ HOME% \bin\windows\kafka-topics.bat ^

--create ^

23
CMR Engineering College CSD

--topic % TOPIC_ NAME% ^

--bootstrap-server % BROKER% ^

--partitions % PARTITIONS% ^

--replication-factor % REPLICATION_ FACTOR%

```

Replace ` /path/to/your/kafka` with the actual path to your Kafka installation directory.

Save the script in a file (e.g., ` create-topic.sh` for Unix/Linux or ` create-topic.bat` for
Windows) and make it executable (Unix/Linux: ` chmod +x create-topic.sh` ). Then, you
can run the script to create the topic with the specified settings.

Adjust the ` TOPIC_ NAME` , ` PARTITIONS` , ` REPLICATION_ FACTOR` , and ` BROKER`


variables as needed for your use case. After running the script, you should have a Kafka
topic with the specified partition and replication factor settings.

24
CMR Engineering College CSD

8. Simulate fault tolerance by shutting down one broker and observing the cluster behavior.

To simulate fault tolerance in a Kafka cluster, you can intentionally shut down one of the Kafka
brokers and observe how the remaining brokers handle the situation. Here are the steps:

1. **Identify the Broker to Shutdown:**


- Determine the ID or address of the broker you want to shut down. You can find this
information in the ` server.properties` file of each Kafka broker.

2. **Shutdown the Broker:**


- Use the appropriate script to stop the Kafka broker. For example, if you are using Unix/Linux,
you might run:
` ` ` bash
./bin/kafka-server-stop.sh config/server.properties
```
For Windows, use:
` ` ` batch
.\bin\windows\kafka-server-stop.bat .\config\server.properties
```

3. **Observe Cluster Behavior:**


- After shutting down a broker, observe the behavior of the remaining brokers in the Kafka
cluster.
- Check the logs of the remaining brokers for any information about leadership changes,
reassignments, or other activities related to the fault tolerance mechanisms.

4. **Produce and Consume Messages:**

25
CMR Engineering College CSD

- While the cluster adapts to the loss of a broker, you can continue to produce and consume
messages to observe how Kafka handles the situation.
- You may notice that some partitions get reassigned, and the remaining brokers take over the
responsibilities of the shutdown broker.

5. **Restart the Shutdown Broker:**


- After observing the behavior, you can restart the broker that you shut down.
- Observe how the cluster redistributes partitions and returns to a stable state.

It's important to note that Kafka is designed to handle fault tolerance gracefully. The replication
factor you set when creating a topic plays a crucial role in ensuring data availability and
durability. If a broker goes down, partitions with the replication factor greater than 1 will still
have copies on other brokers.

Keep in mind that this simulation is for educational or testing purposes. In a production
environment, you should plan for fault tolerance and ensure that Kafka is properly configured
for your specific use case. Always refer to the [official Kafka documentation](https://
kafka.apache.org/documentation/) for the latest information and best practices regarding fault
tolerance and high availability.

9. Implement operations such as listing topics, modifying configurations, and deleting


topics.

To perform operations like listing topics, modifying configurations, and deleting topics in Kafka,
you can use the ` kafka-topics.sh` (Unix/Linux) or ` kafka-topics.bat` (Windows) script provided
by Kafka. Below are examples of how you can use these scripts for each operation:

1. List Topics:

**Unix/Linux:**

./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

**Windows:**

.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2. Modify Configurations:

**Unix/Linux:**
./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --
alter --add-config max.message.bytes=2000000

**Windows:**
.\bin\windows\kafka-configs.bat --zookeeper localhost:2181 --entity-type topics --entity-name
my-topic --alter --add-config max.message.bytes=2000000
26
CMR Engineering College CSD

This example modifies the ` max.message.bytes` configuration for the topic named ` my-topic` .
Adjust the configuration and topic name as needed.
3. Delete Topics:

**Unix/Linux:**
./bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

**Windows:**
.\bin\windows\kafka-topics.bat --delete --topic my-topic --bootstrap-server localhost:9092

This example deletes the topic named ` my-topic` . Be cautious when deleting topics in a
production environment as it can result in data loss.

Make sure to replace the ` localhost:9092` with the actual Kafka broker address and port.

Remember to handle these operations carefully, especially in a production environment, to avoid


unintended consequences. Always refer to the [official Kafka documentation](https://
kafka.apache.org/documentation/) for the latest and most accurate information.

10. Introduce Kafka Connect and demonstrate how to use connectors to integrate with
external

systems.

**Introduction to Kafka Connect:**

Kafka Connect is a framework in Apache Kafka that simplifies the integration of Kafka with other
systems. It provides a scalable and fault-tolerant way to stream data between Apache Kafka and
various data storage systems, databases, and other data processing frameworks. Kafka Connect
aims to eliminate the need for custom data integration code by providing a set of pre-built
connectors.

Connectors in Kafka Connect are plugins that define how data should be ingested or egressed
from Kafka. Kafka Connect includes both source connectors (for bringing data into Kafka) and
sink connectors (for pushing data from Kafka to external systems).

**Using Connectors:**

Here is a simple demonstration of how to use Kafka Connect to integrate with an external
system using a source connector and a sink connector.

**1. Start Kafka Connect:**

- Navigate to your Kafka installation directory.


- Start Kafka Connect in standalone mode using the following command:
27
CMR Engineering College CSD

./bin/connect-standalone.sh config/connect-standalone.properties config/your-source-


config.properties config/your-sink-config.properties

The ` connect-standalone.properties` file is the Kafka Connect standalone configuration file,


and ` your-source-config.properties` and ` your-sink-config.properties` are your specific
connector configurations.

**2. Source Connector:**

In this example, let's use the [Debezium connector](https://debezium.io/) as a source connector


to capture changes from a MySQL database and send them to Kafka.

- Download the Debezium MySQL connector JAR from the [Debezium website](https://
debezium.io/documentation/reference/1.7/install.html).
- Place the JAR file in the ` plugin.path` directory specified in ` connect-standalone.properties` .

**3. Sink Connector:**

For the sink connector, let's use the Kafka Connect JDBC sink connector to write the data to a
relational database (e.g., PostgreSQL).

- Download the Kafka Connect JDBC sink connector JAR from the [Confluent Hub](https://
www.confluent.io/hub/confluentinc/kafka-connect-jdbc).
- Place the JAR file in the ` plugin.path` directory.

**4. Configure Connectors:**

Create configuration files for your source and sink connectors (` your-source-config.properties`
and ` your-sink-config.properties` ). Here are simplified examples:

**` your-source-config.properties` (Debezium MySQL Source Connector):**

name=my-source-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=localhost
database.port=3306
database.user=mydbuser
database.password=mydbpassword
database.server.id=1
database.server.name=mydbserver
database.whitelist=mydatabase

**` your-sink-config.properties` (JDBC Sink Connector for PostgreSQL):**

name=my-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
28
CMR Engineering College CSD

topics=my-topic
connection.url=jdbc:postgresql://localhost:5432/mydatabase
connection.user=mydbuser
connection.password=mydbpassword
auto.create=true

**5. Verify and Monitor:**

- Verify that your source system (MySQL) and target system (PostgreSQL) are running.
- Monitor the connectors using the Kafka Connect REST API or the Confluent Control Center.

**6. Produce and Consume Data:**

Produce data to your source system (MySQL), and observe that changes are captured by the
Debezium connector and sent to Kafka. The JDBC sink connector will then consume these
changes and write them to the target system (PostgreSQL).

**Note:** This is a simplified example, and configurations may vary based on your specific use
case and systems.

By leveraging Kafka Connect, you can streamline data integration, ensure scalability, and
simplify the process of connecting Kafka to various external systems. Always refer to the official
documentation for the connectors and systems you are using for detailed configuration options
and best practices.

11. Implement a simple word count stream processing application using Kafka Stream

let's create a simple word count stream processing application using Kafka Streams in Java. This
example assumes you have Kafka and Zookeeper running and a topic named "word-count-input"
where you'll be producing messages.

Dependencies:

Make sure you include the required dependencies in your project. For a Maven project, add the
following to your ` pom.xml` :

xml
<dependencies>
<!-- Kafka Streams Dependency -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.1</version> <!-- Replace with the latest version -->
</dependency>
</dependencies>

WordCountStreamApp.java:
29
CMR Engineering College CSD

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class WordCountStreamApp {

public static void main(String[] args) {


// Set up Kafka Streams properties
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ ID_CONFIG, "word-count-app");
config.put(StreamsConfig.BOOTSTRAP_ SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_ RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_ KEY_ SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_ SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(ProducerConfig.KEY_ SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_ SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());

// Build the Kafka Streams topology


StreamsBuilder builder = new StreamsBuilder();

// Read the input stream


KStream<String, String> textLines = builder.stream("word-count-input",
Consumed.with(Serdes.String(), Serdes.String()));

// Tokenize the input, map each word to a key, and count occurrences
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> {
String[] words = value.toLowerCase().split("\\W+");
return Arrays.asList(words);
})
.groupBy((key, word) -> word)
.count(Materialized.as("counts"));
30
CMR Engineering College CSD

// Send the result to a Kafka topic


wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(),
Serdes.Long()));

// Start the Kafka Streams application


KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

// Shutdown hook to handle graceful shutdown


Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

This Kafka Streams application reads from a topic named "word-count-input," tokenizes the
input lines, counts the occurrences of each word, and then writes the word counts to a topic
named "word-count-output."

Remember to replace "localhost:9092" with your actual Kafka bootstrap server address.

Compile and run this application, and make sure you have a Kafka topic named "word-count-
input" where you can produce messages.

./bin/kafka-console-producer.sh --topic word-count-input --bootstrap-server localhost:9092

After typing some sentences, you can consume the output from the "word-count-output" topic:
./bin/kafka-console-consumer.sh --topic word-count-output --from-beginning --bootstrap-
server localhost:9092 --property print.key=true --property
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

You should see the word counts being updated in real-time as new messages are produced to the
input topic.

This is a basic example, and you can extend and customize it based on your specific
requirements. Always refer to the Kafka Streams documentation for more advanced features and
configurations: [Kafka Streams Documentation](https://kafka.apache.org/documentation/
streams/).

31
CMR Engineering College CSD

12. Implement Kafka integration with the Hadoop ecosystem.

Integrating Kafka with the Hadoop ecosystem often involves connecting Kafka producers
and consumers with Hadoop tools like HDFS (Hadoop Distributed File System) and
Apache Hive. Below, I'll provide a general overview and examples for integrating Kafka
with HDFS and Hive.

1. Kafka to HDFS Integration:

Prerequisites:

1. **Hadoop Installation:**

- Ensure Hadoop is installed and running in your environment.

32
CMR Engineering College CSD

2. **Kafka Installation:**

- Have Kafka installed and running.

Integration Steps:

**a. Configure HDFS Sink Connector:**

- Download the Confluent HDFS Sink Connector JAR from [Confluent Hub](https://
www.confluent.io/hub/confluentinc/kafka-connect-hdfs).

- Place the JAR file in the ` plugin.path` directory specified in your Kafka Connect
properties file.

**b. Create HDFS Sink Connector Configuration:**

Create a configuration file for the HDFS Sink Connector (e.g., ` hdfs-sink-
config.properties` ):

name=hdfs-sink-connector

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

tasks.max=1

topics=my-topic

hdfs.url=hdfs://localhost:9000

flush.size=3

Adjust the ` topics` and ` hdfs.url` properties based on your Kafka topic and HDFS
configuration.

33
CMR Engineering College CSD

**c. Start Kafka Connect:**

Start Kafka Connect in standalone mode with the HDFS Sink Connector configuration:

./bin/connect-standalone.sh config/connect-standalone.properties hdfs-sink-


config.properties

This setup will write messages from the specified Kafka topic to HDFS.

2. Kafka to Hive Integration:

Prerequisites:

1. **Hadoop Installation:**

- Ensure Hadoop and Hive are installed and running in your environment.

2. **Kafka Installation:**

- Have Kafka installed and running.

Integration Steps:

**a. Configure Hive Sink Connector:**

- Download the Confluent Hive Sink Connector JAR from [Confluent Hub](https://
www.confluent.io/hub/confluentinc/kafka-connect-hive).

- Place the JAR file in the ` plugin.path` directory specified in your Kafka Connect
properties file.

**b. Create Hive Sink Connector Configuration:**

34
CMR Engineering College CSD

Create a configuration file for the Hive Sink Connector (e.g., ` hive-sink-config.properties` ):

name=hive-sink-connector

connector.class=io.confluent.connect.hive.HiveSinkConnector

tasks.max=1

topics=my-topic

hive.metastore.uris=thrift://localhost:9083

schema.compatibility=NONE

auto.create=true

Adjust the ` topics` and ` hive.metastore.uris` properties based on your Kafka topic and
Hive configuration.

**c. Start Kafka Connect:**

Start Kafka Connect in standalone mode with the Hive Sink Connector configuration:

./bin/connect-standalone.sh config/connect-standalone.properties hive-sink-


config.properties

This setup will write messages from the specified Kafka topic to Hive.

Important Notes:

1. **Serialization/Deserialization:**

- Ensure that your Kafka producers and consumers use compatible serializers/
deserializers with your data formats.

35
CMR Engineering College CSD

2. **Topic Configuration:**

- Adjust ` topics` in the sink connector configurations to match the Kafka topic you
want to integrate with.

3. **Hadoop/Hive Configuration:**

- Configure Hadoop and Hive appropriately based on your specific environment.

4. **Connector Versions:**

- Ensure that the versions of Kafka Connect connectors are compatible with your Kafka
and Hadoop/Hive versions.

5. **Security Considerations:**

- If your environment is secured, configure authentication and authorization for HDFS


and Hive accordingly.

Always refer to the official documentation for Kafka Connect and the specific
connectors you're using for the most accurate and detailed information:

- [Kafka Connect Documentation](https://docs.confluent.io/platform/current/connect/


index.html)

- [Confluent Hub](https://www.confluent.io/hub/)

- [HDFS Sink Connector Documentation](https://docs.confluent.io/platform/current/


connect/kafka-connect-hdfs/index.html)

- [Hive Sink Connector Documentation](https://docs.confluent.io/platform/current/


connect/kafka-connect-hive/index.html)

36

You might also like