@@ -116,3 +116,214 @@ def enrichment_with_vertex_ai_legacy():
116116 | "Enrich W/ Vertex AI" >> Enrichment (vertex_ai_handler )
117117 | "Print" >> beam .Map (print ))
118118 # [END enrichment_with_vertex_ai_legacy]
119+
120+
121+ def enrichment_with_google_cloudsql_pg ():
122+ # [START enrichment_with_google_cloudsql_pg]
123+ import apache_beam as beam
124+ from apache_beam .transforms .enrichment import Enrichment
125+ from apache_beam .transforms .enrichment_handlers .cloudsql import (
126+ CloudSQLEnrichmentHandler ,
127+ DatabaseTypeAdapter ,
128+ TableFieldsQueryConfig ,
129+ CloudSQLConnectionConfig )
130+ import os
131+
132+ database_adapter = DatabaseTypeAdapter .POSTGRESQL
133+ database_uri = os .environ .get ("GOOGLE_CLOUD_SQL_DB_URI" )
134+ database_user = os .environ .get ("GOOGLE_CLOUD_SQL_DB_USER" )
135+ database_password = os .environ .get ("GOOGLE_CLOUD_SQL_DB_PASSWORD" )
136+ database_id = os .environ .get ("GOOGLE_CLOUD_SQL_DB_ID" )
137+ table_id = os .environ .get ("GOOGLE_CLOUD_SQL_DB_TABLE_ID" )
138+ where_clause_template = "product_id = :pid"
139+ where_clause_fields = ["product_id" ]
140+
141+ data = [
142+ beam .Row (product_id = 1 , name = 'A' ),
143+ beam .Row (product_id = 2 , name = 'B' ),
144+ beam .Row (product_id = 3 , name = 'C' ),
145+ ]
146+
147+ connection_config = CloudSQLConnectionConfig (
148+ db_adapter = database_adapter ,
149+ instance_connection_uri = database_uri ,
150+ user = database_user ,
151+ password = database_password ,
152+ db_id = database_id )
153+
154+ query_config = TableFieldsQueryConfig (
155+ table_id = table_id ,
156+ where_clause_template = where_clause_template ,
157+ where_clause_fields = where_clause_fields )
158+
159+ cloudsql_handler = CloudSQLEnrichmentHandler (
160+ connection_config = connection_config ,
161+ table_id = table_id ,
162+ query_config = query_config )
163+ with beam .Pipeline () as p :
164+ _ = (
165+ p
166+ | "Create" >> beam .Create (data )
167+ |
168+ "Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment (cloudsql_handler )
169+ | "Print" >> beam .Map (print ))
170+ # [END enrichment_with_google_cloudsql_pg]
171+
172+
173+ def enrichment_with_external_pg ():
174+ # [START enrichment_with_external_pg]
175+ import apache_beam as beam
176+ from apache_beam .transforms .enrichment import Enrichment
177+ from apache_beam .transforms .enrichment_handlers .cloudsql import (
178+ CloudSQLEnrichmentHandler ,
179+ DatabaseTypeAdapter ,
180+ TableFieldsQueryConfig ,
181+ ExternalSQLDBConnectionConfig )
182+ import os
183+
184+ database_adapter = DatabaseTypeAdapter .POSTGRESQL
185+ database_host = os .environ .get ("EXTERNAL_SQL_DB_HOST" )
186+ database_port = int (os .environ .get ("EXTERNAL_SQL_DB_PORT" ))
187+ database_user = os .environ .get ("EXTERNAL_SQL_DB_USER" )
188+ database_password = os .environ .get ("EXTERNAL_SQL_DB_PASSWORD" )
189+ database_id = os .environ .get ("EXTERNAL_SQL_DB_ID" )
190+ table_id = os .environ .get ("EXTERNAL_SQL_DB_TABLE_ID" )
191+ where_clause_template = "product_id = :pid"
192+ where_clause_fields = ["product_id" ]
193+
194+ data = [
195+ beam .Row (product_id = 1 , name = 'A' ),
196+ beam .Row (product_id = 2 , name = 'B' ),
197+ beam .Row (product_id = 3 , name = 'C' ),
198+ ]
199+
200+ connection_config = ExternalSQLDBConnectionConfig (
201+ db_adapter = database_adapter ,
202+ host = database_host ,
203+ port = database_port ,
204+ user = database_user ,
205+ password = database_password ,
206+ db_id = database_id )
207+
208+ query_config = TableFieldsQueryConfig (
209+ table_id = table_id ,
210+ where_clause_template = where_clause_template ,
211+ where_clause_fields = where_clause_fields )
212+
213+ cloudsql_handler = CloudSQLEnrichmentHandler (
214+ connection_config = connection_config ,
215+ table_id = table_id ,
216+ query_config = query_config )
217+ with beam .Pipeline () as p :
218+ _ = (
219+ p
220+ | "Create" >> beam .Create (data )
221+ | "Enrich W/ Unmanaged PostgreSQL" >> Enrichment (cloudsql_handler )
222+ | "Print" >> beam .Map (print ))
223+ # [END enrichment_with_external_pg]
224+
225+
226+ def enrichment_with_external_mysql ():
227+ # [START enrichment_with_external_mysql]
228+ import apache_beam as beam
229+ from apache_beam .transforms .enrichment import Enrichment
230+ from apache_beam .transforms .enrichment_handlers .cloudsql import (
231+ CloudSQLEnrichmentHandler ,
232+ DatabaseTypeAdapter ,
233+ TableFieldsQueryConfig ,
234+ ExternalSQLDBConnectionConfig )
235+ import os
236+
237+ database_adapter = DatabaseTypeAdapter .MYSQL
238+ database_host = os .environ .get ("EXTERNAL_SQL_DB_HOST" )
239+ database_port = int (os .environ .get ("EXTERNAL_SQL_DB_PORT" ))
240+ database_user = os .environ .get ("EXTERNAL_SQL_DB_USER" )
241+ database_password = os .environ .get ("EXTERNAL_SQL_DB_PASSWORD" )
242+ database_id = os .environ .get ("EXTERNAL_SQL_DB_ID" )
243+ table_id = os .environ .get ("EXTERNAL_SQL_DB_TABLE_ID" )
244+ where_clause_template = "product_id = :pid"
245+ where_clause_fields = ["product_id" ]
246+
247+ data = [
248+ beam .Row (product_id = 1 , name = 'A' ),
249+ beam .Row (product_id = 2 , name = 'B' ),
250+ beam .Row (product_id = 3 , name = 'C' ),
251+ ]
252+
253+ connection_config = ExternalSQLDBConnectionConfig (
254+ db_adapter = database_adapter ,
255+ host = database_host ,
256+ port = database_port ,
257+ user = database_user ,
258+ password = database_password ,
259+ db_id = database_id )
260+
261+ query_config = TableFieldsQueryConfig (
262+ table_id = table_id ,
263+ where_clause_template = where_clause_template ,
264+ where_clause_fields = where_clause_fields )
265+
266+ cloudsql_handler = CloudSQLEnrichmentHandler (
267+ connection_config = connection_config ,
268+ table_id = table_id ,
269+ query_config = query_config )
270+ with beam .Pipeline () as p :
271+ _ = (
272+ p
273+ | "Create" >> beam .Create (data )
274+ | "Enrich W/ Unmanaged MySQL" >> Enrichment (cloudsql_handler )
275+ | "Print" >> beam .Map (print ))
276+ # [END enrichment_with_external_mysql]
277+
278+
279+ def enrichment_with_external_sqlserver ():
280+ # [START enrichment_with_external_sqlserver]
281+ import apache_beam as beam
282+ from apache_beam .transforms .enrichment import Enrichment
283+ from apache_beam .transforms .enrichment_handlers .cloudsql import (
284+ CloudSQLEnrichmentHandler ,
285+ DatabaseTypeAdapter ,
286+ TableFieldsQueryConfig ,
287+ ExternalSQLDBConnectionConfig )
288+ import os
289+
290+ database_adapter = DatabaseTypeAdapter .SQLSERVER
291+ database_host = os .environ .get ("EXTERNAL_SQL_DB_HOST" )
292+ database_port = int (os .environ .get ("EXTERNAL_SQL_DB_PORT" ))
293+ database_user = os .environ .get ("EXTERNAL_SQL_DB_USER" )
294+ database_password = os .environ .get ("EXTERNAL_SQL_DB_PASSWORD" )
295+ database_id = os .environ .get ("EXTERNAL_SQL_DB_ID" )
296+ table_id = os .environ .get ("EXTERNAL_SQL_DB_TABLE_ID" )
297+ where_clause_template = "product_id = :pid"
298+ where_clause_fields = ["product_id" ]
299+
300+ data = [
301+ beam .Row (product_id = 1 , name = 'A' ),
302+ beam .Row (product_id = 2 , name = 'B' ),
303+ beam .Row (product_id = 3 , name = 'C' ),
304+ ]
305+
306+ connection_config = ExternalSQLDBConnectionConfig (
307+ db_adapter = database_adapter ,
308+ host = database_host ,
309+ port = database_port ,
310+ user = database_user ,
311+ password = database_password ,
312+ db_id = database_id )
313+
314+ query_config = TableFieldsQueryConfig (
315+ table_id = table_id ,
316+ where_clause_template = where_clause_template ,
317+ where_clause_fields = where_clause_fields )
318+
319+ cloudsql_handler = CloudSQLEnrichmentHandler (
320+ connection_config = connection_config ,
321+ table_id = table_id ,
322+ query_config = query_config )
323+ with beam .Pipeline () as p :
324+ _ = (
325+ p
326+ | "Create" >> beam .Create (data )
327+ | "Enrich W/ Unmanaged SQL Server" >> Enrichment (cloudsql_handler )
328+ | "Print" >> beam .Map (print ))
329+ # [END enrichment_with_external_sqlserver]
0 commit comments