diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 35c312f60..9ff802a17 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -49,6 +49,7 @@ set(ICEBERG_SOURCES manifest/manifest_group.cc manifest/manifest_list.cc manifest/manifest_reader.cc + manifest/manifest_util.cc manifest/manifest_writer.cc manifest/rolling_manifest_writer.cc manifest/v1_metadata.cc @@ -85,6 +86,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/expire_snapshots.cc + update/fast_append.cc update/pending_update.cc update/snapshot_update.cc update/update_location.cc diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h index 89001f09c..1d5941626 100644 --- a/src/iceberg/constants.h +++ b/src/iceberg/constants.h @@ -32,6 +32,7 @@ namespace iceberg { constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; constexpr int64_t kInvalidSnapshotId = -1; +constexpr int64_t kInvalidSequenceNumber = -1; /// \brief Stand-in for the current sequence number that will be assigned when the commit /// is successful. This is replaced when writing a manifest list by the ManifestFile /// adapter. diff --git a/src/iceberg/inheritable_metadata.cc b/src/iceberg/inheritable_metadata.cc index 1d740b5c3..7ff2ddbcb 100644 --- a/src/iceberg/inheritable_metadata.cc +++ b/src/iceberg/inheritable_metadata.cc @@ -21,14 +21,16 @@ #include -#include - #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" -#include "iceberg/snapshot.h" namespace iceberg { +InheritableMetadata::~InheritableMetadata() = default; +BaseInheritableMetadata::~BaseInheritableMetadata() = default; +CopyInheritableMetadata::~CopyInheritableMetadata() = default; +EmptyInheritableMetadata::~EmptyInheritableMetadata() = default; + BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id, int64_t sequence_number, std::string manifest_location) diff --git a/src/iceberg/inheritable_metadata.h b/src/iceberg/inheritable_metadata.h index f06693a42..8b5ddadc7 100644 --- a/src/iceberg/inheritable_metadata.h +++ b/src/iceberg/inheritable_metadata.h @@ -39,7 +39,7 @@ namespace iceberg { /// from the manifest file. This interface provides a way to apply such inheritance rules. class ICEBERG_EXPORT InheritableMetadata { public: - virtual ~InheritableMetadata() = default; + virtual ~InheritableMetadata(); /// \brief Apply inheritable metadata to a manifest entry. /// \param entry The manifest entry to modify. @@ -61,6 +61,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata { Status Apply(ManifestEntry& entry) override; + ~BaseInheritableMetadata() override; + private: int32_t spec_id_; int64_t snapshot_id_; @@ -72,6 +74,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata { class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata { public: Status Apply(ManifestEntry& entry) override; + + ~EmptyInheritableMetadata() override; }; /// \brief Metadata inheritance for copying manifests before commit. @@ -83,6 +87,8 @@ class ICEBERG_EXPORT CopyInheritableMetadata : public InheritableMetadata { Status Apply(ManifestEntry& entry) override; + ~CopyInheritableMetadata() override; + private: int64_t snapshot_id_; }; diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 693b9fc55..53100b236 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -32,6 +32,7 @@ #include "iceberg/expression/expression.h" #include "iceberg/expression/projections.h" #include "iceberg/file_format.h" +#include "iceberg/inheritable_metadata.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader_internal.h" @@ -998,18 +999,22 @@ Result> ManifestReader::Make( } Result> ManifestReader::Make( - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec) { - if (file_io == nullptr || schema == nullptr || spec == nullptr) { - return InvalidArgument( - "FileIO, Schema, and PartitionSpec cannot be null to create ManifestReader"); + std::string_view manifest_location, std::optional manifest_length, + std::shared_ptr file_io, std::shared_ptr schema, + std::shared_ptr spec, + std::unique_ptr inheritable_metadata, + std::optional first_row_id) { + ICEBERG_PRECHECK(file_io != nullptr, "FileIO cannot be null to read manifest"); + ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null to read manifest"); + ICEBERG_PRECHECK(spec != nullptr, "PartitionSpec cannot be null to read manifest"); + + if (inheritable_metadata == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(inheritable_metadata, InheritableMetadataFactory::Empty()); } - // No metadata to inherit in this case. - ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty()); return std::make_unique( - std::string(manifest_location), std::nullopt, std::move(file_io), std::move(schema), - std::move(spec), std::move(inheritable_metadata), std::nullopt); + std::string(manifest_location), manifest_length, std::move(file_io), + std::move(schema), std::move(spec), std::move(inheritable_metadata), first_row_id); } Result> ManifestListReader::Make( diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index ddfefc57d..1a1420216 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -22,7 +22,9 @@ /// \file iceberg/manifest/manifest_reader.h /// Data reader interface for manifest files. +#include #include +#include #include #include #include @@ -92,13 +94,19 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Creates a reader for a manifest file. /// \param manifest_location Path to the manifest file. + /// \param manifest_length Length of the manifest file. /// \param file_io File IO implementation to use. /// \param schema Schema used to bind the partition type. /// \param spec Partition spec used for this manifest file. + /// \param inheritable_metadata Inheritable metadata. + /// \param first_row_id First row ID to use for the manifest entries. /// \return A Result containing the reader or an error. static Result> Make( - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec); + std::string_view manifest_location, std::optional manifest_length, + std::shared_ptr file_io, std::shared_ptr schema, + std::shared_ptr spec, + std::unique_ptr inheritable_metadata, + std::optional first_row_id = std::nullopt); /// \brief Add stats columns to the column list if needed. static std::vector WithStatsColumns( diff --git a/src/iceberg/manifest/manifest_util.cc b/src/iceberg/manifest/manifest_util.cc new file mode 100644 index 000000000..12805452d --- /dev/null +++ b/src/iceberg/manifest/manifest_util.cc @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "iceberg/inheritable_metadata.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_util_internal.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result CopyAppendManifest( + const ManifestFile& manifest, const std::shared_ptr& file_io, + const std::shared_ptr& schema, const std::shared_ptr& spec, + int64_t snapshot_id, const std::string& output_path, int8_t format_version, + SnapshotSummaryBuilder* summary_builder) { + // use metadata that will add the current snapshot's ID for the rewrite + // read first_row_id as null because this copies the incoming manifest before commit + ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, + InheritableMetadataFactory::ForCopy(snapshot_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ManifestReader::Make(manifest.manifest_path, manifest.manifest_length, file_io, + schema, spec, std::move(inheritable_metadata), + /*first_row_id=*/std::nullopt)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + // do not produce row IDs for the copy + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter( + format_version, snapshot_id, output_path, file_io, spec, schema, + ManifestContent::kData, /*first_row_id=*/std::nullopt)); + + for (auto& entry : entries) { + ICEBERG_CHECK(entry.status == ManifestStatus::kAdded, + "Manifest to copy must only contain added entries"); + if (summary_builder != nullptr && entry.data_file != nullptr) { + ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file)); + } + + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } + + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_util_internal.h b/src/iceberg/manifest/manifest_util_internal.h new file mode 100644 index 000000000..e68437d08 --- /dev/null +++ b/src/iceberg/manifest/manifest_util_internal.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_util_internal.h +/// Internal utility functions for manifest operations. + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Copy an append manifest with a new snapshot ID. +/// +/// This function copies a manifest file that contains only ADDED entries, +/// rewriting it with a new snapshot ID. This is similar to Java's +/// ManifestFiles.copyAppendManifest. +/// +/// \param manifest The manifest file to copy +/// \param file_io File IO implementation to use +/// \param schema Table schema +/// \param spec Partition spec for the manifest +/// \param snapshot_id The new snapshot ID to assign to entries +/// \param output_path Path where the new manifest will be written +/// \param format_version Table format version +/// \param summary_builder Optional summary builder to update with file metrics +/// \return The copied manifest file, or an error +ICEBERG_EXPORT Result CopyAppendManifest( + const ManifestFile& manifest, const std::shared_ptr& file_io, + const std::shared_ptr& schema, const std::shared_ptr& spec, + int64_t snapshot_id, const std::string& output_path, int8_t format_version, + SnapshotSummaryBuilder* summary_builder = nullptr); + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 317b4fa9e..febe39c9a 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -67,6 +67,7 @@ iceberg_sources = files( 'manifest/manifest_group.cc', 'manifest/manifest_list.cc', 'manifest/manifest_reader.cc', + 'manifest/manifest_util.cc', 'manifest/manifest_writer.cc', 'manifest/rolling_manifest_writer.cc', 'manifest/v1_metadata.cc', @@ -103,6 +104,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/expire_snapshots.cc', + 'update/fast_append.cc', 'update/pending_update.cc', 'update/snapshot_update.cc', 'update/update_location.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 5c406debc..28ee285ab 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -199,6 +199,13 @@ Result> Table::NewUpdateLocation() { return transaction->NewUpdateLocation(); } +Result> Table::NewFastAppend() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewFastAppend(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index fd346e15a..75cad6e1e 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewUpdateLocation(); + /// \brief Create a new FastAppend to append data files and commit the changes. + virtual Result> NewFastAppend(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index a4165b814..6631b9d2d 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata { static constexpr int8_t kMinFormatVersionRowLineage = 3; static constexpr int8_t kMinFormatVersionDefaultValues = 3; static constexpr int64_t kInitialSequenceNumber = 0; - static constexpr int64_t kInvalidSequenceNumber = -1; static constexpr int64_t kInitialRowId = 0; static inline const std::unordered_map kMinFormatVersions = {}; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index d243a48bf..3414a862e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES expire_snapshots_test.cc + fast_append_test.cc transaction_test.cc update_location_test.cc update_partition_spec_test.cc diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc new file mode 100644 index 000000000..7c79d5e9e --- /dev/null +++ b/src/iceberg/test/fast_append_test.cc @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/fast_append.h" + +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/test_resource.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +class FastAppendTest : public UpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + InitializeFileIO(); + // Use minimal metadata for FastAppend tests + RegisterTableFromResource("TableMetadataV2ValidMinimal.json"); + + // Get partition spec and schema from the base table + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + // Create test data files + file_a_ = + CreateDataFile("/data/file_a.parquet", /*size=*/100, /*partition_value=*/1024); + file_b_ = + CreateDataFile("/data/file_b.parquet", /*size=*/200, /*partition_value=*/2048); + } + + std::shared_ptr CreateDataFile(const std::string& path, int64_t record_count, + int64_t size, int64_t partition_value = 0) { + auto data_file = std::make_shared(); + data_file->content = DataFile::Content::kData; + data_file->file_path = table_location_ + path; + data_file->file_format = FileFormatType::kParquet; + // The base table has partition spec with identity(x), so we need 1 partition value + data_file->partition = + PartitionValues(std::vector{Literal::Long(partition_value)}); + data_file->file_size_in_bytes = size; + data_file->record_count = record_count; + data_file->partition_spec_id = spec_->spec_id(); + return data_file; + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(FastAppendTest, AppendDataFile) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("added-data-files"), "1"); + EXPECT_EQ(snapshot->summary.at("added-records"), "100"); + EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024"); +} + +TEST_F(FastAppendTest, AppendMultipleDataFiles) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + fast_append->AppendFile(file_b_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("added-data-files"), "2"); + EXPECT_EQ(snapshot->summary.at("added-records"), "300"); + EXPECT_EQ(snapshot->summary.at("added-files-size"), "3072"); +} + +TEST_F(FastAppendTest, AppendManyFiles) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + + int64_t total_records = 0; + int64_t total_size = 0; + constexpr int kFileCount = 10; + for (int index = 0; index < kFileCount; ++index) { + auto data_file = CreateDataFile(std::format("/data/file_{}.parquet", index), + /*record_count=*/10 + index, + /*size=*/100 + index * 10, + /*partition_value=*/index % 2); + total_records += data_file->record_count; + total_size += data_file->file_size_in_bytes; + fast_append->AppendFile(std::move(data_file)); + } + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("added-data-files"), std::to_string(kFileCount)); + EXPECT_EQ(snapshot->summary.at("added-records"), std::to_string(total_records)); + EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size)); +} + +TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) { + EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot")); + const int64_t base_sequence_number = table_->metadata()->last_sequence_number; + + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->sequence_number, base_sequence_number + 1); + EXPECT_EQ(table_->metadata()->last_sequence_number, base_sequence_number + 1); +} + +TEST_F(FastAppendTest, AppendNullFile) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(nullptr); + + auto result = fast_append->Commit(); + EXPECT_FALSE(result.has_value()); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); + EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot")); +} + +TEST_F(FastAppendTest, AppendDuplicateFile) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + fast_append->AppendFile(file_a_); // Add same file twice + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + // Should only count the file once + EXPECT_EQ(snapshot->summary.at("added-data-files"), "1"); + EXPECT_EQ(snapshot->summary.at("added-records"), "100"); +} + +TEST_F(FastAppendTest, SetSnapshotProperty) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->Set("custom-property", "custom-value"); + fast_append->AppendFile(file_a_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value"); +} + +} // namespace iceberg diff --git a/src/iceberg/test/manifest_writer_versions_test.cc b/src/iceberg/test/manifest_writer_versions_test.cc index 70d00504a..cc4e804be 100644 --- a/src/iceberg/test/manifest_writer_versions_test.cc +++ b/src/iceberg/test/manifest_writer_versions_test.cc @@ -27,6 +27,7 @@ #include "iceberg/arrow/arrow_file_io.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" @@ -411,12 +412,11 @@ class ManifestWriterVersionsTest : public ::testing::Test { TEST_F(ManifestWriterVersionsTest, TestV1Write) { auto manifest = WriteManifest(/*format_version=*/1, {data_file_}); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); - CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData); + CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber, + DataFile::Content::kData); } TEST_F(ManifestWriterVersionsTest, TestV1WriteDelete) { @@ -449,13 +449,12 @@ TEST_F(ManifestWriterVersionsTest, TestV1WriteWithInheritance) { TEST_F(ManifestWriterVersionsTest, TestV2Write) { auto manifest = WriteManifest(/*format_version=*/2, {data_file_}); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); ASSERT_EQ(manifest.content, ManifestContent::kData); - CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData); + CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber, + DataFile::Content::kData); } TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) { @@ -470,8 +469,7 @@ TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) { TEST_F(ManifestWriterVersionsTest, TestV2PlusWriteDeleteV2) { auto manifest = WriteDeleteManifest(/*format_version=*/2, delete_file_); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); ASSERT_EQ(manifest.content, ManifestContent::kDeletes); @@ -507,7 +505,7 @@ TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) { // rewrite the manifest file using a v2 manifest auto rewritten_manifest = RewriteManifest(manifests[0], 2); - CheckRewrittenManifest(rewritten_manifest, TableMetadata::kInvalidSequenceNumber, + CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber, TableMetadata::kInitialSequenceNumber); // add the v2 manifest to a v2 manifest list, with a sequence number @@ -525,14 +523,12 @@ TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) { TEST_F(ManifestWriterVersionsTest, TestV3Write) { auto manifest = WriteManifest(/*format_version=*/3, {data_file_}); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); ASSERT_EQ(manifest.content, ManifestContent::kData); - CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData, - ManifestStatus::kAdded, kFirstRowId); + CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber, + DataFile::Content::kData, ManifestStatus::kAdded, kFirstRowId); } TEST_F(ManifestWriterVersionsTest, TestV3WriteWithInheritance) { @@ -598,7 +594,7 @@ TEST_F(ManifestWriterVersionsTest, TestV3ManifestRewriteWithInheritance) { // rewrite the manifest file using a v3 manifest auto rewritten_manifest = RewriteManifest(manifests[0], 3); - CheckRewrittenManifest(rewritten_manifest, TableMetadata::kInvalidSequenceNumber, + CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber, TableMetadata::kInitialSequenceNumber); // add the v3 manifest to a v3 manifest list, with a sequence number diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index c78dc4d0e..c14cb76b9 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -41,6 +41,12 @@ namespace iceberg { class UpdateTestBase : public ::testing::Test { protected: void SetUp() override { + InitializeFileIO(); + RegisterTableFromResource("TableMetadataV2Valid.json"); + } + + /// \brief Initialize file IO and create necessary directories. + void InitializeFileIO() { file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); catalog_ = InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", /*properties=*/{}); @@ -50,12 +56,19 @@ class UpdateTestBase : public ::testing::Test { static_cast(*file_io_).fs()); ASSERT_TRUE(arrow_fs != nullptr); ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + } + + /// \brief Register a table from a metadata resource file. + /// + /// \param resource_name The name of the metadata resource file + void RegisterTableFromResource(const std::string& resource_name) { + // Drop existing table if it exists + std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false); // Write table metadata to the table location. auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", table_location_, Uuid::GenerateV7().ToString()); - ICEBERG_UNWRAP_OR_FAIL(auto metadata, - ReadTableMetadataFromResource("TableMetadataV2Valid.json")); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name)); metadata->location = table_location_; ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), IsOk()); diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 10a87e653..d10586a4c 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -23,7 +23,6 @@ #include #include "iceberg/catalog.h" -#include "iceberg/schema.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -31,6 +30,7 @@ #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/expire_snapshots.h" +#include "iceberg/update/fast_append.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_location.h" @@ -293,4 +293,11 @@ Result> Transaction::NewUpdateLocation() { return update_location; } +Result> Transaction::NewFastAppend() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr fast_append, + FastAppend::Make(table_->name().name, shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append)); + return fast_append; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 7133a3b5d..0f567312a 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -86,6 +86,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateLocation(); + /// \brief Create a new FastAppend to append data files and commit the changes. + Result> NewFastAppend(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 251334c14..5bf03a00d 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -120,6 +120,8 @@ struct SnapshotLogEntry; struct SnapshotRef; struct StatisticsFile; struct TableMetadata; +class InheritableMetadata; +class SnapshotSummaryBuilder; /// \brief Expression. class BoundPredicate; @@ -188,6 +190,7 @@ class Transaction; /// \brief Update family. class ExpireSnapshots; +class FastAppend; class PendingUpdate; class SnapshotUpdate; class UpdateLocation; @@ -200,7 +203,6 @@ class UpdateSortOrder; /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- -class AppendFiles; class EncryptedKey; } // namespace iceberg diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc new file mode 100644 index 000000000..c7f66f2fb --- /dev/null +++ b/src/iceberg/update/fast_append.cc @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/fast_append.h" + +#include +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_util_internal.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> FastAppend::Make( + std::string table_name, std::shared_ptr transaction) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create FastAppend without a transaction"); + return std::unique_ptr( + new FastAppend(std::move(table_name), std::move(transaction))); +} + +FastAppend::FastAppend(std::string table_name, std::shared_ptr transaction) + : SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {} + +FastAppend& FastAppend::AppendFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(), + "Data file must have partition spec ID"); + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id)); + + auto& data_files = new_data_files_by_spec_[spec_id]; + auto [iter, inserted] = data_files.insert(file); + if (inserted) { + has_new_files_ = true; + ICEBERG_BUILDER_RETURN_IF_ERROR(summary_.AddedFile(*spec, *file)); + } + + return *this; +} + +FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) { + ICEBERG_BUILDER_CHECK(!manifest.has_existing_files(), + "Cannot append manifest with existing files"); + ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(), + "Cannot append manifest with deleted files"); + ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId, + "Snapshot id must be assigned during commit"); + ICEBERG_BUILDER_CHECK(manifest.sequence_number == kInvalidSequenceNumber, + "Sequence number must be assigned during commit"); + + if (can_inherit_snapshot_id() && manifest.added_snapshot_id == kInvalidSnapshotId) { + summary_.AddedManifest(manifest); + append_manifests_.push_back(manifest); + } else { + // The manifest must be rewritten with this update's snapshot ID + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest)); + rewritten_append_manifests_.push_back(std::move(copied_manifest)); + } + + return *this; +} + +std::string FastAppend::operation() { return DataOperation::kAppend; } + +Result> FastAppend::Apply( + const TableMetadata& metadata_to_update, const std::shared_ptr& snapshot) { + std::vector manifests; + + ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests()); + manifests.reserve(new_written_manifests.size() + append_manifests_.size() + + rewritten_append_manifests_.size()); + if (!new_written_manifests.empty()) { + manifests.insert(manifests.end(), + std::make_move_iterator(new_written_manifests.begin()), + std::make_move_iterator(new_written_manifests.end())); + } + + // Transform append manifests and rewritten append manifests with snapshot ID + int64_t snapshot_id = SnapshotId(); + for (auto& manifest : append_manifests_) { + manifest.added_snapshot_id = snapshot_id; + } + for (auto& manifest : rewritten_append_manifests_) { + manifest.added_snapshot_id = snapshot_id; + } + manifests.insert(manifests.end(), append_manifests_.begin(), append_manifests_.end()); + manifests.insert(manifests.end(), rewritten_append_manifests_.begin(), + rewritten_append_manifests_.end()); + + // Add all manifests from the snapshot + if (snapshot != nullptr) { + auto cached_snapshot = SnapshotCache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, + cached_snapshot.Manifests(transaction_->table()->io())); + manifests.insert(manifests.end(), snapshot_manifests.begin(), + snapshot_manifests.end()); + } + + return manifests; +} + +std::unordered_map FastAppend::Summary() { + summary_.SetPartitionSummaryLimit( + base().properties.Get(TableProperties::kWritePartitionSummaryLimit)); + return summary_.Build(); +} + +void FastAppend::CleanUncommitted(const std::unordered_set& committed) { + // Clean up new manifests that were written but not committed + if (!new_manifests_.empty()) { + for (const auto& manifest : new_manifests_) { + if (!committed.contains(manifest.manifest_path)) { + std::ignore = DeleteFile(manifest.manifest_path); + } + } + new_manifests_.clear(); + } + + // Clean up only rewritten append manifests as they are always owned by the table + // Don't clean up append manifests as they are added to the manifest list and are + // not compacted + if (!rewritten_append_manifests_.empty()) { + for (const auto& manifest : rewritten_append_manifests_) { + if (!committed.contains(manifest.manifest_path)) { + std::ignore = DeleteFile(manifest.manifest_path); + } + } + } +} + +bool FastAppend::CleanupAfterCommit() const { + // Cleanup after committing is disabled for FastAppend unless there are + // rewritten_append_manifests_ because: + // 1.) Appended manifests are never rewritten + // 2.) Manifests which are written out as part of AppendFile are already cleaned + // up between commit attempts in WriteNewManifests + return !rewritten_append_manifests_.empty(); +} + +Result> FastAppend::Spec(int32_t spec_id) { + return base().PartitionSpecById(spec_id); +} + +Result FastAppend::CopyManifest(const ManifestFile& manifest) { + const TableMetadata& current = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + current.PartitionSpecById(manifest.partition_spec_id)); + + // Generate a unique manifest path using the transaction's metadata location + std::string new_manifest_path = ManifestPath(); + int64_t snapshot_id = SnapshotId(); + + // Copy the manifest with the new snapshot ID. + return CopyAppendManifest(manifest, transaction_->table()->io(), schema, spec, + snapshot_id, new_manifest_path, current.format_version, + &summary_); +} + +Result> FastAppend::WriteNewManifests() { + // If there are new files and manifests were already written, clean them up + if (has_new_files_ && !new_manifests_.empty()) { + for (const auto& manifest : new_manifests_) { + std::ignore = DeleteFile(manifest.manifest_path); + } + new_manifests_.clear(); + } + + // Write new manifests if there are new data files + if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) { + for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id)); + std::vector> files; + files.reserve(data_files.size()); + std::ranges::copy(data_files, std::back_inserter(files)); + ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec)); + new_manifests_.insert(new_manifests_.end(), + std::make_move_iterator(written_manifests.begin()), + std::make_move_iterator(written_manifests.end())); + } + has_new_files_ = false; + } + + return new_manifests_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h new file mode 100644 index 000000000..87887c74d --- /dev/null +++ b/src/iceberg/update/fast_append.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/fast_append.h + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/snapshot_update.h" +#include "iceberg/util/content_file_util.h" + +namespace iceberg { + +/// \brief Appending new files in a table. +/// +/// FastAppend is optimized for appending new data files to a table, it creates new +/// manifest files for the added data without compacting or rewriting existing manifests, +/// making it faster for write-heavy workloads. +class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { + public: + /// \brief Create a new FastAppend instance. + /// + /// \param table_name The name of the table + /// \param transaction The transaction to use for this update + /// \return A Result containing the FastAppend instance or an error + static Result> Make( + std::string table_name, std::shared_ptr transaction); + + /// \brief Append a data file to this update. + /// + /// \param file The data file to append + /// \return Reference to this for method chaining + FastAppend& AppendFile(const std::shared_ptr& file); + + /// \brief Append a manifest file to this update. + /// + /// The manifest must only contain added files (no existing or deleted files). + /// If the manifest doesn't have a snapshot ID assigned and snapshot ID inheritance + /// is enabled, it will be used directly. Otherwise, it will be copied with the + /// new snapshot ID. + /// + /// \param manifest The manifest file to append + /// \return Reference to this for method chaining + FastAppend& AppendManifest(const ManifestFile& manifest); + + std::string operation() override; + + Result> Apply( + const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) override; + std::unordered_map Summary() override; + void CleanUncommitted(const std::unordered_set& committed) override; + bool CleanupAfterCommit() const override; + + private: + explicit FastAppend(std::string table_name, std::shared_ptr transaction); + + /// \brief Get the partition spec by spec ID. + Result> Spec(int32_t spec_id); + + /// \brief Copy a manifest file with a new snapshot ID. + /// + /// \param manifest The manifest to copy + /// \return The copied manifest file + Result CopyManifest(const ManifestFile& manifest); + + /// \brief Write new manifests for the accumulated data files. + /// + /// \return A vector of manifest files, or an error + Result> WriteNewManifests(); + + private: + std::string table_name_; + std::unordered_map new_data_files_by_spec_; + std::vector append_manifests_; + std::vector rewritten_append_manifests_; + std::vector new_manifests_; + bool has_new_files_{false}; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 3387fd11a..4ca406842 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'expire_snapshots.h', + 'fast_append.h', 'pending_update.h', 'snapshot_update.h', 'update_location.h', diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 48ef1676f..f31327fcd 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -51,6 +51,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { ~SnapshotUpdate() override; + Kind kind() const override { return Kind::kUpdateSnapshot; } + /// \brief Set a callback to delete files instead of the table's default. /// /// \param delete_func A function used to delete file locations @@ -74,6 +76,16 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Set a summary property. + /// + /// \param property The property name + /// \param value The property value + /// \return Reference to this for method chaining + auto& Set(this auto& self, const std::string& property, const std::string& value) { + self.summary_.Set(property, value); + return self; + } + /// \brief Apply the update's changes to create a new snapshot. /// /// This method validates the changes, applies them to the metadata, @@ -95,6 +107,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \param spec The partition spec to use /// \param data_sequence_number Optional data sequence number for the files /// \return A vector of manifest files + /// TODO(xxx): Change signature to accept iterator begin/end instead of vector to avoid + /// intermediate vector allocations (e.g., from DataFileSet) Result> WriteDataManifests( const std::vector>& data_files, const std::shared_ptr& spec, @@ -167,6 +181,16 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Get or generate the snapshot ID for the new snapshot. int64_t SnapshotId(); + /// \brief Delete a file at the given path. + /// + /// \param path The path of the file to delete + /// \return A status indicating the result of the deletion + Status DeleteFile(const std::string& path); + + std::string ManifestPath(); + std::string ManifestListPath(); + SnapshotSummaryBuilder& summary_builder() { return summary_; } + private: /// \brief Returns the snapshot summary from the implementation and updates totals. Result> ComputeSummary( @@ -175,9 +199,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Clean up all uncommitted files void CleanAll(); - Status DeleteFile(const std::string& path); - std::string ManifestListPath(); - std::string ManifestPath(); + protected: + SnapshotSummaryBuilder summary_; private: const bool can_inherit_snapshot_id_{true}; diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index e173a41aa..95a8d6343 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -27,6 +27,7 @@ #include #include #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/manifest/manifest_entry.h" @@ -35,6 +36,72 @@ namespace iceberg { +/// \brief A set of DataFile pointers with insertion order preserved and deduplicated by +/// file path. +class ICEBERG_EXPORT DataFileSet { + public: + using value_type = std::shared_ptr; + using iterator = typename std::vector::iterator; + using const_iterator = typename std::vector::const_iterator; + using difference_type = typename std::vector::difference_type; + + DataFileSet() = default; + + /// \brief Insert a data file into the set. + /// \param file The data file to insert + /// \return A pair with an iterator to the inserted element (or the existing one) and + /// a bool indicating whether insertion took place + std::pair insert(const value_type& file) { return InsertImpl(file); } + + /// \brief Insert a data file into the set (move version). + std::pair insert(value_type&& file) { + return InsertImpl(std::move(file)); + } + + /// \brief Get the number of elements in the set. + size_t size() const { return elements_.size(); } + + /// \brief Check if the set is empty. + bool empty() const { return elements_.empty(); } + + /// \brief Clear all elements from the set. + void clear() { + elements_.clear(); + index_by_path_.clear(); + } + + /// \brief Get iterator to the beginning. + iterator begin() { return elements_.begin(); } + const_iterator begin() const { return elements_.begin(); } + const_iterator cbegin() const { return elements_.cbegin(); } + + /// \brief Get iterator to the end. + iterator end() { return elements_.end(); } + const_iterator end() const { return elements_.end(); } + const_iterator cend() const { return elements_.cend(); } + + private: + std::pair InsertImpl(value_type file) { + if (!file) { + return {elements_.end(), false}; + } + + auto [index_iter, inserted] = + index_by_path_.try_emplace(file->file_path, elements_.size()); + if (!inserted) { + auto pos = static_cast(index_iter->second); + return {elements_.begin() + pos, false}; + } + + elements_.push_back(std::move(file)); + return {std::prev(elements_.end()), true}; + } + + // Vector to preserve insertion order + std::vector elements_; + std::unordered_map index_by_path_; +}; + /// \brief Utility functions for content files. struct ICEBERG_EXPORT ContentFileUtil { /// \brief Check if a delete file is a deletion vector (DV).