Skip to content

Commit 1da37bc

Browse files
[2/2] sdks/python: enrich data with CloudSQL [PostgreSQL, MySQL, SQLServer] (#35473)
* .github+sdks+website: update docs and add exmples for CloudSQL handler * Update website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-cloudsql.md Co-authored-by: Danny McCormick <[email protected]> * sdks/python: fix issue regards generic binding parameteres in CloudSQL * sdks/python: use binding parameters instead of `{}` * CHANGES.md: update release notes * website: update beam version * sdks/python: add `ALLOYDB_PASSWORD` to `tox.ini` * sdks/python: fix unbounded local variable * CHANGES.md: fix white space issue * sdks/python: make table_id globally unique in `enrichment_test` * sdks/python: fix data type issue * sdks/python: enforce CloudSQL tests to run only on py transforms flow * sdks/python: remove `uses_testcontainer` pytest marker from CloudSQL * sdks/python: skip google cloudsql tests unless `ALLOYDB_PASSWORD` found * workflows: remove `ALLOYDB_PASSWORD` from beam precommit python coverage * sdks/python: fix duplicate data issue * sdks/python: fix linting * sdks/python: reorder table drop approach --------- Co-authored-by: Danny McCormick <[email protected]>
1 parent 33d8d70 commit 1da37bc

File tree

10 files changed

+629
-64
lines changed

10 files changed

+629
-64
lines changed

.github/workflows/beam_PreCommit_Python_Coverage.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ env:
5454
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
5555
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
5656
HF_INFERENCE_TOKEN: ${{ secrets.HF_INFERENCE_TOKEN }}
57-
ALLOYDB_PASSWORD: ${{ secrets.ALLOYDB_PASSWORD }}
5857

5958

6059
jobs:
@@ -113,7 +112,7 @@ jobs:
113112
TESTCONTAINERS_HOST_OVERRIDE: ${{ contains(matrix.os, 'self-hosted') && env.DIND_IP || '' }}
114113
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/docker.sock"
115114
TESTCONTAINERS_RYUK_DISABLED: "false"
116-
TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true"
115+
TESTCONTAINERS_RYUK_CONTAINER_PRIVILEGED: "true"
117116
PYTEST_ADDOPTS: "-v --tb=short --maxfail=3 --durations=20 --reruns=2 --reruns-delay=5"
118117
TC_TIMEOUT: "120"
119118
TC_MAX_TRIES: "120"

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
## New Features / Improvements
7575

7676
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
77+
* Python examples added for CloudSQL enrichment handler on [Beam website](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-cloudsql/) (Python) ([#35473](https://github.com/apache/beam/issues/36095)).
7778

7879
## Breaking Changes
7980

sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,214 @@ def enrichment_with_vertex_ai_legacy():
116116
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
117117
| "Print" >> beam.Map(print))
118118
# [END enrichment_with_vertex_ai_legacy]
119+
120+
121+
def enrichment_with_google_cloudsql_pg():
122+
# [START enrichment_with_google_cloudsql_pg]
123+
import apache_beam as beam
124+
from apache_beam.transforms.enrichment import Enrichment
125+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
126+
CloudSQLEnrichmentHandler,
127+
DatabaseTypeAdapter,
128+
TableFieldsQueryConfig,
129+
CloudSQLConnectionConfig)
130+
import os
131+
132+
database_adapter = DatabaseTypeAdapter.POSTGRESQL
133+
database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI")
134+
database_user = os.environ.get("GOOGLE_CLOUD_SQL_DB_USER")
135+
database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD")
136+
database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID")
137+
table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID")
138+
where_clause_template = "product_id = :pid"
139+
where_clause_fields = ["product_id"]
140+
141+
data = [
142+
beam.Row(product_id=1, name='A'),
143+
beam.Row(product_id=2, name='B'),
144+
beam.Row(product_id=3, name='C'),
145+
]
146+
147+
connection_config = CloudSQLConnectionConfig(
148+
db_adapter=database_adapter,
149+
instance_connection_uri=database_uri,
150+
user=database_user,
151+
password=database_password,
152+
db_id=database_id)
153+
154+
query_config = TableFieldsQueryConfig(
155+
table_id=table_id,
156+
where_clause_template=where_clause_template,
157+
where_clause_fields=where_clause_fields)
158+
159+
cloudsql_handler = CloudSQLEnrichmentHandler(
160+
connection_config=connection_config,
161+
table_id=table_id,
162+
query_config=query_config)
163+
with beam.Pipeline() as p:
164+
_ = (
165+
p
166+
| "Create" >> beam.Create(data)
167+
|
168+
"Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
169+
| "Print" >> beam.Map(print))
170+
# [END enrichment_with_google_cloudsql_pg]
171+
172+
173+
def enrichment_with_external_pg():
174+
# [START enrichment_with_external_pg]
175+
import apache_beam as beam
176+
from apache_beam.transforms.enrichment import Enrichment
177+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
178+
CloudSQLEnrichmentHandler,
179+
DatabaseTypeAdapter,
180+
TableFieldsQueryConfig,
181+
ExternalSQLDBConnectionConfig)
182+
import os
183+
184+
database_adapter = DatabaseTypeAdapter.POSTGRESQL
185+
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
186+
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
187+
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
188+
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
189+
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
190+
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
191+
where_clause_template = "product_id = :pid"
192+
where_clause_fields = ["product_id"]
193+
194+
data = [
195+
beam.Row(product_id=1, name='A'),
196+
beam.Row(product_id=2, name='B'),
197+
beam.Row(product_id=3, name='C'),
198+
]
199+
200+
connection_config = ExternalSQLDBConnectionConfig(
201+
db_adapter=database_adapter,
202+
host=database_host,
203+
port=database_port,
204+
user=database_user,
205+
password=database_password,
206+
db_id=database_id)
207+
208+
query_config = TableFieldsQueryConfig(
209+
table_id=table_id,
210+
where_clause_template=where_clause_template,
211+
where_clause_fields=where_clause_fields)
212+
213+
cloudsql_handler = CloudSQLEnrichmentHandler(
214+
connection_config=connection_config,
215+
table_id=table_id,
216+
query_config=query_config)
217+
with beam.Pipeline() as p:
218+
_ = (
219+
p
220+
| "Create" >> beam.Create(data)
221+
| "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler)
222+
| "Print" >> beam.Map(print))
223+
# [END enrichment_with_external_pg]
224+
225+
226+
def enrichment_with_external_mysql():
227+
# [START enrichment_with_external_mysql]
228+
import apache_beam as beam
229+
from apache_beam.transforms.enrichment import Enrichment
230+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
231+
CloudSQLEnrichmentHandler,
232+
DatabaseTypeAdapter,
233+
TableFieldsQueryConfig,
234+
ExternalSQLDBConnectionConfig)
235+
import os
236+
237+
database_adapter = DatabaseTypeAdapter.MYSQL
238+
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
239+
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
240+
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
241+
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
242+
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
243+
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
244+
where_clause_template = "product_id = :pid"
245+
where_clause_fields = ["product_id"]
246+
247+
data = [
248+
beam.Row(product_id=1, name='A'),
249+
beam.Row(product_id=2, name='B'),
250+
beam.Row(product_id=3, name='C'),
251+
]
252+
253+
connection_config = ExternalSQLDBConnectionConfig(
254+
db_adapter=database_adapter,
255+
host=database_host,
256+
port=database_port,
257+
user=database_user,
258+
password=database_password,
259+
db_id=database_id)
260+
261+
query_config = TableFieldsQueryConfig(
262+
table_id=table_id,
263+
where_clause_template=where_clause_template,
264+
where_clause_fields=where_clause_fields)
265+
266+
cloudsql_handler = CloudSQLEnrichmentHandler(
267+
connection_config=connection_config,
268+
table_id=table_id,
269+
query_config=query_config)
270+
with beam.Pipeline() as p:
271+
_ = (
272+
p
273+
| "Create" >> beam.Create(data)
274+
| "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler)
275+
| "Print" >> beam.Map(print))
276+
# [END enrichment_with_external_mysql]
277+
278+
279+
def enrichment_with_external_sqlserver():
280+
# [START enrichment_with_external_sqlserver]
281+
import apache_beam as beam
282+
from apache_beam.transforms.enrichment import Enrichment
283+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
284+
CloudSQLEnrichmentHandler,
285+
DatabaseTypeAdapter,
286+
TableFieldsQueryConfig,
287+
ExternalSQLDBConnectionConfig)
288+
import os
289+
290+
database_adapter = DatabaseTypeAdapter.SQLSERVER
291+
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
292+
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
293+
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
294+
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
295+
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
296+
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
297+
where_clause_template = "product_id = :pid"
298+
where_clause_fields = ["product_id"]
299+
300+
data = [
301+
beam.Row(product_id=1, name='A'),
302+
beam.Row(product_id=2, name='B'),
303+
beam.Row(product_id=3, name='C'),
304+
]
305+
306+
connection_config = ExternalSQLDBConnectionConfig(
307+
db_adapter=database_adapter,
308+
host=database_host,
309+
port=database_port,
310+
user=database_user,
311+
password=database_password,
312+
db_id=database_id)
313+
314+
query_config = TableFieldsQueryConfig(
315+
table_id=table_id,
316+
where_clause_template=where_clause_template,
317+
where_clause_fields=where_clause_fields)
318+
319+
cloudsql_handler = CloudSQLEnrichmentHandler(
320+
connection_config=connection_config,
321+
table_id=table_id,
322+
query_config=query_config)
323+
with beam.Pipeline() as p:
324+
_ = (
325+
p
326+
| "Create" >> beam.Create(data)
327+
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
328+
| "Print" >> beam.Map(print))
329+
# [END enrichment_with_external_sqlserver]

0 commit comments

Comments
 (0)