-
-
Notifications
You must be signed in to change notification settings - Fork 955
Expand file tree
/
Copy pathstreamtolines.go
More file actions
119 lines (106 loc) · 2.91 KB
/
streamtolines.go
File metadata and controls
119 lines (106 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package utilfn
import (
"bytes"
"context"
"io"
"time"
)
type LineOutput struct {
Line string
Error error
}
type lineBuf struct {
buf []byte
inLongLine bool
}
// needs to be large enough to read the largest RPC packet
// there are some legacy file transfer packets that can send up to 32m (base64 encoded)
const maxLineLength = 64 * 1024 * 1024
func ReadLineWithTimeout(ch chan LineOutput, timeout time.Duration) (string, error) {
select {
case output := <-ch:
if output.Error != nil {
return "", output.Error
}
return output.Line, nil
case <-time.After(timeout):
return "", context.DeadlineExceeded
}
}
func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]byte)) {
for len(readBuf) > 0 {
nlIdx := bytes.IndexByte(readBuf, '\n')
if nlIdx == -1 {
if lineBuf.inLongLine || len(lineBuf.buf)+len(readBuf) > maxLineLength {
lineBuf.buf = nil
lineBuf.inLongLine = true
return
}
lineBuf.buf = append(lineBuf.buf, readBuf...)
return
}
if !lineBuf.inLongLine && len(lineBuf.buf)+nlIdx <= maxLineLength {
line := append(lineBuf.buf, readBuf[:nlIdx]...)
lineFn(line)
}
lineBuf.buf = nil
lineBuf.inLongLine = false
readBuf = readBuf[nlIdx+1:]
}
}
func StreamToLines(input io.Reader, lineFn func([]byte), readCallback func()) error {
var lineBuf lineBuf
readBuf := make([]byte, 64*1024)
for {
n, err := input.Read(readBuf)
streamToLines_processBuf(&lineBuf, readBuf[:n], lineFn)
if err != nil {
return err
}
if readCallback != nil {
readCallback()
}
}
}
// starts a goroutine to drive the channel
// line output does not include the trailing newline
func StreamToLinesChan(input io.Reader) chan LineOutput {
ch := make(chan LineOutput)
go func() {
defer close(ch)
err := StreamToLines(input, func(line []byte) {
ch <- LineOutput{Line: string(line)}
}, nil)
if err != nil && err != io.EOF {
ch <- LineOutput{Error: err}
}
}()
return ch
}
// LineWriter is an io.Writer that processes data line-by-line via a callback.
// Lines do not include the trailing newline. Lines longer than maxLineLength are dropped.
type LineWriter struct {
lineBuf lineBuf
lineFn func([]byte)
}
// NewLineWriter creates a new LineWriter with the given callback function.
func NewLineWriter(lineFn func([]byte)) *LineWriter {
return &LineWriter{
lineFn: lineFn,
}
}
// Write implements io.Writer, processing the data and calling the callback for each complete line.
func (lw *LineWriter) Write(p []byte) (n int, err error) {
streamToLines_processBuf(&lw.lineBuf, p, lw.lineFn)
return len(p), nil
}
// Flush outputs any remaining buffered data as a final line.
// Should be called when the input stream is complete (e.g., at EOF).
func (lw *LineWriter) Flush() {
if len(lw.lineBuf.buf) > 0 && !lw.lineBuf.inLongLine {
lw.lineFn(lw.lineBuf.buf)
lw.lineBuf.buf = nil
}
}