Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37345",
"modification": 49
"modification": 50
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37360",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 6
"modification": 7
}
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/dataframe/io_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import pytest

import apache_beam.io.gcp.bigquery
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand All @@ -34,8 +33,11 @@

try:
from google.api_core.exceptions import GoogleAPICallError

import apache_beam.io.gcp.bigquery
except ImportError:
GoogleAPICallError = None
bigquery = None


@unittest.skipIf(
Expand Down
27 changes: 14 additions & 13 deletions sdks/python/apache_beam/dataframe/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,22 @@
from parameterized import parameterized

import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.dataframe import convert
from apache_beam.dataframe import io
from apache_beam.io import fileio
from apache_beam.io import restriction_trackers
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

try:
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import bigquery as gcp_bigquery

import apache_beam.io.gcp.bigquery
except ImportError:
GoogleAPICallError = None
gcp_bigquery = None

# Get major, minor version
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
Expand Down Expand Up @@ -490,19 +492,18 @@ def test_double_write(self):
class ReadGbqTransformTests(unittest.TestCase):
@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_direct_read(self, get_table):
try:
bigquery.TableFieldSchema
except AttributeError:
raise ValueError('Please install GCP Dependencies.')
if gcp_bigquery is None:
raise unittest.SkipTest('GCP dependencies are not installed')
fields = [
bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
gcp_bigquery.SchemaField(
name='stn', field_type='DOUBLE', mode="NULLABLE"),
gcp_bigquery.SchemaField(
name='temp', field_type='FLOAT64', mode="REPEATED"),
gcp_bigquery.SchemaField(
name='count', field_type='INTEGER', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
table = apache_beam.io.gcp.internal.clients.bigquery. \
bigquery_v2_messages.Table(
schema=schema)
table = mock.Mock()
table.schema = fields
get_table.return_value = table

with self.assertRaisesRegex(ValueError,
Expand Down
84 changes: 30 additions & 54 deletions sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,60 +44,36 @@ def run(argv=None):

with beam.Pipeline(argv=pipeline_args) as p:

from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position

table_schema = bigquery.TableSchema()

# Fields that use standard types.
kind_schema = bigquery.TableFieldSchema()
kind_schema.name = 'kind'
kind_schema.type = 'string'
kind_schema.mode = 'nullable'
table_schema.fields.append(kind_schema)

full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)

age_schema = bigquery.TableFieldSchema()
age_schema.name = 'age'
age_schema.type = 'integer'
age_schema.mode = 'nullable'
table_schema.fields.append(age_schema)

gender_schema = bigquery.TableFieldSchema()
gender_schema.name = 'gender'
gender_schema.type = 'string'
gender_schema.mode = 'nullable'
table_schema.fields.append(gender_schema)

# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'

area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)

number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)

# A repeated field.
children_schema = bigquery.TableFieldSchema()
children_schema.name = 'children'
children_schema.type = 'string'
children_schema.mode = 'repeated'
table_schema.fields.append(children_schema)
# pylint: disable=wrong-import-order, wrong-import-position

table_schema = {
'fields': [{
'name': 'kind', 'type': 'STRING', 'mode': 'NULLABLE'
}, {
'name': 'fullName', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'
}, {
'name': 'gender', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'phoneNumber',
'type': 'RECORD',
'mode': 'NULLABLE',
'fields': [{
'name': 'areaCode',
'type': 'INTEGER',
'mode': 'NULLABLE'
},
{
'name': 'number',
'type': 'INTEGER',
'mode': 'NULLABLE'
}]
}, {
'name': 'children', 'type': 'STRING', 'mode': 'REPEATED'
}]
}

def create_random_record(record_id):
return {
Expand Down
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,11 @@ def model_bigqueryio(
# [END model_bigqueryio_table_spec_without_project]

# [START model_bigqueryio_table_spec_object]
from apache_beam.io.gcp.internal.clients import bigquery
from google.cloud import bigquery

table_spec = bigquery.TableReference(
projectId='clouddataflow-readonly',
datasetId='samples',
tableId='weather_stations')
bigquery.DatasetReference('clouddataflow-readonly', 'samples'),
'weather_stations')
# [END model_bigqueryio_table_spec_object]

# [START model_bigqueryio_data_types]
Expand Down
50 changes: 17 additions & 33 deletions sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@

from apache_beam.io.gcp import big_query_query_to_table_pipeline
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
from google.api_core import exceptions
from google.cloud import bigquery
except ImportError:
pass
import unittest
raise unittest.SkipTest('GCP dependencies are not installed')

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -101,36 +102,24 @@ def setUp(self):
self.output_table = "%s.output_table" % (self.dataset_id)

def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
try:
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
self.bigquery_client.client.delete_dataset(
f"{self.project}.{self.dataset_id}",
delete_contents=True,
not_found_ok=True)
except exceptions.GoogleAPIError:
_LOGGER.debug('Failed to clean up dataset %s' % self.dataset_id)

def _setup_new_types_env(self):
table_schema = bigquery.TableSchema()
table_field = bigquery.TableFieldSchema()
table_field.name = 'bytes'
table_field.type = 'BYTES'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'date'
table_field.type = 'DATE'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'time'
table_field.type = 'TIME'
table_schema.fields.append(table_field)
table_schema = [
bigquery.SchemaField('bytes', 'BYTES'),
bigquery.SchemaField('date', 'DATE'),
bigquery.SchemaField('time', 'TIME'),
]
table = bigquery.Table(
tableReference=bigquery.TableReference(
projectId=self.project,
datasetId=self.dataset_id,
tableId=NEW_TYPES_INPUT_TABLE),
f"{self.project}.{self.dataset_id}.{NEW_TYPES_INPUT_TABLE}",
schema=table_schema)
request = bigquery.BigqueryTablesInsertRequest(
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)
self.bigquery_client.client.create_table(table)

# Call get_table so that we wait until the table is visible.
_ = self.bigquery_client.get_table(
Expand All @@ -151,12 +140,7 @@ def _setup_new_types_env(self):
'date': '2000-01-01',
'time': '00:00:00'
}]
# the API Tools bigquery client expects byte values to be base-64 encoded
# TODO https://github.com/apache/beam/issues/19073: upgrade to
# google-cloud-bigquery which does not require handling the encoding in
# beam
for row in table_data:
row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')

passed, errors = self.bigquery_client.insert_rows(
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
Expand Down
Loading
Loading