From dbf246d34488bdd53dd73d441e0107200b3a3196 Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Thu, 23 Jan 2025 10:28:16 +0100 Subject: [PATCH 01/10] add an option to send only matches to receiving services this greatly reduces the size of delta messages in stacks with large data changes. --- README.md | 1 + app.js | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d6a8dbc..4530cf5 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ The exported property contains an array of definitions, each linking a match to - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). - `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order! - `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms. + - `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes. ### Modifying quads #### Normalize datetime diff --git a/app.js b/app.js index 2ba5c84..138185c 100644 --- a/app.js +++ b/app.js @@ -43,6 +43,32 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { res.status(204).send(); } ); + +/** + * Filters the change sets based on a specified pattern. + * + * @param {Array} changeSets - An array of change set objects, + * each containing `insert` and `delete` properties. + * @param {Object} entry - An object containing the matching criteria. + * @param {Array} entry.match - The pattern used to filter the triples + * in the `insert` and `delete` arrays. + * @returns {Array} A new array of change set objects with + * filtered `insert` and `delete` properties. + */ +function filterChangesestOnPattern(changeSets, entry) { + const filteredChangesets = []; + for (const changeSet of changeSets) { + const { insert, delete: deleteSet } = changeSet; + const clonedChangeSet = { + ...changeSet, + insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)), + delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)) + }; + filteredChangesets.push(clonedChangeSet); + }; + return filteredChangesets; +} + async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ services.map( async (entry, index) => { entry.index = index; @@ -52,7 +78,11 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ const matchSpec = entry.match; - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); + let maybePatternFilteredChangesets = changeSets; + if (entry.options?.sendMatchesOnly) { + maybePatternFilteredChangesets = filterChangesestOnPattern(changeSets, entry); + } + const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); From 19efd3ce8e4007265adff1829c308f28a58a002c Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 15:09:56 +0100 Subject: [PATCH 02/10] optimize filtering this tries to optimize filtering and match detection in two ways: - by grouping on match pattern: this means we only do detection and filtering once for a given match pattern (takes the pattern and sendMatchesOnly into account) - by doing matching before origin filter, since the latter seems more costly --- app.js | 177 ++++++++++++++++++++-------------------------------- matching.js | 79 +++++++++++++++++++++++ 2 files changed, 145 insertions(+), 111 deletions(-) create mode 100644 matching.js diff --git a/app.js b/app.js index 138185c..40435a9 100644 --- a/app.js +++ b/app.js @@ -2,15 +2,31 @@ import { app } from 'mu'; import services from './config/rules'; import normalizeQuad from './config/normalize-quad'; import bodyParser from 'body-parser'; -import dns from 'dns'; import { foldChangeSets } from './folding'; import { sendRequest } from './send-request'; import { sendBundledRequest } from './bundle-requests'; +import { + filterChangesetsOnPattern, + tripleMatchesSpec, + filterMatchesForOrigin, + hostnameForEntry +} from './matching'; // Log server config if requested if( process.env["LOG_SERVER_CONFIGURATION"] ) console.log(JSON.stringify( services )); +const groupedServices = services.reduce((acc, service) => { + // Create a unique key for the match pattern + const matchKey = `${normalizeObject(service.match)}${service.options.sendMatchesOnly || false}`; + if (!acc[matchKey]) { + acc[matchKey] = []; + } + acc[matchKey].push(service); + return acc; +}, {}); + + app.get( '/', function( req, res ) { res.status(200); res.send("Hello, delta notification is running"); @@ -43,129 +59,68 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { res.status(204).send(); } ); - -/** - * Filters the change sets based on a specified pattern. - * - * @param {Array} changeSets - An array of change set objects, - * each containing `insert` and `delete` properties. - * @param {Object} entry - An object containing the matching criteria. - * @param {Array} entry.match - The pattern used to filter the triples - * in the `insert` and `delete` arrays. - * @returns {Array} A new array of change set objects with - * filtered `insert` and `delete` properties. - */ -function filterChangesestOnPattern(changeSets, entry) { - const filteredChangesets = []; - for (const changeSet of changeSets) { - const { insert, delete: deleteSet } = changeSet; - const clonedChangeSet = { - ...changeSet, - insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)), - delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)) - }; - filteredChangesets.push(clonedChangeSet); - }; - return filteredChangesets; -} - async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ - services.map( async (entry, index) => { - entry.index = index; - // for each entity - if( process.env["DEBUG_DELTA_MATCH"] ) - console.log(`Checking if we want to send to ${entry.callback.url}`); - - const matchSpec = entry.match; - + // Iterate over each unique match pattern + for (const matchKey in groupedServices) { + const firstEntry = groupedServices[matchKey][0]; + // can use first entry since it's part of grouping + const sendMatchesOnly = firstEntry.options.sendMatchesOnly; let maybePatternFilteredChangesets = changeSets; - if (entry.options?.sendMatchesOnly) { - maybePatternFilteredChangesets = filterChangesestOnPattern(changeSets, entry); + if (sendMatchesOnly) { + maybePatternFilteredChangesets = filterChangesetsOnPattern(changeSets, firstEntry); } - const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); let allInserts = []; let allDeletes = []; - - originFilteredChangeSets.forEach( (change) => { + maybePatternFilteredChangesets.forEach( (change) => { allInserts = [...allInserts, ...change.insert]; allDeletes = [...allDeletes, ...change.delete]; } ); - const changedTriples = [...allInserts, ...allDeletes]; - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); - - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - - if( someTripleMatchedSpec ) { - // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) - console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - - if( entry.options && entry.options.gracePeriod ) { - sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); - } else { - const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); - sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); + changedTriples + .some( (triple) => tripleMatchesSpec( triple, firstEntry.match ) ); + const matchingServices = groupedServices[matchKey]; + matchingServices.map( async (entry, index) => { + if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + console.log(`Triple matches spec? ${someTripleMatchedSpec}`); + + if( someTripleMatchedSpec ) { + entry.index = index; + // for each entity + if( process.env["DEBUG_DELTA_MATCH"] ) + console.log(`Checking if we want to send to ${entry.callback.url}`); + const matchSpec = entry.match; + const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); + if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) + console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + + // inform matching entities + if( process.env["DEBUG_DELTA_SEND"] ) + console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); + + if( entry.options && entry.options.gracePeriod ) { + sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); + } else { + const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); + sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); + } } - } - } ); -} - -function tripleMatchesSpec( triple, matchSpec ) { - // form of triple is {s, p, o}, same as matchSpec - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); - - for( let key in matchSpec ){ - // key is one of s, p, o - const subMatchSpec = matchSpec[key]; - const subMatchValue = triple[key]; - - if( subMatchSpec && !subMatchValue ) - return false; - - for( let subKey in subMatchSpec ) - // we're now matching something like {type: "url", value: "http..."} - if( subMatchSpec[subKey] !== subMatchValue[subKey] ) - return false; - } - return true; // no false matches found, let's send a response -} - -async function filterMatchesForOrigin( changeSets, entry ) { - if( ! entry.options || !entry.options.ignoreFromSelf ) { - return changeSets; - } else { - try { - const originIpAddress = await getServiceIp( entry ); - return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); - } catch(e) { - console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`); - console.error(e); - return changeSets; - } + } ); } } -function hostnameForEntry( entry ) { - return (new URL(entry.callback.url)).hostname; +/** + * Normalizes an object by sorting its keys and converting it to a string. + * + * @param {Object} obj - The object to normalize. + * @returns {string} A string representation of the normalized object. + */ +function normalizeObject(obj) { + return JSON.stringify(Object.keys(obj) + .sort() + .reduce((acc, key) => { + acc[key] = obj[key]; + return acc; + }, {})); } - -async function getServiceIp(entry) { - const hostName = hostnameForEntry( entry ); - return new Promise( (resolve, reject) => { - dns.lookup( hostName, { family: 4 }, ( err, address) => { - if( err ) - reject( err ); - else - resolve( address ); - } ); - } ); -}; diff --git a/matching.js b/matching.js new file mode 100644 index 0000000..d0c8dd7 --- /dev/null +++ b/matching.js @@ -0,0 +1,79 @@ +import dns from 'dns'; + +/** + * Filters the change sets based on a specified pattern. + * + * @param {Array} changeSets - An array of change set objects, + * each containing `insert` and `delete` properties. + * @param {Object} entry - An object containing the matching criteria. + * @param {Array} entry.match - The pattern used to filter the triples + * in the `insert` and `delete` arrays. + * @returns {Array} A new array of change set objects with + * filtered `insert` and `delete` properties. + */ +export function filterChangesetsOnPattern(changeSets, entry) { + const filteredChangesets = []; + for (const changeSet of changeSets) { + const { insert, delete: deleteSet } = changeSet; + const clonedChangeSet = { + ...changeSet, + insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)), + delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)) + }; + filteredChangesets.push(clonedChangeSet); + }; + return filteredChangesets; +} + +export function tripleMatchesSpec( triple, matchSpec ) { + // form of triple is {s, p, o}, same as matchSpec + if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); + + for( let key in matchSpec ){ + // key is one of s, p, o + const subMatchSpec = matchSpec[key]; + const subMatchValue = triple[key]; + + if( subMatchSpec && !subMatchValue ) + return false; + + for( let subKey in subMatchSpec ) + // we're now matching something like {type: "url", value: "http..."} + if( subMatchSpec[subKey] !== subMatchValue[subKey] ) + return false; + } + return true; // no false matches found, let's send a response +} + + +export async function filterMatchesForOrigin( changeSets, entry ) { + if( ! entry.options || !entry.options.ignoreFromSelf ) { + return changeSets; + } else { + try { + const originIpAddress = await getServiceIp( entry ); + return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); + } catch(e) { + console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`); + console.error(e); + return changeSets; + } + } +} + +export function hostnameForEntry( entry ) { + return (new URL(entry.callback.url)).hostname; +} + +async function getServiceIp(entry) { + const hostName = hostnameForEntry( entry ); + return new Promise( (resolve, reject) => { + dns.lookup( hostName, { family: 4 }, ( err, address) => { + if( err ) + reject( err ); + else + resolve( address ); + } ); + } ); +}; From 34004ffa9fa3f849aba7ec53ffe64b6a50cdeeca Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 16:32:40 +0100 Subject: [PATCH 03/10] only try sending changesets if there are any left after filtering --- app.js | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/app.js b/app.js index 40435a9..28f969b 100644 --- a/app.js +++ b/app.js @@ -92,18 +92,21 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ console.log(`Checking if we want to send to ${entry.callback.url}`); const matchSpec = entry.match; const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); - - // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) - console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - - if( entry.options && entry.options.gracePeriod ) { - sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); - } else { - const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); - sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); + + if (originFilteredChangeSets.length > 0) { + if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) + console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + + // inform matching entities + if( process.env["DEBUG_DELTA_SEND"] ) + console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); + + if( entry.options && entry.options.gracePeriod ) { + sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); + } else { + const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); + sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); + } } } } ); From 6285770b7744c6038db81d322a29614cefa0bdb0 Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 17:00:53 +0100 Subject: [PATCH 04/10] ensure unique indexes are set on services this is not the prettiest of solutions, but it does achieve the same thing. bonus is it only happens once on startup --- app.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app.js b/app.js index 28f969b..0e23471 100644 --- a/app.js +++ b/app.js @@ -16,12 +16,14 @@ import { if( process.env["LOG_SERVER_CONFIGURATION"] ) console.log(JSON.stringify( services )); +let index = 0; const groupedServices = services.reduce((acc, service) => { // Create a unique key for the match pattern const matchKey = `${normalizeObject(service.match)}${service.options.sendMatchesOnly || false}`; if (!acc[matchKey]) { acc[matchKey] = []; } + service.index = index++; acc[matchKey].push(service); return acc; }, {}); @@ -81,12 +83,11 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ changedTriples .some( (triple) => tripleMatchesSpec( triple, firstEntry.match ) ); const matchingServices = groupedServices[matchKey]; - matchingServices.map( async (entry, index) => { + matchingServices.forEach( async (entry) => { if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) console.log(`Triple matches spec? ${someTripleMatchedSpec}`); if( someTripleMatchedSpec ) { - entry.index = index; // for each entity if( process.env["DEBUG_DELTA_MATCH"] ) console.log(`Checking if we want to send to ${entry.callback.url}`); From 9e2c2e8c7a2e0c7b8eccb51f1219dac06202cc0e Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 17:02:50 +0100 Subject: [PATCH 05/10] only do process.env lookups once it's mentioned in the expressjs best practices (and elsewhere) that process.env should be used sparingly: | If you need to write environment-specific code, you can check the value of NODE_ENV with process.env.NODE_ENV. Be aware that checking the value of any environment variable incurs a performance penalty, and so should be done sparingly. https://expressjs.com/th/advanced/best-practice-performance.html#set-node_env-to-production it seems this is because process.env is a getter that processes the ENV array each time --- app.js | 23 +++++++++++++++-------- bundle-requests.js | 5 +++-- config/normalize-quad.js | 2 +- env.js | 8 ++++++++ folding.js | 3 ++- matching.js | 4 ++-- send-request.js | 6 ++++-- 7 files changed, 35 insertions(+), 16 deletions(-) create mode 100644 env.js diff --git a/app.js b/app.js index 0e23471..9c96e77 100644 --- a/app.js +++ b/app.js @@ -11,9 +11,16 @@ import { filterMatchesForOrigin, hostnameForEntry } from './matching'; +import { + DEBUG_DELTA_MATCH, + DEBUG_DELTA_SEND, + DEBUG_TRIPLE_MATCHES_SPEC, + LOG_REQUESTS, + LOG_SERVER_CONFIGURATION, +} from './env'; // Log server config if requested -if( process.env["LOG_SERVER_CONFIGURATION"] ) +if(LOG_SERVER_CONFIGURATION) console.log(JSON.stringify( services )); let index = 0; @@ -35,7 +42,7 @@ app.get( '/', function( req, res ) { } ); app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { - if( process.env["LOG_REQUESTS"] ) { + if( LOG_REQUESTS ) { console.log("Logging request body"); console.log(req.body); } @@ -84,25 +91,25 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ .some( (triple) => tripleMatchesSpec( triple, firstEntry.match ) ); const matchingServices = groupedServices[matchKey]; matchingServices.forEach( async (entry) => { - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if( DEBUG_TRIPLE_MATCHES_SPEC ) console.log(`Triple matches spec? ${someTripleMatchedSpec}`); if( someTripleMatchedSpec ) { // for each entity - if( process.env["DEBUG_DELTA_MATCH"] ) + if( DEBUG_DELTA_MATCH ) console.log(`Checking if we want to send to ${entry.callback.url}`); const matchSpec = entry.match; const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); - if (originFilteredChangeSets.length > 0) { - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) + if ( originFilteredChangeSets.length > 0 ) { + if( DEBUG_TRIPLE_MATCHES_SPEC && entry.options.ignoreFromSelf ) console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) + if( DEBUG_DELTA_SEND ) console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - if( entry.options && entry.options.gracePeriod ) { + if( entry.options?.gracePeriod ) { sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); } else { const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); diff --git a/bundle-requests.js b/bundle-requests.js index 38b1226..233d728 100644 --- a/bundle-requests.js +++ b/bundle-requests.js @@ -1,5 +1,6 @@ import { foldChangeSets } from './folding'; import { sendRequest } from "./send-request.js"; +import { DEBUG_DELTA_SEND } from './env'; // map from bundle key to bundle object const bundles = {}; @@ -58,13 +59,13 @@ export const sendBundledRequest = ( existingBundle.bundledCallIdTrails.push(muCallIdTrail); // since an existing bundle exists, we don't need to send it after timeout, // the existing bundle will send us too - if (process.env["DEBUG_DELTA_SEND"]) { + if (DEBUG_DELTA_SEND) { console.log( `Adding to bundle for key ${bundleKey}, now contains ${existingBundle.changeSets.length} change sets` ); } } else { - if (process.env["DEBUG_DELTA_SEND"]) { + if (DEBUG_DELTA_SEND) { console.log( `Creating bundle for key ${bundleKey}, sending in ${entry.options.gracePeriod}ms` ); diff --git a/config/normalize-quad.js b/config/normalize-quad.js index c296860..67a0daa 100644 --- a/config/normalize-quad.js +++ b/config/normalize-quad.js @@ -1,4 +1,4 @@ -const normalizeDate = process.env["NORMALIZE_DATETIME_IN_QUAD"]; +import { NORMALIZE_DATETIME_IN_QUAD as normalizeDate} from './env'; export default function(quad) { if (normalizeDate && quad.object.datatype == 'http://www.w3.org/2001/XMLSchema#dateTime') { diff --git a/env.js b/env.js new file mode 100644 index 0000000..be7eeac --- /dev/null +++ b/env.js @@ -0,0 +1,8 @@ +export const DEBUG_DELTA_FOLD = process.env["DEBUG_DELTA_FOLD"]; +export const DEBUG_DELTA_MATCH = process.env["DEBUG_DELTA_MATCH"]; +export const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"]; +export const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"]; +export const DEBUG_TRIPLE_MATCHES_SPEC = process.env["DEBUG_TRIPLE_MATCHES_SPEC"]; +export const LOG_REQUESTS = process.env["LOG_REQUESTS"]; +export const LOG_SERVER_CONFIGURATION = process.env["LOG_SERVER_CONFIGURATION"]; +export const NORMALIZE_DATETIME_IN_QUAD = process.env["NORMALIZE_DATETIME_IN_QUAD"]; diff --git a/folding.js b/folding.js index d4a8f6e..efe316b 100644 --- a/folding.js +++ b/folding.js @@ -1,3 +1,4 @@ +import { DEBUG_DELTA_FOLD } from './env'; /** * Quads may be folded if * - a quad is deleted after it has been inserted @@ -44,7 +45,7 @@ export function foldChangeSets(entry, changeSets) { if (foldedInsertQuads.length) foldedChangeSets.push({ delete: [], insert: foldedInsertQuads }); - if (process.env["DEBUG_DELTA_FOLD"]) + if (DEBUG_DELTA_FOLD) console.log(`Folded changeset from:\n ${JSON.stringify(changeSets)}\nto:\n ${JSON.stringify(foldedChangeSets)}`); return foldedChangeSets; diff --git a/matching.js b/matching.js index d0c8dd7..620ffbe 100644 --- a/matching.js +++ b/matching.js @@ -1,5 +1,5 @@ import dns from 'dns'; - +import { DEBUG_TRIPLE_MATCHES_SPEC } from './env'; /** * Filters the change sets based on a specified pattern. * @@ -27,7 +27,7 @@ export function filterChangesetsOnPattern(changeSets, entry) { export function tripleMatchesSpec( triple, matchSpec ) { // form of triple is {s, p, o}, same as matchSpec - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if(DEBUG_TRIPLE_MATCHES_SPEC) console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); for( let key in matchSpec ){ diff --git a/send-request.js b/send-request.js index e19a990..3da6c0e 100644 --- a/send-request.js +++ b/send-request.js @@ -2,6 +2,8 @@ import http from "http"; import { uuid } from "mu"; const DEFAULT_RETRY_TIMEOUT = 250; +const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"]; +const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"]; function formatChangesetBody(changeSets, options) { if (options.resourceFormat == "v0.0.1") { @@ -115,7 +117,7 @@ export async function sendRequest( // we should send contents body = formatChangesetBody(changeSets, entry.options); } - if (process.env["DEBUG_DELTA_SEND"]) + if (DEBUG_DELTA_SEND) console.log(`Executing send ${method} to ${url}`); try { const keepAliveAgent = new http.Agent({ @@ -140,7 +142,7 @@ export async function sendRequest( console.log(error); } } else { - if (process.env["DEBUG_DELTA_SEND"] || process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"]) + if (DEBUG_DELTA_SEND || DEBUG_DELTA_NOT_SENDING_EMPTY) console.log(`Changeset empty. Not sending to ${entry.callback.method} ${entry.callback.url}`); } } From 69c219f5efa84f8bb7987c5fbc9a3ee8c6caa5e0 Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 18:43:44 +0100 Subject: [PATCH 06/10] better logging of failed requests --- send-request.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/send-request.js b/send-request.js index 3da6c0e..4e6cb7b 100644 --- a/send-request.js +++ b/send-request.js @@ -139,7 +139,7 @@ export async function sendRequest( retriesLeft ); } catch (error) { - console.log(error); + console.error(`Error sending delta to ${url}`, error); } } else { if (DEBUG_DELTA_SEND || DEBUG_DELTA_NOT_SENDING_EMPTY) From fbfe7040a608b979ca40a96d3ad2e184f100454a Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 20:07:51 +0100 Subject: [PATCH 07/10] also filter effectiveInsert and effectiveDelete --- matching.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/matching.js b/matching.js index 620ffbe..2b51a3b 100644 --- a/matching.js +++ b/matching.js @@ -14,11 +14,13 @@ import { DEBUG_TRIPLE_MATCHES_SPEC } from './env'; export function filterChangesetsOnPattern(changeSets, entry) { const filteredChangesets = []; for (const changeSet of changeSets) { - const { insert, delete: deleteSet } = changeSet; + const { insert, delete: deleteSet, effectiveInsert, effectiveDelete } = changeSet; const clonedChangeSet = { ...changeSet, insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)), - delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)) + delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)), + effectiveInsert: effectiveInsert.filter((triple) => tripleMatchesSpec(triple, entry.match)), + effectiveDelete: effectiveDelete.filter((triple) => tripleMatchesSpec(triple, entry.match)), }; filteredChangesets.push(clonedChangeSet); }; From 921a094cb9e7c47da7fa6febd04afc11618bf054 Mon Sep 17 00:00:00 2001 From: Niels V Date: Thu, 23 Jan 2025 20:30:07 +0100 Subject: [PATCH 08/10] load env from correct path --- config/normalize-quad.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/normalize-quad.js b/config/normalize-quad.js index 67a0daa..7b51565 100644 --- a/config/normalize-quad.js +++ b/config/normalize-quad.js @@ -1,4 +1,4 @@ -import { NORMALIZE_DATETIME_IN_QUAD as normalizeDate} from './env'; +import { NORMALIZE_DATETIME_IN_QUAD as normalizeDate} from '../env'; export default function(quad) { if (normalizeDate && quad.object.datatype == 'http://www.w3.org/2001/XMLSchema#dateTime') { From d77e06dff470e7de87e2499561c2825663dc1357 Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Thu, 15 May 2025 09:57:04 +0200 Subject: [PATCH 09/10] add error handling --- app.js | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/app.js b/app.js index 9c96e77..7665650 100644 --- a/app.js +++ b/app.js @@ -1,4 +1,4 @@ -import { app } from 'mu'; +import { app, errorHandler } from 'mu'; import services from './config/rules'; import normalizeQuad from './config/normalize-quad'; import bodyParser from 'body-parser'; @@ -68,6 +68,19 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { res.status(204).send(); } ); +// log delta's ignored because of size +app.use((err, req, res, next) => { + if (err.type === 'entity.too.large') { + console.warn(`Payload too large for ${req.method} ${req.originalUrl}`); + return res.status(413).send('Payload too large'); + } + + // Pass other errors to the default handler + next(err); +}); + +app.use(errorHandler); + async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ // Iterate over each unique match pattern for (const matchKey in groupedServices) { From a276b32f37d7828042805da662a9030f28fab182 Mon Sep 17 00:00:00 2001 From: Ruben Date: Tue, 23 Sep 2025 12:16:50 +0200 Subject: [PATCH 10/10] optimize filtering for sending only matches to services - avoid recalculating changedTriples in every loop - if sendMatchesOnly is true, a changeset might be empty. Do not send a changeset in this case - don't redo checking matches if this was already done because of `sendMatchesOnly` --- app.js | 42 ++++++++++++++++++++++-------------------- matching.js | 10 ++++++++-- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/app.js b/app.js index 7665650..c83d67b 100644 --- a/app.js +++ b/app.js @@ -82,6 +82,14 @@ app.use((err, req, res, next) => { app.use(errorHandler); async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ + let allInserts = []; + let allDeletes = []; + changeSets.forEach( (change) => { + allInserts = [...allInserts, ...change.insert]; + allDeletes = [...allDeletes, ...change.delete]; + } ); + const changedTriples = [...allInserts, ...allDeletes]; + // Iterate over each unique match pattern for (const matchKey in groupedServices) { const firstEntry = groupedServices[matchKey][0]; @@ -92,31 +100,25 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ maybePatternFilteredChangesets = filterChangesetsOnPattern(changeSets, firstEntry); } - let allInserts = []; - let allDeletes = []; - maybePatternFilteredChangesets.forEach( (change) => { - allInserts = [...allInserts, ...change.insert]; - allDeletes = [...allDeletes, ...change.delete]; - } ); - const changedTriples = [...allInserts, ...allDeletes]; - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, firstEntry.match ) ); - const matchingServices = groupedServices[matchKey]; - matchingServices.forEach( async (entry) => { - if( DEBUG_TRIPLE_MATCHES_SPEC ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - - if( someTripleMatchedSpec ) { + const someTripleMatchedSpec = sendMatchesOnly + ? maybePatternFilteredChangesets.length > 0 // makes the assumption that maybePatternFilteredChangesets has no empty change sets + : changedTriples.some((triple) => + tripleMatchesSpec(triple, firstEntry.match) + ); + + if( someTripleMatchedSpec ) { + const matchingServices = groupedServices[matchKey]; + matchingServices.forEach( async (entry) => { + if( DEBUG_TRIPLE_MATCHES_SPEC ) + console.log(`Triple matches spec? ${someTripleMatchedSpec}`); // for each entity if( DEBUG_DELTA_MATCH ) console.log(`Checking if we want to send to ${entry.callback.url}`); - const matchSpec = entry.match; const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); if ( originFilteredChangeSets.length > 0 ) { if( DEBUG_TRIPLE_MATCHES_SPEC && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + console.log(`There are ${originFilteredChangeSets.length} change sets not from ${hostnameForEntry( entry )}`); // inform matching entities if( DEBUG_DELTA_SEND ) @@ -129,8 +131,8 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); } } - } - } ); + }); + } } } diff --git a/matching.js b/matching.js index 2b51a3b..52fee63 100644 --- a/matching.js +++ b/matching.js @@ -22,8 +22,14 @@ export function filterChangesetsOnPattern(changeSets, entry) { effectiveInsert: effectiveInsert.filter((triple) => tripleMatchesSpec(triple, entry.match)), effectiveDelete: effectiveDelete.filter((triple) => tripleMatchesSpec(triple, entry.match)), }; - filteredChangesets.push(clonedChangeSet); - }; + // do not keep empty change sets + if( clonedChangeSet.insert.length > 0 + || clonedChangeSet.delete.length > 0 + || clonedChangeSet.effectiveInsert.length > 0 + || clonedChangeSet.effectiveDelete.length > 0 ) { + filteredChangesets.push(clonedChangeSet); + } + } return filteredChangesets; }