-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathchunk_readers.jl
More file actions
135 lines (124 loc) · 3.76 KB
/
chunk_readers.jl
File metadata and controls
135 lines (124 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
struct LineChunkReader <: AbstractChunkReader
buffered_input::Base.BufferStream
end
function Base.iterate(iter::LineChunkReader, _state=nothing)
if eof(iter.buffered_input)
return nothing
else
out = IOBuffer()
while !eof(iter.buffered_input)
byte = read(iter.buffered_input, UInt8)
(byte == codepoint('\n')) && break
write(out, byte)
end
return (take!(out), iter)
end
end
struct JSONChunkReader <: AbstractChunkReader
buffered_input::Base.BufferStream
end
function _read_json_chunk(io::IO)
out = IOBuffer()
first_byte = peek(io, UInt8)
if first_byte == UInt8('{') || first_byte == UInt8('[')
close_byte = first_byte == UInt8('{') ? UInt8('}') : UInt8(']')
depth = 0
in_string = false
escaped = false
while !eof(io)
byte = read(io, UInt8)
write(out, byte)
if escaped
escaped = false
continue
end
if in_string
if byte == UInt8('\\')
escaped = true
elseif byte == UInt8('"')
in_string = false
end
else
if byte == UInt8('"')
in_string = true
elseif byte == first_byte
depth += 1
elseif byte == close_byte
depth -= 1
depth == 0 && break
end
end
end
elseif first_byte == UInt8('"')
escaped = false
read(io, UInt8) # consume opening quote
write(out, UInt8('"'))
while !eof(io)
byte = read(io, UInt8)
write(out, byte)
if escaped
escaped = false
elseif byte == UInt8('\\')
escaped = true
elseif byte == UInt8('"')
break
end
end
else
# number / true / false / null: read until delimiter
while !eof(io)
byte = peek(io, UInt8)
if isspace(Char(byte)) || byte == UInt8(',') || byte == UInt8(']') || byte == UInt8('}')
break
end
write(out, read(io, UInt8))
end
end
take!(out)
end
function Base.iterate(iter::JSONChunkReader, _state=nothing)
if eof(iter.buffered_input)
return nothing
else
# read all whitespaces
while !eof(iter.buffered_input)
byte = peek(iter.buffered_input, UInt8)
if isspace(Char(byte))
read(iter.buffered_input, UInt8)
else
break
end
end
eof(iter.buffered_input) && return nothing
chunk_bytes = _read_json_chunk(iter.buffered_input)
isempty(chunk_bytes) && return nothing
valid_json = _json_parse(String(chunk_bytes))
bytes = convert(Vector{UInt8}, codeunits(JSON.json(valid_json)))
return (bytes, iter)
end
end
# Ref: https://www.rfc-editor.org/rfc/rfc7464.html
const RFC7464_RECORD_SEPARATOR = UInt8(0x1E)
struct RFC7464ChunkReader <: AbstractChunkReader
buffered_input::Base.BufferStream
end
function Base.iterate(iter::RFC7464ChunkReader, _state=nothing)
if eof(iter.buffered_input)
return nothing
else
out = IOBuffer()
while !eof(iter.buffered_input)
byte = read(iter.buffered_input, UInt8)
if byte == RFC7464_RECORD_SEPARATOR
bytes = take!(out)
if isnothing(_state) || !isempty(bytes)
return (bytes, iter)
end
else
write(out, byte)
end
end
bytes = take!(out)
return (bytes, iter)
end
end