Recently I started working in a Data Engineering project where I was assigned the task of learning about Apache Kafka and setting up an infrastructure to consume different feeds in real time. This was an amazing challenge to my skills, mainly because I’m not very experienced but also because my background is more related to Machine Learning than Data. So, as soon as I had set up everything, I was eager to show a bit on how I could manage to create a pipeline using Kafka, and maybe, be helpful to others that may go through for the same process in the future.
The idea is to show how to implement an end-to-end pipeline using Python and Amazon Managed Streaming for Apache Kafka (MSK).
Overall, streaming data empowers machine learning applications to operate in real-time, handle massive volumes of data, adapt to dynamic environments, detect anomalies, continually learn and improve, and provide personalized experiences. Its integration with machine learning enables the development of intelligent systems capable of making timely, data-driven decisions in today's fast-paced world.
So, what 's Kafka?
The best way to introduce it is by citing the Confluent web page.
Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.[...] Kafka provides a system with high throughput, scalability, low latency, storage and availability, very useful in the world that we are living on, as real streaming data is fundamental today, for example, many apps that we use everyday use Kafka or have used Kafka, like LinkedIn, Uber, Spotify, and others.
Kafka provides three main functions to its users:
1. To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
2. To store streams of events durably and reliably for as long as you want.
3. To process streams of events as they occur or retrospectively.
Now that we have an understanding of Kafka, let's delve into the pipeline we aim to establish. The primary objective of the pipeline is to consume data from a real-time streaming feed. Although we won't be building a specific application using this data in this article, you can utilize the pipeline to send data to a database, train machine learning models, or transform it for ETL processes.
To set up MSK, we will follow the tutorial provided by AWS, making a few modifications for our purposes.
In the "Create a cluster" section, I suggest:
Additionally, in the "Security settings" section:
Finally, in the next part, we will produce and consume messages from this topic.
Having a Stream API is crucial in modern data-driven environments for several reasons.
Overall, having a Stream API empowers organizations to leverage real-time data, make data-driven decisions, and gain a competitive advantage in today's fast-paced and dynamic business landscape.
To simulate a real-time streaming API we are going to use the Flask library in python. Code snippet below:
The Flask application is set up with an API endpoint that can be accessed to retrieve the streaming data. The data generation is decoupled from the API endpoint, allowing it to generate data independently in the background.
The generate_data() function is responsible for continuously generating data. It uses a timestamp to track the time difference in seconds since the start of data generation. Each data point consists of a counter and the timestamp. The generated data is stored in a queue for efficient retrieval.
The API endpoint /stream is defined to stream the data from the queue. When a client accesses this endpoint, the streaming response is initiated. Initially, it yields any existing data from the queue. Then, it enters a loop where it periodically checks for new data. If new data is available, it yields the new data and clears the queue. This approach ensures that both existing and real-time data can be consumed by the client.
The real-time streaming API code is now complete, providing the capability to generate and stream data. To utilize this streaming functionality effectively, we need to ingest the generated data into a Kafka cluster. This involves setting up a Kafka producer, which will consume the data from the generator and publish it to the Kafka cluster. By integrating the Kafka producer into our streaming API code, we establish a seamless data flow from the generator to the Kafka infrastructure, enabling real-time data ingestion for further processing and analysis.
In the following code snippet, we will utilize the confluent_kafka library, as mentioned earlier, to establish a connection with a Kafka cluster and set up a Kafka producer. The producer will play a crucial role in ingesting the data generated by the streaming API and sending it to the Kafka cluster for further processing.
The main function initiates a GET request to the streaming API endpoint, enabling the streaming functionality. As the streaming response is received, the code iterates over each line of data and sends it to the Kafka cluster using the Kafka producer. The producer is configured with the necessary parameters retrieved from environment variables.
To consume data from the Kafka cluster, we can utilize the Kafka Consumer API directly from the command-line interface (CLI).
To initiate the consumer, you can use the following command on the Kafka installation folder:
../bin/kafka-console-consumer.sh --bootstrap-server <bootstrap.servers> --consumer.config client.properties --topic msk-demo-topic --from-beginning
Executing this command will retrieve and display all the messages that have been produced and stored within the Kafka cluster, enabling you to review the data and perform any necessary analysis or processing.
To begin producing data, we will need to copy both the API code and the producer code into your EC2 instance. Run them simultaneously in separate terminals. One terminal will show the API running, while the other can be used to add print statements for logging purposes. This allows us to monitor the code and ensure everything is running smoothly.
Within the producer code, you can enhance the delivery report function by implementing the else clause. This allows you to print a message for each successful event. Be aware that this may result in a flood of logs, but it can be useful for verifying that everything is functioning correctly.
These configuration details will enable the producer to establish a secure connection with the Kafka cluster.
By following these steps, we can successfully set up and run the API, the producer and the consumer code on our EC2 instance. Monitoring the logs and verifying the successful delivery of events will provide reassurance that the process is functioning as expected.
In the API image, we can observe the running code of api.py and the requests being sent to the /stream endpoint. When it comes to the producer, no visible output will be displayed unless there is an error with the events being processed. On the other hand, in the consumer, we will be able to see all the events that were produced and stored in the Kafka cluster.
To verify the successful transmission of data to the Kafka cluster, we can leverage Cloudwatch Dashboards for monitoring purposes. By creating a dashboard, we gain access to a variety of metrics that allow us to track different aspects of our topics, brokers, consumers, connectors, and more.
Among the useful metrics available, two stand out: BytesInPerSec and BytesOutPerSec. These metrics provide insights into the data flow, enabling us to assess the health of both the producer and consumer components.
For instance, by monitoring BytesInPerSec, we can confirm whether the producer is functioning correctly and sending data to the cluster as expected. Similarly, monitoring BytesOutPerSec helps us determine if the consumers are processing the data effectively.
The provided image serves as an example, showcasing the visualization of BytesInPerSec, ByterOutPerSec and EventsInPerSec with a six broker cluster. By leveraging Cloudwatch Dashboards and monitoring the appropriate metrics, we can gain valuable insights into the performance and behavior of your Kafka ecosystem.
The pipeline we created enables the consumption of real-time streaming data from a variety of sources. Although we did not build a specific application in this article, the pipeline can be extended to send data to databases, train machine learning models, or perform ETL processes.
Overall, this project has showcased the importance and value of streaming data in the modern data-driven landscape. Streaming data enables organizations to leverage real-time insights, make data-driven decisions, and gain a competitive edge. By harnessing the power of Apache Kafka and the capabilities of the pipeline we established, organizations can build intelligent systems that adapt, learn, and process data in real time to drive innovation and success.