99import copy
1010import json
1111from datetime import datetime
12- from typing import (
13- Any , Dict , Iterator , List , Optional , Union ,
14- TYPE_CHECKING
15- )
12+ from typing import Any , Dict , Iterator , List , Optional , Union , TYPE_CHECKING
1613from typing_extensions import Self
1714
1815from azure .core .exceptions import HttpResponseError
3532
3633class Segment :
3734 def __init__ (
38- self , client : Union ["ContainerClient" , "AsyncContainerClient" ],
35+ self ,
36+ client : Union ["ContainerClient" , "AsyncContainerClient" ],
3937 segment_path : str ,
4038 page_size : int ,
41- segment_cursor : Optional [Dict [str , Any ]] = None
39+ segment_cursor : Optional [Dict [str , Any ]] = None ,
4240 ) -> None :
4341 self .client = client
4442 self .segment_path = segment_path
@@ -91,8 +89,9 @@ def _initialize(self, segment_cursor=None):
9189 self .shards .append (Shard (self .client , shard_path ))
9290 else :
9391 start_shard_path = segment_cursor ["CurrentShardPath" ]
94- shard_cursors = {shard_cursor ["CurrentChunkPath" ][:- 10 ]: shard_cursor
95- for shard_cursor in segment_cursor ["ShardCursors" ]}
92+ shard_cursors = {
93+ shard_cursor ["CurrentChunkPath" ][:- 10 ]: shard_cursor for shard_cursor in segment_cursor ["ShardCursors" ]
94+ }
9695
9796 if shard_paths :
9897 # Initialize all shards using the shard cursors
@@ -129,30 +128,33 @@ class ChangeFeedPaged(PageIterator):
129128 """The current page of listed results."""
130129
131130 def __init__ (
132- self , container_client : Union ["ContainerClient" , "AsyncContainerClient" ],
131+ self ,
132+ container_client : Union ["ContainerClient" , "AsyncContainerClient" ],
133133 results_per_page : Optional [int ] = None ,
134134 start_time : Optional [datetime ] = None ,
135135 end_time : Optional [datetime ] = None ,
136- continuation_token : Optional [str ] = None
136+ continuation_token : Optional [str ] = None ,
137137 ) -> None :
138138 if (start_time or end_time ) and continuation_token :
139139 raise ValueError ("start_time/end_time and continuation_token shouldn't be specified at the same time" )
140140 super (ChangeFeedPaged , self ).__init__ (
141- get_next = self ._get_next_cf ,
142- extract_data = self ._extract_data_cb ,
143- continuation_token = continuation_token or ""
141+ get_next = self ._get_next_cf , extract_data = self ._extract_data_cb , continuation_token = continuation_token or ""
144142 )
145143 dict_continuation_token = json .loads (continuation_token ) if continuation_token else None
146144
147- if dict_continuation_token and (container_client .primary_hostname != dict_continuation_token ["UrlHost" ]): # pylint: disable=unsubscriptable-object
145+ if dict_continuation_token and (container_client .primary_hostname != dict_continuation_token ["UrlHost" ]):
148146 raise ValueError ("The token is not for the current storage account." )
149- if dict_continuation_token and (dict_continuation_token ["CursorVersion" ] != 1 ): # pylint: disable=unsubscriptable-object
147+ if dict_continuation_token and (dict_continuation_token ["CursorVersion" ] != 1 ):
150148 raise ValueError ("The CursorVersion is not supported by the current SDK." )
151149 self .results_per_page = results_per_page or 5000
152150 self .current_page = None
153- self ._change_feed = ChangeFeed (container_client , self .results_per_page , start_time = start_time ,
154- end_time = end_time ,
155- cf_cursor = dict_continuation_token )
151+ self ._change_feed = ChangeFeed (
152+ container_client ,
153+ self .results_per_page ,
154+ start_time = start_time ,
155+ end_time = end_time ,
156+ cf_cursor = dict_continuation_token ,
157+ )
156158
157159 def _get_next_cf (self , continuation_token ): # pylint:disable=inconsistent-return-statements, unused-argument
158160 try :
@@ -174,11 +176,12 @@ def _extract_data_cb(self, event_list):
174176
175177class ChangeFeed :
176178 def __init__ (
177- self , client : Union ["ContainerClient" , "AsyncContainerClient" ],
179+ self ,
180+ client : Union ["ContainerClient" , "AsyncContainerClient" ],
178181 page_size : int ,
179182 start_time : Optional [datetime ] = None ,
180183 end_time : Optional [datetime ] = None ,
181- cf_cursor : Optional [Dict [str , Any ]] = None
184+ cf_cursor : Optional [Dict [str , Any ]] = None ,
182185 ) -> None :
183186 self .client = client
184187 self .page_size = page_size
@@ -189,16 +192,19 @@ def __init__(
189192 # the end time is in str format
190193 end_time_in_cursor = cf_cursor ["EndTime" ] if cf_cursor else None
191194 # convert the end time in str format to a datetime object
192- end_time_in_cursor_obj = \
195+ end_time_in_cursor_obj = (
193196 datetime .strptime (end_time_in_cursor , "%Y-%m-%dT%H:%M:%S+00:00" ) if end_time_in_cursor else None
197+ )
194198 # self.end_time is in datetime format
195199 self .end_time = end_time or end_time_in_cursor_obj
196200
197201 cur_segment_cursor = cf_cursor ["CurrentSegmentCursor" ] if cf_cursor else None
198202
199- self .cursor = {"CursorVersion" : 1 ,
200- "EndTime" : self .end_time .strftime ("%Y-%m-%dT%H:%M:%S+00:00" ) if self .end_time else "" ,
201- "UrlHost" : self .client .primary_hostname }
203+ self .cursor = {
204+ "CursorVersion" : 1 ,
205+ "EndTime" : self .end_time .strftime ("%Y-%m-%dT%H:%M:%S+00:00" ) if self .end_time else "" ,
206+ "UrlHost" : self .client .primary_hostname ,
207+ }
202208 self ._initialize (cur_segment_cursor = cur_segment_cursor )
203209
204210 def __iter__ (self ) -> Self :
@@ -257,9 +263,8 @@ def _initialize(self, cur_segment_cursor=None):
257263 next_segment_path = next (self ._segment_paths_generator )
258264
259265 self .current_segment = self ._get_next_segment (
260- next_segment_path ,
261- self .page_size ,
262- segment_cursor = cur_segment_cursor if cur_segment_cursor else None )
266+ next_segment_path , self .page_size , segment_cursor = cur_segment_cursor if cur_segment_cursor else None
267+ )
263268
264269 def _get_next_segment (self , segment_path , page_size , segment_cursor = None ):
265270 if segment_path :
@@ -297,23 +302,24 @@ def _parse_datetime_from_segment_path(segment_path):
297302
298303 def _is_earlier_than_start_time (self , segment_path ):
299304 segment_date = self ._parse_datetime_from_segment_path (segment_path )
300- opaque_start_date = datetime (self .start_time .year , self .start_time .month ,
301- self .start_time .day , self .start_time .hour )
305+ opaque_start_date = datetime (
306+ self .start_time .year , self .start_time .month , self .start_time .day , self .start_time .hour
307+ )
302308
303309 return segment_date < opaque_start_date
304310
305311 def _is_later_than_end_time (self , segment_path ):
306312 segment_date = self ._parse_datetime_from_segment_path (segment_path )
307- opaque_end_date = datetime (self .end_time .year , self .end_time .month ,
308- self .end_time .day , self .end_time .hour )
313+ opaque_end_date = datetime (self .end_time .year , self .end_time .month , self .end_time .day , self .end_time .hour )
309314 return segment_date > opaque_end_date
310315
311316
312317class Shard :
313318 def __init__ (
314- self , client : Union ["ContainerClient" , "AsyncContainerClient" ],
319+ self ,
320+ client : Union ["ContainerClient" , "AsyncContainerClient" ],
315321 shard_path : str ,
316- shard_cursor : Optional [Dict [str , Any ]] = None
322+ shard_cursor : Optional [Dict [str , Any ]] = None ,
317323 ) -> None :
318324 self .client = client
319325 self .shard_path = shard_path
@@ -348,8 +354,9 @@ def _initialize(self, shard_cursor=None):
348354
349355 # move cursor to the expected chunk
350356 if shard_cursor :
351- while self .unprocessed_chunk_path_props and \
352- self .unprocessed_chunk_path_props [0 ].name != shard_cursor .get ("CurrentChunkPath" ):
357+ while self .unprocessed_chunk_path_props and self .unprocessed_chunk_path_props [0 ].name != shard_cursor .get (
358+ "CurrentChunkPath"
359+ ):
353360 self .unprocessed_chunk_path_props .popleft ()
354361 self .current_chunk = self ._get_next_chunk (chunk_cursor = shard_cursor )
355362 else :
@@ -365,10 +372,7 @@ def _get_next_chunk(self, chunk_cursor=None):
365372class ChangeFeedStreamer :
366373 """File-like streaming iterator."""
367374
368- def __init__ (
369- self , blob_client : "BlobClient" ,
370- chunk_file_start : int = 0
371- ) -> None :
375+ def __init__ (self , blob_client : "BlobClient" , chunk_file_start : int = 0 ) -> None :
372376 self ._chunk_file_start = chunk_file_start or 0 # this value will never be updated
373377 self ._download_offset = self ._chunk_file_start # range start of the next download
374378 self .object_position = self ._chunk_file_start # track the most recently read sync marker position
@@ -378,8 +382,9 @@ def __init__(
378382 self ._buf_start = self ._chunk_file_start # the start position of the chunk file to buffer
379383 self ._chunk_size_snapshot = blob_client .get_blob_properties ().size
380384 length = self ._chunk_size_snapshot - self ._chunk_file_start
381- self ._iterator = blob_client .download_blob (offset = self ._chunk_file_start ,
382- length = length ).chunks () if length > 0 else iter ([])
385+ self ._iterator = (
386+ blob_client .download_blob (offset = self ._chunk_file_start , length = length ).chunks () if length > 0 else iter ([])
387+ )
383388
384389 def __len__ (self ) -> int :
385390 return self ._download_offset
@@ -430,7 +435,7 @@ def read(self, size: int) -> bytes:
430435 if relative_start < 0 :
431436 raise ValueError ("Buffer has dumped too much data" )
432437 relative_end = relative_start + size
433- data = self ._buf [relative_start : relative_end ]
438+ data = self ._buf [relative_start :relative_end ]
434439
435440 # dump the extra data in buffer
436441 # buffer start--------------------16bytes----current read position
@@ -449,9 +454,10 @@ def set_object_index(self, event_index: int) -> None:
449454
450455class Chunk :
451456 def __init__ (
452- self , client : Union ["ContainerClient" , "AsyncContainerClient" ],
457+ self ,
458+ client : Union ["ContainerClient" , "AsyncContainerClient" ],
453459 chunk_path : str ,
454- chunk_cursor : Optional [Dict [str , Any ]] = None
460+ chunk_cursor : Optional [Dict [str , Any ]] = None ,
455461 ) -> None :
456462 self .client = client
457463 self .chunk_path = chunk_path
0 commit comments