Skip to content

Commit ca58e77

Browse files
authored
Merge pull request #318 from albe/copilot/refactor-codebase-maintainability
Reduce cyclomatic complexity in raw-stream hotspots and consolidate utilities under `src/utils`
2 parents cae6036 + e796281 commit ca58e77

20 files changed

Lines changed: 293 additions & 225 deletions

.github/workflows/npm-publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
"package/src/Storage.js"
6363
"package/src/Partition.js"
6464
"package/src/Index.js"
65-
"package/src/metadataUtil.js"
65+
"package/src/utils/metadataUtil.js"
6666
)
6767
for f in "${REQUIRED[@]}"; do
6868
echo "$FILES" | grep -qF "$f" || { echo "ERROR: required file '$f' is missing from the package"; exit 1; }

package.json

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,8 @@
4343
"src/Partition/*.js",
4444
"src/Storage/*.js",
4545
"src/WatchesFile.js",
46-
"src/util.js",
47-
"src/fsUtil.js",
48-
"src/metadataUtil.js",
49-
"index.js"
46+
"index.js",
47+
"src/utils/*.js"
5048
],
5149
"license": "MIT",
5250
"maintainers": [

src/Consumer.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import stream from 'stream';
22
import fs from 'fs';
33
import path from 'path';
4-
import { assert } from './util.js';
5-
import { ensureDirectory } from './fsUtil.js';
4+
import { assert } from './utils/util.js';
5+
import { ensureDirectory } from './utils/fsUtil.js';
66
import Storage from './Storage/ReadableStorage.js';
77
const MAX_CATCHUP_BATCH = 10;
88

src/EventStore.js

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import events from 'events';
66
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
77
import Index from './Index.js';
88
import Consumer from './Consumer.js';
9-
import { assert, getPropertyAtPath } from './util.js';
10-
import { ensureDirectory, scanForFiles } from './fsUtil.js';
11-
import { buildTypeMatcherFn } from './metadataUtil.js';
9+
import { assert, getPropertyAtPath } from './utils/util.js';
10+
import { ensureDirectory, scanForFiles } from './utils/fsUtil.js';
11+
import { buildTypeMatcherFn } from './utils/metadataUtil.js';
1212

1313
const ExpectedVersion = {
1414
Any: -1,
@@ -20,6 +20,7 @@ const ExpectedVersion = {
2020
*/
2121
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
2222
const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/;
23+
const STORAGE_HOOK_EVENTS = new Set(['preCommit', 'preRead']);
2324

2425
class OptimisticConcurrencyError extends Error {}
2526

@@ -240,11 +241,8 @@ class EventStore extends events.EventEmitter {
240241
* @returns {this}
241242
*/
242243
on(event, listener) {
243-
if (event === 'preCommit' || event === 'preRead') {
244-
if (event === 'preCommit') {
245-
assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
246-
}
247-
this.storage.on(event, listener);
244+
if (this.isStorageHookEvent(event)) {
245+
this.delegateStorageHookEvent('on', event, listener);
248246
return this;
249247
}
250248
return super.on(event, listener);
@@ -265,11 +263,8 @@ class EventStore extends events.EventEmitter {
265263
* @returns {this}
266264
*/
267265
once(event, listener) {
268-
if (event === 'preCommit' || event === 'preRead') {
269-
if (event === 'preCommit') {
270-
assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
271-
}
272-
this.storage.once(event, listener);
266+
if (this.isStorageHookEvent(event)) {
267+
this.delegateStorageHookEvent('once', event, listener);
273268
return this;
274269
}
275270
return super.once(event, listener);
@@ -284,13 +279,24 @@ class EventStore extends events.EventEmitter {
284279
* @returns {this}
285280
*/
286281
off(event, listener) {
287-
if (event === 'preCommit' || event === 'preRead') {
282+
if (this.isStorageHookEvent(event)) {
288283
this.storage.off(event, listener);
289284
return this;
290285
}
291286
return super.off(event, listener);
292287
}
293288

289+
isStorageHookEvent(event) {
290+
return STORAGE_HOOK_EVENTS.has(event);
291+
}
292+
293+
delegateStorageHookEvent(method, event, listener) {
294+
if (event === 'preCommit') {
295+
assert(!(this.storage instanceof ReadOnlyStorage), 'The storage was opened in read-only mode. Can not register a preCommit handler on it.');
296+
}
297+
this.storage[method](event, listener);
298+
}
299+
294300
/**
295301
* @inheritDoc
296302
*/
@@ -429,6 +435,20 @@ class EventStore extends events.EventEmitter {
429435
return type;
430436
}
431437

438+
getExistingQueryTypes(types) {
439+
const queryTypes = [];
440+
for (const type of types) {
441+
if (type in this.streams) {
442+
queryTypes.push(type);
443+
continue;
444+
}
445+
if (!this.typeAccessor) {
446+
throw new Error(`Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
447+
}
448+
}
449+
return queryTypes;
450+
}
451+
432452
/**
433453
* Commit a list of events for the given stream name, which is expected to be at the given version.
434454
* Note that the events committed may still appear in other streams too - the given stream name is only
@@ -543,20 +563,7 @@ class EventStore extends events.EventEmitter {
543563
*/
544564
query(types, matcher = null, minRevision = 1, raw = false) {
545565
assert(Array.isArray(types) && types.length > 0, 'Must specify a non-empty array of event types for query.');
546-
547-
const queryTypes = [];
548-
for (const type of types) {
549-
if (!(type in this.streams)) {
550-
// No typeAccessor: the stream was never created; we cannot know whether events of
551-
// this type exist in the store, so throw to avoid an unintentional full-store scan.
552-
assert(!!this.typeAccessor, `Type stream "${type}" does not exist. Create it with createEventStream() first, or configure typeAccessor to have type streams created automatically on commit.`);
553-
// typeAccessor is configured: type streams are created on commit, so a missing
554-
// stream simply means no event of this type has been committed yet — treat as empty.
555-
continue;
556-
}
557-
queryTypes.push(type);
558-
}
559-
566+
const queryTypes = this.getExistingQueryTypes(types);
560567
const condition = new CommitCondition(types, matcher, this.storage.length, raw);
561568
const stream = this.fromStreams('_query_' + types.join('_'), queryTypes, minRevision, -1, matcher, raw);
562569
return { stream, condition };
@@ -574,10 +581,7 @@ class EventStore extends events.EventEmitter {
574581
* @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
575582
*/
576583
getEventStream(streamName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
577-
if (typeof predicate === 'boolean' && raw === false) {
578-
raw = predicate;
579-
predicate = null;
580-
}
584+
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
581585
if (!(streamName in this.streams)) {
582586
return false;
583587
}
@@ -596,10 +600,7 @@ class EventStore extends events.EventEmitter {
596600
* @returns {EventStream} The event stream.
597601
*/
598602
getAllEvents(minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
599-
if (typeof predicate === 'boolean' && raw === false) {
600-
raw = predicate;
601-
predicate = null;
602-
}
603+
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
603604
return this.getEventStream('_all', minRevision, maxRevision, predicate, raw);
604605
}
605606

@@ -616,10 +617,7 @@ class EventStore extends events.EventEmitter {
616617
* @throws {Error} if any of the streams doesn't exist.
617618
*/
618619
fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
619-
if (typeof predicate === 'boolean' && raw === false) {
620-
raw = predicate;
621-
predicate = null;
622-
}
620+
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
623621
assert(streamNames instanceof Array, 'Must specify an array of stream names.');
624622

625623
if (streamNames.length === 0) {
@@ -658,10 +656,7 @@ class EventStore extends events.EventEmitter {
658656
* @throws {Error} If no stream for this category exists.
659657
*/
660658
getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1, predicate = null, raw = false) {
661-
if (typeof predicate === 'boolean' && raw === false) {
662-
raw = predicate;
663-
predicate = null;
664-
}
659+
({ predicate, raw } = normalizePredicateRaw(predicate, raw));
665660
if (categoryName in this.streams) {
666661
return this.getEventStream(categoryName, minRevision, maxRevision, predicate, raw);
667662
}
@@ -844,6 +839,13 @@ function parseStreamFromIndexName(indexName) {
844839
return indexName;
845840
}
846841

842+
function normalizePredicateRaw(predicate, raw) {
843+
if (typeof predicate === 'boolean' && raw === false) {
844+
return { predicate: null, raw: predicate };
845+
}
846+
return { predicate, raw };
847+
}
848+
847849
EventStore.Storage = Storage;
848850
EventStore.Index = Index;
849851

src/EventStream.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import stream from 'stream';
2-
import { assert } from './util.js';
3-
import { buildRawBufferMatcher, matches } from './metadataUtil.js';
2+
import { assert } from './utils/util.js';
3+
import { buildRawBufferMatcher, matches } from './utils/metadataUtil.js';
44

55
const NDJSON_NEWLINE = Buffer.from('\n');
66

src/Index/ReadableIndex.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import fs from 'fs';
22
import path from 'path';
33
import events from 'events';
44
import Entry, { assertValidEntryClass } from '../IndexEntry.js';
5-
import { assert, wrapAndCheck, binarySearch } from '../util.js';
5+
import { assert, wrapAndCheck, binarySearch } from '../utils/util.js';
66

77
// node-event-store-index V01
88
const HEADER_MAGIC = "nesidx01";

src/Index/WritableIndex.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import fs from 'fs';
22
import ReadableIndex, { Entry, CorruptedIndexError, HEADER_MAGIC } from './ReadableIndex.js';
3-
import { assert, assertEqual } from '../util.js';
4-
import { buildMetadataHeader } from '../metadataUtil.js';
5-
import { ensureDirectory } from '../fsUtil.js';
3+
import { assert, assertEqual } from '../utils/util.js';
4+
import { buildMetadataHeader } from '../utils/metadataUtil.js';
5+
import { ensureDirectory } from '../utils/fsUtil.js';
66

77
/**
88
* An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position

src/IndexMatcher.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { getPropertyAtPath } from './util.js';
2-
import { matches } from './metadataUtil.js';
1+
import { getPropertyAtPath } from './utils/util.js';
2+
import { matches } from './utils/metadataUtil.js';
33

44
/**
55
* @typedef {object|function(object):boolean} Matcher

src/JoinEventStream.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import EventStream from './EventStream.js';
2-
import {assert, kWayMerge} from './util.js';
2+
import { assert, kWayMerge } from './utils/util.js';
33

44
/** Reusable sentinel used for missing or empty per-stream iterators. */
55
const emptyIterator = Object.freeze({ next() { return { done: true }; } });

src/Partition/ReadablePartition.js

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import fs from 'fs';
22
import path from 'path';
33
import events from 'events';
4-
import { assert, alignTo, hash, binarySearch } from '../util.js';
4+
import { assert, alignTo, hash, binarySearch } from '../utils/util.js';
55

66

77

@@ -209,6 +209,30 @@ class ReadablePartition extends events.EventEmitter {
209209
return ({ dataSize, sequenceNumber, time64 });
210210
}
211211

212+
resolveIterationPosition(position) {
213+
return position < 0 ? this.size + position + 1 : position;
214+
}
215+
216+
selectReader(position, size, backwardsHint) {
217+
if (size > 0 && backwardsHint) {
218+
const bufferOffset = DOCUMENT_HEADER_SIZE + size;
219+
const reader = this.prepareReadBufferBackwards(position + bufferOffset, bufferOffset);
220+
return { reader, bufferOffset };
221+
}
222+
return { reader: this.prepareReadBuffer(position), bufferOffset: 0 };
223+
}
224+
225+
assignHeaderOutput(headerOut, header) {
226+
if (headerOut === null) {
227+
return;
228+
}
229+
headerOut.dataSize = header.dataSize;
230+
headerOut.sequenceNumber = header.sequenceNumber;
231+
// Denormalize time64 relative to this partition's epoch so callers can compare
232+
// timestamps across partitions without needing to know the epoch value.
233+
headerOut.time64 = this.metadata.epoch + header.time64;
234+
}
235+
212236
/**
213237
* Read the data from the given position.
214238
*
@@ -227,10 +251,7 @@ class ReadablePartition extends events.EventEmitter {
227251
assert(this.fd, 'Partition is not opened.');
228252
assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);
229253

230-
const bufferOffset = size > 0 && backwardsHint ? DOCUMENT_HEADER_SIZE + size : 0;
231-
const reader = size > 0 && backwardsHint
232-
? this.prepareReadBufferBackwards(position + bufferOffset, bufferOffset)
233-
: this.prepareReadBuffer(position);
254+
const { reader, bufferOffset } = this.selectReader(position, size, backwardsHint);
234255
if (reader.length < DOCUMENT_HEADER_SIZE) {
235256
return false;
236257
}
@@ -242,13 +263,7 @@ class ReadablePartition extends events.EventEmitter {
242263
let dataPosition = reader.cursor + DOCUMENT_HEADER_SIZE;
243264
const header = this.readDocumentHeader(reader.buffer, reader.cursor, position, size);
244265
const dataSize = header.dataSize;
245-
if (headerOut !== null) {
246-
headerOut.dataSize = header.dataSize;
247-
headerOut.sequenceNumber = header.sequenceNumber;
248-
// Denormalize time64 relative to this partition's epoch so callers can compare
249-
// timestamps across partitions without needing to know the epoch value.
250-
headerOut.time64 = this.metadata.epoch + header.time64;
251-
}
266+
this.assignHeaderOutput(headerOut, header);
252267

253268
const writeSize = this.documentWriteSize(dataSize);
254269
assert(position + writeSize <= this.size, `Invalid document at position ${position}. This may be caused by an unfinished write.`, CorruptFileError);
@@ -437,7 +452,7 @@ class ReadablePartition extends events.EventEmitter {
437452
* @returns {Generator<Buffer>} A generator that returns all documents in this partition.
438453
*/
439454
*readAll(after = 0, headerOut = null) {
440-
let position = after < 0 ? this.size + after + 1 : after;
455+
let position = this.resolveIterationPosition(after);
441456
const internalHeader = headerOut !== null ? headerOut : {};
442457
let data;
443458
while ((data = this.readFrom(position, 0, internalHeader)) !== false) {
@@ -457,7 +472,7 @@ class ReadablePartition extends events.EventEmitter {
457472
* @returns {Generator<Buffer>} A generator that returns all documents in this partition in reverse order.
458473
*/
459474
*readAllBackwards(before = -1, headerOut = null) {
460-
let position = before < 0 ? this.size + before + 1 : before;
475+
let position = this.resolveIterationPosition(before);
461476
const internalHeader = headerOut !== null ? headerOut : {};
462477
while ((position = this.findDocumentPositionBefore(position)) !== false) {
463478
const data = this.readFrom(position, 0, internalHeader, true);

0 commit comments

Comments
 (0)