diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index 926f220370b0..e9e8435ea1a1 100644 --- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -16,7 +16,7 @@ import functools import typing -from typing import cast, Any +from typing import Any, cast import bigframes_vendored.ibis.expr.api as ibis_api import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes diff --git a/packages/bigframes/bigframes/pandas/io/api.py b/packages/bigframes/bigframes/pandas/io/api.py index 6c83095ab3cd..f8c3a881eed6 100644 --- a/packages/bigframes/bigframes/pandas/io/api.py +++ b/packages/bigframes/bigframes/pandas/io/api.py @@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run( def _read_gbq_colab( # type: ignore[overload-overlap] query_or_table: str, *, + callback: Optional[Callable[[bigframes.core.events.Event], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = ..., dry_run: Literal[False] = ..., ) -> bigframes.dataframe.DataFrame: ... @@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap] def _read_gbq_colab( query_or_table: str, *, + callback: Optional[Callable[[bigframes.core.events.Event], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = ..., dry_run: Literal[True] = ..., ) -> pandas.Series: ... @@ -317,6 +319,7 @@ def _read_gbq_colab( def _read_gbq_colab( query_or_table: str, *, + callback: Optional[Callable[[bigframes.core.events.Event], None]] = None, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: bool = False, ) -> bigframes.dataframe.DataFrame | pandas.Series: @@ -328,6 +331,8 @@ def _read_gbq_colab( Args: query_or_table (str): SQL query or table ID (table ID not yet supported). + callback (Optional[Callable[[bigframes.core.events.Event], None]]): + Callback to receive query execution events. pyformat_args (Optional[Dict[str, Any]]): Parameters to format into the query string. dry_run (bool): @@ -379,6 +384,7 @@ def _read_gbq_colab( return global_session.with_default_session( bigframes.session.Session._read_gbq_colab, query_or_table, + callback=callback, pyformat_args=pyformat_args, dry_run=dry_run, ) diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 38e92a60321b..775cb4e33e4f 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -199,9 +199,10 @@ def __init__( self._clients_provider = clients_provider self._location = context.location or "US" else: - credentials, project = ( - bigframes._config.auth.resolve_credentials_and_project(context) - ) + ( + credentials, + project, + ) = bigframes._config.auth.resolve_credentials_and_project(context) if context.location is None: with bigquery.Client( project=project, @@ -584,6 +585,7 @@ def _read_gbq_colab( self, query: str, *, + callback: Optional[Callable[[bigframes.core.events.Event], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., ) -> dataframe.DataFrame: ... @@ -593,6 +595,7 @@ def _read_gbq_colab( self, query: str, *, + callback: Optional[Callable[[bigframes.core.events.Event], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., ) -> pandas.Series: ... @@ -601,8 +604,8 @@ def _read_gbq_colab( def _read_gbq_colab( self, query: str, - # TODO: Add a callback parameter that takes some kind of Event object. *, + callback: Optional[Callable[[bigframes.core.events.Event], None]] = None, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: bool = False, ) -> Union[dataframe.DataFrame, pandas.Series]: @@ -615,6 +618,8 @@ def _read_gbq_colab( query (str): A SQL query string to execute. Results (if any) are turned into a DataFrame. + callback (Optional[Callable[[bigframes.core.events.Event], None]]): + Callback to receive query execution events. pyformat_args (dict): A dictionary of potential variables to replace in ``query``. Note: strings are _not_ escaped. Use query parameters for these, @@ -634,13 +639,21 @@ def _read_gbq_colab( dry_run=dry_run, ) - return self._loader.read_gbq_query( - query=query, - index_col=bigframes.enums.DefaultIndexKind.NULL, - force_total_order=False, - dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run), - allow_large_results=allow_large_results, - ) + def _run_query(): + return self._loader.read_gbq_query( + query=query, + index_col=bigframes.enums.DefaultIndexKind.NULL, + force_total_order=False, + dry_run=typing.cast( + Union[Literal[False], Literal[True]], dry_run + ), + allow_large_results=allow_large_results, + ) + + if callback is not None: + with self._publisher.subscribe(callback): + return _run_query() + return _run_query() @overload def read_gbq_query( # type: ignore[overload-overlap] diff --git a/packages/bigframes/tests/unit/session/test_read_gbq_colab.py b/packages/bigframes/tests/unit/session/test_read_gbq_colab.py index bb2cba0c1093..39d733dcc907 100644 --- a/packages/bigframes/tests/unit/session/test_read_gbq_colab.py +++ b/packages/bigframes/tests/unit/session/test_read_gbq_colab.py @@ -126,3 +126,13 @@ def test_read_gbq_colab_doesnt_set_destination_table(): assert query == "SELECT 'my-test-query';" assert config.destination is None + + +def test_read_gbq_colab_with_callback(): + """Make sure callback receives events during execution.""" + session = mocks.create_bigquery_session() + callback = mock.Mock() + + _ = session._read_gbq_colab("SELECT 'my-test-query';", callback=callback) + + assert callback.call_count > 0