Skip to content

Conversation

@FreemanDane
Copy link

Purpose

Linked issue: close #xxx

Tests

API and Format

Documentation

from pypaimon.write.ray_datasink import PaimonDatasink
datasink = PaimonDatasink(dataset, overwrite=overwrite)
dataset.write_datasink(datasink, concurrency=parallelism)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can named it as write_ray, just list write_pandas, write_arrow and so on

table_write = self.writer_builder.new_write()
for block in blocks:
block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow()
table_write.write_arrow(block_arrow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid to_arrow will cost a lot of memory, we can introduce some stream or iterable way.

staging bucket in S3.
"""
self.writer_builder: WriteBuilder= self.table.new_batch_write_builder()
if self.overwrite:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add test to show that writer_builder is serializable

"""
table_commit = self.writer_builder.new_commit()
table_commit.commit([commit_message for commit_messages in write_result.write_returns for commit_message in commit_messages])
table_commit.close()
Copy link
Contributor

@XiaoHongbo-Hope XiaoHongbo-Hope Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle write failure case too.

def write_raydata(self, dataset, overwrite=False, parallelism=1):
from pypaimon.write.ray_datasink import PaimonDatasink
datasink = PaimonDatasink(dataset, overwrite=overwrite)
dataset.write_datasink(datasink, concurrency=parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provided dataset, but PaimonDatasink init method needs a table here

return self

def write_raydata(self, dataset, overwrite=False, parallelism=1):
from pypaimon.write.ray_datasink import PaimonDatasink
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can user get the dataset in non test mode code, can you add a sample code for that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants