Skip to content

Memory leak with when using local Observable.subscribe with System.Collections.Concurrent collections? #10315

@Thorium

Description

@Thorium

I'm asking here, as StackOverflow didn't tell anything useful, the cause is rather Observable.subscribe than ConcurrentCollection.
I had an operation (submit updates to database) and wanted to record what other events happen during the same time.
So I had a method with a local ConcurrentDictionary and an observable subscribe filling that.
It looks to me that out even if I dispose the observable the collection is not GC-collected. The server did leak memory like 500MB per day, which is not acceptable for a long running server.

After changing the ConcurrentDictionary to a list (with manual locking), the memory leak is gone. But concurrent collections should be the way to go, not manual locking.

I'll try to simulate the issue with a script:

open System
open System.Collections.Concurrent

// Whatever observable stream, I'll took a one from:
// https://fsharpforfunandprofit.com/posts/concurrency-reactive/
let createTimerAndObservable timerInterval =
    let timer = new System.Timers.Timer(float timerInterval)
    timer.AutoReset <- true
    let observable = timer.Elapsed  
    let task = async {
        timer.Start()
        do! Async.Sleep 5000
        timer.Stop()
        }
    (task,observable)
let basicTimer2 , timerEventStream = createTimerAndObservable 1000

/// Create local ConcurrentDictionary
/// only for listening events at that time
/// Then dispose the whole thing
let myFunc() =
    async {
        let tmpItems = ConcurrentDictionary<_,_>()
        use o =
            timerEventStream 
            |> Observable.subscribe (fun x -> 
                let now = DateTime.Now
                printfn "tick %A" now
                tmpItems.GetOrAdd(now, x) |> ignore)
        // Small operation during which I record the events
        do! Async.Sleep 2000
        // Disposing should allow tmpItems to be GC-collected after this method exits.
        o.Dispose()
        tmpItems.Clear()
    } |> Async.StartAsTask

let startMem = GC.GetTotalMemory(true);
myFunc() 
let endMem = GC.GetTotalMemory(true);
Async.RunSynchronously basicTimer2
printfn "Memory usage: %O" (endMem - startMem)

Environment is .NET Framework full 4.7.1, Windows 10.
// FSharp.Core, Version=4.7.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a
[assembly: AssemblyFileVersion("4.700.19.40208")]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Area-LibraryIssues for FSharp.Core not covered elsewhere

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions