1414
1515"""Context manager for Cloud Spanner batched writes."""
1616import functools
17+ from datetime import datetime
18+ from typing import List
1719
18- from google .cloud .spanner_v1 import CommitRequest
20+ from google .cloud .spanner_v1 import CommitRequest , CommitResponse
1921from google .cloud .spanner_v1 import Mutation
2022from google .cloud .spanner_v1 import TransactionOptions
2123from google .cloud .spanner_v1 import BatchWriteRequest
@@ -47,13 +49,17 @@ class _BatchBase(_SessionWrapper):
4749 :param session: the session used to perform the commit
4850 """
4951
50- transaction_tag = None
51- _read_only = False
52-
5352 def __init__ (self , session ):
5453 super (_BatchBase , self ).__init__ (session )
55- self ._mutations = []
5654
55+ self ._mutations : List [Mutation ] = []
56+ self .transaction_tag : str = None
57+
58+ self .committed : datetime = None
59+ """Timestamp at which the batch was successfully committed."""
60+ self .commit_stats : CommitResponse .CommitStats = None
61+
62+ # TODO multiplexed - cleanup
5763 def _check_state (self ):
5864 """Helper for :meth:`commit` et al.
5965
@@ -148,10 +154,7 @@ def delete(self, table, keyset):
148154class Batch (_BatchBase ):
149155 """Accumulate mutations for transmission during :meth:`commit`."""
150156
151- committed = None
152- commit_stats = None
153- """Timestamp at which the batch was successfully committed."""
154-
157+ # TODO multiplexed - cleanup
155158 def _check_state (self ):
156159 """Helper for :meth:`commit` et al.
157160
@@ -163,6 +166,7 @@ def _check_state(self):
163166 if self .committed is not None :
164167 raise ValueError ("Batch already committed" )
165168
169+ # TODO multiplexed - cleanup kwargs
166170 def commit (
167171 self ,
168172 return_commit_stats = False ,
@@ -205,7 +209,10 @@ def commit(
205209 :rtype: datetime
206210 :returns: timestamp of the committed changes.
207211 """
212+
213+ # TODO multiplexed - cleanup
208214 self ._check_state ()
215+
209216 database = self ._session ._database
210217 api = database .spanner_api
211218 metadata = _metadata_with_prefix (database .name )
@@ -282,6 +289,8 @@ def wrapped_method(*args, **kwargs):
282289
283290 def __enter__ (self ):
284291 """Begin ``with`` block."""
292+
293+ # TODO multiplexed - cleanup
285294 self ._check_state ()
286295
287296 return self
@@ -317,19 +326,18 @@ class MutationGroups(_SessionWrapper):
317326 :param session: the session used to perform the commit
318327 """
319328
320- committed = None
321-
322329 def __init__ (self , session ):
323330 super (MutationGroups , self ).__init__ (session )
324- self ._mutation_groups = []
331+ self ._mutation_groups : List [MutationGroup ] = []
332+ self .committed : bool = False
325333
326334 def _check_state (self ):
327335 """Checks if the object's state is valid for making API requests.
328336
329337 :raises: :exc:`ValueError` if the object's state is invalid for making
330338 API requests.
331339 """
332- if self .committed is not None :
340+ if self .committed :
333341 raise ValueError ("MutationGroups already committed" )
334342
335343 def group (self ):
@@ -358,10 +366,14 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
358366 :rtype: :class:`Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]`
359367 :returns: a sequence of responses for each batch.
360368 """
369+
370+ # TODO multiplexed - cleanup
361371 self ._check_state ()
362372
363- database = self ._session ._database
373+ session = self ._session
374+ database = session ._database
364375 api = database .spanner_api
376+
365377 metadata = _metadata_with_prefix (database .name )
366378 if database ._route_to_leader_enabled :
367379 metadata .append (
@@ -374,7 +386,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
374386 request_options = RequestOptions (request_options )
375387
376388 request = BatchWriteRequest (
377- session = self . _session .name ,
389+ session = session .name ,
378390 mutation_groups = self ._mutation_groups ,
379391 request_options = request_options ,
380392 exclude_txn_from_change_streams = exclude_txn_from_change_streams ,
@@ -409,6 +421,7 @@ def wrapped_method(*args, **kwargs):
409421 InternalServerError : _check_rst_stream_error ,
410422 },
411423 )
424+
412425 self .committed = True
413426 return response
414427
0 commit comments