Skip to content

Commit 0655459

Browse files
committed
feat(filter): add basic filtering function to streams
1 parent 6109e51 commit 0655459

7 files changed

Lines changed: 227 additions & 17 deletions

File tree

src/filter.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import type { SDFRecord } from "./parser";
2+
3+
export type FilterFn = (record: SDFRecord) => boolean;
4+
5+
export type FilterRule = {
6+
property: string;
7+
min: number;
8+
max: number;
9+
treatAs: "number" | "string" | "date";
10+
};
11+
12+
export const filterRecord = (record: SDFRecord, rules: FilterRule[]): boolean => {
13+
for (const rule of rules) {
14+
const value = record.properties[rule.property];
15+
// skip a record that is missing a property to which we apply a filter
16+
if (value === undefined) continue;
17+
18+
// handle each type of filter
19+
switch (rule.treatAs) {
20+
case "number": {
21+
if (value === "") return false;
22+
// parseFloat has issues so should probably use a different library
23+
const numberValue = Number.parseFloat(value);
24+
// if the value is not parsable to a number, drop the record
25+
if (Number.isNaN(numberValue)) return false;
26+
// Keep records within the range inclusive of values equal to the bounds, I.e [min, max]
27+
if (numberValue < rule.min || numberValue > rule.max) return false;
28+
break;
29+
}
30+
// TODO: To implement
31+
case "string":
32+
case "date":
33+
continue;
34+
}
35+
}
36+
return true;
37+
};

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
export * from "./node-stream";
1+
export * from "./filter";
2+
export * from "./node-stream";
23
export * from "./parser";
34
export * from "./web-stream";

src/node-stream.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Transform, type TransformCallback, type TransformOptions } from "node:stream";
22

3+
import type { FilterFn } from "./filter";
4+
import type { SDFRecord } from "./parser";
35
import { parseSdPart } from "./parser";
46
import { splitLines } from "./utils";
57

@@ -41,16 +43,18 @@ const countRecords = (buffer: string) => buffer.match(/\${4}.*/g)?.length ?? 0;
4143
*/
4244
export class NodeSDFTransformer extends Transform {
4345
constructor(
46+
private filter: FilterFn = () => true,
4447
options?: TransformOptions,
48+
// these shouldn't be in the constructor definition but how set these to this without ts complaining?
4549
private buffer = "",
50+
private record: SDFRecord | undefined = undefined,
4651
) {
4752
super({ ...options, readableObjectMode: true, writableObjectMode: true });
48-
this.buffer = buffer;
4953

5054
this.push("[");
5155
}
5256

53-
private parse() {
57+
private parse(): SDFRecord {
5458
const recordEndIndex = this.buffer.indexOf(RECORD_SEPARATOR);
5559
const recordText = this.buffer.slice(0, recordEndIndex);
5660

@@ -66,20 +70,30 @@ export class NodeSDFTransformer extends Transform {
6670

6771
this.buffer += data.replace(/\r\n/g, "\n");
6872

69-
while (countRecords(this.buffer) > 1) {
73+
while (countRecords(this.buffer) > 0) {
7074
const record = this.parse();
71-
const json = JSON.stringify(record);
72-
this.push(json + ",");
75+
if (this.filter(record)) {
76+
if (this.record) {
77+
const json = JSON.stringify(this.record);
78+
this.push(json + ",");
79+
this.record = record;
80+
} else {
81+
this.record = record;
82+
}
83+
}
7384
}
7485

7586
callback();
7687
}
7788

7889
_flush(callback: TransformCallback) {
79-
// Handle the remaining record in the buffer
80-
const record = this.parse();
81-
const json = JSON.stringify(record);
82-
this.push(json);
90+
const record = this.record;
91+
92+
if (record) {
93+
const json = JSON.stringify(record);
94+
this.push(json);
95+
}
96+
8397
this.push("]");
8498

8599
callback();

src/parser.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
export type SDFRecord = {
44
molFile: string | undefined;
5-
properties: Record<string, string | undefined>;
5+
// mapping from property name to property value
6+
// doesn't strictly need undefined here since it's only show the property may be missing,
7+
// tsconfig should
8+
properties: Record<string, string>;
69
};
710

811
export const parseSdPart = (recordLines: string[]) => {
@@ -19,12 +22,12 @@ export const parseSdPart = (recordLines: string[]) => {
1922
if (record.molFile) {
2023
const matchHeader = line.match(patternHeader);
2124
if (matchHeader?.length == 2) {
22-
key = matchHeader[1];
25+
key = matchHeader[1] as string;
2326
} else {
2427
const hasKeyAndValue = key === "" || line === "";
2528
if (!hasKeyAndValue) {
2629
const oldValue = record.properties[key];
27-
record.properties[key] = oldValue ? `${oldValue};${line}` : line;
30+
record.properties[key] = oldValue ? `${oldValue}\r\n${line}` : line;
2831
}
2932
}
3033
} else {

src/web-stream.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { FilterFn } from "./filter";
12
import { parseSdPart, type SDFRecord } from "./parser";
23
import { splitLines } from "./utils";
34

@@ -19,7 +20,7 @@ const RECORD_SEPARATOR = "$$$$";
1920
```
2021
* @returns instance of `TransformStream`
2122
*/
22-
export const createSDFTransformer = () => {
23+
export const createSDFTransformer = (filter: FilterFn = () => true) => {
2324
let content = ""; // accumulator for the content of the current record
2425

2526
// TransformStream to be used with stream.pipeThrough()
@@ -35,7 +36,9 @@ export const createSDFTransformer = () => {
3536
const record = parseSdPart(recordLines);
3637
content = content.slice(recordEndIndex + RECORD_SEPARATOR.length + 1);
3738

38-
controller.enqueue(record);
39+
if (filter(record)) {
40+
controller.enqueue(record);
41+
}
3942
}
4043
},
4144
});

tests/stream.spec.ts

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { Transform, Writable } from "node:stream";
22
import njfetch from "node-fetch";
33
import { describe, expect, it } from "vitest";
44

5-
import type { SDFRecord } from "..";
5+
import { filterRecord, type FilterRule } from "../src/filter";
66
import { NodeSDFTransformer } from "../src/node-stream";
7+
import type { SDFRecord } from "../src/parser";
78
import { createSDFTransformer as createWebSDFTransformer, parser } from "..";
89

910
const webStreamParser = async (stream: ReadableStream<string>) => {
@@ -112,3 +113,154 @@ describe("NodeJS stream", async () => {
112113
expect(streamedRecords).toEqual(records);
113114
});
114115
});
116+
117+
describe("Filter with treatAs number", () => {
118+
const record: SDFRecord = {
119+
molFile: "",
120+
properties: {
121+
property1: "",
122+
property2: "abc",
123+
property3: "123",
124+
property4: "-123",
125+
},
126+
};
127+
128+
it("Filter drops empty string if treated as a number", () => {
129+
const filterRules: FilterRule[] = [
130+
{
131+
property: "property1",
132+
treatAs: "number",
133+
min: 0,
134+
max: Infinity,
135+
},
136+
];
137+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
138+
139+
expect(filter(record)).toBe(false);
140+
});
141+
142+
it("Filter drops non-numbers if treated as a number", () => {
143+
const filterRules: FilterRule[] = [
144+
{
145+
property: "property2",
146+
treatAs: "number",
147+
min: 0,
148+
max: Infinity,
149+
},
150+
];
151+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
152+
153+
expect(filter(record)).toBe(false);
154+
});
155+
156+
it("Filter keeps numbers when using infinite bounds", () => {
157+
const filterRules: FilterRule[] = [
158+
{
159+
property: "property3",
160+
treatAs: "number",
161+
min: -Infinity,
162+
max: Infinity,
163+
},
164+
];
165+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
166+
167+
expect(filter(record)).toBe(true);
168+
});
169+
170+
it("Filter keeps numbers when using semi-infinite bounds", () => {
171+
const filterRules: FilterRule[] = [
172+
{
173+
property: "property3",
174+
treatAs: "number",
175+
min: 0,
176+
max: Infinity,
177+
},
178+
];
179+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
180+
181+
expect(filter(record)).toBe(true);
182+
});
183+
184+
it("Filter drops negative numbers when out of bounds", () => {
185+
const filterRules: FilterRule[] = [
186+
{
187+
property: "property4",
188+
treatAs: "number",
189+
min: 0,
190+
max: Infinity,
191+
},
192+
];
193+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
194+
195+
expect(filter(record)).toBe(false);
196+
});
197+
});
198+
199+
const getAStream = async () => {
200+
const response = await njfetch(
201+
"https://github.com/InformaticsMatters/sdf-parser/raw/master/tests/data/poses.sdf",
202+
);
203+
const stream = response.body;
204+
205+
if (!stream) throw new Error("No stream");
206+
207+
return stream;
208+
};
209+
210+
describe("NodeJS stream with filter", async () => {
211+
it("Filter drops all records", async () => {
212+
const stream = await getAStream();
213+
const filterRules: FilterRule[] = [
214+
{
215+
property: "TransFSScore",
216+
treatAs: "number",
217+
min: -Infinity,
218+
max: 0,
219+
},
220+
];
221+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
222+
223+
const streamedRecords = await consumeStream(
224+
stream.pipe(decoderTransform()).pipe(new NodeSDFTransformer(filter)),
225+
);
226+
expect(streamedRecords).toHaveLength(0);
227+
});
228+
229+
it("Filter keeps all records", async () => {
230+
const stream = await getAStream();
231+
232+
const filterRules: FilterRule[] = [
233+
{
234+
property: "FeatureStein",
235+
treatAs: "number",
236+
min: -Infinity,
237+
max: Infinity,
238+
},
239+
];
240+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
241+
242+
const streamedRecords = await consumeStream(
243+
stream.pipe(decoderTransform()).pipe(new NodeSDFTransformer(filter)),
244+
);
245+
expect(streamedRecords).toHaveLength(268);
246+
});
247+
248+
it("Filter keeps the right number of records", async () => {
249+
const stream = await getAStream();
250+
251+
const filterRules: FilterRule[] = [
252+
{
253+
property: "TransFSScore",
254+
treatAs: "number",
255+
min: 0.2,
256+
max: 0.3,
257+
},
258+
];
259+
const filter = (record: SDFRecord) => filterRecord(record, filterRules);
260+
261+
const streamedRecords = await consumeStream(
262+
stream.pipe(decoderTransform()).pipe(new NodeSDFTransformer(filter)),
263+
);
264+
expect(streamedRecords).toHaveLength(53);
265+
});
266+
});

tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"moduleDetection": "force",
1212
// Strictness
1313
"strict": true,
14-
"noUncheckedIndexedAccess": false,
14+
"noUncheckedIndexedAccess": true,
1515
// If transpiling with TS
1616
// "moduleResolution": "NodeNext",
1717
// "module": "NodeNext",

0 commit comments

Comments
 (0)