Skip to content

Commit 67407bf

Browse files
committed
Generate TUS assembly upload helper
1 parent 41484c2 commit 67407bf

3 files changed

Lines changed: 156 additions & 101 deletions

File tree

examples/api2-devdock-tus-assembly/main.py

Lines changed: 20 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@
77

88
import json
99
import os
10-
from io import BytesIO
1110
from pathlib import Path
12-
from urllib.parse import quote
1311

1412
from transloadit.client import Transloadit
15-
from tusclient import client as tus
1613

1714

1815
def required_env(name):
@@ -35,45 +32,6 @@ def load_scenario():
3532
return json.load(scenario_file)
3633

3734

38-
def read_path(value, path_parts, label):
39-
current = value
40-
for part in path_parts:
41-
if isinstance(current, list) and isinstance(part, int):
42-
if part >= len(current):
43-
fail(f"{label} path {path_parts!r} index {part} is out of range")
44-
current = current[part]
45-
continue
46-
47-
if isinstance(current, dict) and isinstance(part, str):
48-
if part not in current:
49-
fail(f"{label} path {path_parts!r} is missing key {part!r}")
50-
current = current[part]
51-
continue
52-
53-
fail(f"{label} path {path_parts!r} cannot read {part!r} from {current!r}")
54-
55-
return current
56-
57-
58-
def resolve_value(value_spec, context, label):
59-
if "value" in value_spec:
60-
return value_spec["value"]
61-
62-
source = value_spec.get("source")
63-
if not isinstance(source, dict):
64-
fail(f"{label} value spec has no literal value or source")
65-
66-
root = source.get("root")
67-
if root not in context:
68-
fail(f"{label} value source root {root!r} is unavailable")
69-
70-
path_parts = source.get("path") or []
71-
if not isinstance(path_parts, list):
72-
fail(f"{label} value source path must be a list")
73-
74-
return read_path(context[root], path_parts, label)
75-
76-
7735
def response_data(response, operation):
7836
data = response.data
7937
if not isinstance(data, dict):
@@ -100,19 +58,13 @@ def feature_step(scenario, collection_name, feature_id, kind):
10058
fail(f"scenario has no {collection_name} step for feature {feature_id!r}")
10159

10260

103-
def create_assembly(client, scenario):
61+
def file_count(scenario):
10462
feature = feature_step(scenario, "preparations", "createTusAssembly", "feature-call")
10563
input_values = list(feature["input"].values())
10664
if len(input_values) != 1:
10765
fail(f"{feature['featureId']} expected exactly one input value")
10866

109-
response = client.create_tus_assembly(input_values[0])
110-
data = response_data(response, feature["featureId"])
111-
for required_path in feature["requiredResponsePaths"]:
112-
value = read_path(data, required_path, feature["featureId"])
113-
if value is None or value == "":
114-
fail(f"{feature['featureId']} returned empty value at {required_path!r}")
115-
return data
67+
return input_values[0]
11668

11769

11870
def scenario_bytes(scenario):
@@ -124,53 +76,14 @@ def scenario_bytes(scenario):
12476
return source["value"].encode("utf-8")
12577

12678

127-
def upload_metadata(scenario, create_response):
128-
context = {"createResponse": create_response, "scenario": scenario}
129-
metadata = {}
130-
for field in scenario["upload"]["metadata"]:
131-
metadata[field["name"]] = str(resolve_value(field["value"], context, field["name"]))
132-
return metadata
133-
134-
135-
def upload_with_tus(scenario, create_response):
136-
context = {"createResponse": create_response, "scenario": scenario}
137-
endpoint_url = str(resolve_value(scenario["upload"]["tusUrl"], context, "tusUrl"))
138-
content = scenario_bytes(scenario)
139-
chunk_size = len(content) if scenario["upload"]["chunkSize"] == "full-file" else None
140-
if chunk_size is None:
141-
fail(f"unsupported chunk size policy {scenario['upload']['chunkSize']!r}")
142-
143-
uploader = tus.TusClient(endpoint_url).uploader(
144-
file_stream=BytesIO(content),
145-
chunk_size=chunk_size,
146-
metadata=upload_metadata(scenario, create_response),
147-
retries=scenario["upload"]["retries"],
148-
)
149-
uploader.upload()
150-
if not uploader.url:
151-
fail("TUS upload did not expose an upload URL")
152-
if uploader.offset != len(content):
153-
fail(f"TUS upload offset {uploader.offset}, expected {len(content)}")
154-
return uploader.url
155-
156-
157-
def render_path_template(template_config, context, label):
158-
rendered = template_config["template"]
159-
for name, value_spec in template_config["replacements"].items():
160-
value = resolve_value(value_spec, context, f"{label}.{name}")
161-
rendered = rendered.replace("{" + name + "}", quote(str(value), safe=""))
162-
163-
if "{" in rendered or "}" in rendered:
164-
fail(f"{label} still has unresolved placeholders: {rendered}")
165-
166-
return rendered
167-
168-
169-
def wait_for_assembly(client, scenario, create_response):
170-
feature = feature_step(scenario, "observations", "waitForAssembly", "feature-poll")
171-
context = {"createResponse": create_response, "scenario": scenario}
172-
wait_input = render_path_template(feature["input"], context, feature["featureId"])
173-
return response_data(client.wait_for_assembly(wait_input), feature["featureId"])
79+
def upload_config(scenario):
80+
upload = scenario["upload"]
81+
return {
82+
"content": scenario_bytes(scenario),
83+
"fieldname": upload["fieldName"],
84+
"filename": upload["fileName"],
85+
"user_meta": upload.get("userMeta") or {},
86+
}
17487

17588

17689
def write_result(create_response, status, upload_url):
@@ -200,10 +113,16 @@ def main():
200113
service=endpoint,
201114
)
202115

203-
create_response = create_assembly(client, scenario)
204-
upload_url = upload_with_tus(scenario, create_response)
205-
status = wait_for_assembly(client, scenario, create_response)
206-
write_result(create_response, status, upload_url)
116+
upload = upload_config(scenario)
117+
completed_assembly, upload_url = client.upload_tus_assembly(
118+
file_count(scenario),
119+
upload["content"],
120+
upload["fieldname"],
121+
upload["filename"],
122+
upload["user_meta"],
123+
)
124+
status = response_data(completed_assembly, "uploadTusAssembly")
125+
write_result(status, status, upload_url)
207126

208127
print(f"Python Transloadit SDK devdock scenario {scenario['scenarioId']} passed for {endpoint}")
209128

transloadit/async_client.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,74 @@ async def create_tus_assembly(self, file_count: int):
267267

268268
return assembly
269269

270+
async def upload_tus_assembly(self, file_count: int, content: bytes, fieldname: str, filename: str, user_meta: Optional[dict] = None):
271+
"""
272+
Create a TUS-ready Assembly, upload one file with the TUS protocol, and wait for the Assembly to finish.
273+
"""
274+
createdAssembly = await self.create_tus_assembly(file_count)
275+
276+
import base64
277+
from urllib.parse import urljoin
278+
279+
endpointUrl = createdAssembly.data.get("tus_url")
280+
if not endpointUrl:
281+
raise RuntimeError("TUS singleUploadLifecycle needs input.endpointUrl")
282+
283+
metadataMap = {}
284+
if user_meta:
285+
metadataMap.update({str(key): str(value) for key, value in user_meta.items()})
286+
metadataMap["assembly_url"] = str(createdAssembly.data.get("assembly_url"))
287+
metadataMap["fieldname"] = str(fieldname)
288+
metadataMap["filename"] = str(filename)
289+
290+
session = await self.request._ensure_session()
291+
292+
createHeaders = {}
293+
createHeaders["Tus-Resumable"] = "1.0.0"
294+
createHeaders["Upload-Length"] = str(len(content))
295+
createMetadataParts = []
296+
for key, value in metadataMap.items():
297+
encoded_value = base64.b64encode(str(value).encode("utf-8")).decode("ascii")
298+
createMetadataParts.append(f"{key} {encoded_value}")
299+
createHeaders["Upload-Metadata"] = ",".join(createMetadataParts)
300+
async with session.request(
301+
"POST",
302+
endpointUrl,
303+
data=b"",
304+
headers=createHeaders,
305+
timeout=self.request._timeout(),
306+
) as createResponse:
307+
if createResponse.status != 201:
308+
raise RuntimeError(f"TUS create returned HTTP {createResponse.status}, expected 201")
309+
uploadUrlLocation = createResponse.headers.get("Location")
310+
if not uploadUrlLocation:
311+
raise RuntimeError("TUS create did not return a Location header")
312+
uploadUrlText = urljoin(endpointUrl, uploadUrlLocation)
313+
314+
uploadHeaders = {}
315+
uploadHeaders["Tus-Resumable"] = "1.0.0"
316+
uploadHeaders["Upload-Offset"] = "0"
317+
uploadHeaders["Content-Type"] = "application/offset+octet-stream"
318+
async with session.request(
319+
"PATCH",
320+
uploadUrlText,
321+
data=content,
322+
headers=uploadHeaders,
323+
timeout=self.request._timeout(),
324+
) as uploadResponse:
325+
if uploadResponse.status != 204:
326+
raise RuntimeError(f"TUS upload returned HTTP {uploadResponse.status}, expected 204")
327+
try:
328+
remote_offset = int(uploadResponse.headers.get("Upload-Offset", ""))
329+
except ValueError as error:
330+
raise RuntimeError("TUS upload returned an invalid Upload-Offset header") from error
331+
if remote_offset != len(content):
332+
raise RuntimeError(f"TUS upload offset {remote_offset}, expected {len(content)}")
333+
334+
completedAssembly = await self.wait_for_assembly(createdAssembly.data.get("assembly_ssl_url"))
335+
336+
return completedAssembly, uploadUrlText
337+
270338
async def wait_for_assembly(self, assembly_url: str):
271339
"""
272340
Wait for an Assembly to finish uploading and executing.

transloadit/client.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,74 @@ def create_tus_assembly(self, file_count: int):
275275

276276
return assembly
277277

278+
def upload_tus_assembly(self, file_count: int, content: bytes, fieldname: str, filename: str, user_meta: Optional[dict] = None):
279+
"""
280+
Create a TUS-ready Assembly, upload one file with the TUS protocol, and wait for the Assembly to finish.
281+
"""
282+
createdAssembly = self.create_tus_assembly(file_count)
283+
284+
import base64
285+
from urllib.parse import urljoin
286+
287+
import requests
288+
289+
endpointUrl = createdAssembly.data.get("tus_url")
290+
if not endpointUrl:
291+
raise RuntimeError("TUS singleUploadLifecycle needs input.endpointUrl")
292+
293+
metadataMap = {}
294+
if user_meta:
295+
metadataMap.update({str(key): str(value) for key, value in user_meta.items()})
296+
metadataMap["assembly_url"] = str(createdAssembly.data.get("assembly_url"))
297+
metadataMap["fieldname"] = str(fieldname)
298+
metadataMap["filename"] = str(filename)
299+
300+
createHeaders = {}
301+
createHeaders["Tus-Resumable"] = "1.0.0"
302+
createHeaders["Upload-Length"] = str(len(content))
303+
createMetadataParts = []
304+
for key, value in metadataMap.items():
305+
encoded_value = base64.b64encode(str(value).encode("utf-8")).decode("ascii")
306+
createMetadataParts.append(f"{key} {encoded_value}")
307+
createHeaders["Upload-Metadata"] = ",".join(createMetadataParts)
308+
createResponse = requests.request(
309+
"POST",
310+
endpointUrl,
311+
data=b"",
312+
headers=createHeaders,
313+
timeout=request.TIMEOUT,
314+
)
315+
if createResponse.status_code != 201:
316+
raise RuntimeError(f"TUS create returned HTTP {createResponse.status_code}, expected 201")
317+
uploadUrlLocation = createResponse.headers.get("Location")
318+
if not uploadUrlLocation:
319+
raise RuntimeError("TUS create did not return a Location header")
320+
uploadUrlText = urljoin(endpointUrl, uploadUrlLocation)
321+
322+
uploadHeaders = {}
323+
uploadHeaders["Tus-Resumable"] = "1.0.0"
324+
uploadHeaders["Upload-Offset"] = "0"
325+
uploadHeaders["Content-Type"] = "application/offset+octet-stream"
326+
uploadResponse = requests.request(
327+
"PATCH",
328+
uploadUrlText,
329+
data=content,
330+
headers=uploadHeaders,
331+
timeout=request.TIMEOUT,
332+
)
333+
if uploadResponse.status_code != 204:
334+
raise RuntimeError(f"TUS upload returned HTTP {uploadResponse.status_code}, expected 204")
335+
try:
336+
remote_offset = int(uploadResponse.headers.get("Upload-Offset", ""))
337+
except ValueError as error:
338+
raise RuntimeError("TUS upload returned an invalid Upload-Offset header") from error
339+
if remote_offset != len(content):
340+
raise RuntimeError(f"TUS upload offset {remote_offset}, expected {len(content)}")
341+
342+
completedAssembly = self.wait_for_assembly(createdAssembly.data.get("assembly_ssl_url"))
343+
344+
return completedAssembly, uploadUrlText
345+
278346
def wait_for_assembly(self, assembly_url: str):
279347
"""
280348
Wait for an Assembly to finish uploading and executing.

0 commit comments

Comments
 (0)