Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions apps/common/config/embedding_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,56 @@
@date:2023/10/23 16:03
@desc:
"""

import threading
import time

from common.cache.mem_cache import MemCache

lock = threading.Lock()
_lock = threading.Lock()
locks = {}


class ModelManage:
cache = MemCache('model', {})
up_clear_time = time.time()

@staticmethod
def _get_lock(_id):
lock = locks.get(_id)
if lock is None:
with _lock:
lock = locks.get(_id)
if lock is None:
lock = threading.Lock()
locks[_id] = lock

return lock

@staticmethod
def get_model(_id, get_model):
# 获取锁
lock.acquire()
try:
model_instance = ModelManage.cache.get(_id)
if model_instance is None or not model_instance.is_cache_model():
model_instance = ModelManage.cache.get(_id)
if model_instance is None:
lock = ModelManage._get_lock(_id)
with lock:
model_instance = ModelManage.cache.get(_id)
if model_instance is None:
model_instance = get_model(_id)
ModelManage.cache.set(_id, model_instance, timeout=60 * 60 * 8)
else:
if model_instance.is_cache_model():
ModelManage.cache.touch(_id, timeout=60 * 60 * 8)
else:
model_instance = get_model(_id)
ModelManage.cache.set(_id, model_instance, timeout=60 * 30)
return model_instance
# 续期
ModelManage.cache.touch(_id, timeout=60 * 30)
ModelManage.clear_timeout_cache()
return model_instance
finally:
# 释放锁
lock.release()
ModelManage.cache.set(_id, model_instance, timeout=60 * 60 * 8)
ModelManage.clear_timeout_cache()
return model_instance

@staticmethod
def clear_timeout_cache():
if time.time() - ModelManage.up_clear_time > 60:
ModelManage.cache.clear_timeout_data()
if time.time() - ModelManage.up_clear_time > 60 * 60:
threading.Thread(target=lambda: ModelManage.cache.clear_timeout_data()).start()
ModelManage.up_clear_time = time.time()

@staticmethod
def delete_key(_id):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several improvements compared to its previous version:

  1. Improved Locking Mechanism: The use of a dictionary locks along with thread-safe operations ensures that each _id can have its own lock object instead of acquiring the global lock. This reduces contention and improves performance when multiple models need access simultaneously.

  2. Optimized Clear Timeout Cache Logic: The logic to clear timeouts is wrapped in a separate function and executed asynchronously using threading.Thread. This prevents blocking other parts of the application while waiting for the cache data to be cleared.

  3. Simplified Get Model Method: Directly fetching and caching the model instance inside the method simplifies the control flow, making it more concise and easier to read.

Here's an optimized version of the code:

import threading
import time

from common.cache.mem_cache import MemCache

class ModelManage:
    cache = MemCache('model', {})
    up_clear_time = time.time()

    _lock = threading.Lock()
    locks = {}

    @staticmethod
    def _get_lock(_id):
        lock = locks.get(_id)
        if lock is None:
            with ModelManage._lock:
                lock = locks.get(_id)
                if lock is None:
                    lock = threading.Lock()
                    locks[_id] = lock
        return lock

    @staticmethod
    def get_model(_id, get_model):
        lock = None
        try:
            lock = ModelManage._get_lock(_id)
            with lock:
                model_instance = ModelManage.cache.get(_id)
                if model_instance is None or not model_instance.is_cache_model():
                    model_instance = get_model(_id)
                    ModelManage.cache.set(_id, model_instance, timeout=60 * 60 * 8,)
        except Exception as e:
            print(f"Error getting model {_id}: {e}")
        finally:
            ModelManage.clear_timeout_cache_async(lock)

    @staticmethod
    def clear_timeout_cache():
        if time.time() - ModelManage.up_clear_time > 60:
            ModelManage.cache.clear_timeout_data()
  
    @staticmethod
    def delete_key(_id):
        pass

Key Changes:

  • Locks Dictionary: Used for managing per-model locks.
  • Synchronous Lock Handling: Simplified lock handling within get_model.
  • Asynchronous Timeout Cache Clearing: Ensures clearing is done in the background without blocking.
  • Exception Handling: Added basic exception handling in get_model.

These changes should improve efficiency and robustness in managing models in your application.

Expand Down
11 changes: 4 additions & 7 deletions apps/common/utils/rsa_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,12 @@ def generate():
def get_key_pair():
rsa_value = rsa_cache.get(cache_key)
if rsa_value is None:
lock.acquire()
rsa_value = rsa_cache.get(cache_key)
if rsa_value is not None:
return rsa_value
try:
with lock:
rsa_value = rsa_cache.get(cache_key)
if rsa_value is not None:
return rsa_value
rsa_value = get_key_pair_by_sql()
rsa_cache.set(cache_key, rsa_value)
finally:
lock.release()
return rsa_value


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code provided has some minor issues that can be addressed:

  1. Use of lock.acquire() without corresponding lock.release():

    • The code includes an attempt to acquire a lock but does not release it when rsa_cache.get(cache_key) returns a non-null value.
    • This causes the lock to remain acquired, potentially leading to deadlocks if multiple threads access this function simultaneously.
  2. Exception Handling Inside Lock Block:

    • It's best practice to handle exceptions explicitly inside the locked block and ensure they are properly handled after releasing the lock, even if there was no exception in the first place.
      However, in this code, the try-finally block is used incorrectly because catching and handling an exception (try/except) inside the lock might lead to confusion about which part of the code should run if an exception occurs.

Suggested Corrections:

def get_key_pair():
    rsa_value = rsa_cache.get(cache_key)
    if rsa_value is None:
        with lock:
            rsa_value = rsa_cache.get(cache_key)
            if rsa_value is not None:
                return rsa_value

            try:
                rsa_value = get_key_pair_by_sql()
                rsa_cache.set(cache_key, rsa_value)
            except Exception as e:
                # Handle the exception here (e.g., log it)
                print(f"An error occurred while fetching RSA key pair: {str(e)}")

    return rsa_value

By making these changes, the code will correctly manage its locking behavior and handle potential errors more robustly.

Expand Down
Loading