Skip to content

Comments

Flink: Add passthroughRecords option to DynamicIcebergSink#15433

Open
sqd wants to merge 1 commit intoapache:mainfrom
sqd:oss_passthrough_records
Open

Flink: Add passthroughRecords option to DynamicIcebergSink#15433
sqd wants to merge 1 commit intoapache:mainfrom
sqd:oss_passthrough_records

Conversation

@sqd
Copy link

@sqd sqd commented Feb 24, 2026

When enabled, records are forwarded directly from the record generator to the writer using a forward edge instead of a hash edge. This allows Flink to chain the two operators, avoiding serialization/deserialization overhead and drastically increasing throughput in high-volume pipelines.

Current topology:
image

Same pipeline, with the new change enabled:
image

Serdes of Flink RowData can be very expensive:
image

When enabled, records are forwarded directly from the record generator
to the writer using a forward edge instead of a hash edge. This allows
Flink to chain the two operators, avoiding serialization/deserialization
overhead and drastically increasing throughput in high-volume pipelines.
@sqd
Copy link
Author

sqd commented Feb 24, 2026

@mxm @pvary I would appreciate if you could please take a look. I'm happy to provide any detail/context. I have tested this on an internal pipeline which processes around 10TB~20TB of data per hour, where this change has drastically reduced the resources usage and increased output.

Comment on lines +451 to +457
if (passthroughRecords) {
if (!immediateUpdate) {
throw new UnsupportedOperationException(
"Immediate update must be enabled to pass through records");
}
rowDataDataStreamSink = converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will ignore DistributionMode and partitioning in DynamicRecord. I saw that you listed this in the docs, but I'm not sure we should diverge too much from the normal mode of operation. I think what we can do, is to add a new chained side output with an extra DynamicWriter for this quick path.

It may be worth adding a new DistributionMode. Currently NONE does a round-robin, which is slightly confusing, we could rename it to ROUND_ROBIN and use NONE for this direct path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle DistributionMode in the normal Sink?
We should be consistent

@pvary
Copy link
Contributor

pvary commented Feb 25, 2026

@​sqd Could you share a bit more about your use case? Ignoring DistributionMode and chaining directly to writers feels quite risky to me, even if the performance gains are tempting.

This approach might work if your input records are already correctly distributed. But any mistake there will lead to small files or skewed writes—fast for the writers, but potentially very costly for the readers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants