Skip to content

Comments

Core: Detect and merge duplicate DVs for a data file and merge them before committing#15006

Open
amogh-jahagirdar wants to merge 29 commits intoapache:mainfrom
amogh-jahagirdar:always-merge-duplicates-on-driver
Open

Core: Detect and merge duplicate DVs for a data file and merge them before committing#15006
amogh-jahagirdar wants to merge 29 commits intoapache:mainfrom
amogh-jahagirdar:always-merge-duplicates-on-driver

Conversation

@amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Jan 9, 2026

While generally, writers are expected to merge DVs for a given data file before attempting to commit, we probably want to have a safeguard in the commit path in case this assumption is violated. This has been observed when AQE is enabled in Spark and a data file is split across multiple tasks (really just depends on how files and deletes are split); then multiple DVs are produced for a given data file, and then committed. Currently, after that commit reads would fail since the DeleteFileIndex detects the duplicates and fails on read.

Arguably, there should be a safeguard on the commit path which detects duplicates and fixes them up to prevent any invalid table states. Doing this behind the API covers any engine integration using the library.

This change updates MergingSnapshotProducer to track duplicate DVs for a datafile, and then merge them and produces a Puffin file per DV. Note that since we generally expect duplicates to be rare, we don't expect there to be too many small Puffins produced, and we don't add the additional logic to coalesce into larger files. Furthermore, these can later be compacted. In case of large scale duplicates, then engines should arguably fix those up before handing off to the commit path.

@github-actions github-actions bot added the core label Jan 9, 2026
@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Jan 9, 2026

Still cleaning some stuff up, so leaving in draft but feel free to comment. But basically there are some cases in Spark where a file can be split across multiple tasks, and if deletes happen to touch every single part in the task we'd incorrectly produce multiple DVs for a given data file (discovered this recently with a user when they had Spark AQE enabled, but I think file splitting can happen in more cases).

We currently throw on read in such cases, but ideally we can try and prevent this on write by detecting and merging pre-commit.

The reason this is done behind the API is largely so that we are defensive from a library perspective that in case an engine/integration happens to produce multiple DVs, we can at least fix it up pre-commit.

In the case there are too many to reasonably rewrite on a single node, then engines could do distributed writes to fix up before handing off the files to the API, but arguably from a library perspective it seems reasonable to pay this overhead to prevent bad commits across any integration.

@github-actions github-actions bot added the spark label Jan 9, 2026
addedFilesSummary.addedFile(spec, file);
hasNewDeleteFiles = true;
if (ContentFileUtil.isDV(file)) {
newDVRefs.add(file.referencedDataFile());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably keep a boolean in-case we detect a duplicate. That way we don't have to pay the price of grouping by referenced file everytime to detect possible duplicates; only if we detect it at the time of adding it, we can do the dedupe/merge

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also could just keep a mapping specific for duplicates. That shrinks down how much work we need to do because instead of trying to group by every referenced data file in case of duplicates, we just go through the duplicates set. It's maybe a little more memory but if we consider that we expect duplicates to generally be rare it feels like a generally better solution

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from 374b567 to c04d0e0 Compare January 11, 2026 20:02
DeleteFileSet deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create());
if (deleteFiles.add(file)) {
addedFilesSummary.addedFile(spec, file);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we may be merging duplicates, we don't update the summary for delete files until after we dedupe and are just about to write the new manifests

Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix surfaced an issue in some of the TestPositionDeletesTable tests where we were setting the wrong data file for delete file; we'd just add a DV for the same data file, and then it'd get merged with the new logic , and break some of the later assertions.

// Add Data Files with EQ and POS deletes
DeleteFile fileADeletes = fileADeletes();
DeleteFile fileA2Deletes = fileA2Deletes();
DeleteFile fileBDeletes = fileBDeletes();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test had to be fixed after the recent changes because the file paths for data file B and B2 were set to the same before, so the DVs for both referenced the same file (but that probably wasn't the intention of these tests) so it was a duplicate. After this change we'd merge the DVs in the commit, and then it'd actually get treated as a dangling delete and fail some of the assertions.

Since these tests are just testing the eq. delete case we could just simplify it by removing the usage of fileB deletes, it's a more minimal test that tests the same thing.

Also note, generally I'd take this in a separate PR but I think there's a good argument that this change should be in a 1.10.2 patch release to prevent invalid table states; in that case we'd need to keep these changes together.

@amogh-jahagirdar amogh-jahagirdar marked this pull request as ready for review January 12, 2026 17:15
@amogh-jahagirdar amogh-jahagirdar changed the title Core: Merge DVs referencing the same data files as a safeguard Core: Track duplicate DVs for data file and merge them before committing Jan 12, 2026
String referencedLocation = dvsToMergeForDataFile.getKey();
mergedDVs.put(
referencedLocation,
mergeAndWriteDV(referencedLocation, dvsToMergeForDataFile.getValue()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LIttle weird that we are now writing delete vectors on the driver?

OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();

DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't matter but in the real world these could also potentitally have existing overlapping deletes
Ie
Task 1 has existing DV and merges a few new Deletes
Task 2 has existing DV and merges a few new deletes

I think the logic is fine though

@RussellSpitzer
Copy link
Member

I'm mostly on board here but I have a few concerns,

  1. I Think the implementation here is trying to be efficient with memory usage by populating the map only if a duplicate is found, but I'm not sure it's really saving us all that much when every time we do hit a match we have to scan the entire list dv's again to find the dv which is the duplicate. I'd consider whether just keeping a mapping of reference data files to delete vectors is that much more expensive. It would make the lookup much simpler

  2. Intrinsically we are cleaning up after something which shouldn't be happening. Given that we know fixing this behavior in spark may take a while or ... may just be impossible without further mods, I think having a fix here is better than throwing an exception (which could be another solution) but It feels weird that we are rewriting things from the driver here. I think this is now the first time that the Spark integration writes data/delete files from the driver and that feels a little weird to me.

  3. I agree with the requests for logging, this is probably really not good for performance and we should have some notes that it is happening for users although that's not really important now unless we have a way to avoid it from spark.

// Add position deletes for both partitions
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are failing with the fix to core, right? I think we generally try to update one Spark version per commit unless there are failures that would leave main in a broken state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's right, it's failing with the changes to core in this PR because the test was setting up duplicates (unintentionally as far as I can tell) and now we'd be merging them and breaking some of the assertions in the test.

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from 647ae79 to 7b5bd0c Compare February 20, 2026 17:35
cachedNewDeleteManifests.clear();
// On cache invalidation of delete files, clear the whole summary.
// Since the summary contained both data files and DVs, add back the data files.
addedFilesSummary.clear();
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that possibly we could add a targeted clearDeleteFiles that is only zeroes/nulls out the delete file state in the summary. Then we'd be able to not have to add back the data files completely again. Since this is contained in this method I think it's probably OK as is, but worth considering in a follow on. The main aspect is correctness on the stats.

I'm also not a fan of the targeted clearDeleteFiles because there may be more complicated state in the future where it becomes ambiguous to a caller to what exactly delete file state is in the summary (the exact semantic becomes tricky to maintain over time)

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a clean way to make the field track specifically data files without splitting the summary state into two fields, and we largely want to avoid new state in MergingSnapshotProducer. But that is another option, a separate addedDataFIlesSummary and addedDeleteFilesSummary and add both in apply()

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch 10 times, most recently from f39d27e to afdb314 Compare February 24, 2026 16:00
private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap();
private final Set<String> newDVRefs = Sets.newHashSet();
private final List<DeleteFile> v2Deletes = Lists.newArrayList();
private final Map<String, List<DeleteFile>> dvsByReferencedFile = Maps.newLinkedHashMap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented this elsehwere but the reason this field is a LinkedHashMap is because there's quite a lot of tests which verify the exact ordering of entries in a manifest. I do think we should change those tests because we really shouldn't be guaranteeing any ordering in the entries in the output manifests but that's a larger change. The current implementation which tracks newDeleteFilesBySpec uses a DeleteFileSet which is an insertion-ordering preserving structure.

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from afdb314 to 1efc041 Compare February 24, 2026 17:06
return toPositionIndexes(posDeletes, null /* unknown delete file */);
}

public static PositionDeleteIndex readDV(DeleteFile deleteFile, FileIO fileIO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be here or in DVUtil?

Deletes seems to be utilities for working with position delete structs coming from an engine. DVUtil seems more appropriate to me.

* Merges duplicate DVs for the same data file and writes the merged DV Puffin files. If there is
* exactly 1 DV for a given data file then it is return as is
*
* @param dvsByFile map of data file location to DVs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing quite a few params here.

*/
static List<DeleteFile> mergeAndWriteDVsIfRequired(
Map<String, List<DeleteFile>> dvsByFile,
Supplier<OutputFile> dvOutputFile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this has a FileIO, it's a little awkward to have a Supplier<OutputFile> passed in since a big part of that is the job of FileIO. Also, if this supplier is only called once then it doesn't need to be a supplier. What about passing a string location instead?

* @return a list containing both any newly merged DVs and any DVs that are already valid
*/
static List<DeleteFile> mergeAndWriteDVsIfRequired(
Map<String, List<DeleteFile>> dvsByFile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dvsByReferencedFile?

for (int i = 0; i < duplicateDVPositions.length; i++) {
PositionDeleteIndex dvPositions = duplicateDVPositions[i];
DeleteFile dv = duplicateDVs.get(i);
mergedIndexByFile.merge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct, but I find a more straightforward implementation that uses get instead of merge easier to read:

    Map<String, PositionDeleteIndex> mergedDVs = Maps.newHashMap();
    for  (int i = 0; i < duplicatedDVPositions.length; i++) {
      PositionDeleteIndex previousDV = mergedDVs.get(duplicateDVs.get(i).referencedDataFile());
      if (previousDV != null) {
        previousDV.merge(duplicatedDVPositions[i]);
      } else {
        mergedDVs.put(delete.referencedDataFile(), duplicatedDVPositions[i]);
      }
    }


List<DeleteFile> mergedDVs = Lists.newArrayList();
Map<String, PartitionSpec> specByFile = Maps.newHashMap();
Map<String, StructLike> partitionByFile = Maps.newHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than 2 maps and lookups, why not use Map<String, Pair<PartitionSpec, StructLike>>?

FileIO fileIO,
Map<Integer, PartitionSpec> specs,
ExecutorService pool) {
Map<String, List<DeleteFile>> duplicateDVsByFile =
Copy link
Contributor

@rdblue rdblue Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of things that make this method fairly complicated. First, using a map of lists ends up causing this to do a fair amount of stream manipulation. Using a Guava multimap cleans that up quite a bit.

Second, this doesn't need to process the input map multiple times. This is cleaner if you process it once, keep the DVs that don't need to be merged in an output list, and also accumulate the partition and sequence number info for later at the same time. Doing that also simplifies the write and validate methods because you can identify the expected data sequence number and partition information here rather than complicating the loops in later methods.

Here's what I came up with:

  static List<DeleteFile> mergeAndWriteDVsIfRequired(
      Map<String, List<DeleteFile>> dvsByFile,
      Supplier<OutputFile> dvOutputFile,
      FileIO fileIO,
      Map<Integer, PartitionSpec> specs,
      ExecutorService pool) {

    List<DeleteFile> finalDVs = Lists.newArrayList();
    Multimap<String, DeleteFile> duplicates =
        Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
    Map<String, Pair<PartitionSpec, StructLike>> partitions = Maps.newHashMap();

    for (Map.Entry<String, List<DeleteFile>> entry : dvsByFile.entrySet()) {
      if (entry.getValue().size() > 1) {
        duplicates.putAll(entry.getKey(), entry.getValue());
        DeleteFile first = entry.getValue().get(0);
        partitions.put(entry.getKey(), Pair.of(specs.get(first.specId()), first.partition()));
      } else {
        finalDVs.addAll(entry.getValue());
      }
    }

    if (duplicates.isEmpty()) {
      return finalDVs;
    }

    validateCanMerge(duplicates, partitions);

    Map<String, PositionDeleteIndex> deletes =
        readAndMergeDVs(duplicates.values().toArray(DeleteFile[]::new), fileIO, pool);

    finalDVs.addAll(writeDVs(deletes, partitions, dvOutputFile));

    return finalDVs;
  }

return mergedDVs;
}

private static void validateCanMerge(
Copy link
Contributor

@rdblue rdblue Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simpler if you accumulate partition map in the caller while looping over its inputs just once. Here's the version that does that and reuses partition data from the caller:

  private static void validateCanMerge(
      Multimap<String, DeleteFile> duplicates,
      Map<String, Pair<PartitionSpec, StructLike>> partitions) {
    Map<Integer, Comparator<StructLike>> comparatorsBySpecId = Maps.newHashMap();
    for (Map.Entry<String, Collection<DeleteFile>> entry : duplicates.asMap().entrySet()) {
      String referencedFile = entry.getKey();

      // validate that each file matches the expected partition
      Pair<PartitionSpec, StructLike> partition = partitions.get(referencedFile);
      long sequenceNumber = Iterables.getFirst(entry.getValue(), null).dataSequenceNumber();
      PartitionSpec spec = partition.first();
      StructLike tuple = partition.second();
      Comparator<StructLike> comparator =
          comparatorsBySpecId.computeIfAbsent(
              spec.specId(), id -> Comparators.forType(spec.partitionType()));

      for (DeleteFile dv : entry.getValue()) {
        Preconditions.checkArgument(
            sequenceNumber == dv.dataSequenceNumber(),
            "Cannot merge DVs, mismatched sequence numbers (%s, %s) for %s",
            sequenceNumber,
            dv.dataSequenceNumber(),
            referencedFile);

        Preconditions.checkArgument(
            spec.specId() == dv.specId(),
            "Cannot merge DVs, mismatched partition specs (%s, %s) for %s",
            spec.specId(),
            dv.specId(),
            referencedFile);

        Preconditions.checkArgument(
            comparator.compare(tuple, dv.partition()) == 0,
            "Cannot merge DVs, mismatched partition tuples (%s, %s) for %s",
            tuple,
            dv.partition(),
            referencedFile);
      }
    }
  }

I also combined this with validateCanMerge since having a separate method for the inner loop didn't seem useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I handled sequenceNumber correctly here. It should probably use Objects.equals to ensure that null is handled correctly.

}

// Produces a single Puffin file containing the merged DVs
private static List<DeleteFile> writeDVs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to work with a Map<String, Pair> to fit with my suggestion for the caller:

  private static List<DeleteFile> writeDVs(
      Map<String, PositionDeleteIndex> mergedIndexByFile,
      Map<String, Pair<PartitionSpec, StructLike>> partitions,
      Supplier<OutputFile> dvOutputFile) {
    try (DVFileWriter dvFileWriter = new BaseDVFileWriter(dvOutputFile, path -> null)) {
      for (Map.Entry<String, PositionDeleteIndex> entry : mergedIndexByFile.entrySet()) {
        String referencedLocation = entry.getKey();
        PositionDeleteIndex mergedPositions = entry.getValue();
        Pair<PartitionSpec, StructLike> partition = partitions.get(referencedLocation);
        dvFileWriter.delete(
            referencedLocation, mergedPositions, partition.first(), partition.second());
      }

      dvFileWriter.close();
      return dvFileWriter.result().deleteFiles();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

I didn't change the handling of the OutputFile, which you may want to do. You could just pass in OutputFile and wrap it in a lambda.

cachedNewDeleteManifests.clear();
// On cache invalidation of delete files, clear the whole summary.
// Since the summary contained both data files and DVs, add back the data files.
addedFilesSummary.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an added files summary, or is it a commit summary? Maybe we should change the name to be more clear.

// On cache invalidation of delete files, clear the whole summary.
// Since the summary contained both data files and DVs, add back the data files.
addedFilesSummary.clear();
newDataFilesBySpec.forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better not to handle data files in newDeleteFilesAsManifests. It isn't obvious that this method is going to clear the data file summary and no one would look here for it. Why not keep a separate summary for data files and merge it with the deletes for the final summary? The data file one doesn't need to change.

dvsByReferencedFile,
() -> {
String filename = FileFormat.PUFFIN.addExtension(String.valueOf(snapshotId()));
return fileIO.newOutputFile(ops().locationProvider().newDataLocation(filename));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a little more to this path. This doesn't show what the file is and will overwrite the same file name on each retry.

I would use an instance field so that we can embed a file counter:

private final AtomicInteger dvMergeAttempt = new AtomicInteger(0);

String filename = FileFormat.PUFFIN.addExtension(String.format("merged-dvs-%s-%s", snapshotId(), dvMergeAttempt.incrementAndGet()));

ThreadPools.getDeleteWorkerPool());

return finalDVs.stream()
.map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I noted, I'd prefer to remove the wrapper here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working core data spark

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants