Watermark & Windowing: How to Implement a Custom Watermark Strategy in Apache Flink
When working with Apache Flink, stream data processing is typically based on two types of time semantics: event time and processing time.
- Event Time: Derived from the timestamps in your source data, often from Kafka records.
- Processing Time: Corresponds to the system clock of the machine executing the Flink job.
The Problem with Idle Sources
Watermarks play a critical role in event-time-based stream processing by advancing the “event clock.” However, when a data source becomes idle (stops producing events temporarily), the watermark does not progress. This results in:
- Stalled computation: Processing windows remain open indefinitely, causing delays in output generation.
To solve this, we can implement a custom watermark strategy that ensures the watermark progresses even during idle periods. This guarantees timely window closures and uninterrupted stream processing.
Custom Watermark Strategy
Below is an example implementation of a custom watermark generator that handles both out-of-orderness and idle source detection.
This generator calculates watermarks using the following logic:
- It emits watermarks that lag behind the maximum observed event timestamp by a specified out-of-orderness duration.
- If the source has been idle for longer than a defined max idle duration, it progresses the watermark to ensure timely window closure.
package org.apache.flink.examples.eventtime;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;
import java.time.Duration;
public class CustomBoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
private long maxTimestamp;
private final long outOfOrdernessMillis;
private long lastMaxTimestampCalculatedAt;
private long sourceMaxIdlenessMillis;
private long lastEmittedWatermark = Long.MIN_VALUE;
public CustomBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness, Duration sourceIdleness) {
Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
Preconditions.checkNotNull(sourceIdleness, "sourceIdleness");
Preconditions.checkArgument(!sourceIdleness.isNegative(), "sourceIdleness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.sourceMaxIdlenessMillis = sourceIdleness.toMillis();
this.maxTimestamp = Long.MIN_VALUE + this.outOfOrdernessMillis + 1L;
}
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
if(eventTimestamp > maxTimestamp) {
maxTimestamp = eventTimestamp;
}
lastMaxTimestampCalculatedAt = System.currentTimeMillis();
}
public void onPeriodicEmit(WatermarkOutput output) {
long potentialWM = maxTimestamp - outOfOrdernessMillis - 1L;
if(potentialWM > lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
} else if ((System.currentTimeMillis() - lastMaxTimestampCalculatedAt) > sourceMaxIdlenessMillis) {
// Truncate maxTimestamp to the nearest 10-second interval and trails the watermark by outOfOrdernessMillis + 1 ms to close the window
potentialWM = ((maxTimestamp / 10000) * 10000) + outOfOrdernessMillis + 1L;
lastEmittedWatermark = Math.max(lastEmittedWatermark, potentialWM);
System.out.println("Increment watermark:"+ lastEmittedWatermark);
}
output.emitWatermark(new Watermark(this.lastEmittedWatermark));
}
}
Example code to use that CustomBoundedOutOfOrdernessWatermarks strategy:
package org.apache.flink.examples;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.datasource.IncrementalSourceWithIdlePeriod;
import org.apache.flink.examples.eventtime.CustomBoundedOutOfOrdernessWatermarks;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import java.time.Duration;
public class CustomWatermarkExample {
public static void main(String[] args) throws Exception {
// Create the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism (for simplicity in this example)
env.setParallelism(1);
// Add the source with an active period of 20 seconds and idle period of 50 seconds
DataStream<Tuple2<String, Long>> input = env.addSource(new IncrementalSourceWithIdlePeriod(20000, 50000));
// Assign timestamps and watermarks using the custom strategy
DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = input.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forGenerator(ctx ->
new CustomBoundedOutOfOrdernessWatermarks<>(
Duration.ofSeconds(10), // Max out-of-orderness
Duration.ofSeconds(5) // Idle source detection
)
).withTimestampAssigner((event, timestamp) -> event.f1) // Assign timestamps
);
// Process the data with a simple window operation
withTimestampsAndWatermarks
.keyBy(event -> event.f0) // Key by the event name (e.g., Event1, Event2)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) // Tumbling window of 5 seconds
.sum(1) // Summing up the timestamps for demonstration
.print(); // Print the result
// Execute the Flink job
env.execute("Custom Watermark Strategy Example");
}
}
Outcome
- Active Period: The source produces events regularly, and watermarks progress normally.
- Idle Period: Watermarks advance based on idle duration, allowing windows to close timely.
A complete example is available on CustomWatermarkExample.
This approach ensures reliable and consistent processing of event-time streams, even with intermittent source activity.