Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import functools
import typing
from typing import cast, Any
from typing import Any, cast

import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
Expand Down
6 changes: 6 additions & 0 deletions packages/bigframes/bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run(
def _read_gbq_colab( # type: ignore[overload-overlap]
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[False] = ...,
) -> bigframes.dataframe.DataFrame: ...
Expand All @@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap]
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -317,6 +319,7 @@ def _read_gbq_colab(
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> bigframes.dataframe.DataFrame | pandas.Series:
Expand All @@ -328,6 +331,8 @@ def _read_gbq_colab(
Args:
query_or_table (str):
SQL query or table ID (table ID not yet supported).
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (Optional[Dict[str, Any]]):
Parameters to format into the query string.
dry_run (bool):
Expand Down Expand Up @@ -379,6 +384,7 @@ def _read_gbq_colab(
return global_session.with_default_session(
bigframes.session.Session._read_gbq_colab,
query_or_table,
callback=callback,
pyformat_args=pyformat_args,
dry_run=dry_run,
)
Expand Down
35 changes: 24 additions & 11 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ def __init__(
self._clients_provider = clients_provider
self._location = context.location or "US"
else:
credentials, project = (
bigframes._config.auth.resolve_credentials_and_project(context)
)
(
credentials,
project,
) = bigframes._config.auth.resolve_credentials_and_project(context)
if context.location is None:
with bigquery.Client(
project=project,
Expand Down Expand Up @@ -584,6 +585,7 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
Expand All @@ -593,6 +595,7 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -601,8 +604,8 @@ def _read_gbq_colab(
def _read_gbq_colab(
self,
query: str,
# TODO: Add a callback parameter that takes some kind of Event object.
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> Union[dataframe.DataFrame, pandas.Series]:
Expand All @@ -615,6 +618,8 @@ def _read_gbq_colab(
query (str):
A SQL query string to execute. Results (if any) are turned into
a DataFrame.
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (dict):
A dictionary of potential variables to replace in ``query``.
Note: strings are _not_ escaped. Use query parameters for these,
Expand All @@ -634,13 +639,21 @@ def _read_gbq_colab(
dry_run=dry_run,
)

return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)
def _run_query():
return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(
Union[Literal[False], Literal[True]], dry_run
),
allow_large_results=allow_large_results,
)

if callback is not None:
with self._publisher.subscribe(callback):
return _run_query()
return _run_query()

@overload
def read_gbq_query( # type: ignore[overload-overlap]
Expand Down
10 changes: 10 additions & 0 deletions packages/bigframes/tests/unit/session/test_read_gbq_colab.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,13 @@ def test_read_gbq_colab_doesnt_set_destination_table():

assert query == "SELECT 'my-test-query';"
assert config.destination is None


def test_read_gbq_colab_with_callback():
"""Make sure callback receives events during execution."""
session = mocks.create_bigquery_session()
callback = mock.Mock()

_ = session._read_gbq_colab("SELECT 'my-test-query';", callback=callback)

assert callback.call_count > 0
Loading