Skip to content

Add more powerful cleanup functions#3060

Merged
harendra-kumar merged 14 commits intomasterfrom
prompt-cleanup1
Jul 7, 2025
Merged

Add more powerful cleanup functions#3060
harendra-kumar merged 14 commits intomasterfrom
prompt-cleanup1

Conversation

@harendra-kumar
Copy link
Copy Markdown
Member

So that we can release resources any time.
Fix atomicity issue during allocation.
Rename the APIs.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR renames and enhances resource cleanup APIs to provide deterministic, on-demand cleanup without relying on GC, fixes allocation atomicity, and updates tests accordingly.

  • Introduce withBracketIO/withBracketIOM and withFinallyIO/withFinallyIOM to replace cleanupIO/cleanupEffectIO.
  • Update tests to use the new withFinally* functions.
  • Improve internal implementation with IORef+Map for multiple registrations and atomic modifications.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
test/Streamly/Test/Data/Stream/Exception.hs Swapped cleanupIO/cleanupEffectIO calls to withFinallyIO/withFinallyIOM
core/src/Streamly/Internal/Data/Stream/Exception.hs Removed old cleanup* exports, added withBracket* & withFinally* implementations, updated docs
Comments suppressed due to low confidence (4)

core/src/Streamly/Internal/Data/Stream/Exception.hs:24

  • Removing cleanupIO from the export without a deprecation or alias may break backward compatibility. Consider adding a deprecated alias forwarding calls to withFinallyIO before fully removing it.
    , bracketIO

core/src/Streamly/Internal/Data/Stream/Exception.hs:25

  • Removing cleanupEffectIO from the export without a deprecation or alias may break backward compatibility. Consider adding a deprecated alias forwarding calls to withFinallyIOM before fully removing it.
    , withBracketIO

core/src/Streamly/Internal/Data/Stream/Exception.hs:406

  • The doc for withFinallyIOM references withBracketIOM. It should instead mention withFinallyIOM to accurately describe when the hooks are run.
-- would run automatically when 'withBracketIOM' ends or if an exception occurs

core/src/Streamly/Internal/Data/Stream/Exception.hs:417

  • [nitpick] The suffix ‘IOM’ is ambiguous; consider a more descriptive name such as withFinallyEffect or including Effect to clearly distinguish it from withFinallyIO.
withFinallyIOM :: (MonadIO m, MonadCatch m) =>

@composewell composewell deleted a comment from Copilot AI Jun 25, 2025
Comment thread core/src/Streamly/Internal/Data/Stream/Exception.hs Outdated
So that we can release resources any time.
Fix atomicity issue during allocation.
Rename the APIs.
* Fix manual freeing of resources, add the missing "free"
* Add "forall" to types so that different types of resources can be
  alloced/freed.
* Make allocation thread safe and exception safe
@harendra-kumar harendra-kumar force-pushed the prompt-cleanup1 branch 2 times, most recently from 9782b7a to 23f119c Compare July 6, 2025 08:39

-- | Resources with Priority1 are freed before Priority2. This is especially
-- introduced to take care of the case where we need to free channels, so that
-- all the workers of the channel are cleanup before we free the resources
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup -> cleaned up

release = do
f <- atomicModifyIORef ref modify
sequence_ f
-- XXX can we ensure via GC that the resources that we are freeing are all
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't support parallel release yet.
PS: I'm reviewing the commits one by one. This commit may be obsolete.

Comment on lines +249 to +250
-- IMPORTANT: do not use interruptible operations in this
-- critical section. Even putStrLn can make tests fail.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly are interruptible operations?
Operations that allow the thread to sleep?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See Control.Exception in base.

Comment on lines +256 to +257
-- restoring exceptions makes it non-atomic, tests fail.
-- Can use allowInterrupt in "free" if desired.
Copy link
Copy Markdown
Member

@adithyaov adithyaov Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unable to follow this.

Comment on lines +657 to +658
-- XXX free them atomically, even if another release executes in
-- parallel.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By this statement, you assume that either this atomic free will happen first before the parallel release or the parallel release will happen first, and this won't.

Comment on lines +678 to +681
-- We run it without an exception mask unlike withAllocateIO in
-- the Streamly.Control.Exception module. We can add it if
-- needed.
res <- atomicModifyIORef' ref modify
Copy link
Copy Markdown
Member

@adithyaov adithyaov Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, async exceptions can trigger this.

--
setReleaseCb :: AllocateIO -> Config -> Config
setReleaseCb f cfg = cfg { _release = Just (register f) }
setReleaseCb f cfg = cfg { _release = Just (registerWith f Priority1) }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback registered in the channel config will always be called first, and then the other callbacks.
The user has to be wary about resource cleanup and decide what should go where.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only streamly developers need to worry about the priority. For normal users there is one priority.

-- only one will succeed others will block until it finishes.
--
cleanupChan :: Channel m a -> String -> IO ()
cleanupChan chan reason = do
Copy link
Copy Markdown
Member

@adithyaov adithyaov Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will always be a major GC performed before the channel is marked Stopped.
It may not be required in many cases where there isn't any resource allocation.
Is it possible to performMajorGC only when required?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performMajorGC is done only in inspect mode so that logs come properly.

Comment on lines +165 to +169
-- We use GC based hooks in Stream.bracketIO' so there could be async threads
-- spawned by GC, releasing resources concurrently with us. For that reason we
-- need to make sure that the "release" in the bracket end action is executed
-- only once in that case.
--
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove GC hooks? Now that we have prompt cleanup why do we still have them?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to cleanup even before the "prompt" release happens. prompt release can be later than GC.

-- all the workers of the channel are cleanup before we free the resources
-- allocated by the workers of the channel. Otherwise we might free the
-- resources and workers may be trying to use them and start misbehaving.
-- | Resources with 'Priority1' are freed before 'Priority2'. Priority is
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also give a small example on how this helps in freeing concurrency channels?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stream is being evaluated concurrently by multiple workers, each worker allocates some resource and is working with those resources. An interrupt comes and cleanup starts. Assume, we cleanup the resources of the workers first and then the worker. Now the worker is still working so it might crash or misbehave of because of the resource slipping from under it. So we need ensure that we call the cleanup of the workers before we cleanup their resources.

@adithyaov
Copy link
Copy Markdown
Member

@harendra-kumar Although I have a few questions, I think I understand the motivation behind these changes.

@harendra-kumar harendra-kumar merged commit 24468aa into master Jul 7, 2025
19 of 22 checks passed
@harendra-kumar harendra-kumar deleted the prompt-cleanup1 branch July 9, 2025 20:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants