-
Notifications
You must be signed in to change notification settings - Fork 322
fix: add timeout parameter to to_dataframe and to_arrow met… #2354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @chalmerlowe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the robustness of the BigQuery client library by integrating a Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a timeout parameter to the to_dataframe() and to_arrow() methods, which is a valuable addition to prevent indefinite hangs during data download. The core timeout logic in _download_table_bqstorage is well-implemented, correctly using manual thread pool management to handle timeouts gracefully.
I've identified a critical issue where the method signatures for _EmptyRowIterator in google/cloud/bigquery/table.py have not been updated to include the timeout parameter. This will cause a TypeError for empty result sets and needs to be addressed. I've left a detailed comment on this.
Additionally, I noticed that QueryJob.to_dataframe in google/cloud/bigquery/job/query.py is missing the timeout parameter, which is inconsistent with the other updated methods. Please consider adding it for API consistency.
…ntiate new _EmptyRowIterator for each test in test_methods_w_timeout to prevent ValueError\n- Remove total_rows property from _EmptyRowIterator to fix AssertionError\n- Remove redundant __iter__ method in _EmptyRowIterator to fix lint error
| # Call result() on any finished threads to raise any | ||
| # exceptions encountered. | ||
| future.result() | ||
| # Manually manage the pool to control shutdown behavior on timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may wonder why we switched away from using a contextmanager here.
Here's a breakdown of the change:
-
Manual Pool Creation: Instead of the
withstatement, the code now creates the
ThreadPoolExecutordirectly:pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) -
Conditional Shutdown: The finally block now uses a
wait_on_shutdownboolean flag:pool.shutdown(wait=wait_on_shutdown)
This flag is set toFalseonly when aTimeoutErroris raised. -
Timeout Handling: Inside the while
not_done:loop, there's new logic to check if the
elapsed time has exceeded the specified timeout. If it has, it raises a
concurrent.futures.TimeoutErrorand setswait_on_shutdowntoFalse.
Why this change?
The standard context manager for ThreadPoolExecutor always waits for all futures to complete
upon exiting the block. This behavior is not desirable when a timeout is implemented. If the
download times out, we want to stop waiting for the worker threads immediately and not block
until they all finish.
By manually managing the pool and using the wait_on_shutdown flag, we can tell the
pool.shutdown() method not to wait for the threads to complete if a timeout has occurred.
This allows the TimeoutError to be propagated up quickly, rather than being stuck waiting for
threads that are potentially hanging.
So, the change was necessary to ensure the timeout parameter works effectively and the
function doesn't hang unnecessarily when the timeout duration is exceeded. The core logic of
submitting tasks to the pool and retrieving results from the queue remains very similar, but
the shutdown process is now more nuanced to handle timeouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
|
|
||
| pages = () | ||
| total_rows = 0 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few words on why these were deleted/modified.
Summary of Changes to _EmptyRowIterator
In the _EmptyRowIterator class, the class-level attributes pages = () and total_rows = 0 were replaced with new handling.
Reasoning:
These class-level attributes were causing issues with how instances of _EmptyRowIterator were behaving, particularly in test scenarios.
-
total_rows = 0: While seemingly correct, having this at the class level interfered with the instance-level_total_rowsattribute inherited from theRowIteratorparent class. The parent class expects to manage_total_rowsat the instance level. Removing the class-level attribute allows the instance-level attribute to be a single source of truth, initialized to 0 in the__init__method, and prevents potential conflicts or unexpected behavior. -
pages = (): This class-level attribute was not used and added unnecessary clutter. The_EmptyRowIteratoris designed to represent an empty result set, so it never has any pages. The__iter__method already correctly returns an empty iterator (iter(())). Additionally, to ensure methods liketo_arrow_iterablefunction correctly without making network requests, apagesproperty was added to return an empty iterator.
Replacement and Improvement:
-
total_rows: The class-leveltotal_rows = 0was removed. Instead,self._total_rowsis now consistently initialized to0within the__init__method of_EmptyRowIterator. This aligns with the parent class's instance-level attribute management and ensures each instance has its own_total_rowsset to 0, reflecting an empty iterator. -
pages: The class-levelpages = ()was removed. To provide the necessary empty iterable for methods liketo_arrow_iterable, apagesproperty was added:@property def pages(self): return iter(())
This approach is cleaner as it explicitly provides an empty iterator when the
pagesattribute is accessed, making the class's intent clearer and ensuring compatibility with methods expecting an iterable of pages.
These changes make the _EmptyRowIterator more robust and predictable by adhering to the instance attribute patterns of its parent class and explicitly defining an empty pages iterable.
|
We worked up a script to reproduce/simulate the customer's issue. After updating the code, when we run the repro script, we get the following results depending on whether we include an argument for the new |
vchudnov-g
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. I'm not very knowledgeable about bigquery in particular, but the changes from the status quo look like they do what's intended.
I really appreciate your detailed explanations of the various changes. Those made the review easier.
Description
This PR adds a
timeoutparameter to theto_dataframe()andto_arrow()methods (and their corresponding*_iterable,*_geodataframeandQueryJobwrappers) in the BigQuery client library.This addresses an issue where these methods could hang indefinitely if the underlying BigQuery Storage API stream blocked (e.g., due to firewall issues or network interruptions) during the download phase. The added
timeoutparameter ensures that the download operation respects the specified time limit and raises aconcurrent.futures.TimeoutErrorif it exceeds the duration.Changes
google/cloud/bigquery/_pandas_helpers.py:_download_table_bqstorageto accept atimeoutargument.download_dataframe_bqstorageanddownload_arrow_bqstorageto accept and pass thetimeoutparameter.google/cloud/bigquery/table.py:RowIteratormethods (to_arrow_iterable,to_arrow,to_dataframe_iterable,to_dataframe,to_geodataframe) to accept and passtimeout._EmptyRowIteratormethods to match theRowIteratorsignature, preventingTypeErrorwhen a timeout is provided for empty result sets.google/cloud/bigquery/job/query.py:QueryJobmethods (to_arrow,to_dataframe,to_geodataframe) to accepttimeoutand pass it to the result iterator.tests/unit/job/test_query_pandas.py,tests/unit/test_table.py, andtests/unit/test_table_pandas.pyto reflect the signature changes.Fixes internal bug: b/468091307