Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions docs/supported-databases/bigquery.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
BigQuery
========

Integration with `Google BigQuery <https://cloud.google.com/bigquery>`_ using the `BigQuery Emulator <https://github.com/goccy/bigquery-emulator>`_

This integration uses the official `Google Cloud BigQuery Python Client <https://cloud.google.com/python/docs/reference/bigquery/latest>`_ for testing against the BigQuery Emulator. The emulator is a third-party project that provides a local development environment that mimics the behavior of BigQuery, allowing you to test your application without connecting to the actual service.
Integration with `Google BigQuery <https://cloud.google.com/bigquery>`_ using the
`BigQuery Emulator <https://github.com/goccy/bigquery-emulator>`_.

Installation
------------
Expand All @@ -12,37 +11,39 @@ Installation

pip install pytest-databases[bigquery]

The ``bigquery`` extra is kept as a compatibility group. Install the BigQuery client
that your application already uses.

Usage Example
-------------

.. code-block:: python

import pytest
from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud import bigquery

from pytest_databases.docker.bigquery import BigQueryService

pytest_plugins = ["pytest_databases.docker.bigquery"]


def test(bigquery_service: BigQueryService) -> None:
client = bigquery.Client(
project=bigquery_service.project,
client_options=bigquery_service.client_options,
credentials=bigquery_service.credentials,
client_options=ClientOptions(api_endpoint=bigquery_service.endpoint),
credentials=AnonymousCredentials(),
)

job = client.query(query="SELECT 1 as one")
resp = list(job.result())
assert resp[0].one == 1

def test(bigquery_client: bigquery.Client) -> None:
assert isinstance(bigquery_client, bigquery.Client)
job = client.query(query="SELECT 1 AS one")
rows = list(job.result())
assert rows[0].one == 1

Available Fixtures
------------------

* ``bigquery_image``: The Docker image to use for BigQuery.
* ``bigquery_service``: A fixture that provides a BigQuery service.
* ``bigquery_client``: A fixture that provides a BigQuery client.
* ``bigquery_image``: The Docker image to use for the BigQuery emulator.
* ``bigquery_service``: A fixture that provides a running BigQuery emulator service.

Service API
-----------
Expand Down
6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Source = "https://github.com/litestar-org/pytest-databases"

[project.optional-dependencies]
azure-storage = ["azure-storage-blob"]
bigquery = ["google-cloud-bigquery"]
bigquery = []
cockroachdb = ["psycopg"]
dragonfly = ["redis"]
elasticsearch7 = ["elasticsearch7"]
Expand Down Expand Up @@ -195,10 +195,6 @@ warn_unused_ignores = true
disable_error_code = "attr-defined"
module = "pytest_databases.docker.spanner"

[[tool.mypy.overrides]]
disable_error_code = "attr-defined"
module = "pytest_databases.docker.bigquery"

[[tool.mypy.overrides]]
disable_error_code = "attr-defined"
disallow_untyped_decorators = false
Expand Down
62 changes: 34 additions & 28 deletions src/pytest_databases/docker/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from __future__ import annotations

import json
import urllib.error
import urllib.request
from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import pytest
from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials, Credentials
from google.cloud import bigquery

from pytest_databases.helpers import get_xdist_worker_num
from pytest_databases.types import ServiceContainer, XdistIsolationLevel
Expand Down Expand Up @@ -36,15 +36,29 @@ def platform() -> str:
class BigQueryService(ServiceContainer):
project: str
dataset: str
credentials: Credentials

@property
def endpoint(self) -> str:
return f"http://{self.host}:{self.port}"

@property
def client_options(self) -> ClientOptions:
return ClientOptions(api_endpoint=self.endpoint)

def _query_bigquery_emulator(
host: str,
port: int,
project: str,
sql: str,
*,
timeout: float = 2.0,
) -> dict[str, Any]:
body = json.dumps({"query": sql, "useLegacySql": False}).encode("utf-8")
request = urllib.request.Request(
f"http://{host}:{port}/bigquery/v2/projects/{project}/queries",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response: # noqa: S310
return json.loads(response.read().decode("utf-8"))


@pytest.fixture(scope="session")
Expand All @@ -64,17 +78,19 @@ def bigquery_service(

def check(_service: ServiceContainer) -> bool:
try:
client = bigquery.Client(
project=project,
client_options=ClientOptions(api_endpoint=f"http://{_service.host}:{_service.port}"),
credentials=AnonymousCredentials(),
payload = _query_bigquery_emulator(
_service.host,
_service.port,
project,
"SELECT 1 as one",
)

job = client.query(query="SELECT 1 as one")

resp = list(job.result())
return resp[0].one == 1
except Exception: # noqa: BLE001
except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, TimeoutError, OSError):
return False
if payload.get("jobComplete") is not True:
return False
try:
return payload["rows"][0]["f"][0]["v"] == "1"
except (KeyError, IndexError, TypeError):
return False

with docker_service.run(
Expand All @@ -97,14 +113,4 @@ def check(_service: ServiceContainer) -> bool:
container=service.container,
project=project,
dataset=dataset,
credentials=AnonymousCredentials(),
)


@pytest.fixture(scope="session")
def bigquery_client(bigquery_service: BigQueryService) -> Generator[bigquery.Client, None, None]:
yield bigquery.Client(
project=bigquery_service.project,
client_options=bigquery_service.client_options,
credentials=bigquery_service.credentials,
)
89 changes: 63 additions & 26 deletions tests/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,56 @@
import pytest


def test_service_fixture(pytester: pytest.Pytester) -> None:
def test_plugin_imports_without_google_cloud_bigquery(pytester: pytest.Pytester) -> None:
pytester.makepyfile("""
from google.cloud import bigquery

pytest_plugins = ["pytest_databases.docker.bigquery"]

def test(bigquery_service) -> None:
client = bigquery.Client(
project=bigquery_service.project,
client_options=bigquery_service.client_options,
credentials=bigquery_service.credentials,
)

job = client.query(query="SELECT 1 as one")

resp = list(job.result())
assert resp[0].one == 1
import builtins

def test_import() -> None:
original_import = builtins.__import__
blocked = {
"google.cloud.bigquery",
"google.api_core.client_options",
"google.auth.credentials",
}

def blocked_import(name, globals=None, locals=None, fromlist=(), level=0):
if name in blocked:
raise ModuleNotFoundError(name)
return original_import(name, globals, locals, fromlist, level)

builtins.__import__ = blocked_import
try:
import pytest_databases.docker.bigquery
finally:
builtins.__import__ = original_import
""")

result = pytester.runpytest_subprocess("-p", "pytest_databases")
result = pytester.runpytest_subprocess("-p", "pytest_databases", "-vv")
result.assert_outcomes(passed=1)


def test_client_fixture(pytester: pytest.Pytester) -> None:
def test_service_fixture(pytester: pytest.Pytester) -> None:
pytester.makepyfile("""
from google.cloud import bigquery
import json
import urllib.request

pytest_plugins = ["pytest_databases.docker.bigquery"]

def test(bigquery_client) -> None:
assert isinstance(bigquery_client, bigquery.Client)
def run_bigquery(service, sql):
body = json.dumps({"query": sql, "useLegacySql": False}).encode("utf-8")
request = urllib.request.Request(
f"{service.endpoint}/bigquery/v2/projects/{service.project}/queries",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=10) as response:
return json.loads(response.read().decode("utf-8"))

def test(bigquery_service) -> None:
payload = run_bigquery(bigquery_service, "SELECT 1 as one")
assert payload.get("jobComplete") is True
assert payload["rows"][0]["f"][0]["v"] == "1"
""")

result = pytester.runpytest_subprocess("-p", "pytest_databases")
Expand All @@ -45,15 +64,33 @@ def test(bigquery_client) -> None:

def test_xdist(pytester: pytest.Pytester) -> None:
pytester.makepyfile("""
from google.cloud import bigquery
import json
import urllib.request

pytest_plugins = ["pytest_databases.docker.bigquery"]

def test_one(bigquery_client, bigquery_service) -> None:
bigquery_client.query(f"CREATE TABLE `{bigquery_service.dataset}.test` AS select 1 as the_value")
def run_bigquery(service, sql):
body = json.dumps({"query": sql, "useLegacySql": False}).encode("utf-8")
request = urllib.request.Request(
f"{service.endpoint}/bigquery/v2/projects/{service.project}/queries",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=30) as response:
return json.loads(response.read().decode("utf-8"))

def test_one(bigquery_service) -> None:
run_bigquery(
bigquery_service,
f"CREATE TABLE `{bigquery_service.dataset}.test_one` AS SELECT 1 AS the_value",
)

def test_two(bigquery_client, bigquery_service) -> None:
bigquery_client.query(f"CREATE TABLE `{bigquery_service.dataset}.test` AS select 1 as the_value")
def test_two(bigquery_service) -> None:
run_bigquery(
bigquery_service,
f"CREATE TABLE `{bigquery_service.dataset}.test_two` AS SELECT 1 AS the_value",
)
""")

result = pytester.runpytest_subprocess("-p", "pytest_databases", "-n", "2")
Expand Down
Loading