feat(table): roll parquet files based on actual compressed size#759
feat(table): roll parquet files based on actual compressed size#759twuebi wants to merge 5 commits intoapache:mainfrom
Conversation
zeroshade
left a comment
There was a problem hiding this comment.
A few questions based on my reading of the code
| // current row group — matching the size estimate used by iceberg-java and | ||
| // iceberg-rust to make rolling decisions. | ||
| func (w *ParquetFileWriter) BytesWritten() int64 { | ||
| return w.counter.Count + w.pqWriter.RowGroupTotalCompressedBytes() |
There was a problem hiding this comment.
isn't this going to double count since RowGroupTotalCompressedBytes would also count what has been flushed, not only what's still buffered?
There was a problem hiding this comment.
// RowGroupTotalCompressedBytes returns the total number of bytes after compression
// that have been written to the current row group so far.
func (fw *FileWriter) RowGroupTotalCompressedBytes() int64 {
if fw.rgw != nil {
return fw.rgw.TotalCompressedBytes()
}
return 0
}
// RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in
// the current row group.
func (fw *FileWriter) RowGroupTotalBytesWritten() int64 {
if fw.rgw != nil {
return fw.rgw.TotalBytesWritten()
}
return 0
}Since we're just using RowGroupTotalCompressedBytes here, I don't think we're double counting, w.counter wraps the FileWriter so it should only count what has been flushed, RowGroupTotalCompressedBytes only counts what compressed into the current row group, in contrast to RowGroupTotalBytesWritten which would also have the flushed bytes.
Also added a test TestBytesWrittenNoDoubleCountAcrossRowGroups
There was a problem hiding this comment.
TotalCompressedBytes would include what has already been flushed + what is still in memory. The important bits that it won't count are footers. RowGroupTotalBytesWritten only counts what has been flushed
table/rolling_data_writer.go
Outdated
| fileSchema = sanitized | ||
| } | ||
|
|
||
| format := tblutils.GetFileFormat(iceberg.ParquetFile) |
There was a problem hiding this comment.
this shouldn't be hardcoded as Parquet should it? Shouldn't we get this from a table config/write config property?
table/rolling_data_writer.go
Outdated
| ID: cnt, | ||
| PartitionID: partitionID, | ||
| FileCount: fileCount, | ||
| }.GenerateDataFileName("parquet") |
There was a problem hiding this comment.
same as above, shouldn't we pull "parquet" from the config options instead of hardcoding it?
| } | ||
| }() | ||
|
|
||
| binPackedRecords := binPackRecords(recordIter, defaultBinPackLookback, r.factory.targetFileSize) |
There was a problem hiding this comment.
why are we dropping the binPacking of the records? I don't see it getting used elsewhere, Unless the user is explicitly saying that they are providing sorted records and sorted data, we should allow the binpacking to keep file sizes down, right? Or is there a reason why we're removing the binpacking?
There was a problem hiding this comment.
My understanding is, that the binpacking was primarily used to write files that are equal or smaller than the target file size, not considering compression, with this change, we're tracking actual file sizes and no longer need to binpack records based on estimates. This is modeled after iceberg-javas RollingFileWriter / BaseRollingWriter
This change refactors datafile writing to use the actual written file size as iceberg-java & iceberg-rust do instead of the in-memory size.