Skip to content

Commit f8c3b4d

Browse files
author
Pamparampam
committed
Parallel async streaming
1 parent 5f740ed commit f8c3b4d

9 files changed

Lines changed: 192 additions & 19 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@
33
/out/
44
/files/
55
/dist/
6+
/.test-commands
7+
/tester2.py
8+
/.xtest/

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ If resume ZipFly instance has different files than pause ZipFly instance there w
167167
> [!CAUTION]
168168
> You mustn't reuse `GenFile` instances.
169169
170+
171+
## Parallel async streaming
172+
173+
If your `GenFile`'s rely on network requests to fetch data, network latency can limit throughput
174+
below the available bandwidth. To address this, I introduce `async_stream_parallel`.
175+
176+
```python
177+
zipFly = ZipFly(files)
178+
zipFly.async_stream_parallel(prefetch_files=20, max_chunks_per_file=2)
179+
```
180+
170181
### Other
171182
Python is not optimized for async I/O operations, thus to speed up the async streaming the chunk_size is changed to 4MB, you can override this by passing `chunksize` as argument to LocalFile.
172183

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
66

77
[project]
88
name = "zipFly64"
9-
version = "1.2.3"
9+
version = "1.3.1"
1010
description = "Stream zip64 archives on the fly."
1111
readme = "README.md"
1212
authors = [{ name = "Pamparampampam" }]

src/zipFly/BaseFile.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
import time
22
from abc import ABC, abstractmethod
33
from collections.abc import AsyncGenerator, Generator
4+
from typing import Optional
45

56
from . import consts
67
from .Compressor import Compressor
78

89

910
class BaseFile(ABC):
10-
def __init__(self, name: str, compression_method: int):
11+
def __init__(self, name: str, compression_method: int = consts.NO_COMPRESSION):
1112
self.__used = False
1213
self.__compressed_size = 0
1314
self.__offset = 0 # Offset to local file header
1415
self.__crc = 0
15-
self.__compression_method = compression_method or consts.NO_COMPRESSION
16+
self.__compression_method = compression_method
1617
self.__flags = 0b00001000 # flag about using data descriptor is always on
1718
self.__byte_offset_mode = False
1819
if name == "":

src/zipFly/Compressor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
import zlib
22

3+
from src.zipFly import consts
4+
35

46
class Compressor:
57
def __init__(self, file: 'BaseFile'):
68
self.file = file
7-
8-
if file.compression_method == 0:
9+
if file.compression_method == consts.NO_COMPRESSION: # no compression
910
self.process = self._process_through
1011
self.tail = self._no_tail
11-
elif file.compression_method == 8: # deflate compression
12+
elif file.compression_method == consts.COMPRESSION_DEFLATE: # deflate compression
1213
self.compr = zlib.compressobj(5, zlib.DEFLATED, -15)
1314
self.process = self._process_deflate
1415
self.tail = self._tail_deflate
1516
else:
16-
raise KeyError("Unknown compression method in compressor")
17+
raise KeyError(f"Unknown compression method in compressor: {file.compression_method}")
1718

1819
# no compression
1920
def _process_through(self, chunk):

src/zipFly/FilePrefetcher.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import asyncio
2+
from typing import List, Optional, AsyncGenerator
3+
4+
class FilePrefetcher:
5+
"""Encapsulates a list of files, a sliding prefetch window, and streaming queues."""
6+
7+
def __init__(self, files: List, prefetch_files: int = 20, queue_maxsize: int = 2):
8+
self.files = files
9+
self.prefetch_files = prefetch_files
10+
self.queue_maxsize = queue_maxsize
11+
12+
self.n = len(files)
13+
self.prefetchers: List[Optional[_SingleFilePrefetch]] = [None] * self.n
14+
15+
self.inflight = 0
16+
self.next_to_start = 0
17+
18+
async def _start_prefetch(self, idx: int):
19+
pf = _SingleFilePrefetch(self.files[idx], self.queue_maxsize)
20+
self.prefetchers[idx] = pf
21+
await pf.start()
22+
self.inflight += 1
23+
self.next_to_start = max(self.next_to_start, idx + 1)
24+
25+
async def ensure_prefetch(self, idx: int):
26+
"""Ensure the prefetcher for file `idx` is started, refill window as needed."""
27+
if self.prefetchers[idx] is None:
28+
await self._start_prefetch(idx)
29+
30+
# Refill window
31+
while self.next_to_start < self.n and self.inflight < self.prefetch_files:
32+
await self._start_prefetch(self.next_to_start)
33+
34+
async def stream_file_data(self, idx: int) -> AsyncGenerator[bytes, None]:
35+
"""Yields chunks of a single file in order, managing prefetch completion."""
36+
pf = self.prefetchers[idx]
37+
while True:
38+
chunk = await pf.queue.get()
39+
if chunk is None:
40+
if pf.task:
41+
await pf.task
42+
pf.task = None
43+
self.inflight -= 1
44+
45+
# keep window full
46+
while self.next_to_start < self.n and self.inflight < self.prefetch_files:
47+
await self._start_prefetch(self.next_to_start)
48+
break
49+
yield chunk
50+
51+
52+
class _SingleFilePrefetch:
53+
"""Handles a single file's async queue and task."""
54+
55+
def __init__(self, file, queue_maxsize: int = 2):
56+
self.file = file
57+
self.queue = asyncio.Queue(maxsize=queue_maxsize)
58+
self.task: Optional[asyncio.Task] = None
59+
60+
async def start(self):
61+
self.task = asyncio.create_task(self._prefetch())
62+
63+
async def _prefetch(self):
64+
agen = self.file.async_generate_processed_file_data()
65+
try:
66+
async for chunk in agen:
67+
await self.queue.put(chunk)
68+
except (GeneratorExit, asyncio.CancelledError):
69+
await agen.aclose()
70+
raise
71+
except Exception:
72+
await self.queue.put(None)
73+
raise
74+
else:
75+
await self.queue.put(None)

src/zipFly/ZipFly.py

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
from typing import Generator, AsyncGenerator, Union
1+
import asyncio
2+
from typing import Generator, AsyncGenerator, Union, Optional
23

34
from . import consts
45
from .BaseFile import BaseFile
6+
from .FilePrefetcher import FilePrefetcher
57
from .ZipBase import ZipBase
68
import copy
79
import types
810

11+
912
def process_file_names(files) -> list[BaseFile]:
1013
"""Renames duplicated file names"""
1114
seen_names = set()
@@ -185,7 +188,7 @@ def _make_end_structures(self) -> Generator[bytes, None, None]:
185188
yield self._apply_remaining_offset(self._make_end_of_cdir_record())
186189

187190
async def _async_stream_single_file(self, file: BaseFile) -> AsyncGenerator[bytes, None]:
188-
"""This function streams a single file, it also applies running_offset is needed"""
191+
"""This function streams a single file, it also applies remaining_offset if needed"""
189192

190193
yield self._apply_remaining_offset(self._make_local_file_header(file))
191194

@@ -225,6 +228,51 @@ async def async_stream(self) -> AsyncGenerator[bytes, None]:
225228

226229
# self._cleanup()
227230

231+
async def async_stream_parallel(self, prefetch_files: int = 20, max_chunks_per_file: int = 2):
232+
"""
233+
Stream files in parallel.
234+
- prefetch_files: number of files' DATA to read ahead concurrently
235+
- queue_maxsize: per-file buffered DATA chunks (backpressure)
236+
"""
237+
self._check_if_can_stream()
238+
start_idx, remaining_offset = self._find_starting_file()
239+
self._remaining_offset = remaining_offset
240+
self._set_offset(self._byte_offset - remaining_offset)
241+
242+
if start_idx is not None:
243+
244+
files = self._files[start_idx:]
245+
prefetch_mgr = FilePrefetcher(files, prefetch_files, max_chunks_per_file)
246+
247+
for i, file in enumerate(files):
248+
await prefetch_mgr.ensure_prefetch(i)
249+
250+
# 1) Local File Header
251+
file.set_offset(self._get_offset())
252+
header = self._make_local_file_header(file)
253+
header = self._apply_remaining_offset(header)
254+
self._add_offset(len(header))
255+
if header:
256+
yield header
257+
258+
# 2) Stream DATA
259+
async for chunk in prefetch_mgr.stream_file_data(i):
260+
out = self._apply_remaining_offset(chunk)
261+
if out:
262+
self._add_offset(len(out))
263+
yield out
264+
265+
# 3) Data Descriptor
266+
dd = self._make_data_descriptor(file)
267+
dd = self._apply_remaining_offset(dd)
268+
self._add_offset(len(dd))
269+
if dd:
270+
yield dd
271+
272+
# 4) Central directory & end records
273+
for chunk in self._make_end_structures():
274+
yield chunk
275+
228276
def stream(self) -> Generator[bytes, None, None]:
229277
self._check_if_can_stream()
230278

@@ -243,8 +291,6 @@ def stream(self) -> Generator[bytes, None, None]:
243291
for chunk in self._make_end_structures():
244292
yield chunk
245293

246-
# self._cleanup()
247-
248294
def _check_if_can_stream(self):
249295
if self.__used:
250296
raise RuntimeError("Do not re-use zipFly instances. Recreate it.")
@@ -266,10 +312,3 @@ def _apply_remaining_offset(self, data):
266312
self._add_offset(self._remaining_offset)
267313
self._remaining_offset = 0 # Offset is fully applied
268314
return result
269-
270-
# def _cleanup(self):
271-
# pass
272-
# """Clean all data after streaming"""
273-
# super()._cleanup()
274-
# self._remaining_offset = 0
275-
# # self.__used = False

tests/test_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,9 @@ async def sized_zeros_generator_async(size: int) -> AsyncGenerator[bytes]:
5555
"""Yield zeros up to a certain size."""
5656
for zeros in sized_zeros_generator(size):
5757
yield zeros
58+
59+
def generate_data_async(data: bytes, repeat: int):
60+
async def generator():
61+
for _ in range(repeat):
62+
yield data
63+
return generator

tests/test_zipfly.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,46 @@
1212

1313
from src.zipFly import GenFile, LocalFile, ZipFly, consts
1414
from src.zipFly.EmptyFolder import EmptyFolder
15-
from tests.test_utils import lorem_ipsum_generator, lorem_ipsum, single_archive_size, lorem_ipsum_generator_async, multifile_archive_size
15+
from tests.test_utils import lorem_ipsum_generator, lorem_ipsum, single_archive_size, lorem_ipsum_generator_async, multifile_archive_size, generate_data_async
1616

1717

18+
@pytest.mark.asyncio
19+
async def test_GenFile_multiple_files_async(tmp_path):
20+
"""Test async ZIP with multiple small files (~10KB each)."""
21+
n = 10
22+
chunk = b"x" * 1024 # 1KB
23+
chunks_per_file = 10
24+
expected_content = chunk * chunks_per_file
25+
26+
files = []
27+
for i in range(n):
28+
generator_func = generate_data_async(chunk, chunks_per_file)
29+
file = GenFile(
30+
name=f"file_{i}.txt",
31+
generator=generator_func(),
32+
modification_time=time.time(),
33+
compression_method=consts.COMPRESSION_DEFLATE,
34+
)
35+
files.append(file)
36+
37+
zip_fly = ZipFly(files)
38+
zip_path = tmp_path / "multi_file_async.zip"
39+
40+
with zip_path.open("wb") as fp:
41+
async for zip_chunk in zip_fly.async_stream():
42+
fp.write(zip_chunk)
43+
44+
with zipfile.ZipFile(zip_path) as zfp:
45+
namelist = zfp.namelist()
46+
assert len(namelist) == n
47+
48+
for i in range(n):
49+
fname = f"file_{i}.txt"
50+
assert fname in namelist
51+
with zfp.open(fname) as tfp:
52+
data = tfp.read()
53+
assert data == expected_content
54+
1855
def test_GenFile_COMPRESSION_DEFLATE(tmp_path):
1956
"""Test GenFile with compression."""
2057
file1 = GenFile(

0 commit comments

Comments
 (0)