Skip to content

Commit 2a2d4f8

Browse files
authored
Merge pull request #717 from square/jwilson.0523.lock_free
Make the segment pool lock-free on the JVM
2 parents 0d5a2d0 + fae6650 commit 2a2d4f8

6 files changed

Lines changed: 194 additions & 53 deletions

File tree

okio/src/commonMain/kotlin/okio/SegmentPool.kt

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,19 @@
1515
*/
1616
package okio
1717

18-
import kotlin.native.concurrent.ThreadLocal
19-
2018
/**
2119
* A collection of unused segments, necessary to avoid GC churn and zero-fill.
2220
* This pool is a thread-safe static singleton.
2321
*/
24-
@ThreadLocal
25-
internal object SegmentPool {
26-
/** The maximum number of bytes to pool. */
27-
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
28-
const val MAX_SIZE = 64 * 1024L // 64 KiB.
29-
30-
/** Singly-linked list of segments. */
31-
var next: Segment? = null
32-
33-
/** Total bytes in this pool. */
34-
var byteCount = 0L
22+
internal expect object SegmentPool {
23+
val MAX_SIZE: Long
3524

36-
fun take(): Segment {
37-
synchronized(this) {
38-
next?.let { result ->
39-
next = result.next
40-
result.next = null
41-
byteCount -= Segment.SIZE
42-
return result
43-
}
44-
}
45-
return Segment() // Pool is empty. Don't zero-fill while holding a lock.
46-
}
25+
/** For testing only. Returns a snapshot of the number of bytes currently in the pool. */
26+
val byteCount: Long
4727

48-
fun recycle(segment: Segment) {
49-
require(segment.next == null && segment.prev == null)
50-
if (segment.shared) return // This segment cannot be recycled.
28+
/** Return a segment for the caller's use. */
29+
fun take(): Segment
5130

52-
synchronized(this) {
53-
if (byteCount + Segment.SIZE > MAX_SIZE) return // Pool is full.
54-
byteCount += Segment.SIZE
55-
segment.next = next
56-
segment.limit = 0
57-
segment.pos = segment.limit
58-
next = segment
59-
}
60-
}
31+
/** Recycle a segment that the caller no longer needs. */
32+
fun recycle(segment: Segment)
6133
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (C) 2014 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package okio
17+
18+
internal actual object SegmentPool {
19+
actual val MAX_SIZE: Long = 0L
20+
21+
actual val byteCount: Long = 0L
22+
23+
actual fun take(): Segment = Segment()
24+
25+
actual fun recycle(segment: Segment) {
26+
}
27+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (C) 2014 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package okio
17+
18+
import okio.SegmentPool.LOCK
19+
import okio.SegmentPool.MAX_SIZE
20+
import okio.SegmentPool.recycle
21+
import okio.SegmentPool.take
22+
import java.util.concurrent.atomic.AtomicLong
23+
import java.util.concurrent.atomic.AtomicReference
24+
25+
/**
26+
* This class pools segments in a lock-free singly-linked stack. Though this code is lock-free it
27+
* does use a sentinel [LOCK] value to defend against races.
28+
*
29+
* When popping, a caller swaps the stack's next pointer with the [LOCK] sentinel. If the stack was
30+
* not already locked, the caller replaces the head node with its successor.
31+
*
32+
* When pushing, a caller swaps the head with a new node whose successor is the replaced head.
33+
*
34+
* If operations conflict, segments are not pushed into the stack. A [recycle] call that loses a
35+
* race will not add to the pool, and a [take] call that loses a race will not take from the pool.
36+
* Under significant contention this pool will have fewer hits and the VM will do more GC and zero
37+
* filling of arrays.
38+
*
39+
* Note that the [MAX_SIZE] may be exceeded if multiple calls to [recycle] race. Exceeding the
40+
* target pool size by a few segments doesn't harm performance, and imperfect enforcement is less
41+
* code.
42+
*/
43+
internal actual object SegmentPool {
44+
/** The maximum number of bytes to pool. */
45+
// TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
46+
actual val MAX_SIZE = 64 * 1024L // 64 KiB.
47+
48+
/** A sentinel segment to indicate that the linked list is currently being modified. */
49+
private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false)
50+
51+
/** Singly-linked list of segments. */
52+
private var firstRef = AtomicReference<Segment?>()
53+
54+
/** Total bytes in this pool. */
55+
private var atomicByteCount = AtomicLong()
56+
57+
actual val byteCount: Long
58+
get() = atomicByteCount.get()
59+
60+
@JvmStatic
61+
actual fun take(): Segment {
62+
val first = firstRef.getAndSet(LOCK)
63+
when {
64+
first === LOCK -> {
65+
// We didn't acquire the lock. Don't take a pooled segment.
66+
return Segment()
67+
}
68+
first == null -> {
69+
// We acquired the lock but the pool was empty. Unlock and return a new segment.
70+
firstRef.set(null)
71+
return Segment()
72+
}
73+
else -> {
74+
// We acquired the lock and the pool was not empty. Pop the first element and return it.
75+
firstRef.set(first.next)
76+
first.next = null
77+
atomicByteCount.addAndGet(-Segment.SIZE.toLong())
78+
return first
79+
}
80+
}
81+
}
82+
83+
@JvmStatic
84+
actual fun recycle(segment: Segment) {
85+
require(segment.next == null && segment.prev == null)
86+
if (segment.shared) return // This segment cannot be recycled.
87+
if (atomicByteCount.get() >= MAX_SIZE) return // Pool is full.
88+
89+
val first = firstRef.get()
90+
if (first === LOCK) return // A take() is currently in progress.
91+
92+
segment.next = first
93+
segment.limit = 0
94+
segment.pos = 0
95+
96+
if (firstRef.compareAndSet(first, segment)) {
97+
// We successfully recycled this segment. Adjust the pool size.
98+
atomicByteCount.addAndGet(Segment.SIZE.toLong())
99+
} else {
100+
// We raced another operation. Don't recycle this segment.
101+
segment.next = null
102+
}
103+
}
104+
}

okio/src/jvmTest/kotlin/okio/SegmentSharingTest.kt

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ package okio
1818
import okio.ByteString.Companion.encodeUtf8
1919
import okio.TestUtil.assertEquivalent
2020
import okio.TestUtil.bufferWithSegments
21+
import okio.TestUtil.takeAllPoolSegments
2122
import org.junit.Test
2223
import kotlin.test.assertEquals
23-
import kotlin.test.assertNull
24+
import kotlin.test.assertTrue
2425
import kotlin.test.fail
2526

2627
/** Tests behavior optimized by sharing segments between buffers and byte strings. */
@@ -74,13 +75,11 @@ class SegmentSharingTest {
7475
val snapshot = buffer.snapshot()
7576
assertEquals(xs + ys + zs, snapshot.utf8())
7677

77-
// While locking the pool, confirm that clearing the buffer doesn't release its segments.
78-
synchronized(SegmentPool) {
79-
SegmentPool.next = null
80-
SegmentPool.byteCount = 0L
81-
buffer.clear()
82-
assertNull(SegmentPool.next)
83-
}
78+
// Confirm that clearing the buffer doesn't release its segments.
79+
val bufferHead = buffer.head
80+
takeAllPoolSegments() // Make room for new segments.
81+
buffer.clear()
82+
assertTrue(bufferHead !in takeAllPoolSegments())
8483
}
8584

8685
/**
@@ -92,14 +91,15 @@ class SegmentSharingTest {
9291
val clone = buffer.clone()
9392

9493
// While locking the pool, confirm that clearing the buffer doesn't release its segments.
95-
synchronized(SegmentPool) {
96-
SegmentPool.next = null
97-
SegmentPool.byteCount = 0L
98-
buffer.clear()
99-
assertNull(SegmentPool.next)
100-
clone.clear()
101-
assertNull(SegmentPool.next)
102-
}
94+
val bufferHead = buffer.head!!
95+
takeAllPoolSegments() // Make room for new segments.
96+
buffer.clear()
97+
assertTrue(bufferHead !in takeAllPoolSegments())
98+
99+
val cloneHead = clone.head!!
100+
takeAllPoolSegments() // Make room for new segments.
101+
clone.clear()
102+
assertTrue(cloneHead !in takeAllPoolSegments())
103103
}
104104

105105
@Test fun snapshotJavaSerialization() {

okio/src/jvmTest/kotlin/okio/TestUtil.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ import kotlin.test.assertTrue
2727

2828
object TestUtil {
2929
// Necessary to make an internal member visible to Java.
30-
const val SEGMENT_POOL_MAX_SIZE = SegmentPool.MAX_SIZE
30+
@JvmField val SEGMENT_POOL_MAX_SIZE = SegmentPool.MAX_SIZE
3131
const val SEGMENT_SIZE = Segment.SIZE
3232
const val REPLACEMENT_CODE_POINT: Int = okio.REPLACEMENT_CODE_POINT
33+
3334
@JvmStatic fun segmentPoolByteCount() = SegmentPool.byteCount
3435

3536
@JvmStatic
@@ -247,6 +248,16 @@ object TestUtil {
247248
return buffer.snapshot()
248249
}
249250

251+
/** Remove all segments from the pool and return them as a list. */
252+
@JvmStatic
253+
internal fun takeAllPoolSegments(): List<Segment> {
254+
val result = mutableListOf<Segment>()
255+
while (SegmentPool.byteCount > 0) {
256+
result += SegmentPool.take()
257+
}
258+
return result
259+
}
260+
250261
/** Returns a copy of `buffer` with no segments with `original`. */
251262
@JvmStatic
252263
fun deepCopy(original: Buffer): Buffer {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (C) 2014 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package okio
17+
18+
internal actual object SegmentPool {
19+
actual val MAX_SIZE: Long = 0L
20+
21+
actual val byteCount: Long = 0L
22+
23+
actual fun take(): Segment = Segment()
24+
25+
actual fun recycle(segment: Segment) {
26+
}
27+
}

0 commit comments

Comments
 (0)