Skip to content

Conversation

@FreemanDane
Copy link
Contributor

Purpose

Linked issue: close #xxx

Tests

API and Format

Documentation

from pypaimon.write.ray_datasink import PaimonDatasink
datasink = PaimonDatasink(dataset, overwrite=overwrite)
dataset.write_datasink(datasink, concurrency=parallelism)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can named it as write_ray, just list write_pandas, write_arrow and so on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However,the Dataset class is defined by Ray, and so is its method write_xxx. I think it need to create a PR on Ray? The api in paimon calls TableWrite.write_raydata.

Copy link
Contributor

@XiaoHongbo-Hope XiaoHongbo-Hope Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However,the Dataset class is defined by Ray, and so is its method write_xxx. I think it need to create a PR on Ray? The api in paimon calls TableWrite.write_raydata.

Maybe I commented the wrong line, should be line71

table_write = self.writer_builder.new_write()
for block in blocks:
block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow()
table_write.write_arrow(block_arrow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid to_arrow will cost a lot of memory, we can introduce some stream or iterable way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

staging bucket in S3.
"""
self.writer_builder: WriteBuilder= self.table.new_batch_write_builder()
if self.overwrite:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add test to show that writer_builder is serializable

"""
table_commit = self.writer_builder.new_commit()
table_commit.commit([commit_message for commit_messages in write_result.write_returns for commit_message in commit_messages])
table_commit.close()
Copy link
Contributor

@XiaoHongbo-Hope XiaoHongbo-Hope Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle write failure case too.

def write_raydata(self, dataset, overwrite=False, parallelism=1):
from pypaimon.write.ray_datasink import PaimonDatasink
datasink = PaimonDatasink(dataset, overwrite=overwrite)
dataset.write_datasink(datasink, concurrency=parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provided dataset, but PaimonDatasink init method needs a table here

return self

def write_raydata(self, dataset, overwrite=False, parallelism=1):
from pypaimon.write.ray_datasink import PaimonDatasink
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can user get the dataset in non test mode code, can you add a sample code for that

self.overwrite = overwrite

def on_write_start(self) -> None:
"""Callback for when a write job starts.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image the method change for ray 2.5x

[python] update api, add test

fix code format

fix code format
@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit 22a1e06 into apache:master Jan 5, 2026
4 checks passed
jerry-024 added a commit to jerry-024/paimon that referenced this pull request Jan 6, 2026
* upstream/master: (35 commits)
  [spark] Spark support vector search (apache#6950)
  [doc] update Apache Doris document with DLF 3.0 (apache#6954)
  [variant] Fix reading empty shredded variant via variantAccess (apache#6953)
  [python] support alterTable (apache#6952)
  [python] support ray data sink to paimon (apache#6883)
  [python] Rename to TableScan.withSlice to specific start_pos and end_pos
  [python] sync to_ray method args with ray data api (apache#6948)
  [python] light refactor for stats collect (apache#6941)
  [doc] Update cdc ingestion related docs
  [rest] Add tagNamePrefix definition for listTagsPaged (apache#6947)
  [python] support table scan with row range (apache#6944)
  [spark] Fix EqualNullSafe is not correct when column has null value. (apache#6943)
  [python] fix value_stats containing system fields for primary key tables (apache#6945)
  [test][rest] add test case for two sessions with cache for rest commitTable (apache#6438)
  [python] do not retry for connect exception in rest (apache#6942)
  [spark] Fix read shredded and unshredded variant both (apache#6936)
  [python] Let Python write file without value stats by default (apache#6940)
  [python] ray version compatible (apache#6937)
  [core] Unify conflict detect in FileStoreCommitImpl (apache#6932)
  [test] Fix unstable case in CompactActionITCase
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants