Real-time Data Processing with Apache Flink: A Step-by-Step Guide
In today’s world, real-time data processing is essential. From monitoring live website traffic to analyzing stock prices, companies need to process vast amounts of data quickly and efficiently.
In this guide, we’ll explore how to build a real-time data processing application using Flink, explaining key concepts and walking through practical examples.
What is Apache Flink, and Why Do We Need It?
Apache Flink is an open-source framework and distributed engine designed for stateful computations over data streams, both unbounded (endless) and bounded (finite). In simple terms, it can process data as it arrives (real-time) or in batches (large sets processed at once).
Here’s why Flink is useful:
- Real-time Analytics: Imagine tracking website activity as it happens or analyzing sensor data from IoT devices. Flink can process and respond to this data in real-time.
- Handling Big Data: Flink scales efficiently, making it a great fit for companies dealing with large datasets.
- Complex Event Processing: From detecting fraud in financial transactions to identifying patterns in real-time data, Flink can help find insights and anomalies as they occur.
Key Components of a Flink Application
Flink applications typically consist of three main components:
- Source: Where the data comes from (e.g., a Kafka topic, a database, or HDFS, Apache Cassandra, and ElasticSearch etc).
- Process: The logic that processes or transforms the data.
- Sink: Where the processed data is sent (e.g., another Kafka topic, database, HDFS, Apache Cassandra, and ElasticSearch etc).
- Flink provides two primary APIs: the DataStream API, which handles both bounded (finite) and unbounded (infinite) streams of data.
- Additionally, Flink offers the Table API, which is a SQL-like expression language for relational stream and batch processing, and a SQL API, which is semantically similar to the Table API and represents programs as SQL query expressions.
- Flink supports multiple languages like Java, Scala, Python, and SQL, making it flexible and easy to integrate into various environments.
To connect your application with these sources and sinks, Flink uses connectors. Here’s a quick link to explore the available connectors:
PyFlink Installation
- To use Flink’s Python API, you need to install PyFlink, which is a Python package that provides the DataStream and Table APIs for Python users.
- You can install PyFlink using pip:
pip install apache-flink
A Simple Flink Example: Incrementing IDs
Let’s dive into a simple Flink example where we generate some sample data using the datagen
connector, increment an ID value by 1, and print the result.
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Define the source table with a sequence generator for the ID field
table_env.execute_sql("""
CREATE TABLE source_table (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '10'
)
""")
# Define the sink table to print the results
table_env.execute_sql("""
CREATE TABLE sink_table (
id INT,
data STRING
) WITH (
'connector' = 'print'
)
""")
# Process: Increment the ID and insert the result into the sink table
table_env.execute_sql("""
INSERT INTO sink_table
SELECT id + 1, data FROM source_table
""").wait()
python simple_flink_example.py
Running this script prints the processed records in your terminal. You’ll see IDs incrementing by 1 and the output displayed.
In the example, even though the data generation is sequential (IDs from 1 to 10), when Flink processes these rows, the individual rows may be handled by different parallel tasks, leading to output being printed out of order.
Real-World Use Case: Stock Analysis with Apache Flink
In this use case, we’ll build a real-time stock analysis application that recommends buying or selling stocks based on real-time stock price data. Flink is ideal for this scenario because it processes streaming data efficiently with low latency.
Problem Definition:
We will be receiving real-time stock price updates (known as ticks) from a data stream (e.g., from Kafka or a financial data provider). Based on certain rules or logic, we’ll generate buy or sell signals for specific stocks.
For simplicity, let’s assume we generate a buy signal when the stock price falls by 5% within a short time window and a sell signal when it rises by 5%.
Components Involved:
- Source: Stock price data coming in real-time via a Kafka topic.
- Process: A simple moving window that computes the percentage change in stock prices.
- Sink: Output the buy or sell signals to another Kafka topic.
Implementation in Flink using Python
from pyflink.table import EnvironmentSettings, TableEnvironment
# Create a TableEnvironment in streaming mode, which is used for real-time data processing
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Add the Kafka connector JAR to the classpath so that the Flink job can interact with Kafka topics.
# Download connector jar from https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.2.0-1.19
table_env.get_config().set("pipeline.jars", "file:///path_to_connector/flink-sql-connector-kafka-3.2.0-1.19.jar")
table_env.get_config().get_configuration().set_integer('parallelism.default', 1)
table_env.get_config().get_configuration().set_integer('taskmanager.numberOfTaskSlots', 4)
# Define the source table: Simulating stock price ticks from Kafka
table_env.execute_sql("""
CREATE TABLE stock_prices (
stock_symbol STRING,
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'stock_prices_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'json.timestamp-format.standard' = 'ISO-8601',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""")
# Define the sink table: Output buy/sell signals to Kafka
table_env.execute_sql("""
CREATE TABLE stock_signals (
stock_symbol STRING,
signal STRING, -- 'BUY' or 'SELL'
price DOUBLE,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'stock_signals_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'json.timestamp-format.standard' = 'ISO-8601',
'format' = 'json'
)
""")
# Processing Logic: Generate buy/sell signals based on percentage price changes
table_env.execute_sql("""
INSERT INTO stock_signals
SELECT
stock_symbol,
CASE
WHEN (price / LAG(price, 1) OVER (PARTITION BY stock_symbol ORDER BY event_time) < 0.95) THEN 'BUY'
WHEN (price / LAG(price, 1) OVER (PARTITION BY stock_symbol ORDER BY event_time) > 1.05) THEN 'SELL'
ELSE NULL
END as signal,
price,
event_time
FROM stock_prices
WHERE signal IS NOT NULL
""")
Stocks Prices:
{
"stock_symbol": "AAPL",
"price": 145,
"event_time": "2024-10-06T12:34:56"
}
{
"stock_symbol": "AAPL",
"price": 150,
"event_time": "2024-10-06T12:35:56"
}
{
"stock_symbol": "AAPL",
"price": 158,
"event_time": "2024-10-06T12:54:56"
}
Output Signals
{
"stock_symbol": "AAPL",
"signal": "SELL",
"price": "158.0",
"event_time": "2024-10-06T12:54:56"
}
In conclusion, Apache Flink is a highly powerful and flexible stream processing framework that excels in managing complex, real-time data analytics and processing. Its unified approach to both stream and batch processing, combined with advanced stateful computations, makes it indispensable for applications requiring low-latency, high-throughput performance. As Flink’s community continues to grow and its capabilities expand.
🔗 All the example code can be found in this repository: https://github.com/saurabh47/apache-flink-examples/blob/main/py-flink/04_simple_stocks_single.py