spark: Don't use table FileIO for checkpointing files#15239
spark: Don't use table FileIO for checkpointing files#15239c2zwdjnlcg wants to merge 1 commit intoapache:mainfrom
Conversation
f994146 to
13f02a1
Compare
|
@nastra could you take a look at this PR and see if you are aligned with separating the checkpoint IO from the table IO? |
|
@c2zwdjnlcg I currently don't have any cycles to review this. Maybe @huaxingao, @RussellSpitzer or @aokolnychyi have some time to review it |
|
First look, please do larger changes like this only on a single module first, then backport to the others in a follow up. It makes reviewing a bit more difficult to have duplicated changes. Taking a pass in depth now |
There was a problem hiding this comment.
Rather than changing the IO here to something a user wouldn't expect, I think it's probably better for us to change InitialOffsetStore itself directly.
Since Spark Checkpoints are expected to go through HadoopFS we should probably just use Hadoop FileSystem directly instead of using Iceberg FileIO class. This of course is a breaking change so we probably also need to gate this at least initially.
Maybe build two OffsetStores with the same interface and allow users to opt to Hadoop based with a spark read conf property?
interface InitialOffsetStore {
StreamingOffset initialOffset();
class TableIOOffsetStore implements InitialOffsetStore {
}
class HadoopOffsetStore implements InitialOffsetStore {
}257b264 to
0f5ead4
Compare
|
@RussellSpitzer Thanks for the review.
Sorry about that, will keep in mind for next time Hopefully this is more inline with what you were thinking. I named the setting |
| protected StreamingOffset readOffset() { | ||
| try (FSDataInputStream inputStream = fileSystem.open(initialOffsetPath); | ||
| InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) { | ||
| String json = CharStreams.toString(reader); |
There was a problem hiding this comment.
Don't we already have a StreamingOffset.fromJson(InputStream in) ?
| private final Long fromTimestamp; | ||
|
|
||
| InitialOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) { | ||
| BaseOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) { |
There was a problem hiding this comment.
Could be "long fromTimestamp"
There was a problem hiding this comment.
This was preserved from the previous implementation but changed, i don't think it will do any harm.
|
|
||
| // Controls whether streaming checkpoint operations use table FileIO or Hadoop FileSystem | ||
| public static final String STREAMING_CHECKPOINT_USE_TABLE_IO = | ||
| "streaming-checkpoint-use-table-io"; |
There was a problem hiding this comment.
I'm not sure if it should be use-table-io or use-hdfs ... I think either is probably fine but maybe I slightly prefer use-hdfs because I know the opposite should be using the table io?
I do think using an enum may be overboard here but maybe that's just cleaner all around
streaming-checkpoint = {table-io, hdfs}
streaming-checkpoint_default = table-io
?
Wdyt?
There was a problem hiding this comment.
I went with
streaming-checkpoint-storage = {table-io, hadoop-fs}
since I didn't want it to seem like it was just for hdfs. But can revert to your original naming if you prefer.
RussellSpitzer
left a comment
There was a problem hiding this comment.
Looking pretty close to me! I have a some feelings about the parameter name and whether it should be boolean.
|
|
||
| @Override | ||
| public OutputFile newOutputFile(String path) { | ||
| if (path.contains("/offsets/")) { |
There was a problem hiding this comment.
nit: could use a constant here and in the tests above
0f5ead4 to
1e60a05
Compare
1e60a05 to
bf98b1f
Compare
Fixes: #14762