Data Deduplication in Real-Time with Apache Flink’s DataStream API
In real-time data processing, ensuring clean, accurate data is essential. Duplicates in data streams can disrupt analytics, inflate metrics, and lead to faulty insights. In this post, I’ll explore some simple but effective techniques for removing duplicate records in a real-time data stream using Apache Flink’s DataStream API.
While Flink’s documentation provides examples of deduplication using Flink SQL found here, the DataStream API offers added flexibility, especially for complex or custom use cases where SQL alone might not suffice.
Example 1: Removing Duplicate Orders
In this example, we receive a stream of incoming orders, each identified by a unique orderId
. To effectively remove duplicates, we use the keyBy
function to partition the stream by orderId
, allowing us to process each unique ID separately.
Within the process
function, we maintain a ValueState
variable called previousOrderState
to store the last seen order for each orderId
. This stateful mechanism is crucial for tracking which orders have already been processed.
When a new order arrives, we first check previousOrderState
. If it’s empty, this means it’s the first time we’ve encountered this orderId
, so we emit the order and update the state accordingly.
Conversely, if an order with the same orderId
already exists in previousOrderState
, we skip it, effectively filtering out duplicates. This approach ensures that only unique orders pass through the pipeline.
Note: In this example, we store the previous order in
ValueState
. Alternatively, we could use a boolean flag to track duplicates. However, by storing the full previous order, we enable more complex business logic where we can compare the details of the previous order with the current one if needed.
List<Order> ordersList = Arrays.asList(
new Order(1, "John","apple"),
new Order(2, "Chris","tomato"),
new Order(3, "Ethan","mango"),
new Order(1, "John","apple"), // Duplicate
new Order(4, "Chris","banana"),
new Order(3, "Ethan","mango") // Duplicate
);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Order> ordersDataStream = env.fromData(ordersList);
DataStream<Order> unqiueOrdersDataStream = ordersDataStream
.keyBy(order -> order.orderId)
.process(new KeyedProcessFunction<>() {
private transient ValueState<Order> previousOrderState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Order> previousIbedRecStateDescriptor = new ValueStateDescriptor<>("previousOrderState", TypeInformation.of(Order.class));
previousOrderState = getRuntimeContext().getState(previousIbedRecStateDescriptor);
}
@Override
public void processElement(Order order, KeyedProcessFunction<Integer, Order, Order>.Context context, Collector<Order> collector) throws Exception {
Order previousOrder = previousOrderState.value();
if (previousOrder == null) {
collector.collect(order);
previousOrderState.update(order);
} else {
if (previousOrder.orderId != order.orderId) {
collector.collect(order);
previousOrderState.update(order);
}
}
}
});
unqiueOrdersDataStream.print();
env.execute("Deduplication Job");
Managing Memory with State Cleanup
In a high-volume scenario, ValueState
will accumulate large numbers of order IDs, potentially leading to memory overflow. To avoid this, we can leverage a cleanup mechanism using a timer, which clears the state periodically. For instance, if we assume duplicates only occur within a 10-minute interval, we can set up a timer to clean the ValueState
every 10 minutes.
Here’s how to add a cleanup timer to the deduplication logic:
private transient ValueState<Long> timerState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Order> previousIbedRecStateDescriptor = new ValueStateDescriptor<>("previousOrderState", TypeInformation.of(Order.class));
previousOrderState = getRuntimeContext().getState(previousIbedRecStateDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(Order order, KeyedProcessFunction<Integer, Order, Order>.Context context, Collector<Order> collector) throws Exception {
Order previousOrder = previousOrderState.value();
if (previousOrder == null) {
collector.collect(order);
previousOrderState.update(order);
} else {
if (previousOrder.orderId != order.orderId) {
collector.collect(order);
previousOrderState.update(order);
}
}
// set the timer and timer state
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE * 10;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Integer, Order, Order>.OnTimerContext ctx, Collector<Order> out) {
// remove previous state after 10 minute
System.out.println("Timer fired!");
timerState.clear();
previousOrderState.clear();
}
Example 2: Removing Duplicate Orders & Side Outputing Duplicates
In this example, we enhance the deduplication logic by adding a side output stream to capture duplicate records separately. We define an OutputTag
(duplicateOrdersSideOutputTag
) for this purpose, which allows us to label and route duplicate orders.
When a duplicate order is detected, instead of discarding it, we send it to the side output using context.output(duplicateOrdersSideOutputTag, order)
. This lets us retrieve all duplicates from the main stream using getSideOutput(duplicateOrdersSideOutputTag)
, allowing for dedicated processing or logging of these records without disrupting the flow of unique orders.
final OutputTag<Order> duplicateOrdersSideOutputTag = new OutputTag<Order>("duplicate-orders-side-output"){};
List<Order> ordersList = Arrays.asList(
new Order(1, "John","apple"),
new Order(2, "Chris","tomato"),
new Order(3, "Ethan","mango"),
new Order(1, "John","apple"), // Duplicate
new Order(4, "Chris","banana"),
new Order(3, "Ethan","mango") // Duplicate
);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Order> ordersDataStream = env.fromData(ordersList);
SingleOutputStreamOperator<Order> uniqueOrdersDataStream = ordersDataStream
.keyBy(order -> order.orderId)
.process(new KeyedProcessFunction<>() {
private transient ValueState<Order> previousOrderState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Order> previousIbedRecStateDescriptor = new ValueStateDescriptor<>("previousOrderState", TypeInformation.of(Order.class));
previousOrderState = getRuntimeContext().getState(previousIbedRecStateDescriptor);
}
@Override
public void processElement(Order order, KeyedProcessFunction<Integer, Order, Order>.Context context, Collector<Order> collector) throws Exception {
Order previousOrder = previousOrderState.value();
if (previousOrder == null) {
collector.collect(order);
previousOrderState.update(order);
} else {
if (previousOrder.orderId != order.orderId) {
collector.collect(order);
previousOrderState.update(order);
} else {
// Send duplicate order to duplicatesOrders side-output
context.output(duplicateOrdersSideOutputTag, order);
}
}
}
});
DataStream<Order> duplicatesOrdersDataStream = uniqueOrdersDataStream.getSideOutput(duplicateOrdersSideOutputTag);
duplicatesOrdersDataStream.print("Duplicate Order");
uniqueOrdersDataStream.print("Unique Order");
env.execute("Deduplication Job");
Wrapping Up
By implementing deduplication in Apache Flink with stateful processing and timers, you can handle duplicates effectively, even in high-throughput, real-time data streams. This approach is both flexible and scalable, making it ideal for various real-time data processing applications.
🔗 All the example code can be found in this repository: https://github.com/saurabh47/apache-flink-examples/tree/main/java/flink-examples/src/main/java/org/apache/flink/examples