Skip to content

Commit 3ae64b4

Browse files
CopilotSilianZ
andcommitted
Fix async/await issues in S3 storage and adjust dashboard polling interval
Co-authored-by: SilianZ <113701655+SilianZ@users.noreply.github.com>
1 parent 032a52a commit 3ae64b4

2 files changed

Lines changed: 57 additions & 41 deletions

File tree

core/storages/s3.py

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,16 @@ async def check(self) -> None:
9191
async def getMissingFiles(self, files: FileList, pbar: tqdm) -> FileList:
9292
s3_files = {}
9393
try:
94-
paginator = self.client.get_paginator("list_objects_v2")
95-
# Use asyncio.to_thread to run synchronous paginator in thread pool
96-
for page in paginator.paginate(Bucket=self.bucket):
97-
for obj in page.get("Contents", []):
98-
s3_files[obj["Key"]] = obj["Size"]
94+
# Run synchronous paginator in thread pool to avoid blocking event loop
95+
def list_s3_files():
96+
files_dict = {}
97+
paginator = self.client.get_paginator("list_objects_v2")
98+
for page in paginator.paginate(Bucket=self.bucket):
99+
for obj in page.get("Contents", []):
100+
files_dict[obj["Key"]] = obj["Size"]
101+
return files_dict
102+
103+
s3_files = await asyncio.to_thread(list_s3_files)
99104
except ClientError as e:
100105
logger.terror("storage.error.s3.get_s3_files", e=e)
101106

@@ -111,21 +116,21 @@ async def getMissingFiles(self, files: FileList, pbar: tqdm) -> FileList:
111116
async def express(self, hash: str, counter: dict) -> Union[web.Response, web.FileResponse]:
112117
file_key = f"{hash[:2]}/{hash}"
113118
try:
114-
# Generate a presigned URL for the file
115-
response = await asyncio.to_thread(
116-
self.client.head_object,
117-
Bucket=self.bucket,
118-
Key=file_key
119-
)
120-
file_size = response["ContentLength"]
119+
# Run S3 operations in thread pool to avoid blocking event loop
120+
def get_presigned_url():
121+
# Check if file exists and get size
122+
response = self.client.head_object(Bucket=self.bucket, Key=file_key)
123+
file_size = response["ContentLength"]
124+
125+
# Generate presigned URL for downloading the file
126+
url = self.client.generate_presigned_url(
127+
"get_object",
128+
Params={"Bucket": self.bucket, "Key": file_key},
129+
ExpiresIn=3600 # URL valid for 1 hour
130+
)
131+
return url, file_size
121132

122-
# Generate presigned URL for downloading the file
123-
url = await asyncio.to_thread(
124-
self.client.generate_presigned_url,
125-
"get_object",
126-
Params={"Bucket": self.bucket, "Key": file_key},
127-
ExpiresIn=3600 # URL valid for 1 hour
128-
)
133+
url, file_size = await asyncio.to_thread(get_presigned_url)
129134

130135
# Return redirect to presigned URL
131136
response = web.HTTPFound(url)
@@ -152,13 +157,17 @@ async def recycleFiles(self, files: FileList) -> None:
152157
for file in files.files
153158
}
154159

155-
# List all files in S3 bucket
160+
# List all files in S3 bucket - run in thread pool to avoid blocking
156161
try:
157-
paginator = self.client.get_paginator("list_objects_v2")
158-
all_s3_keys = []
159-
for page in paginator.paginate(Bucket=self.bucket):
160-
for obj in page.get("Contents", []):
161-
all_s3_keys.append(obj["Key"])
162+
def list_all_s3_keys():
163+
keys = []
164+
paginator = self.client.get_paginator("list_objects_v2")
165+
for page in paginator.paginate(Bucket=self.bucket):
166+
for obj in page.get("Contents", []):
167+
keys.append(obj["Key"])
168+
return keys
169+
170+
all_s3_keys = await asyncio.to_thread(list_all_s3_keys)
162171
except ClientError as e:
163172
logger.terror("storage.error.s3.recycle.list", e=e)
164173
return
@@ -190,22 +199,29 @@ async def recycleFiles(self, files: FileList) -> None:
190199
for i in range(0, len(delete_files), 1000):
191200
batch = delete_files[i:i+1000]
192201
try:
193-
# Get sizes before deleting
194-
for key in batch:
195-
try:
196-
response = self.client.head_object(Bucket=self.bucket, Key=key)
197-
total_size += response["ContentLength"]
198-
except Exception:
199-
pass
202+
# Run batch operations in thread pool
203+
def process_batch():
204+
size = 0
205+
# Get sizes before deleting
206+
for key in batch:
207+
try:
208+
response = self.client.head_object(Bucket=self.bucket, Key=key)
209+
size += response["ContentLength"]
210+
except Exception:
211+
pass
212+
213+
# Delete batch
214+
self.client.delete_objects(
215+
Bucket=self.bucket,
216+
Delete={
217+
"Objects": [{"Key": key} for key in batch],
218+
"Quiet": True
219+
}
220+
)
221+
return size
200222

201-
# Delete batch
202-
self.client.delete_objects(
203-
Bucket=self.bucket,
204-
Delete={
205-
"Objects": [{"Key": key} for key in batch],
206-
"Quiet": True
207-
}
208-
)
223+
batch_size = await asyncio.to_thread(process_batch)
224+
total_size += batch_size
209225
pbar.update(len(batch))
210226
except ClientError as e:
211227
logger.terror("storage.error.s3.recycle", e=e)

dashboard/src/views/HomeView.vue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { ref } from 'vue'
88
import { watch } from 'vue'
99
1010
const { data, refresh, loading, error } = useRequest((): Promise<StatsRes> => fetchStat(), {
11-
pollingInterval: 5000, // Poll every 5 seconds for real-time updates
11+
pollingInterval: 10000, // Poll every 10 seconds for real-time updates without excessive server load
1212
refreshOnWindowFocus: true // Refresh when user returns to tab
1313
})
1414

0 commit comments

Comments
 (0)