Skip to content

Core: Support parallel execution when scanning entries in ManifestGroup#15426

Open
dramaticlly wants to merge 3 commits intoapache:mainfrom
dramaticlly:manifestGroupEntriesParallel
Open

Core: Support parallel execution when scanning entries in ManifestGroup#15426
dramaticlly wants to merge 3 commits intoapache:mainfrom
dramaticlly:manifestGroupEntriesParallel

Conversation

@dramaticlly
Copy link
Contributor

Currently, executorService is only used in ManifestGroup::plan

if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
} else {
return CloseableIterable.concat(tasks);
}
But it can also benefit when scanning entries in multiple manifests.

Also apply defensive copy on entries as ManifestReader reuseContainers across iteration

@stevenzwu @RussellSpitzer

Also apply defensive copy on entries as ManifestReader reuseContainers across iteration
@github-actions github-actions bot added the core label Feb 24, 2026
@RussellSpitzer
Copy link
Member

Could you elaborate more on the previous behavior, and how you are changing that in this PR?

@dramaticlly
Copy link
Contributor Author

Could you elaborate more on the previous behavior, and how you are changing that in this PR?

Happy to! Before this change when we scan the manifest entries using ManifestGroup, we will scan manifest sequentially using CloseableIterable.concat even when executorService is provided.

After this change, we now can benefit from using threadpool to scan the manifests if ManifestGroup is created with executorService provided in planWith(). One caveat is that we also need to make defensive copy of entries in parallel scanning as entries are reused across all iterations within a manifest given the reuseContainers in ManifestReader.


AtomicInteger planThreadsIndex = new AtomicInteger(0);
ExecutorService executorService =
Executors.newFixedThreadPool(
Copy link
Member

@deniskuzZ deniskuzZ Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider migrating to Executors.newVirtualThreadPerTaskExecutor() (jdk21)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still build against java17, so we can migrate this later once we move to jdk21 as minimal support.

// copy entries to avoid object reuse issues when scanning manifests in parallel,
// as ManifestReader reuses entry objects during iteration
Iterable<CloseableIterable<ManifestEntry<DataFile>>> entryIterables =
entries((manifest, entries) -> CloseableIterable.transform(entries, ManifestEntry::copy));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about introducing a sort of hidden "materialize everything" setting.

We have one internal use where this could come up and we are effectively double copying with this change

FindFiles

public CloseableIterable<DataFile> collect() {
Snapshot snapshot =
snapshotId != null ? ops.current().snapshot(snapshotId) : ops.current().currentSnapshot();
// snapshot could be null when the table just gets created
if (snapshot == null) {
return CloseableIterable.empty();
}
// when snapshot is not null
CloseableIterable<ManifestEntry<DataFile>> entries =
new ManifestGroup(ops.io(), snapshot.dataManifests(ops.io()))
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
.filterPartitions(partitionFilter)
.ignoreDeleted()
.caseSensitive(caseSensitive)
.planWith(executorService)
.entries();
return CloseableIterable.transform(entries, entry -> entry.file().copy(includeColumnStats));
}

So not even counting external users of this library we are already introducing a regression. I'm not saying we shouldn't do this but we should be very careful here to avoid accidentally doubling memory consumption (or greatly increasing consumption) of a downstream consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Russell, I think you raised a good point as existing consumer of this API might already apply the defensive copy so we dont want to double the memory. I took the stab to follow what's existing

public <T extends ScanTask> CloseableIterable<T> plan(CreateTasksFunction<T> createTasksFunc) {
is doing and add a new method which takes a function for transform.

So testManifestGroupEntriesWithParallelExecution() kind of illustrate that if we only need filePath, we can skip defensive copy like how we collect DataFiles in FindFiles::collect

…ction

Updated the existing entries method to streamline parallel execution without defensive copying, as the caller is now responsible for managing entry copies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants