-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathserver.py
More file actions
147 lines (113 loc) · 4.28 KB
/
server.py
File metadata and controls
147 lines (113 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from dataclasses import dataclass, field
import aiofiles
from blacksheep import Application, FromForm, FileBuffer, Request, FromFiles
from blacksheep.server.openapi.v3 import OpenAPIHandler
from openapidocs.v3 import Info
app = Application(show_error_details=True)
docs = OpenAPIHandler(info=Info(title="Example API", version="0.0.1"))
docs.bind_app(app)
app.serve_files("static", discovery=True, fallback_document="index.html")
async def _write_chunks(source, dest_path: str, chunk_size: int = 65536) -> None:
async with aiofiles.open(dest_path, "wb") as out_file:
while chunk := source.read(chunk_size):
await out_file.write(chunk)
@dataclass
class InputExampleOptimal:
username: str
avatar: FileBuffer
documents: list[FileBuffer] = field(default_factory=list)
subscribe: bool = False
@app.router.post("/from-files")
async def from_files(files: FromFiles):
for file in files.value:
await file.save_to(f".out/{file.file_name.decode()}")
return "OK"
@app.router.post("/upload")
async def create_profile(data: FromForm[InputExampleOptimal]):
# This is just an example!
value = data.value
assert isinstance(value, InputExampleOptimal)
assert isinstance(value.subscribe, bool)
avatar = value.avatar
await avatar.save_to(f".out/{avatar.file_name}")
if value.documents:
for document in value.documents:
await document.save_to(f".out/{document.file_name}")
return {
"status": "created",
"username": value.username,
"subscribe": value.subscribe,
"avatar_filename": avatar.file_name,
"documents_count": len(value.documents) if value.documents else 0
}
@app.router.post("/upload-text")
async def upload_text(request: Request):
"""
Test multipart/form-data field post with long text.
"""
file_size = 0
async for part in request.multipart_stream():
print(part.name)
if part.file_name:
file_size = await part.save_to(f".out/{part.file_name}")
else:
file_size = await part.save_to(f".out/document.txt")
return {
"status": "uploaded",
"file_size": file_size
}
@app.router.post("/upload-files-form-method")
async def upload_files_form_method_1(request: Request):
data = await request.form()
avatar = data['avatar'][0]
await _write_chunks(avatar.file, f".out/{avatar.file_name.decode()}")
for document in data["documents"]:
await _write_chunks(document.file, f".out/{document.file_name.decode()}")
return "OK"
@app.router.post("/upload-files-multipart-method")
async def upload_files_form_method_2(request: Request):
parts = await request.multipart()
for part in parts:
if part.file_name:
await _write_chunks(part.file, f".out/{part.file_name.decode()}")
else:
await _write_chunks(part.file, f".out/{part.name.decode()}")
return "OK"
@app.router.post("/upload-files")
async def upload_files_streaming(request: Request):
"""
Handle file uploads using streaming multipart parsing with StreamingFormPart.
This is more memory-efficient for large files as it doesn't load
the entire request body into memory at once.
"""
uploaded_files = []
file_count = 0
total_size = 0
# Stream multipart data without loading everything into memory
async for part in request.multipart_stream():
if part.file_name:
# This is a file upload - use StreamingFormPart's save_to method
file_name = part.file_name
file_path = f".out/{file_name}"
# Stream the file directly to disk
file_size = await part.save_to(file_path)
total_size += file_size
file_count += 1
uploaded_files.append({
"field_name": part.name,
"file_name": file_name,
"content_type": part.content_type if part.content_type else "unknown",
"size_bytes": file_size
})
else:
# This is a regular form field (non-file)
...
return {
"status": "uploaded",
"files_count": file_count,
"total_size_bytes": total_size,
"files": uploaded_files
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, port=44555)