77# http://www.opensource.org/licenses/MIT-license
88# Copyright (c) 2016, Shay Palachy <shaypal5@gmail.com>
99
10+ import asyncio
1011import inspect
1112import os
1213import threading
@@ -56,6 +57,14 @@ def _function_thread(core, key, func, args, kwds):
5657 print (f"Function call failed with the following exception:\n { exc } " )
5758
5859
60+ async def _function_thread_async (core , key , func , args , kwds ):
61+ try :
62+ func_res = await func (* args , ** kwds )
63+ core .set_entry (key , func_res )
64+ except BaseException as exc :
65+ print (f"Function call failed with the following exception:\n { exc } " )
66+
67+
5968def _calc_entry (core , key , func , args , kwds , printer = lambda * _ : None ) -> Optional [Any ]:
6069 core .mark_entry_being_calculated (key )
6170 try :
@@ -68,6 +77,18 @@ def _calc_entry(core, key, func, args, kwds, printer=lambda *_: None) -> Optiona
6877 core .mark_entry_not_calculated (key )
6978
7079
80+ async def _calc_entry_async (core , key , func , args , kwds , printer = lambda * _ : None ) -> Optional [Any ]:
81+ core .mark_entry_being_calculated (key )
82+ try :
83+ func_res = await func (* args , ** kwds )
84+ stored = core .set_entry (key , func_res )
85+ if not stored :
86+ printer ("Result exceeds entry_size_limit; not cached" )
87+ return func_res
88+ finally :
89+ core .mark_entry_not_calculated (key )
90+
91+
7192def _convert_args_kwargs (func , _is_method : bool , args : tuple , kwds : dict ) -> dict :
7293 """Convert mix of positional and keyword arguments to aggregated kwargs."""
7394 # unwrap if the function is functools.partial
@@ -390,13 +411,108 @@ def _call(*args, max_age: Optional[timedelta] = None, **kwds):
390411 _print ("No entry found. No current calc. Calling like a boss." )
391412 return _calc_entry (core , key , func , args , kwds , _print )
392413
414+ async def _call_async (* args , max_age : Optional [timedelta ] = None , ** kwds ):
415+ # NOTE: For async functions, wait_for_calc_timeout is not honored.
416+ # Instead of blocking the event loop waiting for concurrent
417+ # calculations, async functions will recalculate in parallel.
418+ # This avoids deadlocks and maintains async efficiency.
419+ nonlocal allow_none , last_cleanup
420+ _allow_none = _update_with_defaults (allow_none , "allow_none" , kwds )
421+ # print('Inside async wrapper for {}.'.format(func.__name__))
422+ ignore_cache = _pop_kwds_with_deprecation (kwds , "ignore_cache" , False )
423+ overwrite_cache = _pop_kwds_with_deprecation (kwds , "overwrite_cache" , False )
424+ verbose = _pop_kwds_with_deprecation (kwds , "verbose_cache" , False )
425+ ignore_cache = kwds .pop ("cachier__skip_cache" , ignore_cache )
426+ overwrite_cache = kwds .pop ("cachier__overwrite_cache" , overwrite_cache )
427+ verbose = kwds .pop ("cachier__verbose" , verbose )
428+ _stale_after = _update_with_defaults (stale_after , "stale_after" , kwds )
429+ _next_time = _update_with_defaults (next_time , "next_time" , kwds )
430+ _cleanup_flag = _update_with_defaults (cleanup_stale , "cleanup_stale" , kwds )
431+ _cleanup_interval_val = _update_with_defaults (cleanup_interval , "cleanup_interval" , kwds )
432+ # merge args expanded as kwargs and the original kwds
433+ kwargs = _convert_args_kwargs (func , _is_method = core .func_is_method , args = args , kwds = kwds )
434+
435+ if _cleanup_flag :
436+ now = datetime .now ()
437+ with cleanup_lock :
438+ if now - last_cleanup >= _cleanup_interval_val :
439+ last_cleanup = now
440+ _get_executor ().submit (core .delete_stale_entries , _stale_after )
441+
442+ _print = print if verbose else lambda x : None
443+
444+ # Check current global caching state dynamically
445+ from .config import _global_params
446+
447+ if ignore_cache or not _global_params .caching_enabled :
448+ return await func (args [0 ], ** kwargs ) if core .func_is_method else await func (** kwargs )
449+ key , entry = core .get_entry ((), kwargs )
450+ if overwrite_cache :
451+ result = await _calc_entry_async (core , key , func , args , kwds , _print )
452+ return result
453+ if entry is None or (not entry ._completed and not entry ._processing ):
454+ _print ("No entry found. No current calc. Calling like a boss." )
455+ result = await _calc_entry_async (core , key , func , args , kwds , _print )
456+ return result
457+ _print ("Entry found." )
458+ if _allow_none or entry .value is not None :
459+ _print ("Cached result found." )
460+ now = datetime .now ()
461+ max_allowed_age = _stale_after
462+ nonneg_max_age = True
463+ if max_age is not None :
464+ if max_age < ZERO_TIMEDELTA :
465+ _print ("max_age is negative. Cached result considered stale." )
466+ nonneg_max_age = False
467+ else :
468+ assert max_age is not None # noqa: S101
469+ max_allowed_age = min (_stale_after , max_age )
470+ # note: if max_age < 0, we always consider a value stale
471+ if nonneg_max_age and (now - entry .time <= max_allowed_age ):
472+ _print ("And it is fresh!" )
473+ return entry .value
474+ _print ("But it is stale... :(" )
475+ if _next_time :
476+ _print ("Async calc and return stale" )
477+ # Mark entry as being calculated then immediately unmark
478+ # This matches sync behavior and ensures entry exists
479+ # Background task will update cache when complete
480+ core .mark_entry_being_calculated (key )
481+ # Use asyncio.create_task for background execution
482+ asyncio .create_task (_function_thread_async (core , key , func , args , kwds ))
483+ core .mark_entry_not_calculated (key )
484+ return entry .value
485+ _print ("Calling decorated function and waiting" )
486+ result = await _calc_entry_async (core , key , func , args , kwds , _print )
487+ return result
488+ if entry ._processing :
489+ msg = "No value but being calculated. Recalculating"
490+ _print (f"{ msg } (async - no wait)." )
491+ # For async, don't wait - just recalculate
492+ # This avoids blocking the event loop
493+ result = await _calc_entry_async (core , key , func , args , kwds , _print )
494+ return result
495+ _print ("No entry found. No current calc. Calling like a boss." )
496+ return await _calc_entry_async (core , key , func , args , kwds , _print )
497+
393498 # MAINTAINER NOTE: The main function wrapper is now a standard function
394499 # that passes *args and **kwargs to _call. This ensures that user
395500 # arguments are not shifted, and max_age is only settable via keyword
396501 # argument.
397- @wraps (func )
398- def func_wrapper (* args , ** kwargs ):
399- return _call (* args , ** kwargs )
502+ # For async functions, we create an async wrapper that calls
503+ # _call_async.
504+ is_coroutine = inspect .iscoroutinefunction (func )
505+
506+ if is_coroutine :
507+
508+ @wraps (func )
509+ async def func_wrapper (* args , ** kwargs ):
510+ return await _call_async (* args , ** kwargs )
511+ else :
512+
513+ @wraps (func )
514+ def func_wrapper (* args , ** kwargs ):
515+ return _call (* args , ** kwargs )
400516
401517 def _clear_cache ():
402518 """Clear the cache."""
0 commit comments