Core: Support parallel execution when scanning entries in ManifestGroup#15426
Core: Support parallel execution when scanning entries in ManifestGroup#15426dramaticlly wants to merge 3 commits intoapache:mainfrom
Conversation
Also apply defensive copy on entries as ManifestReader reuseContainers across iteration
|
Could you elaborate more on the previous behavior, and how you are changing that in this PR? |
Happy to! Before this change when we scan the manifest entries using ManifestGroup, we will scan manifest sequentially using After this change, we now can benefit from using threadpool to scan the manifests if ManifestGroup is created with executorService provided in planWith(). One caveat is that we also need to make defensive copy of entries in parallel scanning as entries are reused across all iterations within a manifest given the reuseContainers in ManifestReader. |
|
|
||
| AtomicInteger planThreadsIndex = new AtomicInteger(0); | ||
| ExecutorService executorService = | ||
| Executors.newFixedThreadPool( |
There was a problem hiding this comment.
Should we consider migrating to Executors.newVirtualThreadPerTaskExecutor() (jdk21)?
There was a problem hiding this comment.
I think we still build against java17, so we can migrate this later once we move to jdk21 as minimal support.
| // copy entries to avoid object reuse issues when scanning manifests in parallel, | ||
| // as ManifestReader reuses entry objects during iteration | ||
| Iterable<CloseableIterable<ManifestEntry<DataFile>>> entryIterables = | ||
| entries((manifest, entries) -> CloseableIterable.transform(entries, ManifestEntry::copy)); |
There was a problem hiding this comment.
I'm a little worried about introducing a sort of hidden "materialize everything" setting.
We have one internal use where this could come up and we are effectively double copying with this change
FindFiles
iceberg/core/src/main/java/org/apache/iceberg/FindFiles.java
Lines 201 to 223 in 8417225
So not even counting external users of this library we are already introducing a regression. I'm not saying we shouldn't do this but we should be very careful here to avoid accidentally doubling memory consumption (or greatly increasing consumption) of a downstream consumer.
There was a problem hiding this comment.
Thanks Russell, I think you raised a good point as existing consumer of this API might already apply the defensive copy so we dont want to double the memory. I took the stab to follow what's existing
is doing and add a new method which takes a function for transform.So testManifestGroupEntriesWithParallelExecution() kind of illustrate that if we only need filePath, we can skip defensive copy like how we collect DataFiles in FindFiles::collect
…ction Updated the existing entries method to streamline parallel execution without defensive copying, as the caller is now responsible for managing entry copies.
Currently, executorService is only used in
ManifestGroup::planiceberg/core/src/main/java/org/apache/iceberg/ManifestGroup.java
Lines 215 to 219 in a97b4ec
Also apply defensive copy on entries as ManifestReader reuseContainers across iteration
@stevenzwu @RussellSpitzer