Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
Expand Down
10 changes: 2 additions & 8 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/

#include <charconv>
#include <format>
#include <mutex>
#include <sstream>
Expand All @@ -40,6 +39,7 @@
#include "iceberg/schema_util_internal.h"
#include "iceberg/util/formatter.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/visit_type.h"

namespace iceberg::avro {
Expand Down Expand Up @@ -471,13 +471,7 @@ Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string& attr_name,
return InvalidSchema("Missing avro attribute: {}", attr_name);
}

int32_t id;
const auto& id_value = id_str.value();
auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + id_value.size(), id);
if (ec != std::errc()) {
return InvalidSchema("Invalid {}: {}", attr_name, id_value);
}
return id;
return StringUtils::ParseInt<int32_t>(id_str.value());
}

Result<int32_t> GetElementId(const ::avro::NodePtr& node) {
Expand Down
16 changes: 12 additions & 4 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
if (snapshot.operation().has_value()) {
if (snapshot.Operation().has_value()) {
json[kSummary] = snapshot.summary;
}
SetOptionalField(json, kSchemaId, snapshot.schema_id);
Expand Down Expand Up @@ -1553,9 +1553,17 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), snapshot_id, type,
min_snapshots, max_snapshot_age,
max_ref_age);
if (type == SnapshotRefType::kTag) {
ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *tag);
} else {
ICEBERG_CHECK(type == SnapshotRefType::kBranch,
"Expected branch type for snapshot ref");
ICEBERG_ASSIGN_OR_RAISE(auto branch,
SnapshotRef::MakeBranch(snapshot_id, min_snapshots,
max_snapshot_age, max_ref_age));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *branch);
}
}
if (action == kActionSetProperties) {
using StringMap = std::unordered_map<std::string, std::string>;
Expand Down
11 changes: 3 additions & 8 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,23 +369,18 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
std::optional<ManifestContent> content, std::optional<int64_t> first_row_id) {
ManifestContent content, std::optional<int64_t> first_row_id) {
switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema));
case 2:
ICEBERG_PRECHECK(content.has_value(),
"ManifestContent is required for format version 2");
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema),
content.value());
std::move(partition_spec), std::move(current_schema), content);
case 3:
ICEBERG_PRECHECK(content.has_value(),
"ManifestContent is required for format version 3");
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
std::move(file_io), std::move(partition_spec),
std::move(current_schema), content.value());
std::move(current_schema), content);
default:
return NotSupported("Format version {} is not supported", format_version);
}
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/manifest/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "iceberg/file_writer.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/metrics.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
Expand Down Expand Up @@ -175,7 +176,7 @@ class ICEBERG_EXPORT ManifestWriter {
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema,
std::optional<ManifestContent> content = std::nullopt,
ManifestContent content = ManifestContent::kData,
std::optional<int64_t> first_row_id = std::nullopt);

private:
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
Expand Down
103 changes: 102 additions & 1 deletion src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

#include "iceberg/snapshot.h"

#include <memory>

#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"

namespace iceberg {

Expand All @@ -49,6 +52,55 @@ SnapshotRefType SnapshotRef::type() const noexcept {
retention);
}

Status SnapshotRef::Validate() const {
if (type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<Branch>(this->retention);
ICEBERG_CHECK(!branch.min_snapshots_to_keep.has_value() ||
branch.min_snapshots_to_keep.value() > 0,
"Min snapshots to keep must be greater than 0");
ICEBERG_CHECK(
!branch.max_snapshot_age_ms.has_value() || branch.max_snapshot_age_ms.value() > 0,
"Max snapshot age must be greater than 0 ms");
ICEBERG_CHECK(!branch.max_ref_age_ms.has_value() || branch.max_ref_age_ms.value() > 0,
"Max reference age must be greater than 0");
} else {
const auto& tag = std::get<Tag>(this->retention);
ICEBERG_CHECK(!tag.max_ref_age_ms.has_value() || tag.max_ref_age_ms.value() > 0,
"Max reference age must be greater than 0");
}
return {};
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeBranch(
int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep,
std::optional<int64_t> max_snapshot_age_ms, std::optional<int64_t> max_ref_age_ms) {
auto ref = std::make_unique<SnapshotRef>(
SnapshotRef{.snapshot_id = snapshot_id,
.retention = Branch{
.min_snapshots_to_keep = min_snapshots_to_keep,
.max_snapshot_age_ms = max_snapshot_age_ms,
.max_ref_age_ms = max_ref_age_ms,
}});
ICEBERG_RETURN_UNEXPECTED(ref->Validate());
return ref;
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeTag(
int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms) {
auto ref = std::make_unique<SnapshotRef>(SnapshotRef{
.snapshot_id = snapshot_id, .retention = Tag{.max_ref_age_ms = max_ref_age_ms}});
ICEBERG_RETURN_UNEXPECTED(ref->Validate());
return ref;
}

std::unique_ptr<SnapshotRef> SnapshotRef::Clone(
std::optional<int64_t> new_snapshot_id) const {
auto ref = std::make_unique<SnapshotRef>();
ref->snapshot_id = new_snapshot_id.value_or(snapshot_id);
ref->retention = retention;
return ref;
}

bool SnapshotRef::Equals(const SnapshotRef& other) const {
if (this == &other) {
return true;
Expand All @@ -67,14 +119,32 @@ bool SnapshotRef::Equals(const SnapshotRef& other) const {
}
}

std::optional<std::string_view> Snapshot::operation() const {
std::optional<std::string_view> Snapshot::Operation() const {
auto it = summary.find(SnapshotSummaryFields::kOperation);
if (it != summary.end()) {
return it->second;
}
return std::nullopt;
}

Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
auto it = summary.find(SnapshotSummaryFields::kFirstRowId);
if (it == summary.end()) {
return std::nullopt;
}

return StringUtils::ParseInt<int64_t>(it->second);
}

Result<std::optional<int64_t>> Snapshot::AddedRows() const {
auto it = summary.find(SnapshotSummaryFields::kAddedRows);
if (it == summary.end()) {
return std::nullopt;
}

return StringUtils::ParseInt<int64_t>(it->second);
}

bool Snapshot::Equals(const Snapshot& other) const {
if (this == &other) {
return true;
Expand All @@ -85,6 +155,37 @@ bool Snapshot::Equals(const Snapshot& other) const {
schema_id == other.schema_id;
}

Result<std::unique_ptr<Snapshot>> Snapshot::Make(
int64_t sequence_number, int64_t snapshot_id,
std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
std::string operation, std::unordered_map<std::string, std::string> summary,
std::optional<int32_t> schema_id, std::string manifest_list,
std::optional<int64_t> first_row_id, std::optional<int64_t> added_rows) {
ICEBERG_PRECHECK(!operation.empty(), "Operation cannot be empty");
ICEBERG_PRECHECK(!first_row_id.has_value() || first_row_id.value() >= 0,
"Invalid first-row-id (cannot be negative): {}", first_row_id.value());
ICEBERG_PRECHECK(!added_rows.has_value() || added_rows.value() >= 0,
"Invalid added-rows (cannot be negative): {}", added_rows.value());
ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(),
"Missing added-rows when first-row-id is set");
summary[SnapshotSummaryFields::kOperation] = operation;
if (first_row_id.has_value()) {
summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(first_row_id.value());
}
if (added_rows.has_value()) {
summary[SnapshotSummaryFields::kAddedRows] = std::to_string(added_rows.value());
}
return std::make_unique<Snapshot>(Snapshot{
.snapshot_id = snapshot_id,
.parent_snapshot_id = parent_snapshot_id,
.sequence_number = sequence_number,
.timestamp_ms = timestamp_ms,
.manifest_list = std::move(manifest_list),
.summary = std::move(summary),
.schema_id = schema_id,
});
}

Result<SnapshotCache::ManifestsCache> SnapshotCache::InitManifestsCache(
const Snapshot* snapshot, std::shared_ptr<FileIO> file_io) {
if (file_io == nullptr) {
Expand Down
74 changes: 71 additions & 3 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <variant>

#include "iceberg/iceberg_export.h"
Expand Down Expand Up @@ -114,6 +113,39 @@ struct ICEBERG_EXPORT SnapshotRef {

SnapshotRefType type() const noexcept;

/// \brief Create a branch reference
///
/// \param snapshot_id The snapshot ID for the branch
/// \param min_snapshots_to_keep Optional minimum number of snapshots to keep
/// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds
/// \param max_ref_age_ms Optional maximum reference age in milliseconds
/// \return A Result containing a unique_ptr to the SnapshotRef, or an error if
/// validation failed
static Result<std::unique_ptr<SnapshotRef>> MakeBranch(
int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep = std::nullopt,
std::optional<int64_t> max_snapshot_age_ms = std::nullopt,
std::optional<int64_t> max_ref_age_ms = std::nullopt);

/// \brief Create a tag reference
///
/// \param snapshot_id The snapshot ID for the tag
/// \param max_ref_age_ms Optional maximum reference age in milliseconds
/// \return A Result containing a unique_ptr to the SnapshotRef, or an error if
/// validation failed
static Result<std::unique_ptr<SnapshotRef>> MakeTag(
int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms = std::nullopt);

/// \brief Clone this SnapshotRef with an optional new snapshot ID
///
/// \param new_snapshot_id Optional new snapshot ID. If not provided, uses the current
/// snapshot_id
/// \return A unique_ptr to the cloned SnapshotRef
std::unique_ptr<SnapshotRef> Clone(
std::optional<int64_t> new_snapshot_id = std::nullopt) const;

/// \brief Validate the SnapshotRef
Status Validate() const;

/// \brief Compare two snapshot refs for equality
friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
return lhs.Equals(rhs);
Expand All @@ -125,9 +157,13 @@ struct ICEBERG_EXPORT SnapshotRef {
};

/// \brief Optional Snapshot Summary Fields
struct SnapshotSummaryFields {
struct ICEBERG_EXPORT SnapshotSummaryFields {
/// \brief The operation field key
inline static const std::string kOperation = "operation";
/// \brief The first row id field key
inline static const std::string kFirstRowId = "first-row-id";
/// \brief The added rows field key
inline static const std::string kAddedRows = "added-rows";

/// Metrics, see https://iceberg.apache.org/spec/#metrics

Expand Down Expand Up @@ -246,12 +282,44 @@ struct ICEBERG_EXPORT Snapshot {
/// ID of the table's current schema when the snapshot was created.
std::optional<int32_t> schema_id;

/// \brief Create a new Snapshot instance with validation on the inputs.
static Result<std::unique_ptr<Snapshot>> Make(
int64_t sequence_number, int64_t snapshot_id,
std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
std::string operation, std::unordered_map<std::string, std::string> summary,
std::optional<int32_t> schema_id, std::string manifest_list,
std::optional<int64_t> first_row_id = std::nullopt,
std::optional<int64_t> added_rows = std::nullopt);

/// \brief Return the name of the DataOperations data operation that produced this
/// snapshot.
///
/// \return the operation that produced this snapshot, or nullopt if the operation is
/// unknown.
std::optional<std::string_view> operation() const;
std::optional<std::string_view> Operation() const;

/// \brief The row-id of the first newly added row in this snapshot.
///
/// All rows added in this snapshot will have a row-id assigned to them greater than
/// this value. All rows with a row-id less than this value were created in a snapshot
/// that was added to the table (but not necessarily committed to this branch) in the
/// past.
///
/// \return the first row-id to be used in this snapshot or nullopt when row lineage
/// is not supported
Result<std::optional<int64_t>> FirstRowId() const;

/// \brief The upper bound of number of rows with assigned row IDs in this snapshot.
///
/// It can be used safely to increment the table's `next-row-id` during a commit. It
/// can be more than the number of rows added in this snapshot and include some
/// existing rows.
///
/// This field is optional but is required when the table version supports row lineage.
///
/// \return the upper bound of number of rows with assigned row IDs in this snapshot
/// or nullopt if the value was not stored.
Result<std::optional<int64_t>> AddedRows() const;

/// \brief Compare two snapshots for equality.
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
Expand Down
Loading
Loading