Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The exported property contains an array of definitions, each linking a match to
- `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname).
- `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order!
- `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms.
- `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes.

### Modifying quads
#### Normalize datetime
Expand Down
32 changes: 31 additions & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
res.status(204).send();
} );


/**
* Filters the change sets based on a specified pattern.
*
* @param {Array<Object>} changeSets - An array of change set objects,
* each containing `insert` and `delete` properties.
* @param {Object} entry - An object containing the matching criteria.
* @param {Array} entry.match - The pattern used to filter the triples
* in the `insert` and `delete` arrays.
* @returns {Array<Object>} A new array of change set objects with
* filtered `insert` and `delete` properties.
*/
function filterChangesestOnPattern(changeSets, entry) {
const filteredChangesets = [];
for (const changeSet of changeSets) {
const { insert, delete: deleteSet } = changeSet;
const clonedChangeSet = {
...changeSet,
insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)),
delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match))
};
filteredChangesets.push(clonedChangeSet);
};
return filteredChangesets;
}

async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
services.map( async (entry, index) => {
entry.index = index;
Expand All @@ -52,7 +78,11 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){

const matchSpec = entry.match;

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
let maybePatternFilteredChangesets = changeSets;
if (entry.options?.sendMatchesOnly) {
maybePatternFilteredChangesets = filterChangesestOnPattern(changeSets, entry);
}
const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);

Expand Down