Skip to content

spark: Don't use table FileIO for checkpointing files#15239

Open
c2zwdjnlcg wants to merge 1 commit intoapache:mainfrom
c2zwdjnlcg:fix-checkpoint-fs-impl
Open

spark: Don't use table FileIO for checkpointing files#15239
c2zwdjnlcg wants to merge 1 commit intoapache:mainfrom
c2zwdjnlcg:fix-checkpoint-fs-impl

Conversation

@c2zwdjnlcg
Copy link

Fixes: #14762

@github-actions github-actions bot added the spark label Feb 5, 2026
@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from f994146 to 13f02a1 Compare February 19, 2026 23:44
@c2zwdjnlcg
Copy link
Author

@nastra could you take a look at this PR and see if you are aligned with separating the checkpoint IO from the table IO?

@nastra
Copy link
Contributor

nastra commented Feb 20, 2026

@c2zwdjnlcg I currently don't have any cycles to review this. Maybe @huaxingao, @RussellSpitzer or @aokolnychyi have some time to review it

@RussellSpitzer
Copy link
Member

First look, please do larger changes like this only on a single module first, then backport to the others in a follow up. It makes reviewing a bit more difficult to have duplicated changes. Taking a pass in depth now

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than changing the IO here to something a user wouldn't expect, I think it's probably better for us to change InitialOffsetStore itself directly.

Since Spark Checkpoints are expected to go through HadoopFS we should probably just use Hadoop FileSystem directly instead of using Iceberg FileIO class. This of course is a breaking change so we probably also need to gate this at least initially.

Maybe build two OffsetStores with the same interface and allow users to opt to Hadoop based with a spark read conf property?

interface InitialOffsetStore {
  StreamingOffset initialOffset();

  class TableIOOffsetStore implements InitialOffsetStore {
  }
  class HadoopOffsetStore implements InitialOffsetStore {
}

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch 3 times, most recently from 257b264 to 0f5ead4 Compare February 24, 2026 23:37
@c2zwdjnlcg
Copy link
Author

@RussellSpitzer Thanks for the review.

please do larger changes like this only on a single module first, then backport to the others in a follow up.

Sorry about that, will keep in mind for next time

Hopefully this is more inline with what you were thinking.

I named the setting streaming-checkpoint-use-table-io. If you are generally ok with this approach and name I'll also add documentation to this PR.

Comment on lines 381 to 384
protected StreamingOffset readOffset() {
try (FSDataInputStream inputStream = fileSystem.open(initialOffsetPath);
InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
String json = CharStreams.toString(reader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already have a StreamingOffset.fromJson(InputStream in) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final Long fromTimestamp;

InitialOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) {
BaseOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be "long fromTimestamp"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was preserved from the previous implementation but changed, i don't think it will do any harm.


// Controls whether streaming checkpoint operations use table FileIO or Hadoop FileSystem
public static final String STREAMING_CHECKPOINT_USE_TABLE_IO =
"streaming-checkpoint-use-table-io";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it should be use-table-io or use-hdfs ... I think either is probably fine but maybe I slightly prefer use-hdfs because I know the opposite should be using the table io?

I do think using an enum may be overboard here but maybe that's just cleaner all around
streaming-checkpoint = {table-io, hdfs}
streaming-checkpoint_default = table-io
?

Wdyt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with

streaming-checkpoint-storage = {table-io, hadoop-fs}

since I didn't want it to seem like it was just for hdfs. But can revert to your original naming if you prefer.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty close to me! I have a some feelings about the parameter name and whether it should be boolean.


@Override
public OutputFile newOutputFile(String path) {
if (path.contains("/offsets/")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use a constant here and in the tests above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from 0f5ead4 to 1e60a05 Compare February 26, 2026 00:54
@github-actions github-actions bot added the docs label Feb 26, 2026
@c2zwdjnlcg c2zwdjnlcg force-pushed the fix-checkpoint-fs-impl branch from 1e60a05 to bf98b1f Compare February 26, 2026 01:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark Iceberg streaming - checkpoint leverages S3fileIo signer path instead of hadoop's S3AFileSystem

3 participants