Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
65b648e
docs: A comment for field `code` in message `.google.pubsub.v1.JavaSc…
gcf-owl-bot[bot] Feb 4, 2025
a75d128
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 4, 2025
ea4ad05
feat: Add REST Interceptors which support reading metadata
gcf-owl-bot[bot] Feb 6, 2025
8d49d85
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 6, 2025
e0a1eb7
chore: Update gapic-generator-python to v1.22.1
gcf-owl-bot[bot] Feb 12, 2025
5e6cac2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 12, 2025
db88e66
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 12, 2025
eac0023
Merge branch 'owl-bot-copy' of https://github.com/googleapis/python-p…
gcf-owl-bot[bot] Feb 12, 2025
b031319
chore: Update gapic-generator-python to v1.23.2
gcf-owl-bot[bot] Mar 1, 2025
66a5745
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 1, 2025
8ed4ef3
build(python): remove unused generator option `add-iam-methods`
gcf-owl-bot[bot] Mar 2, 2025
e7a9979
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 2, 2025
c6c4fbd
chore: Update gapic-generator-python to v1.23.3
gcf-owl-bot[bot] Mar 3, 2025
32b0657
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 3, 2025
fb004cd
Merge branch 'main' into owl-bot-copy
ohmayr Mar 6, 2025
a26ba39
feat: deprecate `enabled` field for message transforms and add `disab…
gcf-owl-bot[bot] Mar 6, 2025
3ec56b3
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 6, 2025
41aa8ac
fix: Allow Protobuf 6.x
gcf-owl-bot[bot] Mar 10, 2025
dca9d56
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 10, 2025
0cafaf9
Merge branch 'main' into owl-bot-copy
parthea Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 136 additions & 55 deletions google/pubsub_v1/services/publisher/async_client.py

Large diffs are not rendered by default.

286 changes: 200 additions & 86 deletions google/pubsub_v1/services/publisher/client.py

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions google/pubsub_v1/services/publisher/pagers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.

Expand All @@ -80,8 +80,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicsRequest(request)
Expand Down Expand Up @@ -140,7 +142,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.

Expand All @@ -154,8 +156,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicsRequest(request)
Expand Down Expand Up @@ -218,7 +222,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.

Expand All @@ -232,8 +236,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSubscriptionsRequest(request)
Expand Down Expand Up @@ -292,7 +298,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.

Expand All @@ -306,8 +312,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSubscriptionsRequest(request)
Expand Down Expand Up @@ -370,7 +378,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.

Expand All @@ -384,8 +392,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSnapshotsRequest(request)
Expand Down Expand Up @@ -444,7 +454,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.

Expand All @@ -458,8 +468,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSnapshotsRequest(request)
Expand Down
118 changes: 102 additions & 16 deletions google/pubsub_v1/services/publisher/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging as std_logging
import pickle
import warnings
from typing import Callable, Dict, Optional, Sequence, Tuple, Union

Expand All @@ -21,15 +24,93 @@
import google.auth # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore
from google.protobuf.json_format import MessageToJson
import google.protobuf.message

import grpc # type: ignore
import proto # type: ignore

from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
from google.protobuf import empty_pb2 # type: ignore
from google.pubsub_v1.types import pubsub
from .base import PublisherTransport, DEFAULT_CLIENT_INFO

try:
from google.api_core import client_logging # type: ignore

CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
except ImportError: # pragma: NO COVER
CLIENT_LOGGING_SUPPORTED = False

_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
std_logging.DEBUG
)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra={
"serviceName": "google.pubsub.v1.Publisher",
"rpcName": client_call_details.method,
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)

response = continuation(client_call_details, request)
if logging_enabled: # pragma: NO COVER
response_metadata = response.trailing_metadata()
# Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
metadata = (
dict([(k, str(v)) for k, v in response_metadata])
if response_metadata
else None
)
result = response.result()
if isinstance(result, proto.Message):
response_payload = type(result).to_json(result)
elif isinstance(result, google.protobuf.message.Message):
response_payload = MessageToJson(result)
else:
response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
grpc_response = {
"payload": response_payload,
"metadata": metadata,
"status": "OK",
}
_LOGGER.debug(
f"Received response for {client_call_details.method}.",
extra={
"serviceName": "google.pubsub.v1.Publisher",
"rpcName": client_call_details.method,
"response": grpc_response,
"metadata": grpc_response["metadata"],
},
)
return response


class PublisherGrpcTransport(PublisherTransport):
"""gRPC backend transport for Publisher.
Expand Down Expand Up @@ -186,7 +267,12 @@ def __init__(
],
)

# Wrap messages. This must be done after self._grpc_channel exists
self._interceptor = _LoggingClientInterceptor()
self._logged_channel = grpc.intercept_channel(
self._grpc_channel, self._interceptor
)

# Wrap messages. This must be done after self._logged_channel exists
self._prep_wrapped_messages(client_info)

@classmethod
Expand Down Expand Up @@ -260,7 +346,7 @@ def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "create_topic" not in self._stubs:
self._stubs["create_topic"] = self.grpc_channel.unary_unary(
self._stubs["create_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/CreateTopic",
request_serializer=pubsub.Topic.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -286,7 +372,7 @@ def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "update_topic" not in self._stubs:
self._stubs["update_topic"] = self.grpc_channel.unary_unary(
self._stubs["update_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/UpdateTopic",
request_serializer=pubsub.UpdateTopicRequest.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -311,7 +397,7 @@ def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "publish" not in self._stubs:
self._stubs["publish"] = self.grpc_channel.unary_unary(
self._stubs["publish"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/Publish",
request_serializer=pubsub.PublishRequest.serialize,
response_deserializer=pubsub.PublishResponse.deserialize,
Expand All @@ -335,7 +421,7 @@ def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_topic" not in self._stubs:
self._stubs["get_topic"] = self.grpc_channel.unary_unary(
self._stubs["get_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/GetTopic",
request_serializer=pubsub.GetTopicRequest.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -361,7 +447,7 @@ def list_topics(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topics" not in self._stubs:
self._stubs["list_topics"] = self.grpc_channel.unary_unary(
self._stubs["list_topics"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopics",
request_serializer=pubsub.ListTopicsRequest.serialize,
response_deserializer=pubsub.ListTopicsResponse.deserialize,
Expand Down Expand Up @@ -390,7 +476,7 @@ def list_topic_subscriptions(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topic_subscriptions" not in self._stubs:
self._stubs["list_topic_subscriptions"] = self.grpc_channel.unary_unary(
self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopicSubscriptions",
request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize,
response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize,
Expand Down Expand Up @@ -423,7 +509,7 @@ def list_topic_snapshots(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topic_snapshots" not in self._stubs:
self._stubs["list_topic_snapshots"] = self.grpc_channel.unary_unary(
self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopicSnapshots",
request_serializer=pubsub.ListTopicSnapshotsRequest.serialize,
response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize,
Expand Down Expand Up @@ -452,7 +538,7 @@ def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "delete_topic" not in self._stubs:
self._stubs["delete_topic"] = self.grpc_channel.unary_unary(
self._stubs["delete_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/DeleteTopic",
request_serializer=pubsub.DeleteTopicRequest.serialize,
response_deserializer=empty_pb2.Empty.FromString,
Expand Down Expand Up @@ -484,13 +570,16 @@ def detach_subscription(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "detach_subscription" not in self._stubs:
self._stubs["detach_subscription"] = self.grpc_channel.unary_unary(
self._stubs["detach_subscription"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/DetachSubscription",
request_serializer=pubsub.DetachSubscriptionRequest.serialize,
response_deserializer=pubsub.DetachSubscriptionResponse.deserialize,
)
return self._stubs["detach_subscription"]

def close(self):
self._logged_channel.close()

@property
def set_iam_policy(
self,
Expand All @@ -509,7 +598,7 @@ def set_iam_policy(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "set_iam_policy" not in self._stubs:
self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/SetIamPolicy",
request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand All @@ -535,7 +624,7 @@ def get_iam_policy(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_iam_policy" not in self._stubs:
self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/GetIamPolicy",
request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand Down Expand Up @@ -564,16 +653,13 @@ def test_iam_permissions(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "test_iam_permissions" not in self._stubs:
self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary(
self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/TestIamPermissions",
request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
)
return self._stubs["test_iam_permissions"]

def close(self):
self.grpc_channel.close()

@property
def kind(self) -> str:
return "grpc"
Expand Down
Loading