Skip to content
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ We first present an example, next we explain each of the properties. The follow
export default [
{
match: {
// form of element is {subject,predicate,object}
// form of element is {subject,predicate,object,graph}
// predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" }
},
callback: {
Expand All @@ -52,6 +52,19 @@ export default [
gracePeriod: 1000,
ignoreFromSelf: true
}
},
{
match: {
// Example using regex matching for URIs
predicate: {
type: "uri",
value: /^http:\/\/example\.com\/ontology\/.+$/
}
},
callback: {
url: "http://ontology-service/.mu/delta",
method: "POST"
}
}
]
```
Expand All @@ -62,6 +75,7 @@ The exported property contains an array of definitions, each linking a match to
- `match.subject`: Matches the subject. Both `type` and `value` may be specified.
- `match.predicate`: Matches the predicade. Both `type` and `value` may be specified.
- `match.object`: Matches the object. Both `type` and `value` may be specified.
- `match.graph`: Matches the graph. Both `type` and `value` may be specified.
- `callback`: The place to inform about a matched delta
- `callback.url`: URL to inform about a match
- `callback.method`: Method to use when informing about a match
Expand All @@ -72,6 +86,7 @@ The exported property contains an array of definitions, each linking a match to
- `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname).
- `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order!
- `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms.
- `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes.

### Modifying quads
#### Normalize datetime
Expand Down
191 changes: 101 additions & 90 deletions app.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
import { app } from 'mu';
import { app, errorHandler } from 'mu';
import services from './config/rules';
import normalizeQuad from './config/normalize-quad';
import bodyParser from 'body-parser';
import dns from 'dns';
import { foldChangeSets } from './folding';
import { sendRequest } from './send-request';
import { sendBundledRequest } from './bundle-requests';
import {
filterChangesetsOnPattern,
tripleMatchesSpec,
filterMatchesForOrigin,
hostnameForEntry
} from './matching';
import {
DEBUG_DELTA_MATCH,
DEBUG_DELTA_SEND,
DEBUG_TRIPLE_MATCHES_SPEC,
LOG_REQUESTS,
LOG_SERVER_CONFIGURATION,
} from './env';

// Log server config if requested
if( process.env["LOG_SERVER_CONFIGURATION"] )
if(LOG_SERVER_CONFIGURATION)
console.log(JSON.stringify( services ));

let index = 0;
const groupedServices = services.reduce((acc, service) => {
// Create a unique key for the match pattern
const matchKey = `${normalizeObject(service.match)}${service.options.sendMatchesOnly || false}`;
if (!acc[matchKey]) {
acc[matchKey] = [];
}
service.index = index++;
acc[matchKey].push(service);
return acc;
}, {});


app.get( '/', function( req, res ) {
res.status(200);
res.send("Hello, delta notification is running");
} );

app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
if( process.env["LOG_REQUESTS"] ) {
if( LOG_REQUESTS ) {
console.log("Logging request body");
console.log(req.body);
}
Expand All @@ -43,99 +68,85 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
res.status(204).send();
} );

async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
services.map( async (entry, index) => {
entry.index = index;
// for each entity
if( process.env["DEBUG_DELTA_MATCH"] )
console.log(`Checking if we want to send to ${entry.callback.url}`);

const matchSpec = entry.match;

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);

let allInserts = [];
let allDeletes = [];

originFilteredChangeSets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );

const changedTriples = [...allInserts, ...allDeletes];
// log delta's ignored because of size
app.use((err, req, res, next) => {
if (err.type === 'entity.too.large') {
console.warn(`Payload too large for ${req.method} ${req.originalUrl}`);
return res.status(413).send('Payload too large');
}

const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );
// Pass other errors to the default handler
next(err);
});

if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);
app.use(errorHandler);

if( someTripleMatchedSpec ) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if( entry.options && entry.options.gracePeriod ) {
sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId);
} else {
const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets );
sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId );
}
}
async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
let allInserts = [];
let allDeletes = [];
changeSets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );
}

function tripleMatchesSpec( triple, matchSpec ) {
// form of triple is {s, p, o}, same as matchSpec
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`);

for( let key in matchSpec ){
// key is one of s, p, o
const subMatchSpec = matchSpec[key];
const subMatchValue = triple[key];

if( subMatchSpec && !subMatchValue )
return false;

for( let subKey in subMatchSpec )
// we're now matching something like {type: "url", value: "http..."}
if( subMatchSpec[subKey] !== subMatchValue[subKey] )
return false;
}
return true; // no false matches found, let's send a response
}
const changedTriples = [...allInserts, ...allDeletes];

// Iterate over each unique match pattern
for (const matchKey in groupedServices) {
const firstEntry = groupedServices[matchKey][0];
// can use first entry since it's part of grouping
const sendMatchesOnly = firstEntry.options.sendMatchesOnly;
let maybePatternFilteredChangesets = changeSets;
if (sendMatchesOnly) {
maybePatternFilteredChangesets = filterChangesetsOnPattern(changeSets, firstEntry);
}

async function filterMatchesForOrigin( changeSets, entry ) {
if( ! entry.options || !entry.options.ignoreFromSelf ) {
return changeSets;
} else {
try {
const originIpAddress = await getServiceIp( entry );
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
} catch(e) {
console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`);
console.error(e);
return changeSets;
const someTripleMatchedSpec = sendMatchesOnly
? maybePatternFilteredChangesets.length > 0 // makes the assumption that maybePatternFilteredChangesets has no empty change sets
: changedTriples.some((triple) =>
tripleMatchesSpec(triple, firstEntry.match)
);

if( someTripleMatchedSpec ) {
const matchingServices = groupedServices[matchKey];
matchingServices.forEach( async (entry) => {
if( DEBUG_TRIPLE_MATCHES_SPEC )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);
// for each entity
if( DEBUG_DELTA_MATCH )
console.log(`Checking if we want to send to ${entry.callback.url}`);
const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry );

if ( originFilteredChangeSets.length > 0 ) {
if( DEBUG_TRIPLE_MATCHES_SPEC && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} change sets not from ${hostnameForEntry( entry )}`);

// inform matching entities
if( DEBUG_DELTA_SEND )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if( entry.options?.gracePeriod ) {
sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId);
} else {
const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets );
sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId );
}
}
});
}
}
}

function hostnameForEntry( entry ) {
return (new URL(entry.callback.url)).hostname;
/**
* Normalizes an object by sorting its keys and converting it to a string.
*
* @param {Object} obj - The object to normalize.
* @returns {string} A string representation of the normalized object.
*/
function normalizeObject(obj) {
return JSON.stringify(Object.keys(obj)
.sort()
.reduce((acc, key) => {
acc[key] = obj[key];
return acc;
}, {}));
}

async function getServiceIp(entry) {
const hostName = hostnameForEntry( entry );
return new Promise( (resolve, reject) => {
dns.lookup( hostName, { family: 4 }, ( err, address) => {
if( err )
reject( err );
else
resolve( address );
} );
} );
};
5 changes: 3 additions & 2 deletions bundle-requests.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { foldChangeSets } from './folding';
import { sendRequest } from "./send-request.js";
import { DEBUG_DELTA_SEND } from './env';

// map from bundle key to bundle object
const bundles = {};
Expand Down Expand Up @@ -58,13 +59,13 @@ export const sendBundledRequest = (
existingBundle.bundledCallIdTrails.push(muCallIdTrail);
// since an existing bundle exists, we don't need to send it after timeout,
// the existing bundle will send us too
if (process.env["DEBUG_DELTA_SEND"]) {
if (DEBUG_DELTA_SEND) {
console.log(
`Adding to bundle for key ${bundleKey}, now contains ${existingBundle.changeSets.length} change sets`
);
}
} else {
if (process.env["DEBUG_DELTA_SEND"]) {
if (DEBUG_DELTA_SEND) {
console.log(
`Creating bundle for key ${bundleKey}, sending in ${entry.options.gracePeriod}ms`
);
Expand Down
2 changes: 1 addition & 1 deletion config/normalize-quad.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const normalizeDate = process.env["NORMALIZE_DATETIME_IN_QUAD"];
import { NORMALIZE_DATETIME_IN_QUAD as normalizeDate} from '../env';

export default function(quad) {
if (normalizeDate && quad.object.datatype == 'http://www.w3.org/2001/XMLSchema#dateTime') {
Expand Down
8 changes: 8 additions & 0 deletions env.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export const DEBUG_DELTA_FOLD = process.env["DEBUG_DELTA_FOLD"];
export const DEBUG_DELTA_MATCH = process.env["DEBUG_DELTA_MATCH"];
export const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"];
export const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"];
export const DEBUG_TRIPLE_MATCHES_SPEC = process.env["DEBUG_TRIPLE_MATCHES_SPEC"];
export const LOG_REQUESTS = process.env["LOG_REQUESTS"];
export const LOG_SERVER_CONFIGURATION = process.env["LOG_SERVER_CONFIGURATION"];
export const NORMALIZE_DATETIME_IN_QUAD = process.env["NORMALIZE_DATETIME_IN_QUAD"];
3 changes: 2 additions & 1 deletion folding.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DEBUG_DELTA_FOLD } from './env';
/**
* Quads may be folded if
* - a quad is deleted after it has been inserted
Expand Down Expand Up @@ -44,7 +45,7 @@ export function foldChangeSets(entry, changeSets) {
if (foldedInsertQuads.length)
foldedChangeSets.push({ delete: [], insert: foldedInsertQuads });

if (process.env["DEBUG_DELTA_FOLD"])
if (DEBUG_DELTA_FOLD)
console.log(`Folded changeset from:\n ${JSON.stringify(changeSets)}\nto:\n ${JSON.stringify(foldedChangeSets)}`);

return foldedChangeSets;
Expand Down
Loading