From 05247b1c5a28306b2a4588da05a1b93f1ed708d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Pawlu=C5=9B?= Date: Sat, 6 Jul 2024 18:14:12 +0200 Subject: [PATCH] batch size updates --- README.md | 7 ++----- src/pyarrow/bigquery/write/__init__.py | 15 +++++---------- src/pyarrow/bigquery/write/upload.py | 2 +- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 636a105..462e3cb 100644 --- a/README.md +++ b/README.md @@ -148,8 +148,8 @@ Writes a PyArrow Table to a BigQuery Table. No return value. - `worker_count`: `int`, *default* `os.cpu_count()` The number of threads or processes to use for fetching data from BigQuery. -- `batch_size`: `int`, *default* `100` - The batch size for fetched rows. +- `batch_size`: `int`, *default* `10` + The batch size used to upload. ```python bq.write_table(table, 'gcp_project.dataset.table') @@ -185,9 +185,6 @@ Context manager version of the write method. Useful when the PyArrow table is la - `worker_count`: `int`, *default* `os.cpu_count()` The number of threads or processes to use for writing data to BigQuery. -- `batch_size`: `int`, *default* `100` - The batch size used for writes. The table will be automatically split to this value. - Depending on your use case, you might want to use one of the methods below to write your data to a BigQuery table, using either `pa.Table` or `pa.RecordBatch`. #### `pyarrow.bigquery.writer.write_table` (Context Manager Method) diff --git a/src/pyarrow/bigquery/write/__init__.py b/src/pyarrow/bigquery/write/__init__.py index 5b9210b..c9c3277 100644 --- a/src/pyarrow/bigquery/write/__init__.py +++ b/src/pyarrow/bigquery/write/__init__.py @@ -131,7 +131,6 @@ def __init__( table_overwrite: bool = False, worker_count: int = multiprocessing.cpu_count(), worker_type: type[threading.Thread] | type[multiprocessing.Process] = threading.Thread, - batch_size: int = 100, ): self.project = project self.where = where @@ -144,8 +143,6 @@ def __init__( self.worker_count = worker_count self.worker_type = worker_type - self.batch_size = batch_size - project_id, dataset_id, table_id = where.split(".") self.parent = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}" @@ -187,10 +184,9 @@ def __enter__(self): return self def write_table(self, table): - for table_chunk in some_itertools.to_chunks(table, self.batch_size): - element = tempfile.mktemp(dir=self.temp_dir) - fa.write_feather(table_chunk, element) - self.queue_results.put(element) + element = tempfile.mktemp(dir=self.temp_dir) + fa.write_feather(table, element) + self.queue_results.put(element) def write_batch(self, batch): element = tempfile.mktemp(dir=self.temp_dir) @@ -219,7 +215,7 @@ def write_table( table_overwrite: bool = False, worker_count: int = multiprocessing.cpu_count(), worker_type: type[threading.Thread] | type[multiprocessing.Process] = threading.Thread, - batch_size: int = 100, + batch_size: int = 10, ): assert table.num_rows > 0, "Table is empty" @@ -232,7 +228,6 @@ def write_table( table_overwrite=table_overwrite, worker_count=worker_count, worker_type=worker_type, - batch_size=batch_size, ) as w: - for table_chunk in some_itertools.to_split(table, w.worker_count): + for table_chunk in some_itertools.to_chunks(table, batch_size): w.write_table(table_chunk) diff --git a/src/pyarrow/bigquery/write/upload.py b/src/pyarrow/bigquery/write/upload.py index a1d3420..aebec0f 100644 --- a/src/pyarrow/bigquery/write/upload.py +++ b/src/pyarrow/bigquery/write/upload.py @@ -18,7 +18,7 @@ def _send(stream, serialized_rows, offset): request.offset = offset request.proto_rows = proto_data - stream.append_rows_stream.send(request).result() + stream.append_rows_stream.send(request) def upload_data(stream, pa_table, protobuf_definition, offset):