-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[python] support ray data sink to paimon #6883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| from pypaimon.write.ray_datasink import PaimonDatasink | ||
| datasink = PaimonDatasink(dataset, overwrite=overwrite) | ||
| dataset.write_datasink(datasink, concurrency=parallelism) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can named it as write_ray, just list write_pandas, write_arrow and so on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However,the Dataset class is defined by Ray, and so is its method write_xxx. I think it need to create a PR on Ray? The api in paimon calls TableWrite.write_raydata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However,the Dataset class is defined by Ray, and so is its method write_xxx. I think it need to create a PR on Ray? The api in paimon calls TableWrite.write_raydata.
Maybe I commented the wrong line, should be line71
| table_write = self.writer_builder.new_write() | ||
| for block in blocks: | ||
| block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow() | ||
| table_write.write_arrow(block_arrow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid to_arrow will cost a lot of memory, we can introduce some stream or iterable way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| staging bucket in S3. | ||
| """ | ||
| self.writer_builder: WriteBuilder= self.table.new_batch_write_builder() | ||
| if self.overwrite: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add test to show that writer_builder is serializable
| """ | ||
| table_commit = self.writer_builder.new_commit() | ||
| table_commit.commit([commit_message for commit_messages in write_result.write_returns for commit_message in commit_messages]) | ||
| table_commit.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should handle write failure case too.
| def write_raydata(self, dataset, overwrite=False, parallelism=1): | ||
| from pypaimon.write.ray_datasink import PaimonDatasink | ||
| datasink = PaimonDatasink(dataset, overwrite=overwrite) | ||
| dataset.write_datasink(datasink, concurrency=parallelism) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provided dataset, but PaimonDatasink init method needs a table here
| return self | ||
|
|
||
| def write_raydata(self, dataset, overwrite=False, parallelism=1): | ||
| from pypaimon.write.ray_datasink import PaimonDatasink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can user get the dataset in non test mode code, can you add a sample code for that
| self.overwrite = overwrite | ||
|
|
||
| def on_write_start(self) -> None: | ||
| """Callback for when a write job starts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
078a485 to
26fc28d
Compare
[python] update api, add test fix code format fix code format
26fc28d to
0dda717
Compare
|
+1 |
* upstream/master: (35 commits) [spark] Spark support vector search (apache#6950) [doc] update Apache Doris document with DLF 3.0 (apache#6954) [variant] Fix reading empty shredded variant via variantAccess (apache#6953) [python] support alterTable (apache#6952) [python] support ray data sink to paimon (apache#6883) [python] Rename to TableScan.withSlice to specific start_pos and end_pos [python] sync to_ray method args with ray data api (apache#6948) [python] light refactor for stats collect (apache#6941) [doc] Update cdc ingestion related docs [rest] Add tagNamePrefix definition for listTagsPaged (apache#6947) [python] support table scan with row range (apache#6944) [spark] Fix EqualNullSafe is not correct when column has null value. (apache#6943) [python] fix value_stats containing system fields for primary key tables (apache#6945) [test][rest] add test case for two sessions with cache for rest commitTable (apache#6438) [python] do not retry for connect exception in rest (apache#6942) [spark] Fix read shredded and unshredded variant both (apache#6936) [python] Let Python write file without value stats by default (apache#6940) [python] ray version compatible (apache#6937) [core] Unify conflict detect in FileStoreCommitImpl (apache#6932) [test] Fix unstable case in CompactActionITCase ...

Purpose
Linked issue: close #xxx
Tests
API and Format
Documentation