Skip to content

Commit 96b5ba0

Browse files
authored
refactor: improve upload service (#758)
* upload to tmpdir * add tests * fix usort * fix
1 parent 1626cd3 commit 96b5ba0

4 files changed

Lines changed: 115 additions & 59 deletions

File tree

mapillary_tools/upload_api_v4.py

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
else:
1414
from typing_extensions import override
1515

16+
import tempfile
17+
1618
import requests
1719

1820
from .api_v4 import request_get, request_post, REQUESTS_TIMEOUT
@@ -30,14 +32,14 @@
3032

3133

3234
class UploadService:
35+
"""
36+
Upload byte streams to the Upload Service.
37+
"""
38+
3339
user_access_token: str
3440
session_key: str
3541

36-
def __init__(
37-
self,
38-
user_access_token: str,
39-
session_key: str,
40-
):
42+
def __init__(self, user_access_token: str, session_key: str):
4143
self.user_access_token = user_access_token
4244
self.session_key = session_key
4345

@@ -46,11 +48,7 @@ def fetch_offset(self) -> int:
4648
"Authorization": f"OAuth {self.user_access_token}",
4749
}
4850
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
49-
resp = request_get(
50-
url,
51-
headers=headers,
52-
timeout=REQUESTS_TIMEOUT,
53-
)
51+
resp = request_get(url, headers=headers, timeout=REQUESTS_TIMEOUT)
5452
resp.raise_for_status()
5553
data = resp.json()
5654
return data["offset"]
@@ -59,18 +57,53 @@ def fetch_offset(self) -> int:
5957
def chunkize_byte_stream(
6058
cls, stream: T.IO[bytes], chunk_size: int
6159
) -> T.Generator[bytes, None, None]:
60+
"""
61+
Chunkize a byte stream into chunks of the specified size.
62+
63+
>>> list(UploadService.chunkize_byte_stream(io.BytesIO(b"foo"), 1))
64+
[b'f', b'o', b'o']
65+
66+
>>> list(UploadService.chunkize_byte_stream(io.BytesIO(b"foo"), 10))
67+
[b'foo']
68+
"""
69+
6270
if chunk_size <= 0:
6371
raise ValueError("Expect positive chunk size")
72+
6473
while True:
6574
data = stream.read(chunk_size)
6675
if not data:
6776
break
6877
yield data
6978

79+
@classmethod
7080
def shift_chunks(
71-
self, chunks: T.Iterable[bytes], offset: int
81+
cls, chunks: T.Iterable[bytes], offset: int
7282
) -> T.Generator[bytes, None, None]:
73-
assert offset >= 0, f"Expect non-negative offset but got {offset}"
83+
"""
84+
Shift the chunks by the offset.
85+
86+
>>> list(UploadService.shift_chunks([b"foo", b"bar"], 0))
87+
[b'foo', b'bar']
88+
89+
>>> list(UploadService.shift_chunks([b"foo", b"bar"], 1))
90+
[b'oo', b'bar']
91+
92+
>>> list(UploadService.shift_chunks([b"foo", b"bar"], 3))
93+
[b'bar']
94+
95+
>>> list(UploadService.shift_chunks([b"foo", b"bar"], 6))
96+
[]
97+
98+
>>> list(UploadService.shift_chunks([b"foo", b"bar"], 7))
99+
[]
100+
101+
>>> list(UploadService.shift_chunks([], 0))
102+
[]
103+
"""
104+
105+
if offset < 0:
106+
raise ValueError(f"Expect non-negative offset but got {offset}")
74107

75108
for chunk in chunks:
76109
if offset:
@@ -103,12 +136,10 @@ def upload_chunks(
103136
return self.upload_shifted_chunks(shifted_chunks, offset)
104137

105138
def upload_shifted_chunks(
106-
self,
107-
shifted_chunks: T.Iterable[bytes],
108-
offset: int,
139+
self, shifted_chunks: T.Iterable[bytes], offset: int
109140
) -> str:
110141
"""
111-
Upload the chunks that must already be shifted by the offset (e.g. fp.seek(begin_offset, io.SEEK_SET))
142+
Upload the chunks that must already be shifted by the offset (e.g. fp.seek(offset, io.SEEK_SET))
112143
"""
113144

114145
headers = {
@@ -118,10 +149,7 @@ def upload_shifted_chunks(
118149
}
119150
url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}"
120151
resp = request_post(
121-
url,
122-
headers=headers,
123-
data=shifted_chunks,
124-
timeout=UPLOAD_REQUESTS_TIMEOUT,
152+
url, headers=headers, data=shifted_chunks, timeout=UPLOAD_REQUESTS_TIMEOUT
125153
)
126154

127155
resp.raise_for_status()
@@ -137,18 +165,35 @@ def upload_shifted_chunks(
137165

138166
# A mock class for testing only
139167
class FakeUploadService(UploadService):
140-
def __init__(self, *args, **kwargs):
168+
"""
169+
A mock upload service that simulates the upload process for testing purposes.
170+
It writes the uploaded data to a file in a temporary directory and generates a fake file handle.
171+
"""
172+
173+
FILE_HANDLE_DIR: str = "file_handles"
174+
175+
def __init__(
176+
self,
177+
upload_path: Path | None = None,
178+
transient_error_ratio: float = 0.0,
179+
*args,
180+
**kwargs,
181+
):
141182
super().__init__(*args, **kwargs)
142-
self._upload_path = Path(
143-
os.getenv("MAPILLARY_UPLOAD_PATH", "mapillary_public_uploads")
144-
)
145-
self._error_ratio = 0.02
183+
if upload_path is None:
184+
upload_path = Path(tempfile.gettempdir()).joinpath(
185+
"mapillary_public_uploads"
186+
)
187+
self._upload_path = upload_path
188+
self._transient_error_ratio = transient_error_ratio
189+
190+
@property
191+
def upload_path(self) -> Path:
192+
return self._upload_path
146193

147194
@override
148195
def upload_shifted_chunks(
149-
self,
150-
shifted_chunks: T.Iterable[bytes],
151-
offset: int,
196+
self, shifted_chunks: T.Iterable[bytes], offset: int
152197
) -> str:
153198
expected_offset = self.fetch_offset()
154199
if offset != expected_offset:
@@ -160,17 +205,17 @@ def upload_shifted_chunks(
160205
filename = self._upload_path.joinpath(self.session_key)
161206
with filename.open("ab") as fp:
162207
for chunk in shifted_chunks:
163-
if random.random() <= self._error_ratio:
208+
if random.random() <= self._transient_error_ratio:
164209
raise requests.ConnectionError(
165-
f"TEST ONLY: Failed to upload with error ratio {self._error_ratio}"
210+
f"TEST ONLY: Failed to upload with error ratio {self._transient_error_ratio}"
166211
)
167212
fp.write(chunk)
168-
if random.random() <= self._error_ratio:
213+
if random.random() <= self._transient_error_ratio:
169214
raise requests.ConnectionError(
170-
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
215+
f"TEST ONLY: Partially uploaded with error ratio {self._transient_error_ratio}"
171216
)
172217

173-
file_handle_dir = self._upload_path.joinpath("file_handles")
218+
file_handle_dir = self._upload_path.joinpath(self.FILE_HANDLE_DIR)
174219
file_handle_path = file_handle_dir.joinpath(self.session_key)
175220
if not file_handle_path.exists():
176221
os.makedirs(file_handle_dir, exist_ok=True)
@@ -181,12 +226,12 @@ def upload_shifted_chunks(
181226

182227
@override
183228
def fetch_offset(self) -> int:
184-
if random.random() <= self._error_ratio:
229+
if random.random() <= self._transient_error_ratio:
185230
raise requests.ConnectionError(
186-
f"TEST ONLY: Partially uploaded with error ratio {self._error_ratio}"
231+
f"TEST ONLY: Partially uploaded with error ratio {self._transient_error_ratio}"
187232
)
188-
filename = os.path.join(self._upload_path, self.session_key)
189-
if not os.path.exists(filename):
233+
filename = self._upload_path.joinpath(self.session_key)
234+
if not filename.exists():
190235
return 0
191236
with open(filename, "rb") as fp:
192237
fp.seek(0, io.SEEK_END)

mapillary_tools/uploader.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,15 @@ def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadServic
528528
upload_service: upload_api_v4.UploadService
529529

530530
if self.dry_run:
531+
upload_path = os.getenv("MAPILLARY_UPLOAD_ENDPOINT")
531532
upload_service = upload_api_v4.FakeUploadService(
532533
user_access_token=self.user_items["user_upload_token"],
533534
session_key=session_key,
535+
upload_path=Path(upload_path) if upload_path is not None else None,
536+
)
537+
LOG.info(
538+
"Dry run mode enabled. Data will be uploaded to %s",
539+
upload_service.upload_path.joinpath(session_key),
534540
)
535541
else:
536542
upload_service = upload_api_v4.UploadService(

tests/integration/fixtures.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import py.path
1515
import pytest
1616

17-
from mapillary_tools import utils
17+
from mapillary_tools import upload_api_v4, utils
1818

1919
EXECUTABLE = os.getenv(
2020
"MAPILLARY_TOOLS__TESTS_EXECUTABLE", "python3 -m mapillary_tools.commands"
@@ -58,7 +58,7 @@ def setup_data(tmpdir: py.path.local):
5858
@pytest.fixture
5959
def setup_upload(tmpdir: py.path.local):
6060
upload_dir = tmpdir.mkdir("mapillary_public_uploads")
61-
os.environ["MAPILLARY_UPLOAD_PATH"] = str(upload_dir)
61+
os.environ["MAPILLARY_UPLOAD_ENDPOINT"] = str(upload_dir)
6262
os.environ["MAPILLARY_TOOLS__AUTH_VERIFICATION_DISABLED"] = "YES"
6363
os.environ["MAPILLARY_TOOLS_PROMPT_DISABLED"] = "YES"
6464
os.environ["MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN"] = "YES"
@@ -67,7 +67,7 @@ def setup_upload(tmpdir: py.path.local):
6767
yield upload_dir
6868
if tmpdir.check():
6969
tmpdir.remove(ignore_errors=True)
70-
os.environ.pop("MAPILLARY_UPLOAD_PATH", None)
70+
os.environ.pop("MAPILLARY_UPLOAD_ENDPOINT", None)
7171
os.environ.pop("MAPILLARY_UPLOAD_HISTORY_PATH", None)
7272
os.environ.pop("MAPILLARY_TOOLS__AUTH_VERIFICATION_DISABLED", None)
7373
os.environ.pop("MAPILLARY_TOOLS_PROMPT_DISABLED", None)
@@ -239,11 +239,11 @@ def load_descs(descs) -> list:
239239

240240

241241
def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]:
242-
FILE_HANDLE_DIRNAME = "file_handles"
243-
244242
session_by_file_handle: dict[str, str] = {}
245-
if upload_folder.joinpath(FILE_HANDLE_DIRNAME).exists():
246-
for session_path in upload_folder.joinpath(FILE_HANDLE_DIRNAME).iterdir():
243+
if upload_folder.joinpath(upload_api_v4.FakeUploadService.FILE_HANDLE_DIR).exists():
244+
for session_path in upload_folder.joinpath(
245+
upload_api_v4.FakeUploadService.FILE_HANDLE_DIR
246+
).iterdir():
247247
file_handle = session_path.read_text()
248248
session_by_file_handle[file_handle] = session_path.name
249249

@@ -267,7 +267,7 @@ def extract_all_uploaded_descs(upload_folder: Path) -> list[list[dict]]:
267267
sequences.append(validate_and_extract_zip(file))
268268
elif file.suffix == ".mp4":
269269
sequences.append(validate_and_extract_camm(file))
270-
elif file.name == FILE_HANDLE_DIRNAME:
270+
elif file.name == upload_api_v4.FakeUploadService.FILE_HANDLE_DIR:
271271
# Already processed above
272272
pass
273273

tests/unit/test_upload_api_v4.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,55 @@
11
import io
2+
from pathlib import Path
23

34
import py
45

56
from mapillary_tools import upload_api_v4
67

7-
from ..integration.fixtures import setup_upload
88

9-
10-
def test_upload(setup_upload: py.path.local):
9+
def test_upload(tmpdir: py.path.local):
1110
upload_service = upload_api_v4.FakeUploadService(
1211
user_access_token="TEST",
1312
session_key="FOOBAR.txt",
13+
upload_path=Path(tmpdir),
14+
transient_error_ratio=0.02,
1415
)
15-
upload_service._error_ratio = 0
16+
upload_service._transient_error_ratio = 0
1617
content = b"double_foobar"
1718
cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1)
1819
assert isinstance(cluster_id, str), cluster_id
19-
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
20+
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content
2021

2122
# reupload should not affect the file
2223
upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1)
23-
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
24+
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content
2425

2526

26-
def test_upload_big_chunksize(setup_upload: py.path.local):
27+
def test_upload_big_chunksize(tmpdir: py.path.local):
2728
upload_service = upload_api_v4.FakeUploadService(
2829
user_access_token="TEST",
2930
session_key="FOOBAR.txt",
31+
upload_path=Path(tmpdir),
32+
transient_error_ratio=0.02,
3033
)
31-
upload_service._error_ratio = 0
34+
upload_service._transient_error_ratio = 0
3235
content = b"double_foobar"
3336
cluster_id = upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000)
3437
assert isinstance(cluster_id, str), cluster_id
35-
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
38+
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content
3639

3740
# reupload should not affect the file
3841
upload_service.upload_byte_stream(io.BytesIO(content), chunk_size=1000)
39-
assert (setup_upload.join("FOOBAR.txt").read_binary()) == content
42+
assert (tmpdir.join("FOOBAR.txt").read_binary()) == content
4043

4144

42-
def test_upload_chunks(setup_upload: py.path.local):
45+
def test_upload_chunks(tmpdir: py.path.local):
4346
upload_service = upload_api_v4.FakeUploadService(
4447
user_access_token="TEST",
4548
session_key="FOOBAR2.txt",
49+
upload_path=Path(tmpdir),
50+
transient_error_ratio=0.02,
4651
)
47-
upload_service._error_ratio = 0
52+
upload_service._transient_error_ratio = 0
4853

4954
def _gen_chunks():
5055
yield b"foo"
@@ -55,8 +60,8 @@ def _gen_chunks():
5560
cluster_id = upload_service.upload_chunks(_gen_chunks())
5661

5762
assert isinstance(cluster_id, str), cluster_id
58-
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"
63+
assert (tmpdir.join("FOOBAR2.txt").read_binary()) == b"foobar"
5964

6065
# reupload should not affect the file
6166
upload_service.upload_chunks(_gen_chunks())
62-
assert (setup_upload.join("FOOBAR2.txt").read_binary()) == b"foobar"
67+
assert (tmpdir.join("FOOBAR2.txt").read_binary()) == b"foobar"

0 commit comments

Comments
 (0)