diff --git a/build.gradle b/build.gradle index 0e3571eec145..7279e16d80f9 100644 --- a/build.gradle +++ b/build.gradle @@ -638,7 +638,8 @@ project(':iceberg-delta-lake') { annotationProcessor libs.immutables.value compileOnly libs.immutables.value - compileOnly "io.delta:delta-standalone_${scalaVersion}:${libs.versions.delta.standalone.get()}" + compileOnly "io.delta:delta-kernel-api:${libs.versions.delta.kernel.get()}" + compileOnly "io.delta:delta-kernel-defaults:${libs.versions.delta.kerneldefaults.get()}" compileOnly(libs.hadoop3.common) { exclude group: 'org.apache.avro', module: 'avro' diff --git a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java index b2dae682d296..000615e54236 100644 --- a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java +++ b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.delta; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; +import static io.delta.kernel.utils.CloseableIterable.inMemoryIterable; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.current_date; import static org.apache.spark.sql.functions.date_add; @@ -25,30 +27,36 @@ import static org.apache.spark.sql.functions.expr; import static org.assertj.core.api.Assertions.assertThat; -import io.delta.standalone.DeltaLog; -import io.delta.standalone.Operation; -import io.delta.standalone.OptimisticTransaction; -import io.delta.standalone.VersionLog; -import io.delta.standalone.actions.Action; -import io.delta.standalone.actions.AddFile; -import io.delta.standalone.actions.RemoveFile; -import io.delta.standalone.exceptions.DeltaConcurrentModificationException; +import io.delta.kernel.Operation; +import io.delta.kernel.Scan; +import io.delta.kernel.Table; +import io.delta.kernel.Transaction; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.TableImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.utils.CloseableIterator; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; -import java.sql.Timestamp; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; -import java.util.stream.Collectors; +import java.util.Set; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.net.URLCodec; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -56,7 +64,6 @@ import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.util.LocationUtil; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.delta.catalog.DeltaCatalog; @@ -80,8 +87,8 @@ public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase { "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync ); - private static Dataset typeTestDataFrame; - private static Dataset nestedDataFrame; + private static Dataset typeTestDataFrame; + private static Dataset nestedDataFrame; @TempDir private File tempA; @TempDir private File tempB; @@ -383,24 +390,28 @@ private void checkSnapshotIntegrity( String icebergTableIdentifier, SnapshotDeltaLakeTable.Result snapshotReport, long firstConstructableVersion) { - DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); + Configuration hadoopConf = spark.sessionState().newHadoopConf(); + Engine engine = DefaultEngine.create(hadoopConf); + Table deltaTable = Table.forPath(engine, deltaTableLocation); - List deltaTableContents = + List deltaTableContents = spark.sql("SELECT * FROM " + deltaTableIdentifier).collectAsList(); - List icebergTableContents = + List icebergTableContents = spark.sql("SELECT * FROM " + icebergTableIdentifier).collectAsList(); assertThat(deltaTableContents).hasSize(icebergTableContents.size()); assertThat(snapshotReport.snapshotDataFilesCount()) - .isEqualTo(countDataFilesInDeltaLakeTable(deltaLog, firstConstructableVersion)); + .isEqualTo(countDataFilesInDeltaLakeTable(engine, deltaTable, firstConstructableVersion)); assertThat(icebergTableContents).containsExactlyInAnyOrderElementsOf(deltaTableContents); } private void checkTagContentAndOrder( String deltaTableLocation, String icebergTableIdentifier, long firstConstructableVersion) { - DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); - long currentVersion = deltaLog.snapshot().getVersion(); - Table icebergTable = getIcebergTable(icebergTableIdentifier); + Configuration hadoopConf = spark.sessionState().newHadoopConf(); + Engine engine = DefaultEngine.create(hadoopConf); + Table deltaTable = Table.forPath(engine, deltaTableLocation); + long currentVersion = deltaTable.getLatestSnapshot(engine).getVersion(); + org.apache.iceberg.Table icebergTable = getIcebergTable(icebergTableIdentifier); Map icebergSnapshotRefs = icebergTable.refs(); List icebergSnapshots = Lists.newArrayList(icebergTable.snapshots()); @@ -417,9 +428,11 @@ private void checkTagContentAndOrder( assertThat(icebergSnapshotRefs.get(expectedVersionTag).snapshotId()) .isEqualTo(currentIcebergSnapshot.snapshotId()); - Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp(); - assertThat(deltaVersionTimestamp).isNotNull(); - String expectedTimestampTag = "delta-ts-" + deltaVersionTimestamp.getTime(); + io.delta.kernel.Snapshot deltaSnapshot = + deltaTable.getSnapshotAsOfVersion(engine, deltaVersion); + long deltaVersionTimestamp = deltaSnapshot.getTimestamp(engine); + assertThat(deltaVersionTimestamp).isGreaterThan(0); + String expectedTimestampTag = "delta-ts-" + deltaVersionTimestamp; assertThat(icebergSnapshotRefs.get(expectedTimestampTag)).isNotNull(); assertThat(icebergSnapshotRefs.get(expectedTimestampTag).isTag()).isTrue(); @@ -429,7 +442,7 @@ private void checkTagContentAndOrder( } private void checkIcebergTableLocation(String icebergTableIdentifier, String expectedLocation) { - Table icebergTable = getIcebergTable(icebergTableIdentifier); + org.apache.iceberg.Table icebergTable = getIcebergTable(icebergTableIdentifier); assertThat(icebergTable.location()) .isEqualTo(LocationUtil.stripTrailingSlash(expectedLocation)); } @@ -438,7 +451,7 @@ private void checkIcebergTableProperties( String icebergTableIdentifier, Map expectedAdditionalProperties, String deltaTableLocation) { - Table icebergTable = getIcebergTable(icebergTableIdentifier); + org.apache.iceberg.Table icebergTable = getIcebergTable(icebergTableIdentifier); ImmutableMap.Builder expectedPropertiesBuilder = ImmutableMap.builder(); // The snapshot action will put some fixed properties to the table expectedPropertiesBuilder.put(SNAPSHOT_SOURCE_PROP, DELTA_SOURCE_VALUE); @@ -451,14 +464,30 @@ private void checkIcebergTableProperties( private void checkDataFilePathsIntegrity( String icebergTableIdentifier, String deltaTableLocation) { - Table icebergTable = getIcebergTable(icebergTableIdentifier); - DeltaLog deltaLog = DeltaLog.forTable(spark.sessionState().newHadoopConf(), deltaTableLocation); + org.apache.iceberg.Table icebergTable = getIcebergTable(icebergTableIdentifier); + Configuration hadoopConf = spark.sessionState().newHadoopConf(); + Engine engine = DefaultEngine.create(hadoopConf); + Table deltaTable = Table.forPath(engine, deltaTableLocation); + io.delta.kernel.Snapshot deltaSnapshot = deltaTable.getLatestSnapshot(engine); + // checkSnapshotIntegrity already checks the number of data files in the snapshot iceberg table // equals that in the original delta lake table - List deltaTableDataFilePaths = - deltaLog.update().getAllFiles().stream() - .map(f -> getFullFilePath(f.getPath(), deltaLog.getPath().toString())) - .collect(Collectors.toList()); + List deltaTableDataFilePaths = Lists.newArrayList(); + Scan scan = deltaSnapshot.getScanBuilder().build(); + CloseableIterator scanFileIter = scan.getScanFiles(engine); + while (scanFileIter.hasNext()) { + FilteredColumnarBatch batch = scanFileIter.next(); + CloseableIterator rows = batch.getRows(); + while (rows.hasNext()) { + Row scanFileRow = rows.next(); + if (!scanFileRow.isNullAt(scanFileRow.getSchema().indexOf("add"))) { + Row addFileRow = scanFileRow.getStruct(scanFileRow.getSchema().indexOf("add")); + AddFile addFile = new AddFile(addFileRow); + deltaTableDataFilePaths.add(getFullFilePath(addFile.getPath(), deltaTableLocation)); + } + } + } + icebergTable .currentSnapshot() .addedDataFiles(icebergTable.io()) @@ -469,7 +498,7 @@ private void checkDataFilePathsIntegrity( }); } - private Table getIcebergTable(String icebergTableIdentifier) { + private org.apache.iceberg.Table getIcebergTable(String icebergTableIdentifier) { CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog(); Spark3Util.CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier( @@ -487,39 +516,65 @@ private String destName(String catalogName, String dest) { /** * Add parquet files manually to a delta lake table to mock the situation that some data files are - * not in the same location as the delta lake table. The case that {@link AddFile#getPath()} or - * {@link RemoveFile#getPath()} returns absolute path. + * not in the same location as the delta lake table. This simulates the case where data files have + * absolute paths. * - *

The known issue makes it - * necessary to manually rebuild the AddFile to avoid deserialization error when committing the - * transaction. + *

Note: This method uses Spark SQL to add external files since Kernel API doesn't support + * write operations directly. */ private void addExternalDatafiles( String targetDeltaTableLocation, String sourceDeltaTableLocation) { - DeltaLog targetLog = - DeltaLog.forTable(spark.sessionState().newHadoopConf(), targetDeltaTableLocation); - OptimisticTransaction transaction = targetLog.startTransaction(); - DeltaLog sourceLog = - DeltaLog.forTable(spark.sessionState().newHadoopConf(), sourceDeltaTableLocation); - List newFiles = - sourceLog.update().getAllFiles().stream() - .map( - f -> - AddFile.builder( - getFullFilePath(f.getPath(), sourceLog.getPath().toString()), - f.getPartitionValues(), - f.getSize(), - System.currentTimeMillis(), - true) - .build()) - .collect(Collectors.toList()); + Configuration hadoopConf = spark.sessionState().newHadoopConf(); + Engine engine = DefaultEngine.create(hadoopConf); + Table targetTable = Table.forPath(engine, targetDeltaTableLocation); + Transaction transaction = + targetTable + .createTransactionBuilder(engine, "Delta-Lake/4.0.0", Operation.MANUAL_UPDATE) + .build(engine); + Table sourceTable = Table.forPath(engine, sourceDeltaTableLocation); + io.delta.kernel.Snapshot sourceSnapshot = sourceTable.getLatestSnapshot(engine); + List addFileActions = Lists.newArrayList(); + Scan scan = sourceSnapshot.getScanBuilder().build(); + try (CloseableIterator batches = scan.getScanFiles(engine)) { + while (batches.hasNext()) { + FilteredColumnarBatch batch = batches.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + Row row = rows.next(); + if (DeltaActionUtils.isAdd(row)) { + addFileActions.add( + rebuildAddFile(DeltaActionUtils.getAdd(row), sourceDeltaTableLocation)); + } + } + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to scan source Delta table", e); + } try { - transaction.commit(newFiles, new Operation(Operation.Name.UPDATE), "Delta-Lake/2.2.0"); - } catch (DeltaConcurrentModificationException e) { - throw new RuntimeException(e); + transaction.commit(engine, inMemoryIterable(toCloseableIterator(addFileActions.iterator()))); + } catch (Exception e) { + throw new RuntimeException("Failed to commit external AddFiles", e); } } + private Row rebuildAddFile(AddFile original, String sourceTableLocation) { + String absolutePath = getFullFilePath(original.getPath(), sourceTableLocation); + return SingleAction.createAddFileSingleAction( + AddFile.createAddFileRow( + AddFile.FULL_SCHEMA, + absolutePath, + original.getPartitionValues(), + original.getSize(), + original.getModificationTime(), + true, + Optional.empty(), + original.getTags(), + original.getBaseRowId(), + original.getDefaultRowCommitVersion(), + original.getStats())); + } + private static String getFullFilePath(String path, String tableRoot) { URI dataFileUri = URI.create(path); try { @@ -535,7 +590,10 @@ private static String getFullFilePath(String path, String tableRoot) { } private void writeDeltaTable( - Dataset df, String identifier, String path, String... partitionColumns) { + Dataset df, + String identifier, + String path, + String... partitionColumns) { spark.sql(String.format("DROP TABLE IF EXISTS %s", identifier)); if (partitionColumns.length > 0) { df.write() @@ -549,25 +607,52 @@ private void writeDeltaTable( } } - private long countDataFilesInDeltaLakeTable(DeltaLog deltaLog, long firstConstructableVersion) { - long dataFilesCount = 0; - - List initialDataFiles = - deltaLog.getSnapshotForVersionAsOf(firstConstructableVersion).getAllFiles(); - dataFilesCount += initialDataFiles.size(); - - Iterator versionLogIterator = - deltaLog.getChanges( - firstConstructableVersion + 1, false // not throw exception when data loss detected - ); - - while (versionLogIterator.hasNext()) { - VersionLog versionLog = versionLogIterator.next(); - List addFiles = - versionLog.getActions().stream() - .filter(action -> action instanceof AddFile) - .collect(Collectors.toList()); - dataFilesCount += addFiles.size(); + private long countDataFilesInDeltaLakeTable( + Engine engine, Table deltaTable, long firstConstructableVersion) { + long dataFilesCount = 0L; + + // Count files in initial snapshot + io.delta.kernel.Snapshot initialSnapshot = + deltaTable.getSnapshotAsOfVersion(engine, firstConstructableVersion); + Scan initialScan = initialSnapshot.getScanBuilder().build(); + try (CloseableIterator scanIter = initialScan.getScanFiles(engine)) { + while (scanIter.hasNext()) { + FilteredColumnarBatch batch = scanIter.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + Row row = rows.next(); + if (DeltaActionUtils.isAdd(row)) { + dataFilesCount++; + } + } + } + } + } catch (IOException e) { + throw new RuntimeException( + "Failed to scan initial Delta snapshot at version " + firstConstructableVersion, e); + } + + // Count files in subsequent versions + long latestVersion = deltaTable.getLatestSnapshot(engine).getVersion(); + TableImpl tableImpl = (TableImpl) deltaTable; + for (long version = firstConstructableVersion + 1; version <= latestVersion; version++) { + try (CloseableIterator changes = + tableImpl.getChanges( + engine, version, version, Set.of(DeltaLogActionUtils.DeltaAction.ADD))) { + while (changes.hasNext()) { + ColumnarBatch batch = changes.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + Row row = rows.next(); + if (DeltaActionUtils.isAdd(row)) { + dataFilesCount++; + } + } + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to read Delta change logs at version " + version, e); + } } return dataFilesCount; diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java index 69bbaed71997..669f2fa0083f 100644 --- a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java @@ -18,20 +18,34 @@ */ package org.apache.iceberg.delta; -import io.delta.standalone.DeltaLog; -import io.delta.standalone.VersionLog; -import io.delta.standalone.actions.Action; -import io.delta.standalone.actions.AddFile; -import io.delta.standalone.actions.RemoveFile; -import io.delta.standalone.exceptions.DeltaStandaloneException; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.exceptions.TableNotFoundException; +import io.delta.kernel.internal.DeltaHistoryManager; +import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction; +import io.delta.kernel.internal.TableImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.utils.CloseableIterator; import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; -import java.util.Iterator; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; @@ -84,7 +98,8 @@ class BaseSnapshotDeltaLakeTableAction implements SnapshotDeltaLakeTable { private static final String DELTA_TIMESTAMP_TAG_PREFIX = "delta-ts-"; private final ImmutableMap.Builder additionalPropertiesBuilder = ImmutableMap.builder(); - private DeltaLog deltaLog; + private Engine deltaEngine; + private io.delta.kernel.Table deltaTable; private Catalog icebergCatalog; private final String deltaTableLocation; private TableIdentifier newTableIdentifier; @@ -138,10 +153,10 @@ public SnapshotDeltaLakeTable icebergCatalog(Catalog catalog) { @Override public SnapshotDeltaLakeTable deltaLakeConfiguration(Configuration conf) { - this.deltaLog = DeltaLog.forTable(conf, deltaTableLocation); + this.deltaEngine = DefaultEngine.create(conf); this.deltaLakeFileIO = new HadoopFileIO(conf); - // get the earliest version available in the delta lake table - this.deltaStartVersion = deltaLog.getVersionAtOrAfterTimestamp(0L); + this.deltaTable = io.delta.kernel.Table.forPath(deltaEngine, deltaTableLocation); + return this; } @@ -151,15 +166,23 @@ public SnapshotDeltaLakeTable.Result execute() { icebergCatalog != null && newTableIdentifier != null, "Iceberg catalog and identifier cannot be null. Make sure to configure the action with a valid Iceberg catalog and identifier."); Preconditions.checkArgument( - deltaLog != null && deltaLakeFileIO != null, + deltaTable != null && deltaEngine != null && deltaLakeFileIO != null, "Make sure to configure the action with a valid deltaLakeConfiguration"); - Preconditions.checkArgument( - deltaLog.tableExists(), - "Delta Lake table does not exist at the given location: %s", - deltaTableLocation); + try { + // Validate table exists by attempting to load latest snapshot + this.deltaTable.getLatestSnapshot(deltaEngine); + } catch (TableNotFoundException e) { + throw new IllegalArgumentException( + String.format( + "Delta Lake table does not exist at the given location: %s", deltaTableLocation), + e); + } + // get the earliest recreatable commit version available in the Delta Lake table + Path logPath = new Path(deltaTableLocation, "_delta_log"); + deltaStartVersion = DeltaHistoryManager.getEarliestRecreatableCommit(deltaEngine, logPath); ImmutableSet.Builder migratedDataFilesBuilder = ImmutableSet.builder(); - io.delta.standalone.Snapshot updatedSnapshot = deltaLog.update(); - Schema schema = convertDeltaLakeSchema(updatedSnapshot.getMetadata().getSchema()); + Snapshot updatedSnapshot = deltaTable.getLatestSnapshot(deltaEngine); + Schema schema = convertDeltaLakeSchema(updatedSnapshot.getSchema()); PartitionSpec partitionSpec = getPartitionSpecFromDeltaSnapshot(schema, updatedSnapshot); Transaction icebergTransaction = icebergCatalog.newCreateTableTransaction( @@ -178,14 +201,11 @@ public SnapshotDeltaLakeTable.Result execute() { long constructableStartVersion = commitInitialDeltaSnapshotToIcebergTransaction( updatedSnapshot.getVersion(), icebergTransaction, migratedDataFilesBuilder); - Iterator versionLogIterator = - deltaLog.getChanges( - constructableStartVersion + 1, false // not throw exception when data loss detected - ); - while (versionLogIterator.hasNext()) { - VersionLog versionLog = versionLogIterator.next(); + for (long version = constructableStartVersion + 1; + version <= updatedSnapshot.getVersion(); + version++) { commitDeltaVersionLogToIcebergTransaction( - versionLog, icebergTransaction, migratedDataFilesBuilder); + version, icebergTransaction, migratedDataFilesBuilder); } icebergTransaction.commitTransaction(); @@ -200,15 +220,14 @@ public SnapshotDeltaLakeTable.Result execute() { .build(); } - private Schema convertDeltaLakeSchema(io.delta.standalone.types.StructType deltaSchema) { + private Schema convertDeltaLakeSchema(io.delta.kernel.types.StructType deltaSchema) { Type converted = DeltaLakeDataTypeVisitor.visit(deltaSchema, new DeltaLakeTypeToType(deltaSchema)); return new Schema(converted.asNestedType().asStructType().fields()); } - private PartitionSpec getPartitionSpecFromDeltaSnapshot( - Schema schema, io.delta.standalone.Snapshot deltaSnapshot) { - List partitionNames = deltaSnapshot.getMetadata().getPartitionColumns(); + private PartitionSpec getPartitionSpecFromDeltaSnapshot(Schema schema, Snapshot deltaSnapshot) { + List partitionNames = deltaSnapshot.getPartitionColumnNames(); if (partitionNames.isEmpty()) { return PartitionSpec.unpartitioned(); } @@ -245,8 +264,9 @@ private long commitInitialDeltaSnapshotToIcebergTransaction( long constructableStartVersion = deltaStartVersion; while (constructableStartVersion <= latestVersion) { try { - List initDataFiles = - deltaLog.getSnapshotForVersionAsOf(constructableStartVersion).getAllFiles(); + Snapshot snapshot = + deltaTable.getSnapshotAsOfVersion(deltaEngine, constructableStartVersion); + List initDataFiles = getAddFilesFromSnapshot(snapshot); List filesToAdd = Lists.newArrayList(); for (AddFile addFile : initDataFiles) { DataFile dataFile = buildDataFileFromAction(addFile, transaction.table()); @@ -261,7 +281,7 @@ private long commitInitialDeltaSnapshotToIcebergTransaction( tagCurrentSnapshot(constructableStartVersion, transaction); return constructableStartVersion; - } catch (NotFoundException | IllegalArgumentException | DeltaStandaloneException e) { + } catch (NotFoundException | IllegalArgumentException | KernelException e) { constructableStartVersion++; } } @@ -271,7 +291,38 @@ private long commitInitialDeltaSnapshotToIcebergTransaction( } /** - * Iterate through the {@code VersionLog} to determine the update type and commit the update to + * Returns all {@link AddFile} actions visible in the given Delta snapshot. + * + *

The snapshot is scanned to collect the set of data files that constitute the table state at + * that version. + * + * @param snapshot the Delta snapshot + * @return list of {@link AddFile} actions in the snapshot + */ + private List getAddFilesFromSnapshot(Snapshot snapshot) { + List addFiles = Lists.newArrayList(); + Scan scan = snapshot.getScanBuilder().build(); + try (CloseableIterator scanFileIter = scan.getScanFiles(deltaEngine)) { + while (scanFileIter.hasNext()) { + FilteredColumnarBatch batch = scanFileIter.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + Row scanFileRow = rows.next(); + if (DeltaActionUtils.isAdd(scanFileRow)) { + addFiles.add(DeltaActionUtils.getAdd(scanFileRow)); + } + } + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return addFiles; + } + + /** + * Iterate through the Delta Lake change log to determine the update type and commit the update to * the given {@code Transaction}. * *

There are 3 cases: @@ -283,22 +334,19 @@ private long commitInitialDeltaSnapshotToIcebergTransaction( * *

3. OverwriteFiles - when there are a mix of AddFile and RemoveFile (a DELETE/UPDATE) * - * @param versionLog the delta log version to commit to iceberg table transaction + * @param version the delta log version to commit to iceberg table transaction * @param transaction the iceberg table transaction to commit to */ private void commitDeltaVersionLogToIcebergTransaction( - VersionLog versionLog, + long version, Transaction transaction, ImmutableSet.Builder migratedDataFilesBuilder) { // Only need actions related to data change: AddFile and RemoveFile - List dataFileActions = - versionLog.getActions().stream() - .filter(action -> action instanceof AddFile || action instanceof RemoveFile) - .collect(Collectors.toList()); + List dataFileActions = getDataFileActions(version); List filesToAdd = Lists.newArrayList(); List filesToRemove = Lists.newArrayList(); - for (Action action : dataFileActions) { + for (RowBackedAction action : dataFileActions) { DataFile dataFile = buildDataFileFromAction(action, transaction.table()); if (action instanceof AddFile) { filesToAdd.add(dataFile); @@ -332,10 +380,46 @@ private void commitDeltaVersionLogToIcebergTransaction( transaction.newAppend().commit(); } - tagCurrentSnapshot(versionLog.getVersion(), transaction); + tagCurrentSnapshot(version, transaction); } - private DataFile buildDataFileFromAction(Action action, Table table) { + /** + * Returns data file change actions ({@link AddFile} and {@link RemoveFile}) for the specified + * Delta Lake version. + * + *

This method reads the Delta change log using the Kernel getChanges API and extracts only + * file-level actions. + * + * @param version the Delta Lake version + * @return list of file add and remove actions for the version + */ + private List getDataFileActions(long version) { + List dataFileActions = Lists.newArrayList(); + TableImpl tableImpl = (TableImpl) deltaTable; + Set actions = Set.of(DeltaAction.ADD, DeltaAction.REMOVE); + + try (CloseableIterator changes = + tableImpl.getChanges(deltaEngine, version, version, actions)) { + while (changes.hasNext()) { + ColumnarBatch batch = changes.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + Row row = rows.next(); + if (DeltaActionUtils.isAdd(row)) { + dataFileActions.add(DeltaActionUtils.getAdd(row)); + } else if (DeltaActionUtils.isRemove(row)) { + dataFileActions.add(DeltaActionUtils.getRemove(row)); + } + } + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return dataFileActions; + } + + private DataFile buildDataFileFromAction(RowBackedAction action, Table table) { PartitionSpec spec = table.spec(); String path; long fileSize; @@ -346,22 +430,18 @@ private DataFile buildDataFileFromAction(Action action, Table table) { AddFile addFile = (AddFile) action; path = addFile.getPath(); nullableFileSize = addFile.getSize(); - partitionValues = addFile.getPartitionValues(); + partitionValues = extractPartitionValues(addFile); } else if (action instanceof RemoveFile) { RemoveFile removeFile = (RemoveFile) action; path = removeFile.getPath(); nullableFileSize = removeFile.getSize().orElse(null); - partitionValues = removeFile.getPartitionValues(); + partitionValues = extractPartitionValues(removeFile); } else { throw new ValidationException( "Unexpected action type for Delta Lake: %s", action.getClass().getSimpleName()); } - String fullFilePath = getFullFilePath(path, deltaLog.getPath().toString()); - // For unpartitioned table, the partitionValues should be an empty map rather than null - Preconditions.checkArgument( - partitionValues != null, - String.format("File %s does not specify a partitionValues", fullFilePath)); + String fullFilePath = getFullFilePath(path, deltaTableLocation); FileFormat format = determineFileFormatFromPath(fullFilePath); InputFile file = deltaLakeFileIO.newInputFile(fullFilePath); @@ -400,6 +480,28 @@ private DataFile buildDataFileFromAction(Action action, Table table) { .build(); } + /** + * Extracts partition values from an AddFile action. + * + * @param addFile the AddFile action + * @return map of partition column names to values + */ + private Map extractPartitionValues(AddFile addFile) { + return VectorUtils.toJavaMap(addFile.getPartitionValues()); + } + + /** + * Extracts partition values from a RemoveFile action. + * + * @param removeFile the RemoveFile action + * @return map of partition column names to values, or empty map if not present + */ + private Map extractPartitionValues(RemoveFile removeFile) { + return removeFile.getPartitionValues().isPresent() + ? VectorUtils.toJavaMap(removeFile.getPartitionValues().get()) + : Collections.emptyMap(); + } + private FileFormat determineFileFormatFromPath(String path) { if (path.endsWith(PARQUET_SUFFIX)) { return FileFormat.PARQUET; @@ -416,9 +518,9 @@ private Metrics getMetricsForFile( throw new ValidationException("Cannot get metrics from file format: %s", format); } - private Map destTableProperties( - io.delta.standalone.Snapshot deltaSnapshot, String originalLocation) { - additionalPropertiesBuilder.putAll(deltaSnapshot.getMetadata().getConfiguration()); + private Map destTableProperties(Snapshot deltaSnapshot, String originalLocation) { + additionalPropertiesBuilder.putAll( + ((io.delta.kernel.internal.SnapshotImpl) deltaSnapshot).getMetadata().getConfiguration()); additionalPropertiesBuilder.putAll( ImmutableMap.of( SNAPSHOT_SOURCE_PROP, DELTA_SOURCE_VALUE, ORIGINAL_LOCATION_PROP, originalLocation)); @@ -432,10 +534,11 @@ private void tagCurrentSnapshot(long deltaVersion, Transaction transaction) { ManageSnapshots manageSnapshots = transaction.manageSnapshots(); manageSnapshots.createTag(DELTA_VERSION_TAG_PREFIX + deltaVersion, currentSnapshotId); - Timestamp deltaVersionTimestamp = deltaLog.getCommitInfoAt(deltaVersion).getTimestamp(); - if (deltaVersionTimestamp != null) { + long deltaVersionTimestamp = + deltaTable.getSnapshotAsOfVersion(deltaEngine, deltaVersion).getTimestamp(deltaEngine); + if (deltaVersionTimestamp > 0) { manageSnapshots.createTag( - DELTA_TIMESTAMP_TAG_PREFIX + deltaVersionTimestamp.getTime(), currentSnapshotId); + DELTA_TIMESTAMP_TAG_PREFIX + deltaVersionTimestamp, currentSnapshotId); } manageSnapshots.commit(); } diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaActionUtils.java b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaActionUtils.java new file mode 100644 index 000000000000..7759e8648cdb --- /dev/null +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaActionUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.delta; + +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; + +public final class DeltaActionUtils { + private static final String ADD = "add"; + private static final String REMOVE = "remove"; + + private DeltaActionUtils() {} + + public static boolean isAdd(Row row) { + return !row.isNullAt(indexOf(row, ADD)); + } + + public static boolean isRemove(Row row) { + return !row.isNullAt(indexOf(row, REMOVE)); + } + + public static AddFile getAdd(Row row) { + return new AddFile(row.getStruct(indexOf(row, ADD))); + } + + public static RemoveFile getRemove(Row row) { + return new RemoveFile(row.getStruct(indexOf(row, REMOVE))); + } + + private static int indexOf(Row row, String name) { + int idx = row.getSchema().indexOf(name); + if (idx < 0) { + throw new IllegalArgumentException("Missing column in Delta action row: " + name); + } + return idx; + } +} diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java index 8af654a97b38..e726b6af8777 100644 --- a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeDataTypeVisitor.java @@ -18,19 +18,19 @@ */ package org.apache.iceberg.delta; -import io.delta.standalone.types.ArrayType; -import io.delta.standalone.types.DataType; -import io.delta.standalone.types.MapType; -import io.delta.standalone.types.StructField; -import io.delta.standalone.types.StructType; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; import java.util.List; import org.apache.iceberg.relocated.com.google.common.collect.Lists; abstract class DeltaLakeDataTypeVisitor { public static T visit(DataType type, DeltaLakeDataTypeVisitor visitor) { if (type instanceof StructType) { - StructField[] fields = ((StructType) type).getFields(); - List fieldResults = Lists.newArrayListWithExpectedSize(fields.length); + List fields = ((StructType) type).fields(); + List fieldResults = Lists.newArrayListWithExpectedSize(fields.size()); for (StructField field : fields) { fieldResults.add(visitor.field(field, visit(field.getDataType(), visitor))); diff --git a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java index 752fb11ba04c..6673785a0cbc 100644 --- a/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java +++ b/delta-lake/src/main/java/org/apache/iceberg/delta/DeltaLakeTypeToType.java @@ -18,23 +18,24 @@ */ package org.apache.iceberg.delta; -import io.delta.standalone.types.ArrayType; -import io.delta.standalone.types.BinaryType; -import io.delta.standalone.types.BooleanType; -import io.delta.standalone.types.ByteType; -import io.delta.standalone.types.DataType; -import io.delta.standalone.types.DateType; -import io.delta.standalone.types.DecimalType; -import io.delta.standalone.types.DoubleType; -import io.delta.standalone.types.FloatType; -import io.delta.standalone.types.IntegerType; -import io.delta.standalone.types.LongType; -import io.delta.standalone.types.MapType; -import io.delta.standalone.types.ShortType; -import io.delta.standalone.types.StringType; -import io.delta.standalone.types.StructField; -import io.delta.standalone.types.StructType; -import io.delta.standalone.types.TimestampType; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampNTZType; +import io.delta.kernel.types.TimestampType; import java.util.List; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -51,7 +52,7 @@ class DeltaLakeTypeToType extends DeltaLakeDataTypeVisitor { DeltaLakeTypeToType(StructType root) { this.root = root; - this.nextId = root.getFields().length; + this.nextId = root.fields().size(); } private int getNextId() { @@ -63,11 +64,11 @@ private int getNextId() { @Override @SuppressWarnings("ReferenceEquality") public Type struct(StructType struct, List types) { - StructField[] fields = struct.getFields(); - List newFields = Lists.newArrayListWithExpectedSize(fields.length); + List fields = struct.fields(); + List newFields = Lists.newArrayListWithExpectedSize(fields.size()); boolean isRoot = root == struct; - for (int i = 0; i < fields.length; i += 1) { - StructField field = fields[i]; + for (int i = 0; i < fields.size(); i += 1) { + StructField field = fields.get(i); Type type = types.get(i); int id; @@ -109,7 +110,7 @@ public Type array(ArrayType array, Type elementType) { @Override public Type map(MapType map, Type keyType, Type valueType) { - if (map.valueContainsNull()) { + if (map.isValueContainsNull()) { return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType); } else { return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType); @@ -145,6 +146,9 @@ public Type atomic(DataType atomic) { } else if (atomic instanceof TimestampType) { return Types.TimestampType.withZone(); + } else if (atomic instanceof TimestampNTZType) { + return Types.TimestampType.withoutZone(); + } else if (atomic instanceof DecimalType) { return Types.DecimalType.of( ((DecimalType) atomic).getPrecision(), ((DecimalType) atomic).getScale()); @@ -152,6 +156,6 @@ public Type atomic(DataType atomic) { return Types.BinaryType.get(); } - throw new ValidationException("Not a supported type: %s", atomic.getCatalogString()); + throw new ValidationException("Not a supported type: %s", atomic.toString()); } } diff --git a/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java b/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java index 85dc5c502998..b7ba8b4c01cd 100644 --- a/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java +++ b/delta-lake/src/test/java/org/apache/iceberg/delta/TestDeltaLakeTypeToType.java @@ -19,19 +19,17 @@ package org.apache.iceberg.delta; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import io.delta.standalone.types.ArrayType; -import io.delta.standalone.types.BinaryType; -import io.delta.standalone.types.BooleanType; -import io.delta.standalone.types.DoubleType; -import io.delta.standalone.types.LongType; -import io.delta.standalone.types.MapType; -import io.delta.standalone.types.NullType; -import io.delta.standalone.types.StringType; -import io.delta.standalone.types.StructType; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; import org.apache.iceberg.Schema; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeEach; @@ -44,30 +42,25 @@ public class TestDeltaLakeTypeToType { private static final String STRUCT_ARRAY_TYPE = "testStructArrayType"; private static final String INNER_ATOMIC_SCHEMA = "testInnerAtomicSchema"; private static final String STRING_LONG_MAP_TYPE = "testStringLongMap"; - private static final String NULL_TYPE = "testNullType"; private StructType deltaAtomicSchema; private StructType deltaNestedSchema; - private StructType deltaShallowNullTypeSchema; - private StructType deltaNullTypeSchema; @BeforeEach public void constructDeltaLakeSchema() { deltaAtomicSchema = new StructType() - .add(OPTIONAL_BOOLEAN_TYPE, new BooleanType()) - .add(REQUIRED_BINARY_TYPE, new BinaryType(), false); + .add(new StructField(OPTIONAL_BOOLEAN_TYPE, BooleanType.BOOLEAN, true)) + .add(new StructField(REQUIRED_BINARY_TYPE, BinaryType.BINARY, false)); deltaNestedSchema = new StructType() - .add(INNER_ATOMIC_SCHEMA, deltaAtomicSchema) - .add(DOUBLE_ARRAY_TYPE, new ArrayType(new DoubleType(), true), false) - .add(STRUCT_ARRAY_TYPE, new ArrayType(deltaAtomicSchema, true), false) - .add(STRING_LONG_MAP_TYPE, new MapType(new StringType(), new LongType(), false), false); - deltaNullTypeSchema = - new StructType() - .add(INNER_ATOMIC_SCHEMA, deltaAtomicSchema) - .add(DOUBLE_ARRAY_TYPE, new ArrayType(new DoubleType(), true), false) - .add(STRING_LONG_MAP_TYPE, new MapType(new NullType(), new LongType(), false), false); - deltaShallowNullTypeSchema = new StructType().add(NULL_TYPE, new NullType(), false); + .add(new StructField(INNER_ATOMIC_SCHEMA, deltaAtomicSchema, true)) + .add(new StructField(DOUBLE_ARRAY_TYPE, new ArrayType(DoubleType.DOUBLE, true), false)) + .add(new StructField(STRUCT_ARRAY_TYPE, new ArrayType(deltaAtomicSchema, true), false)) + .add( + new StructField( + STRING_LONG_MAP_TYPE, + new MapType(StringType.STRING, LongType.LONG, false), + false)); } @Test @@ -162,21 +155,4 @@ public void testNestedTypeConversion() { .isRequired()) .isTrue(); } - - @Test - public void testNullTypeConversion() { - assertThatThrownBy( - () -> - DeltaLakeDataTypeVisitor.visit( - deltaNullTypeSchema, new DeltaLakeTypeToType(deltaNullTypeSchema))) - .isInstanceOf(ValidationException.class) - .hasMessage("Not a supported type: %s", new NullType().getCatalogString()); - assertThatThrownBy( - () -> - DeltaLakeDataTypeVisitor.visit( - deltaShallowNullTypeSchema, - new DeltaLakeTypeToType(deltaShallowNullTypeSchema))) - .isInstanceOf(ValidationException.class) - .hasMessage("Not a supported type: %s", new NullType().getCatalogString()); - } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4501b135bf77..812c92516a06 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -41,7 +41,8 @@ caffeine = "2.9.3" calcite = "1.41.0" comet = "0.12.0" datasketches = "6.2.0" -delta-standalone = "3.3.2" +delta-kernel = "4.0.0" +delta-kerneldefaults = "4.0.0" delta-spark = "3.3.2" derby = "10.15.2.0" esotericsoftware-kryo = "4.0.3" @@ -115,7 +116,8 @@ caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "c calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" } calcite-druid = { module = "org.apache.calcite:calcite-druid", version.ref = "calcite" } datasketches = { module = "org.apache.datasketches:datasketches-java", version.ref = "datasketches" } -delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" } +delta-kernel-api = { module = "io.delta:delta-kernel-api", version.ref = "delta-kernel" } +delta-kernel-defaults = { module = "io.delta:delta-kernel-defaults", version.ref = "delta-kernel" } errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" } failsafe = { module = "dev.failsafe:failsafe", version.ref = "failsafe"} findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" }