Skip to content

Commit de2bb45

Browse files
black-elevenyihuiwen
andauthored
sync server add presigned url params (#1033)
Co-authored-by: yihuiwen <yihuiwen@sensetime.com>
1 parent 708c30c commit de2bb45

4 files changed

Lines changed: 105 additions & 6 deletions

File tree

lightx2v/server/api/tasks/image.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,7 @@ async def _wait_task_and_stream_result(task_id: str, timeout_seconds: int, poll_
3030
if status == TaskStatus.COMPLETED.value:
3131
result_png = task_manager.get_task_result_png(task_id)
3232
if result_png:
33-
return Response(
34-
content=result_png,
35-
media_type="image/png",
36-
headers={"Content-Disposition": 'inline; filename="result.png"'},
37-
)
33+
return result_png
3834
raise HTTPException(status_code=500, detail=f"Task completed but no in-memory image found: {task_id}")
3935

4036
if status == TaskStatus.FAILED.value:
@@ -50,6 +46,39 @@ async def _wait_task_and_stream_result(task_id: str, timeout_seconds: int, poll_
5046
await asyncio.sleep(poll_interval_seconds)
5147

5248

49+
def _build_png_response(result_png: bytes) -> Response:
50+
return Response(
51+
content=result_png,
52+
media_type="image/png",
53+
headers={"Content-Disposition": 'inline; filename="result.png"'},
54+
)
55+
56+
57+
async def _upload_sync_result_if_needed(message: ImageTaskRequest, result_png: bytes):
58+
presigned_url = (getattr(message, "presigned_url", "") or "").strip()
59+
if not presigned_url:
60+
return None
61+
62+
services = get_services()
63+
assert services.file_service is not None, "File service is not initialized"
64+
65+
try:
66+
await services.file_service.upload_to_presigned_url(
67+
presigned_url=presigned_url,
68+
file_content=result_png,
69+
content_type="image/png",
70+
)
71+
except ValueError as e:
72+
raise HTTPException(status_code=502, detail=f"Failed to upload sync result to presigned URL: {str(e)}")
73+
74+
return {
75+
"task_id": message.task_id,
76+
"task_status": "completed",
77+
"uploaded_to_presigned_url": True,
78+
"presigned_url": presigned_url,
79+
}
80+
81+
5382
async def _watch_client_disconnect(request: Request, task_id: str, poll_interval_seconds: float = 0.2) -> bool:
5483
while True:
5584
if await request.is_disconnected():
@@ -105,6 +134,9 @@ async def create_image_task_sync(
105134
if hasattr(message, "image_mask_path") and message.image_mask_path and message.image_mask_path.startswith("http"):
106135
if not await validate_url_async(message.image_mask_path):
107136
raise HTTPException(status_code=400, detail=f"Image mask URL is not accessible: {message.image_mask_path}")
137+
if hasattr(message, "presigned_url") and message.presigned_url:
138+
if not message.presigned_url.startswith(("http://", "https://")):
139+
raise HTTPException(status_code=400, detail=f"Invalid presigned_url: {message.presigned_url}")
108140

109141
message.prefer_memory_result = True
110142
task_id = task_manager.create_task(message)
@@ -124,7 +156,11 @@ async def create_image_task_sync(
124156
await asyncio.gather(wait_task, return_exceptions=True)
125157
raise HTTPException(status_code=499, detail=f"Client disconnected, task {task_id} cancelled")
126158

127-
return wait_task.result()
159+
result_png = wait_task.result()
160+
upload_result = await _upload_sync_result_if_needed(message, result_png)
161+
if upload_result is not None:
162+
return upload_result
163+
return _build_png_response(result_png)
128164

129165
except asyncio.CancelledError:
130166
if task_id:

lightx2v/server/schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class BaseTaskRequest(DisaggOverrideRequest):
3535
image_path: str = Field("", description="Base64 encoded image or URL")
3636
image_mask_path: str = Field("", description="Mask image path (supports URL, base64, or local path)")
3737
save_result_path: str = Field("", description="Save result path (optional, defaults to task_id, suffix auto-detected)")
38+
presigned_url: str = Field("", description="Optional presigned URL for uploading final sync result")
3839
infer_steps: int = Field(5, description="Inference steps")
3940
seed: int = Field(default_factory=generate_random_seed, description="Random seed (auto-generated if not set)")
4041
target_shape: list[int] = Field([], description="Return video or image shape")

lightx2v/server/services/file_service.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,67 @@ async def download_image(self, image_url: str) -> Path:
130130
async def download_audio(self, audio_url: str) -> Path:
131131
return await self.download_media(audio_url, "audio")
132132

133+
async def upload_to_presigned_url(
134+
self,
135+
presigned_url: str,
136+
file_content: bytes,
137+
content_type: str = "application/octet-stream",
138+
max_retries: Optional[int] = None,
139+
) -> None:
140+
if max_retries is None:
141+
max_retries = self.max_retries
142+
143+
parsed_url = urlparse(presigned_url)
144+
if not parsed_url.scheme or not parsed_url.netloc:
145+
raise ValueError(f"Invalid presigned URL format: {presigned_url}")
146+
147+
last_exception = None
148+
retry_delay = self.retry_delay
149+
150+
for attempt in range(max_retries):
151+
try:
152+
client = await self._get_http_client()
153+
response = await client.put(
154+
presigned_url,
155+
content=file_content,
156+
headers={"Content-Type": content_type},
157+
)
158+
if 200 <= response.status_code < 300:
159+
logger.info(f"Successfully uploaded result to presigned URL, status: {response.status_code}")
160+
return
161+
if response.status_code >= 500:
162+
logger.warning(f"Presigned upload server error {response.status_code}, attempt {attempt + 1}/{max_retries}")
163+
last_exception = httpx.HTTPStatusError(
164+
f"Server returned {response.status_code}",
165+
request=response.request,
166+
response=response,
167+
)
168+
else:
169+
raise httpx.HTTPStatusError(
170+
f"Client error {response.status_code}",
171+
request=response.request,
172+
response=response,
173+
)
174+
except (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError) as e:
175+
logger.warning(f"Connection error uploading to presigned URL, attempt {attempt + 1}/{max_retries}: {str(e)}")
176+
last_exception = e
177+
except httpx.HTTPStatusError as e:
178+
if e.response and e.response.status_code < 500:
179+
raise ValueError(f"Failed to upload to presigned URL: HTTP {e.response.status_code}")
180+
last_exception = e
181+
except Exception as e:
182+
logger.error(f"Unexpected error uploading to presigned URL: {str(e)}")
183+
last_exception = e
184+
185+
if attempt < max_retries - 1:
186+
await asyncio.sleep(retry_delay)
187+
retry_delay = min(retry_delay * 2, self.max_retry_delay)
188+
189+
error_msg = f"All {max_retries} upload attempts failed for presigned URL"
190+
if last_exception:
191+
error_msg += f": {str(last_exception)}"
192+
raise ValueError(error_msg)
193+
133194
def save_uploaded_file(self, file_content: bytes, filename: str) -> Path:
134195
file_extension = Path(filename).suffix
135196
unique_filename = f"{uuid.uuid4()}{file_extension}"

lightx2v/server/services/generation/image.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async def generate_with_stop_event(self, message: Any, stop_event) -> Optional[A
4545
task_data["seed"] = message.seed
4646
prefer_memory_result = bool(getattr(message, "prefer_memory_result", False))
4747
task_data.pop("prefer_memory_result", None)
48+
task_data.pop("presigned_url", None)
4849
task_data["return_result_tensor"] = prefer_memory_result
4950

5051
result = await self.inference_service.submit_task_async(task_data)

0 commit comments

Comments
 (0)