Add more powerful cleanup functions#3060
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR renames and enhances resource cleanup APIs to provide deterministic, on-demand cleanup without relying on GC, fixes allocation atomicity, and updates tests accordingly.
- Introduce
withBracketIO/withBracketIOMandwithFinallyIO/withFinallyIOMto replacecleanupIO/cleanupEffectIO. - Update tests to use the new
withFinally*functions. - Improve internal implementation with
IORef+Mapfor multiple registrations and atomic modifications.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| test/Streamly/Test/Data/Stream/Exception.hs | Swapped cleanupIO/cleanupEffectIO calls to withFinallyIO/withFinallyIOM |
| core/src/Streamly/Internal/Data/Stream/Exception.hs | Removed old cleanup* exports, added withBracket* & withFinally* implementations, updated docs |
Comments suppressed due to low confidence (4)
core/src/Streamly/Internal/Data/Stream/Exception.hs:24
- Removing
cleanupIOfrom the export without a deprecation or alias may break backward compatibility. Consider adding a deprecated alias forwarding calls towithFinallyIObefore fully removing it.
, bracketIO
core/src/Streamly/Internal/Data/Stream/Exception.hs:25
- Removing
cleanupEffectIOfrom the export without a deprecation or alias may break backward compatibility. Consider adding a deprecated alias forwarding calls towithFinallyIOMbefore fully removing it.
, withBracketIO
core/src/Streamly/Internal/Data/Stream/Exception.hs:406
- The doc for
withFinallyIOMreferenceswithBracketIOM. It should instead mentionwithFinallyIOMto accurately describe when the hooks are run.
-- would run automatically when 'withBracketIOM' ends or if an exception occurs
core/src/Streamly/Internal/Data/Stream/Exception.hs:417
- [nitpick] The suffix ‘IOM’ is ambiguous; consider a more descriptive name such as
withFinallyEffector includingEffectto clearly distinguish it fromwithFinallyIO.
withFinallyIOM :: (MonadIO m, MonadCatch m) =>
b0f6240 to
d367bcd
Compare
53f67c0 to
b8f1ae8
Compare
So that we can release resources any time. Fix atomicity issue during allocation. Rename the APIs.
* Fix manual freeing of resources, add the missing "free" * Add "forall" to types so that different types of resources can be alloced/freed. * Make allocation thread safe and exception safe
9782b7a to
23f119c
Compare
Fixes some issues in Stream.withAllocateIO.
eb438cc to
24468aa
Compare
|
|
||
| -- | Resources with Priority1 are freed before Priority2. This is especially | ||
| -- introduced to take care of the case where we need to free channels, so that | ||
| -- all the workers of the channel are cleanup before we free the resources |
| release = do | ||
| f <- atomicModifyIORef ref modify | ||
| sequence_ f | ||
| -- XXX can we ensure via GC that the resources that we are freeing are all |
There was a problem hiding this comment.
So we don't support parallel release yet.
PS: I'm reviewing the commits one by one. This commit may be obsolete.
| -- IMPORTANT: do not use interruptible operations in this | ||
| -- critical section. Even putStrLn can make tests fail. |
There was a problem hiding this comment.
What exactly are interruptible operations?
Operations that allow the thread to sleep?
There was a problem hiding this comment.
See Control.Exception in base.
| -- restoring exceptions makes it non-atomic, tests fail. | ||
| -- Can use allowInterrupt in "free" if desired. |
| -- XXX free them atomically, even if another release executes in | ||
| -- parallel. |
There was a problem hiding this comment.
By this statement, you assume that either this atomic free will happen first before the parallel release or the parallel release will happen first, and this won't.
| -- We run it without an exception mask unlike withAllocateIO in | ||
| -- the Streamly.Control.Exception module. We can add it if | ||
| -- needed. | ||
| res <- atomicModifyIORef' ref modify |
There was a problem hiding this comment.
So, async exceptions can trigger this.
| -- | ||
| setReleaseCb :: AllocateIO -> Config -> Config | ||
| setReleaseCb f cfg = cfg { _release = Just (register f) } | ||
| setReleaseCb f cfg = cfg { _release = Just (registerWith f Priority1) } |
There was a problem hiding this comment.
The callback registered in the channel config will always be called first, and then the other callbacks.
The user has to be wary about resource cleanup and decide what should go where.
There was a problem hiding this comment.
only streamly developers need to worry about the priority. For normal users there is one priority.
| -- only one will succeed others will block until it finishes. | ||
| -- | ||
| cleanupChan :: Channel m a -> String -> IO () | ||
| cleanupChan chan reason = do |
There was a problem hiding this comment.
There will always be a major GC performed before the channel is marked Stopped.
It may not be required in many cases where there isn't any resource allocation.
Is it possible to performMajorGC only when required?
There was a problem hiding this comment.
performMajorGC is done only in inspect mode so that logs come properly.
| -- We use GC based hooks in Stream.bracketIO' so there could be async threads | ||
| -- spawned by GC, releasing resources concurrently with us. For that reason we | ||
| -- need to make sure that the "release" in the bracket end action is executed | ||
| -- only once in that case. | ||
| -- |
There was a problem hiding this comment.
Is it possible to remove GC hooks? Now that we have prompt cleanup why do we still have them?
There was a problem hiding this comment.
to cleanup even before the "prompt" release happens. prompt release can be later than GC.
| -- all the workers of the channel are cleanup before we free the resources | ||
| -- allocated by the workers of the channel. Otherwise we might free the | ||
| -- resources and workers may be trying to use them and start misbehaving. | ||
| -- | Resources with 'Priority1' are freed before 'Priority2'. Priority is |
There was a problem hiding this comment.
Can you also give a small example on how this helps in freeing concurrency channels?
There was a problem hiding this comment.
A stream is being evaluated concurrently by multiple workers, each worker allocates some resource and is working with those resources. An interrupt comes and cleanup starts. Assume, we cleanup the resources of the workers first and then the worker. Now the worker is still working so it might crash or misbehave of because of the resource slipping from under it. So we need ensure that we call the cleanup of the workers before we cleanup their resources.
|
@harendra-kumar Although I have a few questions, I think I understand the motivation behind these changes. |
So that we can release resources any time.
Fix atomicity issue during allocation.
Rename the APIs.