Skip to content

Commit fa785b1

Browse files
authored
Merge pull request #674 from AbbyGi/add-replay
Add replay script
2 parents 20af977 + 71cb9e8 commit fa785b1

4 files changed

Lines changed: 199 additions & 10 deletions

File tree

.github/workflows/testing.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ jobs:
1717
strategy:
1818
matrix:
1919
python-version: [3.7, 3.8, 3.9]
20-
intake-version: ["master", "latest"]
2120
fail-fast: false
2221
steps:
2322

@@ -39,14 +38,6 @@ jobs:
3938
python -m pip install -r requirements-test.txt
4039
python -m pip list
4140
42-
- name: Install intake mater for some builds.
43-
if: ${{ matrix.intake-version == 'master' }}
44-
shell: bash -l {0}
45-
run: |
46-
set -vxeuo pipefail
47-
python -m pip install git+https://github.com/intake/intake.git@master
48-
python -m pip list
49-
5041
- name: Test with pytest
5142
shell: bash -l {0}
5243
run: |

continuous_integration/scripts/install.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ python -m pip install --upgrade pip setuptools wheel numpy
99
# Versioneer uses the most recent git tag to generate __version__, which appears
1010
# in the published documentation.
1111
git fetch --tags
12-
python -m pip install .
12+
python -m pip install .[all]
1313
python -m pip list

databroker/replay/replay.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import collections
5+
import sys
6+
import time
7+
8+
from event_model import unpack_event_page, unpack_datum_page
9+
10+
11+
def log(*args, **kwargs):
12+
kwargs.setdefault("file", sys.stderr)
13+
return print(*args, **kwargs)
14+
15+
16+
def replay(gen, callback, burst=False, delay=0):
17+
"""
18+
Emit documents to a callback with realistic time spacing.
19+
20+
Parameters
21+
----------
22+
gen: iterable
23+
Expected to yield (name, doc) pairs
24+
callback: callable
25+
Expected signature: callback(name, doc)
26+
"""
27+
cache = collections.deque()
28+
name, doc = next(gen)
29+
if name != "start":
30+
raise ValueError("Expected gen to start with a RunStart document")
31+
# Compute time difference between now and the time that this run started.
32+
offset = time.time() - doc["time"]
33+
callback(name, doc)
34+
for name, doc in gen:
35+
if name == "event_page":
36+
# Expand this EventPage into Events.
37+
for event in unpack_event_page(doc):
38+
_process_document("event", event, cache, offset, callback, burst, delay)
39+
elif name == "datum_page":
40+
# Expand this DatumgPage into Events.
41+
for datum in unpack_datum_page(doc):
42+
_process_document("datum", datum, cache, offset, callback, burst, delay)
43+
else:
44+
_process_document(name, doc, cache, offset, callback, burst, delay)
45+
46+
47+
_DOCUMENTS_WITHOUT_A_TIME = {"datum", "datum_page", "resource"}
48+
49+
50+
def _process_document(name, doc, cache, offset, callback, burst, delay):
51+
if name in _DOCUMENTS_WITHOUT_A_TIME:
52+
# The bluesky RunEngine emits these documents immediately
53+
# before emitting an Event, which does have a time. Lacking
54+
# more specific timing info, we'll cache these and then emit
55+
# them in a burst before the next document with an associated time.
56+
cache.append((name, doc))
57+
else:
58+
if not burst:
59+
delay = max(0, offset - (time.time() - doc["time"]))
60+
time.sleep(delay)
61+
while cache:
62+
# Emit any cached documents without a time in a burst.
63+
time.sleep(delay)
64+
callback(*cache.popleft())
65+
# Emit this document.
66+
time.sleep(delay)
67+
callback(name, doc)
68+
69+
70+
def main(argv=None):
71+
parser = argparse.ArgumentParser()
72+
parser.add_argument("--zmq", help="0MQ address")
73+
parser.add_argument(
74+
"--kafka-bootstrap-servers",
75+
help="Kafka servers, comma-separated string, e.g. "
76+
"kafka1.nsls2.bnl.gov:9092,kafka2.nsls2.bnl.gov:9092,kafka3.nsls2.bnl.gov:9092",
77+
)
78+
parser.add_argument(
79+
"--kafka-topic", help="Kafka topic, e.g. bmm.bluesky.runengine.documents"
80+
)
81+
parser.add_argument("--kafka-key", help="Kafka key")
82+
catalog_group = parser.add_argument_group(
83+
description=("Which catalog should we look for Runs in?")
84+
).add_mutually_exclusive_group()
85+
catalog_group.add_argument("--catalog", help="Databroker catalog name")
86+
catalog_group.add_argument("--msgpack", help="Directory of msgpack files")
87+
filter_group = parser.add_argument_group(
88+
description="Which Runs should we replay? Must specify one of these:"
89+
).add_mutually_exclusive_group()
90+
filter_group.add_argument(
91+
"--all", action="store_true", help="Replay every Run in this Catalog."
92+
)
93+
filter_group.add_argument(
94+
"-q",
95+
"--query",
96+
type=str,
97+
action="append",
98+
help=(
99+
"MongoDB-style query or databroker.queries Query. "
100+
"Narrow results by chaining multiple queries like "
101+
"-q \"TimeRange(since='2020')\" "
102+
"-q \"{'sample': 'Au'}\""
103+
),
104+
)
105+
filter_group.add_argument(
106+
"--uids",
107+
type=argparse.FileType("r"),
108+
action="append",
109+
help=("Newline-separated (partial) uids. Lines starting with # are skipped."),
110+
)
111+
parser.add_argument(
112+
"--burst",
113+
action="store_true",
114+
help=(
115+
"Set this to turn off simulated timing and push documents "
116+
"through as fast as possible."
117+
),
118+
)
119+
parser.add_argument(
120+
"--delay",
121+
type=float,
122+
default=0,
123+
help=("Add a constant delay between documents."),
124+
)
125+
args = parser.parse_args(argv)
126+
if args.zmq:
127+
from bluesky.callbacks.zmq import Publisher
128+
129+
publisher = Publisher(args.zmq)
130+
elif args.kafka_bootstrap_servers and args.kafka_topics:
131+
from bluesky_kafka import Publisher
132+
133+
bootstrap_servers = [s.strip() for s in args.kafka_bootstrap_servers.split(",")]
134+
publisher = Publisher(args.kafka_topic, bootstrap_servers, args.kafka_key)
135+
else:
136+
raise ValueError("Either --zmq or --kafka... parameter is required.")
137+
if args.msgpack:
138+
from databroker._drivers.msgpack import BlueskyMsgpackCatalog
139+
140+
catalog = BlueskyMsgpackCatalog([args.msgpack])
141+
elif args.catalog:
142+
import databroker
143+
144+
catalog = databroker.catalog[args.catalog]
145+
if args.query or args.all:
146+
import databroker.queries
147+
queries = []
148+
if args.query:
149+
raw_queries = args.query
150+
ns = vars(databroker.queries)
151+
for raw_query in raw_queries:
152+
# Parse string like "{'scan_id': 123}" to dict.
153+
try:
154+
query = eval(raw_query, ns)
155+
except Exception:
156+
raise ValueError(f"Could not parse query {raw_query}.")
157+
queries.append(query)
158+
results = catalog
159+
for query in queries:
160+
results = results.search(query)
161+
if not results:
162+
print(f"Query {combined_query} yielded no results. Exiting.")
163+
# This can be a result of bash mangling the query beacuse the
164+
# caller forgot to escape the $'s.
165+
print(
166+
"If your query had dollar signs in it, remember to "
167+
"escape them like \\$."
168+
)
169+
sys.exit(1)
170+
elif args.uids:
171+
# Skip blank lines and commented lines.
172+
uids = []
173+
for uid_file in args.uids:
174+
uids.extend(
175+
line.strip()
176+
for line in uid_file.read().splitlines()
177+
if line and not line.startswith("#")
178+
)
179+
if not uids:
180+
print("Found empty input for --uids. Exiting")
181+
sys.exit(1)
182+
results = {uid: catalog[uid] for uid in uids}
183+
else:
184+
parser.error(
185+
"Must specify which Runs to replay via --query ... or "
186+
"--uids ... or --all."
187+
)
188+
log(f"Replaying {len(results)} runs.")
189+
for uid, run in results.items():
190+
log(f"Replaying run {run}")
191+
replay(run.documents(fill="no"), publisher, burst=args.burst, delay=args.delay)
192+
log(f"Done with run {run}")
193+
log("Complete. Exiting.")
194+
195+
196+
if __name__ == "__main__":
197+
main()

requirements-server.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ event-model
55
fastapi
66
mongomock
77
msgpack
8+
pims
89
pydantic
910
pymongo
1011
pytz

0 commit comments

Comments
 (0)