Skip to content

Commit 0bc7852

Browse files
nikitagrover19Nikita Grover
andauthored
Fix WriteToPubSub to pass ordering_key to publish() method (#37345)
* Fix WriteToPubSub to pass ordering_key to publish() method Fixes #36201 * Update pubsub_test.py * Update pubsub_test.py * Trigger CI rerun * Retry CI (flake) * Apply yapf formatting * Address review comments: use message_to_proto_str and skip ordering key test on Dataflow * Add Dataflow warning in WriteToPubSub.expand() for ordering key support * Update pubsub_integration_test.py * Update PR and modification in beam_PostCommit_Python.json Updated PR number and modification count. * Update pubsub_integration_test.py * Update pubsub_integration_test.py * Fix PubSub error * Update pubsub_integration_test.py * Fix formatting: remove trailing whitespace * Fix ordering key integration test: retry pull loop, fix indentation * Fix import order: google imports before apache_beam (isort) * Fix import ordering in pubsub_integration_test.py * Simplify publish kwargs, conditionally enable message ordering, add retry loop in integration test * Use with_attributes to initialize with_ordering and apply in PublisherClient setup * Fix PubSub tests: enable message ordering in PublisherClient to exercise full ordering flow * Rename to publish_with_ordering_key, gate Dataflow warning, fix test assertions --------- Co-authored-by: Nikita Grover <nikiatgrover10864@gmail.com>
1 parent 2c53494 commit 0bc7852

7 files changed

Lines changed: 219 additions & 17 deletions

File tree

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"pr": "38069",
3+
"pr": "37345",
44
"modification": 49
5-
}
5+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public static class Configuration {
5353
private String topic;
5454
private @Nullable String idAttribute;
5555
private @Nullable String timestampAttribute;
56+
private boolean publishWithOrderingKey = false;
5657

5758
public void setTopic(String topic) {
5859
this.topic = topic;
@@ -65,6 +66,10 @@ public void setIdLabel(@Nullable String idAttribute) {
6566
public void setTimestampAttribute(@Nullable String timestampAttribute) {
6667
this.timestampAttribute = timestampAttribute;
6768
}
69+
70+
public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) {
71+
this.publishWithOrderingKey = publishWithOrderingKey != null && publishWithOrderingKey;
72+
}
6873
}
6974

7075
public static class WriteBuilder
@@ -85,6 +90,7 @@ public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration config
8590
if (config.timestampAttribute != null) {
8691
writeBuilder.setTimestampAttribute(config.timestampAttribute);
8792
}
93+
writeBuilder.setPublishWithOrderingKey(config.publishWithOrderingKey);
8894
writeBuilder.setDynamicDestinations(false);
8995
return writeBuilder.build();
9096
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws IOException {
17271727
this.pubsubClient =
17281728
getPubsubClientFactory()
17291729
.newClient(
1730-
getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class));
1730+
getTimestampAttribute(),
1731+
null,
1732+
c.getPipelineOptions().as(PubsubOptions.class),
1733+
Write.this.getPubsubRootUrl());
17311734
}
17321735

17331736
@ProcessElement

sdks/python/apache_beam/io/external/gcp/pubsub.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def expand(self, pbegin):
117117
# this is not implemented yet on the Java side:
118118
# ('with_attributes', bool),
119119
('timestamp_attribute', typing.Optional[str]),
120+
('publish_with_ordering_key', bool),
120121
])
121122

122123

@@ -135,6 +136,7 @@ def __init__(
135136
with_attributes=False,
136137
id_label=None,
137138
timestamp_attribute=None,
139+
publish_with_ordering_key=False,
138140
expansion_service=None):
139141
"""Initializes ``WriteToPubSub``.
140142
@@ -150,18 +152,23 @@ def __init__(
150152
in a ReadFromPubSub PTransform to deduplicate messages.
151153
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
152154
message with the given name and the message's publish time as the value.
155+
publish_with_ordering_key: If True, enables ordering key support when
156+
publishing messages. The ordering key must be set on each
157+
PubsubMessage via the ``ordering_key`` attribute.
153158
"""
154159
self.params = WriteToPubsubSchema(
155160
topic=topic,
156161
id_label=id_label,
157162
# with_attributes=with_attributes,
158-
timestamp_attribute=timestamp_attribute)
163+
timestamp_attribute=timestamp_attribute,
164+
publish_with_ordering_key=publish_with_ordering_key)
159165
self.expansion_service = expansion_service
160166
self.with_attributes = with_attributes
161167

162168
def expand(self, pvalue):
163169
if self.with_attributes:
164-
pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
170+
pcoll = pvalue | 'ToProto' >> Map(
171+
pubsub.WriteToPubSub.message_to_proto_str)
165172
else:
166173
pcoll = pvalue | 'ToProto' >> Map(
167174
lambda x: pubsub.PubsubMessage(x, {})._to_proto_str())

sdks/python/apache_beam/io/gcp/pubsub.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
"""
3232

3333
# pytype: skip-file
34-
34+
import logging
3535
import re
36+
import time
3637
from typing import Any
3738
from typing import NamedTuple
3839
from typing import Optional
@@ -388,7 +389,8 @@ def __init__(
388389
topic: str,
389390
with_attributes: bool = False,
390391
id_label: Optional[str] = None,
391-
timestamp_attribute: Optional[str] = None) -> None:
392+
timestamp_attribute: Optional[str] = None,
393+
publish_with_ordering_key: bool = False) -> None:
392394
"""Initializes ``WriteToPubSub``.
393395
394396
Args:
@@ -404,9 +406,13 @@ def __init__(
404406
in a ReadFromPubSub PTransform to deduplicate messages.
405407
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
406408
message with the given name and the message's publish time as the value.
409+
publish_with_ordering_key: If True, enables message ordering on the
410+
PublisherClient. Messages with an ordering_key will be delivered
411+
in order. Requires messages to have ordering_key set.
407412
"""
408413
super().__init__()
409414
self.with_attributes = with_attributes
415+
self.publish_with_ordering_key = publish_with_ordering_key
410416
self.id_label = id_label
411417
self.timestamp_attribute = timestamp_attribute
412418
self.project, self.topic_name = parse_topic(topic)
@@ -430,7 +436,16 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes:
430436
def expand(self, pcoll):
431437
# Store pipeline options for use in DoFn
432438
self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None
433-
439+
# Warn Dataflow users to use the XLang path for ordering key support,
440+
# since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation.
441+
runner = self.pipeline_options.get_all_options().get(
442+
'runner', '') if self.pipeline_options else ''
443+
if 'Dataflow' in str(runner) and self.publish_with_ordering_key:
444+
logging.warning(
445+
'WriteToPubSub ordering_key support is not available on Dataflow '
446+
'via this transform. Use the XLang WriteToPubSub path instead: '
447+
'apache_beam.io.external.gcp.pubsub.WriteToPubSub with '
448+
'publish_with_ordering_key=True.')
434449
if self.with_attributes:
435450
pcoll = pcoll | 'ToProtobufX' >> ParDo(
436451
_AddMetricsAndMap(
@@ -457,6 +472,9 @@ def display_data(self):
457472
True, label='With Attributes').drop_if_none(),
458473
'timestamp_attribute': DisplayDataItem(
459474
self.timestamp_attribute, label='Timestamp Attribute'),
475+
'publish_with_ordering_key': DisplayDataItem(
476+
self.publish_with_ordering_key,
477+
label='Publish With Ordering Key').drop_if_none(),
460478
}
461479

462480

@@ -563,6 +581,7 @@ def __init__(self, transform):
563581
self.id_label = transform.id_label
564582
self.timestamp_attribute = transform.timestamp_attribute
565583
self.with_attributes = transform.with_attributes
584+
self.with_ordering = transform.publish_with_ordering_key
566585

567586
# TODO(https://github.com/apache/beam/issues/18939): Add support for
568587
# id_label and timestamp_attribute.
@@ -597,7 +616,7 @@ def __init__(self, transform):
597616
output_labels_supported = False
598617

599618
# Log debug information for troubleshooting
600-
import logging
619+
601620
runner_info = getattr(
602621
pipeline_options, 'runner',
603622
'None') if pipeline_options else 'No options'
@@ -628,7 +647,13 @@ def __init__(self, transform):
628647

629648
def setup(self):
630649
from google.cloud import pubsub
631-
self._pub_client = pubsub.PublisherClient()
650+
if self.with_ordering:
651+
self._pub_client = pubsub.PublisherClient(
652+
publisher_options=pubsub.types.PublisherOptions(
653+
enable_message_ordering=True,
654+
))
655+
else:
656+
self._pub_client = pubsub.PublisherClient()
632657
self._topic = self._pub_client.topic_path(
633658
self.project, self.short_topic_name)
634659

@@ -647,21 +672,20 @@ def _flush(self):
647672
if not self._buffer:
648673
return
649674

650-
import time
651-
652675
# The elements in buffer are serialized protobuf bytes from the previous
653676
# transforms. We need to deserialize them to extract data and attributes.
654677
futures = []
655678
for elem in self._buffer:
656679
# Deserialize the protobuf to get the original PubsubMessage
657680
pubsub_msg = PubsubMessage._from_proto_str(elem)
658681

659-
# Publish with the correct data and attributes
682+
# Publish with the correct data, attributes, and ordering_key
683+
kwargs = {}
660684
if self.with_attributes and pubsub_msg.attributes:
661-
future = self._pub_client.publish(
662-
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
663-
else:
664-
future = self._pub_client.publish(self._topic, pubsub_msg.data)
685+
kwargs.update(pubsub_msg.attributes)
686+
if pubsub_msg.ordering_key:
687+
kwargs['ordering_key'] = pubsub_msg.ordering_key
688+
future = self._pub_client.publish(self._topic, pubsub_msg.data, **kwargs)
665689

666690
futures.append(future)
667691

sdks/python/apache_beam/io/gcp/pubsub_integration_test.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,96 @@ def test_batch_write_with_attributes(self):
305305
"""Test WriteToPubSub in batch mode with attributes."""
306306
self._test_batch_write(with_attributes=True)
307307

308+
@pytest.mark.it_postcommit
309+
def test_batch_write_with_ordering_key(self):
310+
"""Test WriteToPubSub in batch mode with ordering keys.
311+
312+
Dataflow's Native Pub/Sub Sink does not support ordering_key
313+
(see https://github.com/apache/beam/issues/36201), so this test
314+
only applies to runners using Beam's Python WriteToPubSub Sink.
315+
Dataflow users should use the XLang WriteToPubSub path instead
316+
(apache_beam.io.external.gcp.pubsub.WriteToPubSub with
317+
publish_with_ordering_key=True).
318+
"""
319+
if self.runner_name == 'TestDataflowRunner':
320+
self.skipTest(
321+
'Dataflow Native PubSub Sink does not support ordering_key '
322+
'(see https://github.com/apache/beam/issues/36201).')
323+
from google.pubsub_v1.types import Subscription
324+
325+
from apache_beam.options.pipeline_options import PipelineOptions
326+
from apache_beam.options.pipeline_options import StandardOptions
327+
from apache_beam.transforms import Create
328+
329+
ordering_topic = self.pub_client.create_topic(
330+
name=self.pub_client.topic_path(
331+
self.project, 'psit_topic_ordering' + self.uuid))
332+
ordering_sub = self.sub_client.create_subscription(
333+
request=Subscription(
334+
name=self.sub_client.subscription_path(
335+
self.project, 'psit_sub_ordering' + self.uuid),
336+
topic=ordering_topic.name,
337+
enable_message_ordering=True,
338+
))
339+
time.sleep(10)
340+
341+
try:
342+
test_messages = [
343+
PubsubMessage(
344+
b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
345+
PubsubMessage(
346+
b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
347+
PubsubMessage(
348+
b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
349+
]
350+
351+
pipeline_options = PipelineOptions()
352+
pipeline_options.view_as(StandardOptions).streaming = False
353+
354+
with TestPipeline(options=pipeline_options) as p:
355+
messages = p | 'CreateMessages' >> Create(test_messages)
356+
_ = messages | 'WriteToPubSub' >> WriteToPubSub(
357+
ordering_topic.name,
358+
with_attributes=True,
359+
publish_with_ordering_key=True)
360+
361+
time.sleep(10)
362+
363+
# Retry pulling to handle PubSub delivery delays
364+
received_messages = []
365+
deadline = time.time() + 60 # wait up to 60 seconds
366+
while time.time() < deadline:
367+
response = self.sub_client.pull(
368+
request={
369+
'subscription': ordering_sub.name,
370+
'max_messages': 10,
371+
})
372+
received_messages.extend(response.received_messages)
373+
if len(received_messages) >= len(test_messages):
374+
break
375+
time.sleep(5)
376+
377+
self.assertEqual(len(received_messages), len(test_messages))
378+
379+
received_map = {
380+
msg.message.data: msg.message
381+
for msg in received_messages
382+
}
383+
self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1')
384+
self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
385+
self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')
386+
387+
ack_ids = [msg.ack_id for msg in received_messages]
388+
self.sub_client.acknowledge(
389+
request={
390+
'subscription': ordering_sub.name,
391+
'ack_ids': ack_ids,
392+
})
393+
finally:
394+
self.sub_client.delete_subscription(
395+
request={'subscription': ordering_sub.name})
396+
self.pub_client.delete_topic(request={'topic': ordering_topic.name})
397+
308398

309399
if __name__ == '__main__':
310400
logging.getLogger().setLevel(logging.DEBUG)

sdks/python/apache_beam/io/gcp/pubsub_test.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ def test_display_data(self):
467467
DisplayDataItemMatcher('id_label', 'id'),
468468
DisplayDataItemMatcher('with_attributes', True),
469469
DisplayDataItemMatcher('timestamp_attribute', 'time'),
470+
DisplayDataItemMatcher('publish_with_ordering_key', False),
470471
]
471472

472473
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
@@ -1098,6 +1099,77 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
10981099
Lineage.query(p.result.metrics(), Lineage.SINK),
10991100
set(["pubsub:topic:fakeprj.a_topic"]))
11001101

1102+
def test_write_messages_with_ordering_key(self, mock_pubsub):
1103+
"""Test WriteToPubSub with ordering_key in messages."""
1104+
data = b'data'
1105+
ordering_key = 'order-123'
1106+
attributes = {'key': 'value'}
1107+
payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]
1108+
1109+
options = PipelineOptions([])
1110+
options.view_as(StandardOptions).streaming = True
1111+
with TestPipeline(options=options) as p:
1112+
_ = (
1113+
p
1114+
| Create(payloads)
1115+
| WriteToPubSub(
1116+
'projects/fakeprj/topics/a_topic',
1117+
with_attributes=True,
1118+
publish_with_ordering_key=True))
1119+
1120+
# Verify that publish was called with ordering_key
1121+
mock_pubsub.return_value.publish.assert_called()
1122+
call_args = mock_pubsub.return_value.publish.call_args
1123+
1124+
# Check that ordering_key was passed as a keyword argument
1125+
self.assertIn('ordering_key', call_args.kwargs)
1126+
self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
1127+
1128+
def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
1129+
"""Test WriteToPubSub with ordering_key but no attributes."""
1130+
data = b'data'
1131+
ordering_key = 'order-456'
1132+
payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]
1133+
1134+
options = PipelineOptions([])
1135+
options.view_as(StandardOptions).streaming = True
1136+
with TestPipeline(options=options) as p:
1137+
_ = (
1138+
p
1139+
| Create(payloads)
1140+
| WriteToPubSub(
1141+
'projects/fakeprj/topics/a_topic',
1142+
with_attributes=True,
1143+
publish_with_ordering_key=True))
1144+
1145+
# Verify that publish was called with ordering_key
1146+
mock_pubsub.return_value.publish.assert_called()
1147+
call_args = mock_pubsub.return_value.publish.call_args
1148+
1149+
# Check that ordering_key was passed
1150+
self.assertIn('ordering_key', call_args.kwargs)
1151+
self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
1152+
1153+
def test_write_messages_without_ordering_key(self, mock_pubsub):
1154+
"""Test WriteToPubSub without ordering_key (backward compatibility)."""
1155+
data = b'data'
1156+
attributes = {'key': 'value'}
1157+
payloads = [PubsubMessage(data, attributes)] # No ordering_key
1158+
1159+
options = PipelineOptions([])
1160+
options.view_as(StandardOptions).streaming = True
1161+
with TestPipeline(options=options) as p:
1162+
_ = (
1163+
p
1164+
| Create(payloads)
1165+
| WriteToPubSub(
1166+
'projects/fakeprj/topics/a_topic', with_attributes=True))
1167+
1168+
# Verify that publish was called
1169+
mock_pubsub.return_value.publish.assert_called()
1170+
call_args = mock_pubsub.return_value.publish.call_args
1171+
self.assertNotIn('ordering_key', call_args.kwargs)
1172+
11011173

11021174
if __name__ == '__main__':
11031175
logging.getLogger().setLevel(logging.INFO)

0 commit comments

Comments
 (0)