Skip to content

Commit 69409f8

Browse files
committed
handle upload-link-flags with database
1 parent bf55d28 commit 69409f8

1 file changed

Lines changed: 61 additions & 91 deletions

File tree

Lines changed: 61 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,121 +1,91 @@
11
# -*- coding: utf-8 -*-
22
import logging
3-
import time
4-
import json
53
import stat
4+
from collections import defaultdict
65
from datetime import datetime, timedelta
7-
from threading import Thread, Event
6+
from threading import Thread
87

98
from apscheduler.schedulers.blocking import BlockingScheduler
109

1110
from seaserv import seafile_api
1211

13-
from dtable_events.app.event_redis import RedisClient, redis_cache
1412
from dtable_events.db import init_db_session_class
15-
from dtable_events.utils import uuid_str_to_36_chars, uuid_str_to_32_chars
13+
from dtable_events.utils import uuid_str_to_36_chars
1614

1715
logger = logging.getLogger(__name__)
1816

1917

2018
class DTableUploadLinkHandler(Thread):
2119
def __init__(self, config):
2220
Thread.__init__(self)
23-
self._finished = Event()
24-
self._redis_client = RedisClient(config)
21+
self.session_class = init_db_session_class(config)
2522
self.interval_hours = 6
26-
self.cache_timeout = 12 * 60 * 60
2723

28-
def get_cache_key(self, hour):
29-
return f'public_form_upload_link:{hour}'
30-
31-
def handle_upload_link(self, event_data):
32-
dtable_uuid = uuid_str_to_32_chars(event_data.get('dtable_uuid'))
33-
repo_id = event_data.get('repo_id')
34-
parent_dir = event_data.get('parent_dir') or ''
35-
if 'public/forms' not in parent_dir:
36-
return
37-
cache_key = self.get_cache_key((datetime.now().hour+1)%24)
38-
dtable_uuids_cache = redis_cache.get(cache_key)
39-
if dtable_uuids_cache:
40-
try:
41-
dtable_uuids = json.loads(dtable_uuids_cache)
42-
except Exception as e:
43-
logger.warning('key: %s cache invalid', cache_key)
44-
dtable_uuids = [(dtable_uuid, repo_id)]
45-
else:
46-
flag = True
47-
for tmp_dtable_uuid, _ in dtable_uuids:
48-
if dtable_uuid == tmp_dtable_uuid:
49-
flag = False
50-
break
51-
if flag:
52-
dtable_uuids.append((dtable_uuid, repo_id))
53-
else:
54-
dtable_uuids = [(dtable_uuid, repo_id)]
55-
redis_cache.set(cache_key, json.dumps(dtable_uuids), self.cache_timeout)
56-
57-
def listen_redis(self):
58-
logger.info('Starting handle dtable upload link...')
59-
subscriber = self._redis_client.get_subscriber('upload-link')
60-
while not self._finished.is_set():
24+
def handle_flags(self, session):
25+
now = datetime.now()
26+
flag_time = (now - timedelta(hours=self.interval_hours)).replace(minute=0, second=0, microsecond=0)
27+
offset, limit = 0, 1000
28+
while True:
29+
sql = "SELECT dtable_uuid, repo_id FROM dtable_form_upload_link_flags WHERE flag_time=:flag_time LIMIT :offset, :limit"
6130
try:
62-
message = subscriber.get_message()
63-
if message is not None:
64-
data = json.loads(message['data'])
65-
try:
66-
self.handle_upload_link(data)
67-
except Exception as e:
68-
logger.error('Handle dtable upload link: %s' % e)
69-
else:
70-
time.sleep(0.5)
31+
results = list(session.execute(sql, {'flag_time': flag_time, 'offset': offset, 'limit': limit}))
7132
except Exception as e:
72-
logger.error('Failed get message from redis: %s' % e)
73-
subscriber = self._redis_client.get_subscriber('upload-link')
33+
logger.error('query upload flags flag_time: %s error: %s', flag_time, e)
34+
break
35+
logger.debug('flag_time: %s offset: %s limit: %s query results: %s', flag_time, offset, limit, len(results))
36+
repo_id_dtable_uuids_dict = defaultdict(list)
37+
for dtable_uuid, repo_id in results:
38+
repo_id_dtable_uuids_dict[repo_id].append(dtable_uuid)
39+
for repo_id, dtable_uuids in repo_id_dtable_uuids_dict.items():
40+
logger.debug('repo: %s dtable_uuids: %s', repo_id, len(dtable_uuids))
41+
try:
42+
repo = seafile_api.get_repo(repo_id)
43+
if not repo:
44+
continue
45+
for dtable_uuid in dtable_uuids:
46+
public_forms_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}/public/forms'
47+
dir_id = seafile_api.get_dir_id_by_path(repo_id, public_forms_path)
48+
if not dir_id:
49+
continue
50+
f_offset, f_limit= 0, 1000
51+
to_delete_files = []
52+
while True:
53+
dirents = seafile_api.list_dir_by_path(repo_id, public_forms_path, f_offset, f_limit)
54+
if not dirents:
55+
break
56+
for dirent in dirents:
57+
if stat.S_ISDIR(dirent.mode):
58+
continue
59+
if (now.timestamp() - datetime.fromtimestamp(dirent.mtime)) > self.interval_hours * 60 * 60:
60+
to_delete_files.append(dirent.obj_name)
61+
if len(dirents) < f_limit:
62+
break
63+
f_offset += f_limit
64+
logger.debug('repo: %s dtable: %s to delete files: %s', repo_id, dtable_uuid, len(to_delete_files))
65+
for file in to_delete_files:
66+
seafile_api.del_file(repo_id, public_forms_path, file, '')
67+
except Exception as e:
68+
logger.exception('repo: %s handle upload flags error: %s', repo_id, e)
69+
offset += limit
70+
sql = "DELETE FROM dtable_form_upload_link_flags WHERE flag_time <= :flag_time"
71+
try:
72+
session.execute(sql, {'flag_time': flag_time})
73+
session.commit()
74+
except Exception as e:
75+
logger.error('delete upload flags old data flag time: %s error: %s', flag_time, e)
7476

75-
def handle_form_timeout_images(self):
77+
def run(self):
78+
logger.info('start to handle upload flags...')
7679
sched = BlockingScheduler()
7780

7881
@sched.scheduled_job('cron', day_of_week='*', hour='*')
7982
def handle():
80-
handle_hour = (datetime.now() - timedelta(hours=self.interval_hours)).hour
81-
cache_key = self.get_cache_key(handle_hour)
82-
dtable_uuids_cache = redis_cache.get(cache_key)
83-
if not dtable_uuids_cache:
84-
return
83+
session = self.session_class()
8584
try:
86-
dtable_uuids = json.loads(dtable_uuids_cache)
85+
self.handle_flags(session)
8786
except Exception as e:
88-
logger.warning('cache: %s loads dtable uuids error: %s', cache_key, e)
89-
return
90-
now_timestamp = datetime.now().timestamp()
91-
logger.debug('start to check key: %s dtable_uuids: %s ', cache_key, len(dtable_uuids))
92-
for dtable_uuid, repo_id in dtable_uuids:
93-
try:
94-
public_forms_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}/public/forms'
95-
logger.debug('start to scan repo: %s dtable: %s path: %s', repo_id, dtable_uuid, public_forms_path)
96-
dirents = []
97-
limit, offset = 1000, 0
98-
while True:
99-
step_dirents = seafile_api.list_dir_by_path(repo_id, public_forms_path, offset, limit)
100-
if not step_dirents:
101-
break
102-
dirents.extend(step_dirents)
103-
if len(step_dirents) < limit:
104-
break
105-
offset += limit
106-
logger.debug('repo: %s dtable: %s path: %s dirents: %s', repo_id, dtable_uuid, public_forms_path, len(dirents))
107-
for dirent in dirents:
108-
if stat.S_ISDIR(dirent.mode):
109-
continue
110-
if dirent.mtime and now_timestamp - dirent.mtime > self.interval_hours * 60 * 60:
111-
seafile_api.del_file(repo_id, public_forms_path, json.dumps([dirent.obj_name]), '')
112-
except Exception as e:
113-
logger.exception('scan repo: %s dtable: %s path: %s error: %s', repo_id, dtable_uuid, public_forms_path, e)
114-
redis_cache.delete(cache_key)
87+
logger.exception('handle upload flags error: %s', e)
88+
finally:
89+
session.close()
11590

11691
sched.start()
117-
118-
def run(self):
119-
logger.info('start to listen upload-link redis and handle form timeout images...')
120-
Thread(target=self.listen_redis, daemon=True).start()
121-
Thread(target=self.handle_form_timeout_images, daemon=True).start()

0 commit comments

Comments
 (0)