Skip to content

feat(table): roll parquet files based on actual compressed size#759

Open
twuebi wants to merge 5 commits intoapache:mainfrom
twuebi:tp/parquet-file-writer
Open

feat(table): roll parquet files based on actual compressed size#759
twuebi wants to merge 5 commits intoapache:mainfrom
twuebi:tp/parquet-file-writer

Conversation

@twuebi
Copy link
Contributor

@twuebi twuebi commented Feb 26, 2026

This change refactors datafile writing to use the actual written file size as iceberg-java & iceberg-rust do instead of the in-memory size.

@twuebi twuebi changed the title wip: feat(table): roll parquet files based on actual compressed size feat(table): roll parquet files based on actual compressed size Mar 2, 2026
Copy link
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions based on my reading of the code

// current row group — matching the size estimate used by iceberg-java and
// iceberg-rust to make rolling decisions.
func (w *ParquetFileWriter) BytesWritten() int64 {
return w.counter.Count + w.pqWriter.RowGroupTotalCompressedBytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this going to double count since RowGroupTotalCompressedBytes would also count what has been flushed, not only what's still buffered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// RowGroupTotalCompressedBytes returns the total number of bytes after compression
// that have been written to the current row group so far.
func (fw *FileWriter) RowGroupTotalCompressedBytes() int64 {
	if fw.rgw != nil {
		return fw.rgw.TotalCompressedBytes()
	}
	return 0
}

// RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in
// the current row group.
func (fw *FileWriter) RowGroupTotalBytesWritten() int64 {
	if fw.rgw != nil {
		return fw.rgw.TotalBytesWritten()
	}
	return 0
}

Since we're just using RowGroupTotalCompressedBytes here, I don't think we're double counting, w.counter wraps the FileWriter so it should only count what has been flushed, RowGroupTotalCompressedBytes only counts what compressed into the current row group, in contrast to RowGroupTotalBytesWritten which would also have the flushed bytes.

Also added a test TestBytesWrittenNoDoubleCountAcrossRowGroups

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TotalCompressedBytes would include what has already been flushed + what is still in memory. The important bits that it won't count are footers. RowGroupTotalBytesWritten only counts what has been flushed

fileSchema = sanitized
}

format := tblutils.GetFileFormat(iceberg.ParquetFile)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be hardcoded as Parquet should it? Shouldn't we get this from a table config/write config property?

ID: cnt,
PartitionID: partitionID,
FileCount: fileCount,
}.GenerateDataFileName("parquet")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, shouldn't we pull "parquet" from the config options instead of hardcoding it?

}
}()

binPackedRecords := binPackRecords(recordIter, defaultBinPackLookback, r.factory.targetFileSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we dropping the binPacking of the records? I don't see it getting used elsewhere, Unless the user is explicitly saying that they are providing sorted records and sorted data, we should allow the binpacking to keep file sizes down, right? Or is there a reason why we're removing the binpacking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is, that the binpacking was primarily used to write files that are equal or smaller than the target file size, not considering compression, with this change, we're tracking actual file sizes and no longer need to binpack records based on estimates. This is modeled after iceberg-javas RollingFileWriter / BaseRollingWriter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants