Data Ingestion from Kafka to MySQL using Python

In today’s data-driven world, real-time data processing is crucial for businesses to make informed decisions and stay competitive. Apache Kafka has emerged as a popular choice for building real-time data pipelines due to its distributed and fault-tolerant nature. Meanwhile, MySQL remains a widely used relational database for storing and managing structured data. Combining these two technologies, we can create a robust solution for ingesting data from Kafka and inserting it into MySQL using Python.

Understanding the Components

Before we dive into the code, let’s understand the components involved in this process:

  1. Kafka: Apache Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of records, store them in a fault-tolerant manner, and process them in real-time. Kafka uses topics to categorize data streams, and producers write data to these topics, which can be consumed by consumers.
  2. Python: We will use Python, a versatile and widely adopted programming language, to interact with both Kafka and MySQL. The confluent-kafka-python library will help us consume data from Kafka, while the mysql-connector-python library will allow us to interact with MySQL.
  3. MySQL: MySQL is an open-source relational database management system known for its speed and reliability. We will store the data ingested from Kafka into a MySQL database table.

Setting Up the Environment

To get started, make sure you have Kafka and MySQL installed and running. You’ll also need Python installed on your system.

Next, install the required Python libraries:

pip install confluent-kafka
pip install mysql-connector-python

Writing Python Code

Let’s create a Python script to ingest data from Kafka and insert it into MySQL. Below is a step-by-step guide:

from confluent_kafka import Consumer, KafkaError
import mysql.connector

# Kafka consumer configuration
conf = {
    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker(s)
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest'  # Start from the beginning of the topic
}

# Create a Kafka consumer
consumer = Consumer(conf)

# Subscribe to a Kafka topic
consumer.subscribe(['my-topic'])  # Replace with your Kafka topic name

# MySQL database connection configuration
db_config = {
    'host': 'localhost',  # Replace with your MySQL server hostname
    'user': 'username',  # Replace with your MySQL username
    'password': 'password',  # Replace with your MySQL password
    'database': 'my_database'  # Replace with your database name
}

# Create a MySQL database connection
db_connection = mysql.connector.connect(**db_config)
cursor = db_connection.cursor()

# Main data ingestion loop
while True:
    msg = consumer.poll(1.0)  # Poll for Kafka messages
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Reached end of partition')
        else:
            print('Error while polling for messages: {}'.format(msg.error()))
    else:
        # Insert the Kafka message value into MySQL
        data = msg.value().decode('utf-8')  # Assuming data is in UTF-8 encoding
        insert_query = "INSERT INTO my_table (data_column) VALUES (%s)"
        cursor.execute(insert_query, (data,))
        db_connection.commit()
        print('Inserted: {}'.format(data))

# Clean up
cursor.close()
db_connection.close()

In this script:

  • We configure the Kafka consumer to connect to your Kafka broker and subscribe to a specific topic.
  • We set up the MySQL database connection using the provided credentials and specify the database and table where we want to insert the data.
  • Inside the main loop, we continuously poll for messages from Kafka. When a message is received, we decode it (assuming it’s in UTF-8 encoding) and insert it into the MySQL database table.
  • We commit the transaction in the database, ensuring data integrity.

Running the Code

To run the script, simply execute it using Python:

python kafka_to_mysql.py

Ensure that your Kafka topic exists and has data to consume. As messages are ingested from Kafka, you will see them being inserted into your MySQL database.

Conclusion

Ingesting data from Kafka and inserting it into MySQL using Python provides a powerful way to build real-time data pipelines for your applications. This approach allows you to process and analyze data in real-time, enabling faster and more informed decision-making. By following the steps and code provided in this article, you can get started on your journey to building robust data pipelines with Kafka and MySQL.