Data Deduplication in Real-Time with Apache Flink’s DataStream API

Saurabh Gangamwar
5 min readOct 27, 2024

--

In real-time data processing, ensuring clean, accurate data is essential. Duplicates in data streams can disrupt analytics, inflate metrics, and lead to faulty insights. In this post, I’ll explore some simple but effective techniques for removing duplicate records in a real-time data stream using Apache Flink’s DataStream API.

While Flink’s documentation provides examples of deduplication using Flink SQL found here, the DataStream API offers added flexibility, especially for complex or custom use cases where SQL alone might not suffice.

Example 1: Removing Duplicate Orders

In this example, we receive a stream of incoming orders, each identified by a unique orderId. To effectively remove duplicates, we use the keyBy function to partition the stream by orderId, allowing us to process each unique ID separately.

Within the process function, we maintain a ValueState variable called previousOrderState to store the last seen order for each orderId. This stateful mechanism is crucial for tracking which orders have already been processed.
When a new order arrives, we first check previousOrderState. If it’s empty, this means it’s the first time we’ve encountered this orderId, so we emit the order and update the state accordingly.

Conversely, if an order with the same orderId already exists in previousOrderState, we skip it, effectively filtering out duplicates. This approach ensures that only unique orders pass through the pipeline.

Note: In this example, we store the previous order in ValueState. Alternatively, we could use a boolean flag to track duplicates. However, by storing the full previous order, we enable more complex business logic where we can compare the details of the previous order with the current one if needed.

List<Order> ordersList = Arrays.asList(
new Order(1, "John","apple"),
new Order(2, "Chris","tomato"),
new Order(3, "Ethan","mango"),
new Order(1, "John","apple"), // Duplicate
new Order(4, "Chris","banana"),
new Order(3, "Ethan","mango") // Duplicate
);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Order> ordersDataStream = env.fromData(ordersList);

DataStream<Order> unqiueOrdersDataStream = ordersDataStream
.keyBy(order -> order.orderId)
.process(new KeyedProcessFunction<>() {
private transient ValueState<Order> previousOrderState;

@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Order> previousIbedRecStateDescriptor = new ValueStateDescriptor<>("previousOrderState", TypeInformation.of(Order.class));
previousOrderState = getRuntimeContext().getState(previousIbedRecStateDescriptor);
}

@Override
public void processElement(Order order, KeyedProcessFunction<Integer, Order, Order>.Context context, Collector<Order> collector) throws Exception {
Order previousOrder = previousOrderState.value();
if (previousOrder == null) {
collector.collect(order);
previousOrderState.update(order);
} else {
if (previousOrder.orderId != order.orderId) {
collector.collect(order);
previousOrderState.update(order);
}
}
}
});

unqiueOrdersDataStream.print();

env.execute("Deduplication Job");
Output

Managing Memory with State Cleanup

In a high-volume scenario, ValueState will accumulate large numbers of order IDs, potentially leading to memory overflow. To avoid this, we can leverage a cleanup mechanism using a timer, which clears the state periodically. For instance, if we assume duplicates only occur within a 10-minute interval, we can set up a timer to clean the ValueState every 10 minutes.

Here’s how to add a cleanup timer to the deduplication logic:

private transient ValueState<Long> timerState;

@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Order> previousIbedRecStateDescriptor = new ValueStateDescriptor<>("previousOrderState", TypeInformation.of(Order.class));
previousOrderState = getRuntimeContext().getState(previousIbedRecStateDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(Order order, KeyedProcessFunction<Integer, Order, Order>.Context context, Collector<Order> collector) throws Exception {
Order previousOrder = previousOrderState.value();
if (previousOrder == null) {
collector.collect(order);
previousOrderState.update(order);
} else {
if (previousOrder.orderId != order.orderId) {
collector.collect(order);
previousOrderState.update(order);
}
}

// set the timer and timer state
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE * 10;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Integer, Order, Order>.OnTimerContext ctx, Collector<Order> out) {
// remove previous state after 10 minute
System.out.println("Timer fired!");
timerState.clear();
previousOrderState.clear();
}

Example 2: Removing Duplicate Orders & Side Outputing Duplicates

In this example, we enhance the deduplication logic by adding a side output stream to capture duplicate records separately. We define an OutputTag (duplicateOrdersSideOutputTag) for this purpose, which allows us to label and route duplicate orders.

When a duplicate order is detected, instead of discarding it, we send it to the side output using context.output(duplicateOrdersSideOutputTag, order). This lets us retrieve all duplicates from the main stream using getSideOutput(duplicateOrdersSideOutputTag), allowing for dedicated processing or logging of these records without disrupting the flow of unique orders.

final OutputTag<Order> duplicateOrdersSideOutputTag = new OutputTag<Order>("duplicate-orders-side-output"){};

List<Order> ordersList = Arrays.asList(
new Order(1, "John","apple"),
new Order(2, "Chris","tomato"),
new Order(3, "Ethan","mango"),
new Order(1, "John","apple"), // Duplicate
new Order(4, "Chris","banana"),
new Order(3, "Ethan","mango") // Duplicate
);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Order> ordersDataStream = env.fromData(ordersList);

SingleOutputStreamOperator<Order> uniqueOrdersDataStream = ordersDataStream
.keyBy(order -> order.orderId)
.process(new KeyedProcessFunction<>() {
private transient ValueState<Order> previousOrderState;

@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Order> previousIbedRecStateDescriptor = new ValueStateDescriptor<>("previousOrderState", TypeInformation.of(Order.class));
previousOrderState = getRuntimeContext().getState(previousIbedRecStateDescriptor);
}

@Override
public void processElement(Order order, KeyedProcessFunction<Integer, Order, Order>.Context context, Collector<Order> collector) throws Exception {
Order previousOrder = previousOrderState.value();
if (previousOrder == null) {
collector.collect(order);
previousOrderState.update(order);
} else {
if (previousOrder.orderId != order.orderId) {
collector.collect(order);
previousOrderState.update(order);
} else {
// Send duplicate order to duplicatesOrders side-output
context.output(duplicateOrdersSideOutputTag, order);
}
}
}
});

DataStream<Order> duplicatesOrdersDataStream = uniqueOrdersDataStream.getSideOutput(duplicateOrdersSideOutputTag);

duplicatesOrdersDataStream.print("Duplicate Order");

uniqueOrdersDataStream.print("Unique Order");

env.execute("Deduplication Job");
output

Wrapping Up

By implementing deduplication in Apache Flink with stateful processing and timers, you can handle duplicates effectively, even in high-throughput, real-time data streams. This approach is both flexible and scalable, making it ideal for various real-time data processing applications.

🔗 All the example code can be found in this repository: https://github.com/saurabh47/apache-flink-examples/tree/main/java/flink-examples/src/main/java/org/apache/flink/examples

--

--

Saurabh Gangamwar
Saurabh Gangamwar

Written by Saurabh Gangamwar

Full stack developer @Stryker. Loves building SAAS products using @angular ❤ @nestJs 🚀 @mysql

No responses yet