Refactoring table creation in Big Query Pipeline#343
Refactoring table creation in Big Query Pipeline#343aniketsinghrawat wants to merge 12 commits intogoogle:mainfrom
Conversation
6089a99 to
e3ca3b1
Compare
|
|
||
| if known_args.output_table: | ||
| # checking if the output table is in format (<project>.<dataset>.<table>). | ||
| output_table_pattern = r'^[\w-]+\.[\w-]+\.[\w-]+$' |
There was a problem hiding this comment.
Are the []s necessary? Can it just be? r'^\w+\.\w+\.\w+$' (Please test my hand-written regex :))
There was a problem hiding this comment.
project_ids can contain hypens https://cloud.google.com/resource-manager/docs/creating-managing-projects
\w+ doesn't match when word is like my-project.
There was a problem hiding this comment.
Thanks, that makes sense! Then, would it be [\w\-]+? IIRC, [] and - together are often used to express a range.
weather_mv/loader_pipeline/bq.py
Outdated
| """Initializes Sink by creating a BigQuery table based on user input.""" | ||
| """Initializes BigQuery table based on user input.""" | ||
| self.project, self.dataset_id, self.table_id = self.output_table.split('.') | ||
| self.table = None |
| table=self.table.table_id, | ||
| project=self.project, | ||
| dataset=self.dataset_id, | ||
| table=self.table_id, |
There was a problem hiding this comment.
If we wanted to create the BQ table in the pipeline, a simpler solution would be to change this step's disposition: https://beam.apache.org/documentation/io/built-in/google-bigquery/#create-disposition
There was a problem hiding this comment.
On second thought, we may want to keep your step since we create an opinionated schema...
There was a problem hiding this comment.
@alxmrs
i tried putting the create table as a transform in the pipeline but faced many issues
- I tried the
sampletransform as you suggested but there is no way to ensure table creation happens before the usual pipeline flow. (apache_beam python sdk doesn't have support forWait.onwhich could have made this possible) - I also tried stateful processing but as we are windowing out pub sub reads, and a state is discarded when the window is expired, the solution didn't work.
I think there is potential in using the create-disposition flag in WriteToBigQuery. Can you please elaborate on what do you mean by opinionated schema
cc: @mahrsee1997
There was a problem hiding this comment.
Now that you mention it, I agree that using the create disposition is easiest! We need to make sure that we pass in our computed schema (say, from the init) into this transform instead of having it automatically make the schema. Thinking it over now, that's what my concern was about: I wasn't sure if we'd get the schema we wanted if it computed it automatically.
There was a problem hiding this comment.
Was surfing web for some other issue & found this. It might be helpful.
https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#schemas
| """Extract rows of variables from data paths into a BigQuery table.""" | ||
| extracted_rows = ( | ||
| paths | ||
| | 'CreateTable' >> beam.Map(self.create_bq_table) |
There was a problem hiding this comment.
Concern: I am pretty sure this will try to create a BQ table for every element of paths. Remember, self.tables won't refer to the state of the class, since global state is not really a think for parallel steps like this.
There was a problem hiding this comment.
Consider using something like sample: https://beam.apache.org/documentation/transforms/python/aggregation/sample/
weather-mv
fixes: #311
As mentioned in this pr #250 (comment) skipping file prefix validation does not work in case of big query as it was dependent on first_uri to infer schema for big query.
Fixed this issue by creating a pipeline stage that infers the schema and creates the bigquery table using the first uri. This stage is skipped for all subsequent uris.