Skip to content

Commit 00ddd57

Browse files
committed
Resolve comments [2]
1 parent c965879 commit 00ddd57

3 files changed

Lines changed: 66 additions & 27 deletions

File tree

sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import time
3232
from typing import Any
3333
from typing import Dict
34-
from typing import Iterable
3534
from typing import List
3635
from typing import Optional
3736
from typing import Tuple
@@ -48,6 +47,7 @@
4847
from apache_beam.runners.runner import PipelineResult
4948
from apache_beam.transforms import window
5049

50+
from google.api_core.exceptions import NotFound
5151
from google.cloud import pubsub_v1
5252
import torch
5353
import PIL.Image as PILImage
@@ -438,6 +438,16 @@ def parse_known_args(argv):
438438
help='BigQuery output table: dataset.table')
439439
parser.add_argument(
440440
'--publish_to_big_query', default='true', choices=['true', 'false'])
441+
parser.add_argument(
442+
'--feeder_start_delay_sec',
443+
type=int,
444+
default=900,
445+
help=(
446+
'Delay before starting the feeder pipeline that reads URIs from GCS '
447+
'and publishes them to Pub/Sub. This delay allows the main streaming '
448+
'pipeline workers to start and scale before data ingestion begins.'
449+
),
450+
)
441451

442452
# Device
443453
parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU'])
@@ -479,13 +489,13 @@ def ensure_pubsub_resources(
479489

480490
try:
481491
publisher.get_topic(request={"topic": full_topic_path})
482-
except Exception:
492+
except NotFound:
483493
publisher.create_topic(name=full_topic_path)
484494

485495
try:
486496
subscriber.get_subscription(
487497
request={"subscription": full_subscription_path})
488-
except Exception:
498+
except NotFound:
489499
subscriber.create_subscription(
490500
name=full_subscription_path, topic=full_topic_path)
491501

@@ -506,14 +516,14 @@ def cleanup_pubsub_resources(
506516
subscriber.delete_subscription(
507517
request={"subscription": full_subscription_path})
508518
print(f"Deleted subscription: {subscription_name}")
509-
except Exception as e:
510-
print(f"Failed to delete subscription: {e}")
519+
except NotFound:
520+
print(f"Subscription already deleted: {subscription_name}")
511521

512522
try:
513523
publisher.delete_topic(request={"topic": full_topic_path})
514524
print(f"Deleted topic: {topic_name}")
515-
except Exception as e:
516-
print(f"Failed to delete topic: {e}")
525+
except NotFound:
526+
print(f"Topic already deleted: {topic_name}")
517527

518528

519529
def override_or_add(args, flag, value):
@@ -572,9 +582,12 @@ def run(
572582
subscription_path=known_args.pubsub_subscription)
573583

574584
# Start feeder thread that reads URIs from GCS and fills Pub/Sub.
585+
# Delay is used to allow the main streaming pipeline workers to start
586+
# and autoscale before the feeder pipeline begins publishing messages.
575587
threading.Thread(
576-
target=lambda:
577-
(time.sleep(900), run_load_pipeline(known_args, pipeline_args)),
588+
target=lambda: (
589+
time.sleep(known_args.feeder_start_delay_sec),
590+
run_load_pipeline(known_args, pipeline_args)),
578591
daemon=True).start()
579592

580593
pipeline_options = PipelineOptions(pipeline_args)

sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import time
3535
from typing import Any
3636
from typing import Dict
37-
from typing import Iterable
3837
from typing import List
3938
from typing import Optional
4039
from typing import Sequence
@@ -52,6 +51,7 @@
5251
from apache_beam.runners.runner import PipelineResult
5352
from apache_beam.transforms import window
5453

54+
from google.api_core.exceptions import NotFound
5555
from google.cloud import pubsub_v1
5656
import torch
5757
import PIL.Image as PILImage
@@ -286,6 +286,16 @@ def parse_known_args(argv):
286286
type=float,
287287
default=None,
288288
help='Elements per second for load pipeline')
289+
parser.add_argument(
290+
'--feeder_start_delay_sec',
291+
type=int,
292+
default=900,
293+
help=(
294+
'Delay before starting the feeder pipeline that reads URIs from GCS '
295+
'and publishes them to Pub/Sub. This delay allows the main streaming '
296+
'pipeline workers to start and scale before data ingestion begins.'
297+
),
298+
)
289299

290300
# Model & inference
291301
parser.add_argument(
@@ -332,13 +342,13 @@ def ensure_pubsub_resources(
332342

333343
try:
334344
publisher.get_topic(request={"topic": full_topic_path})
335-
except Exception:
345+
except NotFound:
336346
publisher.create_topic(name=full_topic_path)
337347

338348
try:
339349
subscriber.get_subscription(
340350
request={"subscription": full_subscription_path})
341-
except Exception:
351+
except NotFound:
342352
subscriber.create_subscription(
343353
name=full_subscription_path, topic=full_topic_path)
344354

@@ -359,14 +369,14 @@ def cleanup_pubsub_resources(
359369
subscriber.delete_subscription(
360370
request={"subscription": full_subscription_path})
361371
print(f"Deleted subscription: {subscription_name}")
362-
except Exception as e:
363-
print(f"Failed to delete subscription: {e}")
372+
except NotFound:
373+
print(f"Subscription already deleted: {subscription_name}")
364374

365375
try:
366376
publisher.delete_topic(request={"topic": full_topic_path})
367377
print(f"Deleted topic: {topic_name}")
368-
except Exception as e:
369-
print(f"Failed to delete topic: {e}")
378+
except NotFound:
379+
print(f"Topic already deleted: {topic_name}")
370380

371381

372382
def override_or_add(args, flag, value):
@@ -446,9 +456,12 @@ def run(
446456
subscription_path=known_args.pubsub_subscription)
447457

448458
# Start feeder thread that reads URIs from GCS and fills Pub/Sub.
459+
# Delay is used to allow the main streaming pipeline workers to start
460+
# and autoscale before the feeder pipeline begins publishing messages.
449461
threading.Thread(
450-
target=lambda:
451-
(time.sleep(900), run_load_pipeline(known_args, pipeline_args)),
462+
target=lambda: (
463+
time.sleep(known_args.feeder_start_delay_sec),
464+
run_load_pipeline(known_args, pipeline_args)),
452465
daemon=True).start()
453466

454467
pipeline_options = PipelineOptions(pipeline_args)

sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import logging
3030
import threading
3131
import time
32-
from typing import Iterable
3332
from typing import Optional
3433
from typing import Tuple
3534

@@ -50,6 +49,7 @@
5049
from apache_beam.transforms import userstate
5150
from apache_beam.transforms import window
5251

52+
from google.api_core.exceptions import NotFound
5353
from google.cloud import pubsub_v1
5454
import PIL.Image as PILImage
5555

@@ -245,6 +245,16 @@ def parse_known_args(argv):
245245
type=float,
246246
default=None,
247247
help='Elements per second for load pipeline')
248+
parser.add_argument(
249+
'--feeder_start_delay_sec',
250+
type=int,
251+
default=900,
252+
help=(
253+
'Delay before starting the feeder pipeline that reads URIs from GCS '
254+
'and publishes them to Pub/Sub. This delay allows the main streaming '
255+
'pipeline workers to start and scale before data ingestion begins.'
256+
),
257+
)
248258

249259
# Model & inference
250260
parser.add_argument(
@@ -289,13 +299,13 @@ def ensure_pubsub_resources(
289299

290300
try:
291301
publisher.get_topic(request={"topic": full_topic_path})
292-
except Exception:
302+
except NotFound:
293303
publisher.create_topic(name=full_topic_path)
294304

295305
try:
296306
subscriber.get_subscription(
297307
request={"subscription": full_subscription_path})
298-
except Exception:
308+
except NotFound:
299309
subscriber.create_subscription(
300310
name=full_subscription_path, topic=full_topic_path)
301311

@@ -316,14 +326,14 @@ def cleanup_pubsub_resources(
316326
subscriber.delete_subscription(
317327
request={"subscription": full_subscription_path})
318328
print(f"Deleted subscription: {subscription_name}")
319-
except Exception as e:
320-
print(f"Failed to delete subscription: {e}")
329+
except NotFound:
330+
print(f"Subscription already deleted: {subscription_name}")
321331

322332
try:
323333
publisher.delete_topic(request={"topic": full_topic_path})
324334
print(f"Deleted topic: {topic_name}")
325-
except Exception as e:
326-
print(f"Failed to delete topic: {e}")
335+
except NotFound:
336+
print(f"Topic already deleted: {topic_name}")
327337

328338

329339
def override_or_add(args, flag, value):
@@ -402,9 +412,12 @@ def run(
402412
subscription_path=known_args.pubsub_subscription)
403413

404414
# Start feeder thread that reads URIs from GCS and fills Pub/Sub.
415+
# Delay is used to allow the main streaming pipeline workers to start
416+
# and autoscale before the feeder pipeline begins publishing messages.
405417
threading.Thread(
406-
target=lambda:
407-
(time.sleep(900), run_load_pipeline(known_args, pipeline_args)),
418+
target=lambda: (
419+
time.sleep(known_args.feeder_start_delay_sec),
420+
run_load_pipeline(known_args, pipeline_args)),
408421
daemon=True).start()
409422

410423
# StandardOptions

0 commit comments

Comments
 (0)