forked from JuliaPy/PythonCall.jl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGC.jl
More file actions
204 lines (179 loc) · 5.18 KB
/
GC.jl
File metadata and controls
204 lines (179 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
module PythonCall.GC
Garbage collection of Python objects.
See `disable` and `enable`.
"""
module GC
using ..C: C
# `ENABLED`: whether or not python GC is enabled, or paused to process later
const ENABLED = Threads.Atomic{Bool}(true)
# this event allows us to `wait` in a task until GC is re-enabled
# we have both this and `ENABLED` since there is no `isready(::Event)`
# for us to do a non-blocking check. Instead we must keep the event being triggered
# in-sync with `ENABLED[]`.
# We therefore modify both in `enable()` and `disable()` and nowhere else.
const ENABLED_EVENT = Threads.Event()
# this is the queue to process pointers for GC (`C.Py_DecRef`)
const QUEUE = Channel{C.PyPtr}(Inf)
# this is the task which performs GC from thread 1
const GC_TASK = Ref{Task}()
# This we use in testing to know when our GC is running
const GC_FINISHED = Threads.Condition()
# This is used for basic profiling
const SECONDS_SPENT_IN_GC = Threads.Atomic{Float64}()
const LOGGING_ENABLED = Ref{Bool}(false)
"""
PythonCall.GC.enable_logging(enable=true)
Enables printed logging (similar to Julia's `GC.enable_logging`).
"""
function enable_logging(enable=true)
LOGGING_ENABLED[] = enable
return nothing
end
"""
PythonCall.GC.disable()
Disable the PythonCall garbage collector. This should generally not be required.
"""
function disable()
ENABLED[] = false
reset(ENABLED_EVENT)
return
end
"""
PythonCall.GC.enable()
Re-enable the PythonCall garbage collector. This should generally not be required.
"""
function enable()
ENABLED[] = true
notify(ENABLED_EVENT)
return
end
# This is called within a finalizer so we must not task switch
# (so no printing nor blocking on Julia-side locks)
function enqueue_wrapper(f, g)
t = @elapsed begin
if C.CTX.is_initialized
# Eager path: if we are already on thread 1,
# we eagerly decrement
handled = false
if ENABLED[] && Threads.threadid() == 1
# temporarily disable thread migration to be sure
# we call `C.Py_DecRef` from thread 1
old_sticky = current_task().sticky
if !old_sticky
current_task().sticky = true
end
if Threads.threadid() == 1
f()
handled = true
end
if !old_sticky
current_task().sticky = old_sticky
end
end
if !handled
g()
end
end
end
Threads.atomic_add!(SECONDS_SPENT_IN_GC, t)
return
end
function enqueue(ptr::C.PyPtr)
# if we are on thread 1:
f = () -> begin
C.with_gil(false) do
if ptr != C.PyNULL
C.Py_DecRef(ptr)
end
end
end
# otherwise:
g = () -> begin
if ptr != C.PyNULL
put!(QUEUE, ptr)
end
end
enqueue_wrapper(f, g)
end
function enqueue_all(ptrs)
# if we are on thread 1:
f = () -> begin
C.with_gil(false) do
for ptr in ptrs
if ptr != C.PyNULL
C.Py_DecRef(ptr)
end
end
end
end
# otherwise:
g = () -> begin
for ptr in ptrs
if ptr != C.PyNULL
put!(QUEUE, ptr)
end
end
end
enqueue_wrapper(f, g)
end
# must only be called from thread 1 by the task in `GC_TASK[]`
function unsafe_process_queue!()
n = 0
if !isempty(QUEUE)
t = @elapsed C.with_gil(false) do
while !isempty(QUEUE) && ENABLED[]
# This should never block, since there should
# only be one consumer
# (we would like to not block while holding the GIL)
ptr = take!(QUEUE)
if ptr != C.PyNULL
C.Py_DecRef(ptr)
n += 1
end
end
end
if LOGGING_ENABLED[]
Base.time_print(stdout, t; msg="Python GC ($n items)")
println(stdout)
end
else
t = 0.0
end
return t
end
function gc_loop()
while true
if ENABLED[] && !isempty(QUEUE)
t = unsafe_process_queue!()
Threads.atomic_add!(SECONDS_SPENT_IN_GC, t)
# just for testing purposes
Base.@lock GC_FINISHED notify(GC_FINISHED)
end
# wait until there is both something to process
# and GC is `enabled`
wait(QUEUE)
wait(ENABLED_EVENT)
end
end
function launch_gc_task()
if isassigned(GC_TASK) && Base.istaskstarted(GC_TASK[]) && !Base.istaskdone(GC_TASK[])
throw(ConcurrencyViolationError("PythonCall GC task already running!"))
end
task = Task(gc_loop)
task.sticky = VERSION >= v"1.7" # disallow task migration which was introduced in 1.7
# ensure the task runs from thread 1
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, 0)
schedule(task)
if isdefined(Base, :errormonitor)
Base.errormonitor(task)
end
GC_TASK[] = task
task
end
function __init__()
launch_gc_task()
enable() # start enabled
nothing
end
end # module GC