Skip to content

Commit 40f595f

Browse files
.github+sdks+website: update docs and add exmples for CloudSQL handler
1 parent 2e313a9 commit 40f595f

5 files changed

Lines changed: 441 additions & 6 deletions

File tree

.github/workflows/beam_PreCommit_Python_Examples.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ env:
5353
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
5454
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
5555
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
56+
ALLOYDB_PASSWORD: ${{ secrets.ALLOYDB_PASSWORD }}
5657

5758
jobs:
5859
beam_PreCommit_Python_Examples:

sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py

Lines changed: 212 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# Unless required by applicable law or agreed to in writing, software
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
# See the License for the specific language governing permissions and
15+
# See the License for the specific langutage governing permissions and
1616
# limitations under the License.
1717
#
1818

@@ -116,3 +116,214 @@ def enrichment_with_vertex_ai_legacy():
116116
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
117117
| "Print" >> beam.Map(print))
118118
# [END enrichment_with_vertex_ai_legacy]
119+
120+
121+
def enrichment_with_google_cloudsql_pg():
122+
# [START enrichment_with_google_cloudsql_pg]
123+
import apache_beam as beam
124+
from apache_beam.transforms.enrichment import Enrichment
125+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
126+
CloudSQLEnrichmentHandler,
127+
DatabaseTypeAdapter,
128+
TableFieldsQueryConfig,
129+
CloudSQLConnectionConfig)
130+
import os
131+
132+
database_adapter = DatabaseTypeAdapter.POSTGRESQL
133+
database_uri = os.environ.get("GOOGLE_CLOUD_SQL_DB_URI")
134+
database_user = int(os.environ.get("GOOGLE_CLOUD_SQL_DB_USER"))
135+
database_password = os.environ.get("GOOGLE_CLOUD_SQL_DB_PASSWORD")
136+
database_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_ID")
137+
table_id = os.environ.get("GOOGLE_CLOUD_SQL_DB_TABLE_ID")
138+
where_clause_template = "product_id = {}"
139+
where_clause_fields = ["product_id"]
140+
141+
data = [
142+
beam.Row(product_id=1, name='A'),
143+
beam.Row(product_id=2, name='B'),
144+
beam.Row(product_id=3, name='C'),
145+
]
146+
147+
connection_config = CloudSQLConnectionConfig(
148+
db_adapter=database_adapter,
149+
instance_connection_uri=database_uri,
150+
user=database_user,
151+
password=database_password,
152+
db_id=database_id)
153+
154+
query_config = TableFieldsQueryConfig(
155+
table_id=table_id,
156+
where_clause_template=where_clause_template,
157+
where_clause_fields=where_clause_fields)
158+
159+
cloudsql_handler = CloudSQLEnrichmentHandler(
160+
connection_config=connection_config,
161+
table_id=table_id,
162+
query_config=query_config)
163+
with beam.Pipeline() as p:
164+
_ = (
165+
p
166+
| "Create" >> beam.Create(data)
167+
|
168+
"Enrich W/ Google CloudSQL PostgreSQL" >> Enrichment(cloudsql_handler)
169+
| "Print" >> beam.Map(print))
170+
# [END enrichment_with_google_cloudsql_pg]
171+
172+
173+
def enrichment_with_external_pg():
174+
# [START enrichment_with_external_pg]
175+
import apache_beam as beam
176+
from apache_beam.transforms.enrichment import Enrichment
177+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
178+
CloudSQLEnrichmentHandler,
179+
DatabaseTypeAdapter,
180+
TableFieldsQueryConfig,
181+
ExternalSQLDBConnectionConfig)
182+
import os
183+
184+
database_adapter = DatabaseTypeAdapter.POSTGRESQL
185+
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
186+
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
187+
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
188+
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
189+
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
190+
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
191+
where_clause_template = "product_id = {}"
192+
where_clause_fields = ["product_id"]
193+
194+
data = [
195+
beam.Row(product_id=1, name='A'),
196+
beam.Row(product_id=2, name='B'),
197+
beam.Row(product_id=3, name='C'),
198+
]
199+
200+
connection_config = ExternalSQLDBConnectionConfig(
201+
db_adapter=database_adapter,
202+
host=database_host,
203+
port=database_port,
204+
user=database_user,
205+
password=database_password,
206+
db_id=database_id)
207+
208+
query_config = TableFieldsQueryConfig(
209+
table_id=table_id,
210+
where_clause_template=where_clause_template,
211+
where_clause_fields=where_clause_fields)
212+
213+
cloudsql_handler = CloudSQLEnrichmentHandler(
214+
connection_config=connection_config,
215+
table_id=table_id,
216+
query_config=query_config)
217+
with beam.Pipeline() as p:
218+
_ = (
219+
p
220+
| "Create" >> beam.Create(data)
221+
| "Enrich W/ Unmanaged PostgreSQL" >> Enrichment(cloudsql_handler)
222+
| "Print" >> beam.Map(print))
223+
# [END enrichment_with_external_pg]
224+
225+
226+
def enrichment_with_external_mysql():
227+
# [START enrichment_with_external_mysql]
228+
import apache_beam as beam
229+
from apache_beam.transforms.enrichment import Enrichment
230+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
231+
CloudSQLEnrichmentHandler,
232+
DatabaseTypeAdapter,
233+
TableFieldsQueryConfig,
234+
ExternalSQLDBConnectionConfig)
235+
import os
236+
237+
database_adapter = DatabaseTypeAdapter.MYSQL
238+
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
239+
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
240+
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
241+
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
242+
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
243+
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
244+
where_clause_template = "product_id = {}"
245+
where_clause_fields = ["product_id"]
246+
247+
data = [
248+
beam.Row(product_id=1, name='A'),
249+
beam.Row(product_id=2, name='B'),
250+
beam.Row(product_id=3, name='C'),
251+
]
252+
253+
connection_config = ExternalSQLDBConnectionConfig(
254+
db_adapter=database_adapter,
255+
host=database_host,
256+
port=database_port,
257+
user=database_user,
258+
password=database_password,
259+
db_id=database_id)
260+
261+
query_config = TableFieldsQueryConfig(
262+
table_id=table_id,
263+
where_clause_template=where_clause_template,
264+
where_clause_fields=where_clause_fields)
265+
266+
cloudsql_handler = CloudSQLEnrichmentHandler(
267+
connection_config=connection_config,
268+
table_id=table_id,
269+
query_config=query_config)
270+
with beam.Pipeline() as p:
271+
_ = (
272+
p
273+
| "Create" >> beam.Create(data)
274+
| "Enrich W/ Unmanaged MySQL" >> Enrichment(cloudsql_handler)
275+
| "Print" >> beam.Map(print))
276+
# [END enrichment_with_external_mysql]
277+
278+
279+
def enrichment_with_external_sqlserver():
280+
# [START enrichment_with_external_sqlserver]
281+
import apache_beam as beam
282+
from apache_beam.transforms.enrichment import Enrichment
283+
from apache_beam.transforms.enrichment_handlers.cloudsql import (
284+
CloudSQLEnrichmentHandler,
285+
DatabaseTypeAdapter,
286+
TableFieldsQueryConfig,
287+
ExternalSQLDBConnectionConfig)
288+
import os
289+
290+
database_adapter = DatabaseTypeAdapter.SQLSERVER
291+
database_host = os.environ.get("EXTERNAL_SQL_DB_HOST")
292+
database_port = int(os.environ.get("EXTERNAL_SQL_DB_PORT"))
293+
database_user = os.environ.get("EXTERNAL_SQL_DB_USER")
294+
database_password = os.environ.get("EXTERNAL_SQL_DB_PASSWORD")
295+
database_id = os.environ.get("EXTERNAL_SQL_DB_ID")
296+
table_id = os.environ.get("EXTERNAL_SQL_DB_TABLE_ID")
297+
where_clause_template = "product_id = {}"
298+
where_clause_fields = ["product_id"]
299+
300+
data = [
301+
beam.Row(product_id=1, name='A'),
302+
beam.Row(product_id=2, name='B'),
303+
beam.Row(product_id=3, name='C'),
304+
]
305+
306+
connection_config = ExternalSQLDBConnectionConfig(
307+
db_adapter=database_adapter,
308+
host=database_host,
309+
port=database_port,
310+
user=database_user,
311+
password=database_password,
312+
db_id=database_id)
313+
314+
query_config = TableFieldsQueryConfig(
315+
table_id=table_id,
316+
where_clause_template=where_clause_template,
317+
where_clause_fields=where_clause_fields)
318+
319+
cloudsql_handler = CloudSQLEnrichmentHandler(
320+
connection_config=connection_config,
321+
table_id=table_id,
322+
query_config=query_config)
323+
with beam.Pipeline() as p:
324+
_ = (
325+
p
326+
| "Create" >> beam.Create(data)
327+
| "Enrich W/ Unmanaged SQL Server" >> Enrichment(cloudsql_handler)
328+
| "Print" >> beam.Map(print))
329+
# [END enrichment_with_external_sqlserver]

0 commit comments

Comments
 (0)