Watermark & Windowing: How to Implement a Custom Watermark Strategy in Apache Flink

Saurabh Gangamwar
3 min read4 days ago

--

When working with Apache Flink, stream data processing is typically based on two types of time semantics: event time and processing time.

  • Event Time: Derived from the timestamps in your source data, often from Kafka records.
  • Processing Time: Corresponds to the system clock of the machine executing the Flink job.

The Problem with Idle Sources

Watermarks play a critical role in event-time-based stream processing by advancing the “event clock.” However, when a data source becomes idle (stops producing events temporarily), the watermark does not progress. This results in:

  • Stalled computation: Processing windows remain open indefinitely, causing delays in output generation.

To solve this, we can implement a custom watermark strategy that ensures the watermark progresses even during idle periods. This guarantees timely window closures and uninterrupted stream processing.

Custom Watermark Strategy

Below is an example implementation of a custom watermark generator that handles both out-of-orderness and idle source detection.

This generator calculates watermarks using the following logic:

  1. It emits watermarks that lag behind the maximum observed event timestamp by a specified out-of-orderness duration.
  2. If the source has been idle for longer than a defined max idle duration, it progresses the watermark to ensure timely window closure.
package org.apache.flink.examples.eventtime;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;

import java.time.Duration;


public class CustomBoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
private long maxTimestamp;
private final long outOfOrdernessMillis;
private long lastMaxTimestampCalculatedAt;
private long sourceMaxIdlenessMillis;

private long lastEmittedWatermark = Long.MIN_VALUE;

public CustomBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness, Duration sourceIdleness) {
Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
Preconditions.checkNotNull(sourceIdleness, "sourceIdleness");
Preconditions.checkArgument(!sourceIdleness.isNegative(), "sourceIdleness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.sourceMaxIdlenessMillis = sourceIdleness.toMillis();
this.maxTimestamp = Long.MIN_VALUE + this.outOfOrdernessMillis + 1L;
}

public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
if(eventTimestamp > maxTimestamp) {
maxTimestamp = eventTimestamp;
}
lastMaxTimestampCalculatedAt = System.currentTimeMillis();
}

public void onPeriodicEmit(WatermarkOutput output) {
long potentialWM = maxTimestamp - outOfOrdernessMillis - 1L;


if(potentialWM > lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
} else if ((System.currentTimeMillis() - lastMaxTimestampCalculatedAt) > sourceMaxIdlenessMillis) {
// Truncate maxTimestamp to the nearest 10-second interval and trails the watermark by outOfOrdernessMillis + 1 ms to close the window
potentialWM = ((maxTimestamp / 10000) * 10000) + outOfOrdernessMillis + 1L;
lastEmittedWatermark = Math.max(lastEmittedWatermark, potentialWM);
System.out.println("Increment watermark:"+ lastEmittedWatermark);
}
output.emitWatermark(new Watermark(this.lastEmittedWatermark));
}
}

Example code to use that CustomBoundedOutOfOrdernessWatermarks strategy:

package org.apache.flink.examples;


import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.datasource.IncrementalSourceWithIdlePeriod;
import org.apache.flink.examples.eventtime.CustomBoundedOutOfOrdernessWatermarks;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import java.time.Duration;


public class CustomWatermarkExample {
public static void main(String[] args) throws Exception {
// Create the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Set parallelism (for simplicity in this example)
env.setParallelism(1);

// Add the source with an active period of 20 seconds and idle period of 50 seconds
DataStream<Tuple2<String, Long>> input = env.addSource(new IncrementalSourceWithIdlePeriod(20000, 50000));

// Assign timestamps and watermarks using the custom strategy
DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = input.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forGenerator(ctx ->
new CustomBoundedOutOfOrdernessWatermarks<>(
Duration.ofSeconds(10), // Max out-of-orderness
Duration.ofSeconds(5) // Idle source detection
)
).withTimestampAssigner((event, timestamp) -> event.f1) // Assign timestamps
);

// Process the data with a simple window operation
withTimestampsAndWatermarks
.keyBy(event -> event.f0) // Key by the event name (e.g., Event1, Event2)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) // Tumbling window of 5 seconds
.sum(1) // Summing up the timestamps for demonstration
.print(); // Print the result

// Execute the Flink job
env.execute("Custom Watermark Strategy Example");
}
}
Records got processd becuasee of watermark got progressed.

Outcome

  • Active Period: The source produces events regularly, and watermarks progress normally.
  • Idle Period: Watermarks advance based on idle duration, allowing windows to close timely.

A complete example is available on CustomWatermarkExample.

This approach ensures reliable and consistent processing of event-time streams, even with intermittent source activity.

--

--

Saurabh Gangamwar
Saurabh Gangamwar

Written by Saurabh Gangamwar

Full stack developer @Stryker. Loves building SAAS products using @angular ❤ @nestJs 🚀 @mysql

No responses yet