-
Notifications
You must be signed in to change notification settings - Fork 384
Expand file tree
/
Copy pathchunked_data_stream.py
More file actions
104 lines (80 loc) · 2.64 KB
/
chunked_data_stream.py
File metadata and controls
104 lines (80 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import collections
import csv
import io
import json
import splunklib.searchcommands.internals
from splunklib.utils import ensure_binary, ensure_str
class Chunk:
def __init__(self, version, meta, data):
self.version = ensure_str(version)
self.meta = json.loads(meta)
dialect = splunklib.searchcommands.internals.CsvDialect
self.data = csv.DictReader(io.StringIO(data.decode("utf-8")), dialect=dialect)
class ChunkedDataStreamIter(collections.abc.Iterator):
def __init__(self, chunk_stream):
self.chunk_stream = chunk_stream
def __next__(self):
return self.next()
def next(self):
try:
return self.chunk_stream.read_chunk()
except EOFError:
raise StopIteration
class ChunkedDataStream(collections.abc.Iterable):
def __iter__(self):
return ChunkedDataStreamIter(self)
def __init__(self, stream):
empty = stream.read(0)
assert isinstance(empty, bytes)
self.stream = stream
def read_chunk(self):
header = self.stream.readline()
while len(header) > 0 and header.strip() == b"":
header = self.stream.readline() # Skip empty lines
if len(header) == 0:
raise EOFError
version, meta, data = header.rstrip().split(b",")
metabytes = self.stream.read(int(meta))
databytes = self.stream.read(int(data))
return Chunk(version, metabytes, databytes)
def build_chunk(keyval, data=None):
metadata = ensure_binary(json.dumps(keyval))
data_output = _build_data_csv(data)
return b"chunked 1.0,%d,%d\n%s%s" % (
len(metadata),
len(data_output),
metadata,
data_output,
)
def build_empty_searchinfo():
return {
"earliest_time": 0,
"latest_time": 0,
"search": "",
"dispatch_dir": "",
"sid": "",
"args": [],
"splunk_version": "42.3.4",
}
def build_getinfo_chunk():
return build_chunk(
{"action": "getinfo", "preview": False, "searchinfo": build_empty_searchinfo()}
)
def build_data_chunk(data, finished=True):
return build_chunk({"action": "execute", "finished": finished}, data)
def _build_data_csv(data):
if data is None:
return b""
if isinstance(data, bytes):
return data
csvout = io.StringIO()
headers = set()
for datum in data:
headers.update(datum.keys())
writer = csv.DictWriter(
csvout, headers, dialect=splunklib.searchcommands.internals.CsvDialect
)
writer.writeheader()
for datum in data:
writer.writerow(datum)
return ensure_binary(csvout.getvalue())