1414#
1515"""Utility functions for multimodal dataset."""
1616
17+ import asyncio
1718from typing import Any , Type , TypeVar
1819import uuid
1920
2021import google .auth .credentials
2122from vertexai ._genai .types import common
2223from pydantic import BaseModel
2324
25+
2426METADATA_SCHEMA_URI = (
2527 "gs://google-cloud-aiplatform/schema/dataset/metadata/multimodal_1.0.0.yaml"
2628)
@@ -169,14 +171,48 @@ def _normalize_and_validate_table_id(
169171 return f"{ table_ref .project } .{ table_ref .dataset_id } .{ table_ref .table_id } "
170172
171173
174+ async def _normalize_and_validate_table_id_async (
175+ * ,
176+ table_id : str ,
177+ project : str ,
178+ location : str ,
179+ credentials : google .auth .credentials .Credentials ,
180+ ) -> str :
181+ bigquery = _try_import_bigquery ()
182+
183+ table_ref = bigquery .TableReference .from_string (table_id , default_project = project )
184+ if table_ref .project != project :
185+ raise ValueError (
186+ "The BigQuery table "
187+ f"`{ table_ref .project } .{ table_ref .dataset_id } .{ table_ref .table_id } `"
188+ " must be in the same project as the multimodal dataset."
189+ f" The multimodal dataset is in `{ project } `, but the BigQuery table"
190+ f" is in `{ table_ref .project } `."
191+ )
192+
193+ dataset_ref = bigquery .DatasetReference (
194+ project = table_ref .project , dataset_id = table_ref .dataset_id
195+ )
196+ client = bigquery .Client (project = project , credentials = credentials )
197+ bq_dataset = await asyncio .to_thread (client .get_dataset , dataset_ref = dataset_ref )
198+ if not _bq_dataset_location_allowed (location , bq_dataset .location ):
199+ raise ValueError (
200+ "The BigQuery dataset"
201+ f" `{ dataset_ref .project } .{ dataset_ref .dataset_id } ` must be in the"
202+ " same location as the multimodal dataset. The multimodal dataset"
203+ f" is in `{ location } `, but the BigQuery dataset is in"
204+ f" `{ bq_dataset .location } `."
205+ )
206+ return f"{ table_ref .project } .{ table_ref .dataset_id } .{ table_ref .table_id } "
207+
208+
172209def _create_default_bigquery_dataset_if_not_exists (
173210 * ,
174211 project : str ,
175212 location : str ,
176213 credentials : google .auth .credentials .Credentials ,
177214) -> str :
178- # Loading bigquery lazily to avoid auto-loading it when importing vertexai
179- from google .cloud import bigquery # pylint: disable=g-import-not-at-top
215+ bigquery = _try_import_bigquery ()
180216
181217 bigquery_client = bigquery .Client (project = project , credentials = credentials )
182218 location_str = location .lower ().replace ("-" , "_" )
@@ -189,5 +225,55 @@ def _create_default_bigquery_dataset_if_not_exists(
189225 return f"{ dataset_id .project } .{ dataset_id .dataset_id } "
190226
191227
228+ async def _create_default_bigquery_dataset_if_not_exists_async (
229+ * ,
230+ project : str ,
231+ location : str ,
232+ credentials : google .auth .credentials .Credentials ,
233+ ) -> str :
234+ bigquery = _try_import_bigquery ()
235+
236+ bigquery_client = bigquery .Client (project = project , credentials = credentials )
237+ location_str = location .lower ().replace ("-" , "_" )
238+ dataset_id = bigquery .DatasetReference (
239+ project , f"{ _DEFAULT_BQ_DATASET_PREFIX } _{ location_str } "
240+ )
241+ dataset = bigquery .Dataset (dataset_ref = dataset_id )
242+ dataset .location = location
243+ await asyncio .to_thread (bigquery_client .create_dataset , dataset , exists_ok = True )
244+ return f"{ dataset_id .project } .{ dataset_id .dataset_id } "
245+
246+
192247def _generate_target_table_id (dataset_id : str ) -> str :
193248 return f"{ dataset_id } .{ _DEFAULT_BQ_TABLE_PREFIX } _{ str (uuid .uuid4 ())} "
249+
250+
251+ def save_dataframe_to_bigquery (
252+ dataframe : "bigframes.pandas.DataFrame" , # type: ignore # noqa: F821
253+ target_table_id : str ,
254+ bq_client : "bigquery.Client" , # type: ignore # noqa: F821
255+ ) -> None :
256+ # `to_gbq` does not support cross-region use cases. We use `copy_table` as a workaround.
257+ temp_table_id = dataframe .to_gbq ()
258+ copy_job = bq_client .copy_table (
259+ sources = temp_table_id ,
260+ destination = target_table_id ,
261+ )
262+ copy_job .result ()
263+ bq_client .delete_table (temp_table_id )
264+
265+
266+ async def save_dataframe_to_bigquery_async (
267+ dataframe : "bigframes.pandas.DataFrame" , # type: ignore # noqa: F821
268+ target_table_id : str ,
269+ bq_client : "bigquery.Client" , # type: ignore # noqa: F821
270+ ) -> None :
271+ # `to_gbq` does not support cross-region use cases. We use `copy_table` as a workaround.
272+ temp_table_id = await asyncio .to_thread (dataframe .to_gbq )
273+ copy_job = await asyncio .to_thread (
274+ bq_client .copy_table ,
275+ sources = temp_table_id ,
276+ destination = target_table_id ,
277+ )
278+ await asyncio .to_thread (copy_job .result )
279+ await asyncio .to_thread (bq_client .delete_table , temp_table_id )
0 commit comments