Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 5b653d8

Browse files
committed
fix: Adapt to changes in Python 3.14 for scheduler and type hints
1 parent e592d35 commit 5b653d8

2 files changed

Lines changed: 18 additions & 3 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
458458
return self._dispatcher
459459

460460
@property
461-
def leaser(self) -> Optional[leaser.Leaser]:
461+
def leaser(self) -> Optional["leaser.Leaser"]:
462462
"""The leaser helper."""
463463
return self._leaser
464464

google/cloud/pubsub_v1/subscriber/scheduler.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import abc
2222
import concurrent.futures
2323
import queue
24+
import sys
2425
import typing
2526
from typing import Callable, List, Optional
2627
import warnings
@@ -37,7 +38,7 @@ class Scheduler(metaclass=abc.ABCMeta):
3738

3839
@property
3940
@abc.abstractmethod
40-
def queue(self) -> queue.Queue: # pragma: NO COVER
41+
def queue(self) -> "queue.Queue": # pragma: NO COVER
4142
"""Queue: A concurrency-safe queue specific to the underlying
4243
concurrency implementation.
4344
@@ -162,7 +163,21 @@ def shutdown(
162163
work_item = self._executor._work_queue.get(block=False)
163164
if work_item is None: # Exceutor in shutdown mode.
164165
continue
165-
dropped_messages.append(work_item.args[0]) # type: ignore[index]
166+
167+
dropped_message = None
168+
if sys.version_info < (3, 14):
169+
# For Python < 3.14, work_item.args is a tuple of positional arguments.
170+
# The message is expected to be the first argument.
171+
if hasattr(work_item, 'args') and work_item.args:
172+
dropped_message = work_item.args[0] # type: ignore[index]
173+
else:
174+
# For Python >= 3.14, work_item.task is (fn, args, kwargs).
175+
# The message is expected to be the first item in the args tuple (task[1]).
176+
if hasattr(work_item, 'task') and len(work_item.task) == 3 and work_item.task[1]:
177+
dropped_message = work_item.task[1][0]
178+
179+
if dropped_message is not None:
180+
dropped_messages.append(dropped_message)
166181
except queue.Empty:
167182
pass
168183

0 commit comments

Comments
 (0)