Flink: Add passthroughRecords option to DynamicIcebergSink#15433
Flink: Add passthroughRecords option to DynamicIcebergSink#15433sqd wants to merge 1 commit intoapache:mainfrom
Conversation
When enabled, records are forwarded directly from the record generator to the writer using a forward edge instead of a hash edge. This allows Flink to chain the two operators, avoiding serialization/deserialization overhead and drastically increasing throughput in high-volume pipelines.
| if (passthroughRecords) { | ||
| if (!immediateUpdate) { | ||
| throw new UnsupportedOperationException( | ||
| "Immediate update must be enabled to pass through records"); | ||
| } | ||
| rowDataDataStreamSink = converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink")); | ||
| } else { |
There was a problem hiding this comment.
This will ignore DistributionMode and partitioning in DynamicRecord. I saw that you listed this in the docs, but I'm not sure we should diverge too much from the normal mode of operation. I think what we can do, is to add a new chained side output with an extra DynamicWriter for this quick path.
It may be worth adding a new DistributionMode. Currently NONE does a round-robin, which is slightly confusing, we could rename it to ROUND_ROBIN and use NONE for this direct path.
There was a problem hiding this comment.
How do we handle DistributionMode in the normal Sink?
We should be consistent
|
@sqd Could you share a bit more about your use case? Ignoring This approach might work if your input records are already correctly distributed. But any mistake there will lead to small files or skewed writes—fast for the writers, but potentially very costly for the readers. |
When enabled, records are forwarded directly from the record generator to the writer using a forward edge instead of a hash edge. This allows Flink to chain the two operators, avoiding serialization/deserialization overhead and drastically increasing throughput in high-volume pipelines.
Current topology:

Same pipeline, with the new change enabled:

Serdes of Flink RowData can be very expensive:
