Skip to content

Commit

Permalink
feat: support setting max_stream_count when fetching query result (#2051
Browse files Browse the repository at this point in the history
)

* feat: support setting max_stream_count when fetching query result

Allow user to set max_stream_count when fetching result
using BigQuery Storage API with RowIterator's incremental methods:

* to_arrow_iterable
* to_dataframe_iterable

* docs: update docs about max_stream_count for ordered query

* fix: add max_stream_count params to _EmptyRowIterator's methods

* test: add tests for RowIterator's max_stream_count parameter

* docs: add notes on valid  max_stream_count range in docstring

* use a different way to iterate result

---------

Co-authored-by: Lingqing Gan <[email protected]>
  • Loading branch information
kien-truong and Linchin authored Nov 22, 2024
1 parent fffe6ba commit d461297
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
44 changes: 44 additions & 0 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,7 @@ def to_arrow_iterable(
self,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
max_stream_count: Optional[int] = None,
) -> Iterator["pyarrow.RecordBatch"]:
"""[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream.
Expand All @@ -1836,6 +1837,22 @@ def to_arrow_iterable(
created by the server. If ``max_queue_size`` is :data:`None`, the queue
size is infinite.
max_stream_count (Optional[int]):
The maximum number of parallel download streams when
using BigQuery Storage API. Ignored if
BigQuery Storage API is not used.
This setting also has no effect if the query result
is deterministically ordered with ORDER BY,
in which case, the number of download stream is always 1.
If set to 0 or None (the default), the number of download
streams is determined by BigQuery the server. However, this behaviour
can require a lot of memory to store temporary download result,
especially with very large queries. In that case,
setting this parameter value to a value > 0 can help
reduce system resource consumption.
Returns:
pyarrow.RecordBatch:
A generator of :class:`~pyarrow.RecordBatch`.
Expand All @@ -1852,6 +1869,7 @@ def to_arrow_iterable(
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
Expand Down Expand Up @@ -1978,6 +1996,7 @@ def to_dataframe_iterable(
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
dtypes: Optional[Dict[str, Any]] = None,
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
max_stream_count: Optional[int] = None,
) -> "pandas.DataFrame":
"""Create an iterable of pandas DataFrames, to process the table as a stream.
Expand Down Expand Up @@ -2008,6 +2027,22 @@ def to_dataframe_iterable(
.. versionadded:: 2.14.0
max_stream_count (Optional[int]):
The maximum number of parallel download streams when
using BigQuery Storage API. Ignored if
BigQuery Storage API is not used.
This setting also has no effect if the query result
is deterministically ordered with ORDER BY,
in which case, the number of download stream is always 1.
If set to 0 or None (the default), the number of download
streams is determined by BigQuery the server. However, this behaviour
can require a lot of memory to store temporary download result,
especially with very large queries. In that case,
setting this parameter value to a value > 0 can help
reduce system resource consumption.
Returns:
pandas.DataFrame:
A generator of :class:`~pandas.DataFrame`.
Expand All @@ -2034,6 +2069,7 @@ def to_dataframe_iterable(
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_row_iterator,
Expand Down Expand Up @@ -2690,6 +2726,7 @@ def to_dataframe_iterable(
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
dtypes: Optional[Dict[str, Any]] = None,
max_queue_size: Optional[int] = None,
max_stream_count: Optional[int] = None,
) -> Iterator["pandas.DataFrame"]:
"""Create an iterable of pandas DataFrames, to process the table as a stream.
Expand All @@ -2705,6 +2742,9 @@ def to_dataframe_iterable(
max_queue_size:
Ignored. Added for compatibility with RowIterator.
max_stream_count:
Ignored. Added for compatibility with RowIterator.
Returns:
An iterator yielding a single empty :class:`~pandas.DataFrame`.
Expand All @@ -2719,6 +2759,7 @@ def to_arrow_iterable(
self,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
max_queue_size: Optional[int] = None,
max_stream_count: Optional[int] = None,
) -> Iterator["pyarrow.RecordBatch"]:
"""Create an iterable of pandas DataFrames, to process the table as a stream.
Expand All @@ -2731,6 +2772,9 @@ def to_arrow_iterable(
max_queue_size:
Ignored. Added for compatibility with RowIterator.
max_stream_count:
Ignored. Added for compatibility with RowIterator.
Returns:
An iterator yielding a single empty :class:`~pyarrow.RecordBatch`.
"""
Expand Down
70 changes: 70 additions & 0 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5822,3 +5822,73 @@ def test_table_reference_to_bqstorage_v1_stable(table_path):
for klass in (mut.TableReference, mut.Table, mut.TableListItem):
got = klass.from_string(table_path).to_bqstorage()
assert got == expected


@pytest.mark.parametrize("preserve_order", [True, False])
def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order):
pytest.importorskip("pandas")
pytest.importorskip("google.cloud.bigquery_storage")
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud import bigquery_storage

bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
bqstorage_client.create_read_session.return_value = session

row_iterator = mut.RowIterator(
_mock_client(),
api_request=None,
path=None,
schema=[
schema.SchemaField("colA", "INTEGER"),
],
table=mut.TableReference.from_string("proj.dset.tbl"),
)
row_iterator._preserve_order = preserve_order

max_stream_count = 132
result_iterable = row_iterator.to_arrow_iterable(
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
)
list(result_iterable)
bqstorage_client.create_read_session.assert_called_once_with(
parent=mock.ANY,
read_session=mock.ANY,
max_stream_count=max_stream_count if not preserve_order else 1,
)


@pytest.mark.parametrize("preserve_order", [True, False])
def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order):
pytest.importorskip("pandas")
pytest.importorskip("google.cloud.bigquery_storage")
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud import bigquery_storage

bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
bqstorage_client.create_read_session.return_value = session

row_iterator = mut.RowIterator(
_mock_client(),
api_request=None,
path=None,
schema=[
schema.SchemaField("colA", "INTEGER"),
],
table=mut.TableReference.from_string("proj.dset.tbl"),
)
row_iterator._preserve_order = preserve_order

max_stream_count = 132
result_iterable = row_iterator.to_dataframe_iterable(
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
)
list(result_iterable)
bqstorage_client.create_read_session.assert_called_once_with(
parent=mock.ANY,
read_session=mock.ANY,
max_stream_count=max_stream_count if not preserve_order else 1,
)

0 comments on commit d461297

Please sign in to comment.