1+ package org.operatorfoundation.transmission
2+
3+ import java.util.concurrent.locks.ReentrantLock
4+ import kotlin.concurrent.withLock
5+
6+ /* *
7+ * A bidirectional in-memory pipe connecting two endpoints.
8+ * Data written to one end can be read from the other end.
9+ */
10+ class Pipe {
11+ private val bufferAtoB = RingBuffer (4096 )
12+ private val bufferBtoA = RingBuffer (4096 )
13+
14+ val endA = PipeEnd (bufferBtoA, bufferAtoB)
15+ val endB = PipeEnd (bufferAtoB, bufferBtoA)
16+ }
17+
18+ /* *
19+ * One end of a bidirectional pipe.
20+ */
21+ class PipeEnd internal constructor(
22+ private val readBuffer : RingBuffer ,
23+ private val writeBuffer : RingBuffer
24+ ) : Connection {
25+
26+ fun tryReadOne (): Int {
27+ val byte = readBuffer.get()
28+ return byte?.toInt()?.and (0xFF ) ? : - 1
29+ }
30+
31+ fun readOne (): Byte {
32+ return readBuffer.get() ? : (- 1 ).toByte()
33+ }
34+
35+ override fun read (size : Int ): ByteArray? {
36+ val result = mutableListOf<Byte >()
37+
38+ for (i in 0 until size) {
39+ val byte = readBuffer.get() ? : break
40+ result.add(byte)
41+ }
42+
43+ return if (result.isEmpty()) null else result.toByteArray()
44+ }
45+
46+ override fun unsafeRead (size : Int ): ByteArray? {
47+ return read(size)
48+ }
49+
50+ override fun readMaxSize (maxSize : Int ): ByteArray? {
51+ return read(maxSize)
52+ }
53+
54+ override fun readWithLengthPrefix (prefixSizeInBits : Int ): ByteArray? {
55+ throw UnsupportedOperationException (" Not implemented for Pipe" )
56+ }
57+
58+ override fun write (string : String ): Boolean {
59+ return write(string.toByteArray())
60+ }
61+
62+ override fun write (data : ByteArray ): Boolean {
63+ for (byte in data) {
64+ if (! writeBuffer.put(byte)) {
65+ return false
66+ }
67+ }
68+ return true
69+ }
70+
71+ override fun writeWithLengthPrefix (data : ByteArray , prefixSizeInBits : Int ): Boolean {
72+ throw UnsupportedOperationException (" Not implemented for Pipe" )
73+ }
74+
75+ override fun close () {
76+ // No-op for in-memory pipe
77+ }
78+
79+ fun availableForReading (): Boolean {
80+ return readBuffer.count() > 0
81+ }
82+
83+ fun available (): Int {
84+ return readBuffer.count()
85+ }
86+
87+ fun writeSpace (): Int {
88+ return readBuffer.free()
89+ }
90+ }
91+
92+ /* *
93+ * Thread-safe ring buffer for byte storage.
94+ */
95+ internal class RingBuffer (private val capacity : Int ) {
96+ private val buffer = ByteArray (capacity)
97+ private var head = 0 // Write position
98+ private var tail = 0 // Read position
99+ private var size = 0 // Current number of elements
100+ private val lock = ReentrantLock ()
101+
102+ fun put (value : Byte ): Boolean = lock.withLock {
103+ if (size >= capacity) {
104+ return false // Buffer full
105+ }
106+
107+ buffer[head] = value
108+ head = (head + 1 ) % capacity
109+ size++
110+ return true
111+ }
112+
113+ fun get (): Byte? = lock.withLock {
114+ if (size == 0 ) {
115+ return null // Buffer empty
116+ }
117+
118+ val value = buffer[tail]
119+ tail = (tail + 1 ) % capacity
120+ size--
121+ return value
122+ }
123+
124+ fun count (): Int = lock.withLock {
125+ return size
126+ }
127+
128+ fun free (): Int = lock.withLock {
129+ return capacity - size
130+ }
131+ }
0 commit comments