Skip to content

Commit 4e0e38c

Browse files
acrocacicoyle
andauthored
Add support to bulk pubsub (#915)
* Add support to bulk pubsub Signed-off-by: Albert Callarisa <albert@diagrid.io> * lint Signed-off-by: Albert Callarisa <albert@diagrid.io> * install dapr last Signed-off-by: Albert Callarisa <albert@diagrid.io> * Fix tox dependencies install Signed-off-by: Albert Callarisa <albert@diagrid.io> * add callback logic Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * Fix bulk pubsub unit tests Signed-off-by: Albert Callarisa <albert@diagrid.io> * lint Signed-off-by: Albert Callarisa <albert@diagrid.io> * Fix type Signed-off-by: Albert Callarisa <albert@diagrid.io> * Add tests for some edge cases Signed-off-by: Albert Callarisa <albert@diagrid.io> * Add tests to cover more missed scenarios Signed-off-by: Albert Callarisa <albert@diagrid.io> --------- Signed-off-by: Albert Callarisa <albert@diagrid.io> Signed-off-by: Cassandra Coyle <cassie@diagrid.io> Co-authored-by: Cassandra Coyle <cassie@diagrid.io>
1 parent cb20efd commit 4e0e38c

11 files changed

Lines changed: 736 additions & 32 deletions

File tree

dapr/aio/clients/grpc/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from google.protobuf.any_pb2 import Any as GrpcAny
2828
from google.protobuf.empty_pb2 import Empty as GrpcEmpty
2929
from google.protobuf.message import Message as GrpcMessage
30+
from grpc import StatusCode # type: ignore
3031
from grpc.aio import ( # type: ignore
3132
AioRpcError,
3233
StreamStreamClientInterceptor,
@@ -69,6 +70,8 @@
6970
)
7071
from dapr.clients.grpc._response import (
7172
BindingResponse,
73+
BulkPublishResponse,
74+
BulkPublishResponseFailedEntry,
7275
BulkStateItem,
7376
BulkStatesResponse,
7477
ConfigurationResponse,
@@ -484,6 +487,96 @@ async def publish_event(
484487

485488
return DaprResponse(await call.initial_metadata())
486489

490+
async def publish_events(
491+
self,
492+
pubsub_name: str,
493+
topic_name: str,
494+
data: Sequence[Union[bytes, str]],
495+
publish_metadata: Dict[str, str] = {},
496+
data_content_type: Optional[str] = None,
497+
) -> BulkPublishResponse:
498+
"""Bulk publish multiple events to a given topic.
499+
This publishes multiple events to a specified topic and pubsub component.
500+
Each event can be bytes or str. The str data is encoded into bytes with
501+
default charset of utf-8.
502+
503+
The example publishes multiple string events to a topic:
504+
505+
from dapr.aio.clients import DaprClient
506+
async with DaprClient() as d:
507+
resp = await d.publish_events(
508+
pubsub_name='pubsub_1',
509+
topic_name='TOPIC_A',
510+
data=['message1', 'message2', 'message3'],
511+
data_content_type='text/plain',
512+
)
513+
# resp.failed_entries includes any entries that failed to publish.
514+
515+
Args:
516+
pubsub_name (str): the name of the pubsub component
517+
topic_name (str): the topic name to publish to
518+
data (Sequence[Union[bytes, str]]): sequence of events to publish;
519+
each event must be bytes or str
520+
publish_metadata (Dict[str, str], optional): Dapr metadata for the
521+
bulk publish request
522+
data_content_type (str, optional): content type of the event data
523+
524+
Returns:
525+
:class:`BulkPublishResponse` with any failed entries
526+
"""
527+
entries = []
528+
for event in data:
529+
entry_id = str(uuid.uuid4())
530+
if isinstance(event, bytes):
531+
event_data = event
532+
content_type = data_content_type or 'application/octet-stream'
533+
elif isinstance(event, str):
534+
event_data = event.encode('utf-8')
535+
content_type = data_content_type or 'text/plain'
536+
else:
537+
raise ValueError(f'invalid type for event {type(event)}')
538+
539+
entries.append(
540+
api_v1.BulkPublishRequestEntry(
541+
entry_id=entry_id,
542+
event=event_data,
543+
content_type=content_type,
544+
)
545+
)
546+
547+
req = api_v1.BulkPublishRequest(
548+
pubsub_name=pubsub_name,
549+
topic=topic_name,
550+
entries=entries,
551+
metadata=publish_metadata,
552+
)
553+
554+
try:
555+
call = self._stub.BulkPublishEvent(req)
556+
response = await call
557+
except AioRpcError as err:
558+
if err.code() == StatusCode.UNIMPLEMENTED:
559+
try:
560+
call = self._stub.BulkPublishEventAlpha1(req)
561+
response = await call
562+
except AioRpcError as err2:
563+
raise DaprGrpcError(err2) from err2
564+
else:
565+
raise DaprGrpcError(err) from err
566+
567+
failed_entries = [
568+
BulkPublishResponseFailedEntry(
569+
entry_id=entry.entry_id,
570+
error=entry.error,
571+
)
572+
for entry in response.failedEntries
573+
]
574+
575+
return BulkPublishResponse(
576+
failed_entries=failed_entries,
577+
headers=await call.initial_metadata(),
578+
)
579+
487580
async def subscribe(
488581
self,
489582
pubsub_name: str,

dapr/clients/grpc/_response.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,64 @@ def _read_subscribe_config(
723723
pass
724724

725725

726+
class BulkPublishResponseFailedEntry:
727+
"""A failed entry from the bulk publish response.
728+
729+
Attributes:
730+
entry_id (str): the entry ID that failed.
731+
error (str): the error message for the failure.
732+
"""
733+
734+
def __init__(self, entry_id: str, error: str):
735+
"""Initializes BulkPublishResponseFailedEntry.
736+
737+
Args:
738+
entry_id (str): the entry ID that failed.
739+
error (str): the error message for the failure.
740+
"""
741+
self._entry_id = entry_id
742+
self._error = error
743+
744+
@property
745+
def entry_id(self) -> str:
746+
"""Gets the entry ID."""
747+
return self._entry_id
748+
749+
@property
750+
def error(self) -> str:
751+
"""Gets the error message."""
752+
return self._error
753+
754+
755+
class BulkPublishResponse(DaprResponse):
756+
"""The response of publish_events (bulk publish) API.
757+
758+
This inherits from DaprResponse
759+
760+
Attributes:
761+
failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed to publish.
762+
"""
763+
764+
def __init__(
765+
self,
766+
failed_entries: List[BulkPublishResponseFailedEntry] = [],
767+
headers: MetadataTuple = (),
768+
):
769+
"""Initializes BulkPublishResponse from :obj:`runtime_v1.BulkPublishResponse`.
770+
771+
Args:
772+
failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed.
773+
headers (Tuple, optional): the headers from Dapr gRPC response.
774+
"""
775+
super(BulkPublishResponse, self).__init__(headers)
776+
self._failed_entries = failed_entries
777+
778+
@property
779+
def failed_entries(self) -> List[BulkPublishResponseFailedEntry]:
780+
"""Gets the failed entries."""
781+
return self._failed_entries
782+
783+
726784
class TopicEventResponseStatus(Enum):
727785
# success is the default behavior: message is acknowledged and not retried
728786
success = appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS

dapr/clients/grpc/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from google.protobuf.struct_pb2 import Struct as GrpcStruct
3232
from grpc import ( # type: ignore
3333
RpcError,
34+
StatusCode,
3435
StreamStreamClientInterceptor,
3536
StreamUnaryClientInterceptor,
3637
UnaryStreamClientInterceptor,
@@ -60,6 +61,8 @@
6061
)
6162
from dapr.clients.grpc._response import (
6263
BindingResponse,
64+
BulkPublishResponse,
65+
BulkPublishResponseFailedEntry,
6366
BulkStateItem,
6467
BulkStatesResponse,
6568
ConfigurationResponse,
@@ -487,6 +490,96 @@ def publish_event(
487490

488491
return DaprResponse(call.initial_metadata())
489492

493+
def publish_events(
494+
self,
495+
pubsub_name: str,
496+
topic_name: str,
497+
data: Sequence[Union[bytes, str]],
498+
publish_metadata: Dict[str, str] = {},
499+
data_content_type: Optional[str] = None,
500+
) -> BulkPublishResponse:
501+
"""Bulk publish multiple events to a given topic.
502+
This publishes multiple events to a specified topic and pubsub component.
503+
Each event can be bytes or str. The str data is encoded into bytes with
504+
default charset of utf-8.
505+
506+
The example publishes multiple string events to a topic:
507+
508+
from dapr.clients import DaprClient
509+
with DaprClient() as d:
510+
resp = d.publish_events(
511+
pubsub_name='pubsub_1',
512+
topic_name='TOPIC_A',
513+
data=['message1', 'message2', 'message3'],
514+
data_content_type='text/plain',
515+
)
516+
# resp.failed_entries includes any entries that failed to publish.
517+
518+
Args:
519+
pubsub_name (str): the name of the pubsub component
520+
topic_name (str): the topic name to publish to
521+
data (Sequence[Union[bytes, str]]): sequence of events to publish;
522+
each event must be bytes or str
523+
publish_metadata (Dict[str, str], optional): Dapr metadata for the
524+
bulk publish request
525+
data_content_type (str, optional): content type of the event data
526+
527+
Returns:
528+
:class:`BulkPublishResponse` with any failed entries
529+
"""
530+
entries = []
531+
for event in data:
532+
entry_id = str(uuid.uuid4())
533+
if isinstance(event, bytes):
534+
event_data = event
535+
content_type = data_content_type or 'application/octet-stream'
536+
elif isinstance(event, str):
537+
event_data = event.encode('utf-8')
538+
content_type = data_content_type or 'text/plain'
539+
else:
540+
raise ValueError(f'invalid type for event {type(event)}')
541+
542+
entries.append(
543+
api_v1.BulkPublishRequestEntry(
544+
entry_id=entry_id,
545+
event=event_data,
546+
content_type=content_type,
547+
)
548+
)
549+
550+
req = api_v1.BulkPublishRequest(
551+
pubsub_name=pubsub_name,
552+
topic=topic_name,
553+
entries=entries,
554+
metadata=publish_metadata,
555+
)
556+
557+
try:
558+
response, call = self.retry_policy.run_rpc(self._stub.BulkPublishEvent.with_call, req)
559+
except RpcError as err:
560+
if err.code() == StatusCode.UNIMPLEMENTED:
561+
try:
562+
response, call = self.retry_policy.run_rpc(
563+
self._stub.BulkPublishEventAlpha1.with_call, req
564+
)
565+
except RpcError as err2:
566+
raise DaprGrpcError(err2) from err2
567+
else:
568+
raise DaprGrpcError(err) from err
569+
570+
failed_entries = [
571+
BulkPublishResponseFailedEntry(
572+
entry_id=entry.entry_id,
573+
error=entry.error,
574+
)
575+
for entry in response.failedEntries
576+
]
577+
578+
return BulkPublishResponse(
579+
failed_entries=failed_entries,
580+
headers=call.initial_metadata(),
581+
)
582+
490583
def subscribe(
491584
self,
492585
pubsub_name: str,

examples/pubsub-simple/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Example - Publish and subscribe to messages
22

3-
This example utilizes a publisher and a subscriber to show the pubsub pattern, it also shows `PublishEvent`, `OnTopicEvent`, `GetTopicSubscriptions`, and `TopicEventResponse` functionality.
4-
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
3+
This example utilizes a publisher and a subscriber to show the pubsub pattern, it also shows `PublishEvent`, `PublishEvents` (bulk), `OnTopicEvent`, `GetTopicSubscriptions`, and `TopicEventResponse` functionality.
4+
It creates a publisher and calls the `publish_event` and `publish_events` methods in the `DaprClient`.
55
It will create a gRPC subscriber and bind the `OnTopicEvent` method, which gets triggered after a message is published to the subscribed topic.
66
The subscriber will tell dapr to retry delivery of the first message it receives, logging that the message will be retried, and printing it at least once to standard output.
77

@@ -39,6 +39,9 @@ expected_stdout_lines:
3939
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
4040
- '== APP == Subscriber received: TOPIC_CE'
4141
- '== APP == Subscriber received a json cloud event: id=8, message="hello world", content_type="application/json"'
42+
- '== APP == Subscriber received: id=20, message="bulk event 1", content_type="application/json"'
43+
- '== APP == Subscriber received: id=21, message="bulk event 2", content_type="application/json"'
44+
- '== APP == Subscriber received: id=22, message="bulk event 3", content_type="application/json"'
4245
- '== APP == Subscriber received: TOPIC_CE'
4346
- '== APP == Subscriber received plain text cloud event: hello world, content_type="text/plain"'
4447
@@ -68,6 +71,7 @@ expected_stdout_lines:
6871
- "== APP == {'id': 6, 'message': 'hello world'}"
6972
- "== APP == {'id': 7, 'message': 'hello world'}"
7073
- "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-8', 'data': {'id': 8, 'message': 'hello world'}, 'datacontenttype': 'application/json'}"
74+
- "== APP == Bulk published 3 events. Failed entries: 0"
7175
- "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-10', 'data': 'hello world', 'datacontenttype': 'text/plain'}"
7276
background: true
7377
sleep: 15

examples/pubsub-simple/publisher.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,31 @@
9191

9292
time.sleep(0.5)
9393

94+
# Bulk publish multiple events at once using publish_events
95+
bulk_events = [
96+
json.dumps({'id': 20, 'message': 'bulk event 1'}),
97+
json.dumps({'id': 21, 'message': 'bulk event 2'}),
98+
json.dumps({'id': 22, 'message': 'bulk event 3'}),
99+
]
100+
101+
resp = d.publish_events(
102+
pubsub_name='pubsub',
103+
topic_name='TOPIC_A',
104+
data=bulk_events,
105+
data_content_type='application/json',
106+
)
107+
108+
print(
109+
f'Bulk published {len(bulk_events)} events. Failed entries: {len(resp.failed_entries)}',
110+
flush=True,
111+
)
112+
113+
if resp.failed_entries:
114+
for entry in resp.failed_entries:
115+
print(f' Failed entry_id={entry.entry_id}: {entry.error}', flush=True)
116+
117+
time.sleep(0.5)
118+
94119
# Send a cloud event with plain text data
95120
id = 10
96121
cloud_event = {

0 commit comments

Comments
 (0)