Skip to content

Commit 02c2759

Browse files
authored
Merge pull request #316 from albe/copilot/add-event-storage-http-api-layer
Raw buffer streaming, backwards iteration fixes, Consumer progress event, and EventStore.consumers registry
2 parents 00f693f + 389d09f commit 02c2759

29 files changed

Lines changed: 1762 additions & 377 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ node_modules
1313
/site
1414
/data
1515
*.tgz
16+
/event-storage-ui
17+
/event-storage-http

AGENTS.md

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# AGENTS.md
2+
3+
## Principles (in priority order)
4+
5+
1. **Clean API surface** — usage for common cases should be straightforward and well-documented. Avoid breaking changes unless they significantly improve usability.
6+
2. **Understandable code** — cyclomatic complexity per method should stay around or below current levels. Higher-level methods should delegate to clearly-named helpers so the logic reads like pseudo-code. Use utility functions (e.g. `kWayMerge`, binary search) for generic algorithms.
7+
3. **Performance** — maintain good performance across all code paths. The Index and Partition layers are most performance-sensitive and may prefer performance over readability at the margin. Elsewhere, prefer simplicity; simpler code is often faster.
8+
9+
**Keep this file up-to-date**: after any code review that surfaces new architectural insights or a new principle, compact this file and integrate what was learned.
10+
11+
## Architecture
12+
13+
```
14+
EventStore → Storage → Partition (append-only data files)
15+
→ Index → per-stream index files
16+
→ EventStream / JoinEventStream (iterators over indexes)
17+
→ Consumer (durable position tracking)
18+
```
19+
20+
- **Storage/Index/Partition** each follow a 3-tier class hierarchy under `src/<Component>/`: `Readable*``ReadOnly*``Writable*`. The facade file `src/<Component>.js` re-exports the Writable + ReadOnly variants.
21+
- **EventStore** (`src/EventStore.js`) is the main entry point — owns a `Storage` instance, manages stream indexes in a `streams/` subdirectory, and provides `commit()` / `getEventStream()` / `query()` / `createConsumer()`.
22+
- **DCB concurrency**: `query()` returns a `CommitCondition` capturing the global log position + type/matcher filter. Passing it as `expectedVersion` to `commit()` rejects only when matching events were appended since the query.
23+
- Streams are named indexes over a shared storage file; events are partitioned by `event.stream`. Category queries use `<category>-<id>` naming convention.
24+
25+
## Lifecycle & I/O Patterns
26+
27+
- **Constructor stays sync** — all file I/O is deferred to `open()`.
28+
- **`open()` is the entry point for I/O**: scanning partitions, scanning index files, acquiring locks, and opening file descriptors.
29+
- **`'opened'` event** — emitted by Storage once the first async scan + primary index open completes. EventStore listens to `'opened'` to emit its own `'ready'`.
30+
- **`'index-created'` event** — emitted by Storage during `scanFiles()` for each existing secondary index file found. EventStore uses this to register streams without its own directory scan.
31+
- **Secondary indexes open on demand** — only the primary index is opened eagerly; secondary indexes are opened lazily on first access.
32+
- **`initialized` three-state**: `null` = not started, `false` = scan in progress, `true` = scan done. Re-opens after `close()` are synchronous.
33+
- **`open(callback)` hook** — fires after `openIndexes()` and before `'opened'`. Used by `WritableStorage` for torn-write repair.
34+
- **LOCK_RECLAIM in `open()`** — orphaned lock removal lives in `WritableStorage.open()`, directly before `lock()`; torn-write repair runs via the `open(callback)` hook.
35+
- **EventStore `initialize()`** — register `storage.on('index-created', ...)` *before* calling `storage.open()`.
36+
37+
## Key Commands
38+
39+
```bash
40+
npm test # runs mocha tests with c8 coverage (test/*.spec.js)
41+
```
42+
43+
No build step; source is plain ESM consumed directly. No linter configured.
44+
45+
## Code Conventions
46+
47+
- **ESM only** — all files use `import`/`export`.
48+
- Test files use `expect.js` (not chai/jest) with `mocha`. Each spec mirrors `src/` naming: `test/<Component>.spec.js`.
49+
- Tests create temp data in `test/data/` and clean with `fs-extra.emptyDirSync` in `beforeEach`.
50+
- Errors are custom classes exported alongside main classes (e.g. `OptimisticConcurrencyError`, `StorageLockedError`).
51+
- The `index.js` barrel re-exports only the public API — keep it in sync when adding exports.
52+
- **No underscore-prefixed names**; use descriptive public names even for internal helpers.
53+
- **Expressive names over comments** — prefer clear method names. Doc blocks only for the *why*, not the *how*.
54+
55+
## File Layout
56+
57+
| Path | Purpose |
58+
|------|---------|
59+
| `src/EventStore.js` | Core: commit, query (DCB), streams, consumers |
60+
| `src/Storage/Writable*.js` | Append-only file storage with locking |
61+
| `src/Partition/Writable*.js` | Binary partition files with headers/metadata |
62+
| `src/Index/Writable*.js` | Persisted stream indexes |
63+
| `src/Consumer.js` | Durable consumer with position tracking and `progress` event |
64+
| `src/EventStream.js` | Iterator over a single stream; `predicate === true` activates raw (NDJSON buffer) mode |
65+
| `src/JoinEventStream.js` | Merges multiple streams; ordering uses epoch-denormalized `time64` + `sequenceNumber` tiebreaker from binary header |
66+
| `src/Clock.js` | Monotonic microsecond clock |
67+
| `src/IndexEntry.js` | Index record serialization |
68+
| `src/IndexMatcher.js` | O(1) discriminant-based matcher classification |
69+
| `src/PartitionPool.js` | LRU-evicting pool for open partition handles |
70+
| `src/Watcher.js` / `src/WatchesFile.js` | Ref-counting directory watcher and mixin |
71+
| `src/fsUtil.js` | `ensureDirectory`, `scanForFiles` |
72+
| `src/metadataUtil.js` | Metadata matching for access control hooks |
73+
| `bench/` | Benchmarks (not part of test suite) |
74+
| `stress-test/` | Crash-safety / recovery validation |
75+
76+
## Testing Notes
77+
78+
- Always call `eventstore.close()` in `afterEach` to release file locks.
79+
- Commits are async with callback: `eventstore.commit(stream, events, [expectedVersion], callback)`.
80+
- `EventStore` emits `'ready'` after opening — wrap test logic inside that event.
81+
- Prefer `once` over `on` for one-shot lifecycle events (`'opened'`, `'ready'`, `'index-created'`).

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,27 @@ eventstore.on('ready', () => {
7575

7676
---
7777

78+
## HTTP API
79+
80+
To expose an event store over HTTP, see the companion package **[event-storage-http](https://github.com/albe/node-event-storage-http)**:
81+
82+
```bash
83+
npm install event-storage-http
84+
```
85+
86+
```javascript
87+
import EventStore from 'event-storage';
88+
import { createEventStoreHttpServer } from 'event-storage-http';
89+
90+
const eventStore = new EventStore('my-store', { storageDirectory: './data' });
91+
const server = createEventStoreHttpServer(eventStore);
92+
server.listen(3000);
93+
```
94+
95+
The package exposes NDJSON stream endpoints, durable consumer management, and an `HttpEventStream` client helper for consuming event streams over fetch.
96+
97+
---
98+
7899
## Documentation
79100

80101
The full documentation is hosted at **<https://node-event-storage.readthedocs.io/en/latest/>** and covers:

bench/bench-read-scenarios.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ function countAll(iter) {
2121
return n;
2222
}
2323

24+
function countAllExact(iter, expected, label) {
25+
const count = countAll(iter);
26+
if (count !== expected) {
27+
throw new Error(`${label}: expected ${expected} items but got ${count}`);
28+
}
29+
return count;
30+
}
31+
2432
function populateStore(EventStore, directory) {
2533
return new Promise((resolve, reject) => {
2634
fs.emptyDirSync(directory);
@@ -57,12 +65,16 @@ populateStore(Stable, 'data/stable')
5765
Suite
5866
.add('1 - forward full scan [stable]', () => countAll(stableStore.getAllEvents()))
5967
.add('1 - forward full scan [latest]', () => countAll(latestStore.getAllEvents()))
68+
.add('1 - forward full scan [latest(raw)]', () => countAllExact(latestStore.getAllEvents(1, -1, true), EVENTS, 'latest raw'))
6069
.add('2 - backwards full scan [stable]', () => countAll(stableStore.getAllEvents(-1, 1)))
6170
.add('2 - backwards full scan [latest]', () => countAll(latestStore.getAllEvents(-1, 1)))
71+
.add('2 - backwards full scan [latest(raw)]', () => countAll(latestStore.getAllEvents(-1, 1, true)))
6272
.add('3 - join stream [stable]', () => countAll(stableStore.fromStreams('join', [STREAM_1, STREAM_2])))
6373
.add('3 - join stream [latest]', () => countAll(latestStore.fromStreams('join', [STREAM_1, STREAM_2])))
74+
.add('3 - join stream [latest(raw)]', () => countAll(latestStore.fromStreams('join', [STREAM_1, STREAM_2], 0, -1, true)))
6475
.add('4 - range scan [stable]', () => countAll(stableStore.getAllEvents(third, twoThirds)))
6576
.add('4 - range scan [latest]', () => countAll(latestStore.getAllEvents(third, twoThirds)))
77+
.add('4 - range scan [latest(raw)]', () => countAll(latestStore.getAllEvents(third, twoThirds, true)))
6678
.run();
6779
})
6880
.catch((e) => { console.error(e); process.exit(1); });

bench/package-lock.json

Lines changed: 22 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"dependencies": {
99
"beautify-benchmark": "^0.2.4",
1010
"benchmark": "^2.1.4",
11-
"event-storage": "^0.7.2",
11+
"event-storage": "^1.0",
1212
"fs-extra": "^11.1.1"
1313
}
1414
}

docs/api.md

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,30 +77,33 @@ Return the current version (event count) of a stream, or `-1` if the stream does
7777

7878
---
7979

80-
#### `getEventStream(streamName, [minRevision], [maxRevision])` ✅ Stable
80+
#### `getEventStream(streamName, [minRevision], [maxRevision], [predicate], [raw])` ✅ Stable
8181

8282
```javascript
83-
eventstore.getEventStream(streamName [, minRevision [, maxRevision]]) → EventStream | false
83+
eventstore.getEventStream(streamName [, minRevision [, maxRevision [, predicate [, raw]]]]) → EventStream | false
8484
```
8585

86-
Return an `EventStream` for the named stream, or `false` if no such stream exists. `minRevision` and `maxRevision` are 1-based and inclusive; negative values count from the end.
86+
Return an `EventStream` for the named stream, or `false` if no such stream exists. `minRevision` and `maxRevision` are 1-based and inclusive; negative values count from the end.
87+
88+
- `predicate` supports function and object matchers.
89+
- `raw=true` returns newline-delimited JSON `Buffer` chunks (NDJSON) for direct network streaming.
8790

8891
---
8992

90-
#### `getAllEvents([minRevision], [maxRevision])` ✅ Stable
93+
#### `getAllEvents([minRevision], [maxRevision], [predicate], [raw])` ✅ Stable
9194

9295
```javascript
93-
eventstore.getAllEvents([minRevision [, maxRevision]]) → EventStream
96+
eventstore.getAllEvents([minRevision [, maxRevision [, predicate [, raw]]]]) → EventStream
9497
```
9598

9699
Return an `EventStream` covering every event in the store across all streams. Equivalent to `getEventStream('_all', ...)`.
97100

98101
---
99102

100-
#### `getEventStreamForCategory(categoryName, [minRevision], [maxRevision])` ✅ Stable
103+
#### `getEventStreamForCategory(categoryName, [minRevision], [maxRevision], [predicate], [raw])` ✅ Stable
101104

102105
```javascript
103-
eventstore.getEventStreamForCategory(categoryName [, minRevision [, maxRevision]]) → EventStream
106+
eventstore.getEventStreamForCategory(categoryName [, minRevision [, maxRevision [, predicate [, raw]]]]) → EventStream
104107
```
105108

106109
Return a joined `EventStream` for all streams whose names begin with
@@ -211,10 +214,10 @@ Remove a previously registered listener.
211214

212215
---
213216

214-
#### `fromStreams(streamName, streamNames, [minRevision], [maxRevision])`
217+
#### `fromStreams(streamName, streamNames, [minRevision], [maxRevision], [predicate], [raw])`
215218

216219
```javascript
217-
eventstore.fromStreams(streamName, streamNames [, minRevision [, maxRevision]]) → EventStream
220+
eventstore.fromStreams(streamName, streamNames [, minRevision [, maxRevision [, predicate [, raw]]]]) → EventStream
218221
```
219222

220223
Create a virtual `EventStream` by joining the listed streams in sequence-number order.
@@ -291,12 +294,12 @@ Asynchronously scan all consumer state files and return their identifiers.
291294
import { EventStream } from 'event-storage';
292295
```
293296

294-
`EventStream` extends Node's `stream.Readable` (in `objectMode`).
297+
`EventStream` extends Node's `stream.Readable`.
295298

296299
### Constructor
297300

298301
```javascript
299-
new EventStream(name, eventStore, [minRevision], [maxRevision])
302+
new EventStream(name, eventStore, [minRevision], [maxRevision], [predicate], [raw])
300303
```
301304

302305
| Parameter | Type | Default | Description |
@@ -305,6 +308,8 @@ new EventStream(name, eventStore, [minRevision], [maxRevision])
305308
| `eventStore` | `EventStore` || The owning `EventStore`. |
306309
| `minRevision` | `number` | `1` | First event revision to include (1-based, inclusive). Negative values count from the end. |
307310
| `maxRevision` | `number` | `-1` | Last event revision to include (inclusive). `-1` means the last event. |
311+
| `predicate` | `function\|object\|null` | `null` | Optional matcher. In object mode functions receive `(payload, metadata)`. In raw mode functions receive `(buffer)`. |
312+
| `raw` | `boolean` | `false` | Emit NDJSON `Buffer` chunks instead of payload objects. |
308313

309314
In practice, `EventStream` instances are obtained from `EventStore` methods rather than constructed directly.
310315

@@ -492,6 +497,28 @@ Return the next event object from the iterator, or `false` when the stream is ex
492497

493498
---
494499

500+
### Raw Streaming (NDJSON)
501+
502+
Use raw mode when you want to stream events directly over sockets/HTTP without deserializing and re-serializing every event in userland.
503+
504+
```javascript
505+
const stream = eventstore.getEventStream('orders', 1, -1, { payload: { type: 'OrderPlaced' } }, true);
506+
stream.pipe(httpResponse);
507+
```
508+
509+
Matcher behavior differs by mode:
510+
511+
- **Object mode (`raw=false`)**
512+
- Function matcher: `(payload, metadata) => boolean`
513+
- Object matcher: evaluated against `{ stream, payload, metadata }`
514+
- **Raw mode (`raw=true`)**
515+
- Function matcher: `(buffer) => boolean` where `buffer` is one compact JSON document
516+
- Object matcher: byte-level raw matcher over compact JSON bytes (lazy-built at first stream consumption)
517+
518+
The raw object matcher requires the default compact JSON serializer format. If you use a custom serializer (including pretty-printed or transformed JSON), object matchers in raw mode are not guaranteed to work; use a function matcher in that case.
519+
520+
---
521+
495522
## Storage
496523

497524
`Storage` is the low-level append-only document store used internally by `EventStore`. It can be used directly for advanced use cases.

docs/dcb.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const { stream, condition } = store.query(
3636

3737
The optional `matcher` narrows the boundary to exactly the events that would affect the decision — unrelated events of the same type (e.g. a different order) never cause spurious conflicts.
3838

39+
`query()` also supports raw mode (`query(types, matcher, minRevision, true)`), but raw streaming itself is a general stream-reading feature, not DCB-specific. See [Event Streams -> Reading Streams](streams.md#reading-streams) for the full raw-mode semantics and matcher behavior.
40+
3941
### Step 2 — Build the decision model
4042

4143
```javascript

docs/streams.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,34 @@ stream.forEach((event, metadata, streamName) => {
7373
});
7474
```
7575

76+
### Raw Mode (NDJSON for Network Streaming)
77+
78+
Use raw mode when you want to stream events directly over HTTP/TCP without deserializing and re-serializing each document in userland.
79+
80+
```javascript
81+
const stream = eventstore.getEventStream(
82+
'user-123',
83+
1,
84+
-1,
85+
{ payload: { type: 'UserRegistered' } },
86+
true
87+
);
88+
89+
// Each chunk is one compact JSON document plus "\n"
90+
stream.pipe(httpResponse);
91+
```
92+
93+
Raw mode is available on all stream-reading APIs (`getEventStream`, `getAllEvents`, `fromStreams`, `getEventStreamForCategory`) and also on `query()`.
94+
95+
Matcher semantics differ by mode:
96+
97+
| Mode | Function matcher | Object matcher |
98+
|------|------------------|----------------|
99+
| `raw=false` (default) | `(payload, metadata) => boolean` | Matched against `{ stream, payload, metadata }` |
100+
| `raw=true` | `(buffer) => boolean` | Byte-level matcher against compact JSON bytes |
101+
102+
Important: raw object matchers require the default compact JSON serializer format. If you use a custom serializer, use a raw function matcher instead.
103+
76104
### Revision Ranges
77105

78106
Retrieve only a slice of the stream by passing `minRevision` and `maxRevision`:

0 commit comments

Comments
 (0)