From 66f2948ea5089c06bfc74779570555e469c3c218 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 13:01:49 +0200 Subject: [PATCH 1/4] add version v0.0.2 with effective inserts and deletions (requires other mu-authorization version! --- app.js | 314 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 200 insertions(+), 114 deletions(-) diff --git a/app.js b/app.js index 97c710a..eaa46e7 100644 --- a/app.js +++ b/app.js @@ -5,208 +5,294 @@ import bodyParser from 'body-parser'; import dns from 'dns'; // Also parse application/json as json -app.use( bodyParser.json( { - type: function(req) { - return /^application\/json/.test( req.get('content-type') ); +app.use(bodyParser.json({ + type: function (req) { + return /^application\/json/.test(req.get('content-type')); }, limit: '500mb' -} ) ); +})); // Log server config if requested -if( process.env["LOG_SERVER_CONFIGURATION"] ) - console.log(JSON.stringify( services )); +if (process.env["LOG_SERVER_CONFIGURATION"]) + console.log(JSON.stringify(services)); -app.get( '/', function( req, res ) { +app.get('/', function (req, res) { res.status(200); res.send("Hello, delta notification is running"); -} ); +}); -app.post( '/', function( req, res ) { - if( process.env["LOG_REQUESTS"] ) { +app.post('/', function (req, res) { + if (process.env["LOG_REQUESTS"]) { console.log("Logging request body"); console.log(req.body); } + console.log(req.body); + console.log(req.get('mu-call-id-trail')); + const changeSets = req.body.changeSets; - const originalMuCallIdTrail = JSON.parse( req.get('mu-call-id-trail') || "[]" ); + // const originalMuCallIdTrail = JSON.parse(req.get('mu-call-id-trail') || "[]"); const originalMuCallId = req.get('mu-call-id'); - const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] ); + // const muCallIdTrail = JSON.stringify([...originalMuCallIdTrail, originalMuCallId]); - changeSets.forEach( (change) => { - change.insert = change.insert || []; - change.delete = change.delete || []; - } ); + changeSets.forEach((change) => { + change.effectiveInserts = change.effectiveInserts || []; + change.effectiveDeletes = change.effectiveDeletes || []; + change.inserts = change.inserts || []; + change.deletes = change.deletes || []; + }); // inform watchers - informWatchers( changeSets, res, muCallIdTrail ); + informWatchers(changeSets, res, originalMuCallId); // push relevant data to interested actors res.status(204).send(); -} ); +}); + +// { +// changeSets: +// [{ +// origin: '172.28.0.1', +// mu_call_id_trail: '[]', +// insert: [Array], +// delete: [Array], +// index: 1629274666268, +// authorization_groups: +// '[{"variables":[],"name":"public"},{"variables":[],"name":"user-lookup"},{"variables":[],"name":"clean"}]' +// }] +// } + +function getMatchOnEffective(entry) { + if (!(entry.options && "matchOnEffective" in entry.options)) return false; // DEFAULT + return entry.options.matchOnEffective; +} -async function informWatchers( changeSets, res, muCallIdTrail ){ - services.map( async (entry) => { +function getRequestPerMuCallIdTrail(entry) { + if (!(entry.options && "requestPerMuCallIdTrail" in entry.options)) return true; // DEFAULT + return entry.options.requestPerMuCallIdTrail; +} + +async function informWatchers(changeSets, res, originalMuCallId) { + services.map(async (entry) => { // for each entity - if( process.env["DEBUG_DELTA_MATCH"] ) + if (process.env["DEBUG_DELTA_MATCH"]) console.log(`Checking if we want to send to ${entry.callback.url}`); const matchSpec = entry.match; - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + const originFilteredChangeSets = await filterMatchesForOrigin(changeSets, entry); + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf) + console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry(entry)}`); let allInserts = []; let allDeletes = []; - originFilteredChangeSets.forEach( (change) => { - allInserts = [...allInserts, ...change.insert]; - allDeletes = [...allDeletes, ...change.delete]; - } ); + if (getMatchOnEffective(entry)) { + originFilteredChangeSets.forEach((change) => { + allInserts = [...allInserts, ...change.effectiveInserts]; + allDeletes = [...allDeletes, ...change.effectiveDeletes]; + }); + } else { + originFilteredChangeSets.forEach((change) => { + allInserts = [...allInserts, ...change.inserts]; + allDeletes = [...allDeletes, ...change.deletes]; + }); + } const changedTriples = [...allInserts, ...allDeletes]; const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); + changedTriples + .some((triple) => tripleMatchesSpec(triple, matchSpec)); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - if( someTripleMatchedSpec ) { + if (someTripleMatchedSpec) { // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) + if (process.env["DEBUG_DELTA_SEND"]) console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - if( entry.options && entry.options.gracePeriod ) { + if (entry.options && entry.options.gracePeriod) { setTimeout( - () => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ), - entry.options.gracePeriod ); + () => sendRequest(entry, originFilteredChangeSets, originalMuCallId), + entry.options.gracePeriod); } else { - sendRequest( entry, originFilteredChangeSets, muCallIdTrail ); + sendRequest(entry, originFilteredChangeSets, originalMuCallId); } } - } ); + }); } -function tripleMatchesSpec( triple, matchSpec ) { +function tripleMatchesSpec(triple, matchSpec) { // form of triple is {s, p, o}, same as matchSpec - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); - for( let key in matchSpec ){ + for (let key in matchSpec) { // key is one of s, p, o const subMatchSpec = matchSpec[key]; const subMatchValue = triple[key]; - if( subMatchSpec && !subMatchValue ) + if (subMatchSpec && !subMatchValue) return false; - for( let subKey in subMatchSpec ) + for (let subKey in subMatchSpec) // we're now matching something like {type: "url", value: "http..."} - if( subMatchSpec[subKey] !== subMatchValue[subKey] ) + if (subMatchSpec[subKey] !== subMatchValue[subKey]) return false; } return true; // no false matches found, let's send a response } -function formatChangesetBody( changeSets, options ) { - if( options.resourceFormat == "v0.0.1" ) { - return JSON.stringify( - changeSets.map( (change) => { - return { - inserts: change.insert, - deletes: change.delete - }; - } ) ); - } - if( options.resourceFormat == "v0.0.0-genesis" ) { - // [{delta: {inserts, deletes}] - const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" }); - const newFormat = JSON.parse( formatChangesetBody( changeSets, newOptions ) ); - return JSON.stringify({ - // graph: Not available - delta: { - inserts: newFormat - .flatMap( ({inserts}) => inserts) - .map( ({subject,predicate,object}) => - ( { s: subject.value, p: predicate.value, o: object.value } ) ), - deletes: newFormat - .flatMap( ({deletes}) => deletes) - .map( ({subject,predicate,object}) => - ( { s: subject.value, p: predicate.value, o: object.value } ) ) - } - }); - } else { - throw `Unknown resource format ${options.resourceFormat}`; +function formatChangesetBody(changeSets, options) { + switch (options.resourceFormat) { + case "v0.0.2": + return formatV002(changeSets, options); + case "v0.0.1": + return formatV001(changeSets, options); + case "v0.0.0-genesis": + return formatV000Genesis(changeSets, options); + default: + throw `Unknown resource format ${options.resourceFormat}`; } } -async function sendRequest( entry, changeSets, muCallIdTrail ) { - let requestObject; // will contain request information + +function formatV002(changeSets, options) { + return JSON.stringify( + changeSets.map((change) => { + return { + inserts: change.inserts, + deletes: change.deletes, + effectiveInserts: change.effectiveInserts, + effectiveDeletes: change.effectiveDeletes, + index: change.index + }; + })); +} + +function formatV001(changeSets, options) { + return JSON.stringify( + changeSets.map((change) => { + return { + inserts: change.inserts, + deletes: change.deletes + }; + })); +} + +function formatV000Genesis(changeSets, options) { + const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" }); + const newFormat = JSON.parse(formatV001(changeSets, newOptions)); + return JSON.stringify({ + // graph: Not available + delta: { + inserts: newFormat + .flatMap(({ inserts }) => inserts) + .map(({ subject, predicate, object }) => + ({ s: subject.value, p: predicate.value, o: object.value })), + deletes: newFormat + .flatMap(({ deletes }) => deletes) + .map(({ subject, predicate, object }) => + ({ s: subject.value, p: predicate.value, o: object.value })) + } + }); +} + +function createMuCallIdTrail(trail, originalMuCallId) { + const originalMuCallIdTrail = JSON.parse(trail); + const muCallIdTrail = JSON.stringify([...originalMuCallIdTrail, originalMuCallId]); + return muCallIdTrail; +} + +async function sendRequest(entry, changeSets, originalMuCallId) { + const requestObjects = []; // will contain request information + + let changesPerMuCallIdTrail = {}; + + if (getRequestPerMuCallIdTrail(entry)) { + for (let change of changeSets) { + const trail = change.muCallIdTrail || '[]'; + if (!changesPerMuCallIdTrail[trail]) changesPerMuCallIdTrail[trail] = []; + changesPerMuCallIdTrail[trail].push(change); + } + } else { + // Generic purposes, just one element to loop over + changesPerMuCallIdTrail[changeSets[0].muCallIdTrail || '[]'] = changeSets; + } // construct the requestObject const method = entry.callback.method; const url = entry.callback.url; - const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": changeSets[0].allowedGroups, "mu-call-id-trail": muCallIdTrail, "mu-call-id": uuid() }; - if( entry.options && entry.options.resourceFormat ) { - // we should send contents - const body = formatChangesetBody( changeSets, entry.options ); + for (let muCallIdTrail in changesPerMuCallIdTrail) { + const full_trail = createMuCallIdTrail(muCallIdTrail, originalMuCallId); - // TODO: we now assume the mu-auth-allowed-groups will be the same - // for each changeSet. that's a simplification and we should not - // depend on it. + if (entry.options && entry.options.resourceFormat) { + const current_changes = changesPerMuCallIdTrail[muCallIdTrail]; - requestObject = { - url, method, - headers, - body: body - }; - } else { - // we should only inform - requestObject = { url, method, headers }; + const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": current_changes[0].allowedGroups, "mu-call-id-trail": full_trail, "mu-call-id": uuid() }; + // we should send contents + const body = formatChangesetBody(current_changes, entry.options); + + // TODO: we now assume the mu-auth-allowed-groups will be the same + // for each changeSet. that's a simplification and we should not + // depend on it. + + requestObjects.push({ + url, method, + headers, + body: body + }); + } else { + // we should only inform + requestObjects.push({ url, method, headers }); + } } - if( process.env["DEBUG_DELTA_SEND"] ) + + if (process.env["DEBUG_DELTA_SEND"]) console.log(`Executing send ${method} to ${url}`); - request( requestObject, function( error, response, body ) { - if( error ) { - console.log(`Could not send request ${method} ${url}`); - console.log(error); - console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send - } + for (let requestObject of requestObjects) { + request(requestObject, function (error, response, body) { + if (error) { + console.log(`Could not send request ${method} ${url}`); + console.log(error); + console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send + } - if( response ) { - // console.log( body ); - } - }); + if (response) { + // console.log( body ); + } + }); + } } -async function filterMatchesForOrigin( changeSets, entry ) { - if( ! entry.options || !entry.options.ignoreFromSelf ) { +async function filterMatchesForOrigin(changeSets, entry) { + if (!entry.options || !entry.options.ignoreFromSelf) { return changeSets; } else { - const originIpAddress = await getServiceIp( entry ); - return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); + const originIpAddress = await getServiceIp(entry); + return changeSets.filter((changeSet) => changeSet.origin != originIpAddress); } } -function hostnameForEntry( entry ) { +function hostnameForEntry(entry) { return (new URL(entry.callback.url)).hostname; } async function getServiceIp(entry) { - const hostName = hostnameForEntry( entry ); - return new Promise( (resolve, reject) => { - dns.lookup( hostName, { family: 4 }, ( err, address) => { - if( err ) - reject( err ); + const hostName = hostnameForEntry(entry); + return new Promise((resolve, reject) => { + dns.lookup(hostName, { family: 4 }, (err, address) => { + if (err) + reject(err); else - resolve( address ); - } ); - } ); + resolve(address); + }); + }); }; From b1fd0d6b3633c36d5520d5a41afa01b3a5af7f0b Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 13:11:39 +0200 Subject: [PATCH 2/4] update README.md --- README.md | 33 ++++++++++++++++++++++++++++++--- app.js | 28 ++++++---------------------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index cb44e1f..d9b5b6c 100644 --- a/README.md +++ b/README.md @@ -69,14 +69,41 @@ The exported property contains an array of definitions, each linking a match to - `options.resourceFormat`: Version format describing the format of the contents. Keys may be added to this format, but they may not be removed. Filter the properties as needed. - `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future. - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). + - `options.matchOnEffective`: Only match with effective inserts and deletions, not all inserts and deletions. Default: false. + - `options.requestPerMuCallIdTrail`: Execute one request per muCallIdTrail. mu-authorization batches multiple requests, which might have different idTrails. ## Delta formats The delta may be offered in multiple formats. Versions should match the exact string. Specify `options.resourceFormat` to indicate the specific resourceformat. +#### v0.0.2 + +v0.0.2 is the last format added. It makes a distinction between all inserts and deletions and effective inserts and deletions. An inserts is effective if the triple is not yet in the triplestore. A deletion is effective if the triple was present in the triplestore. + +This version uses an updated mu-authorization that batches inserts and deletions together to improve performance. This imposes the need for an `index` field. This `index` is incremental starting from Unix time in milliseconds. + +```json + [ + { "inserts": [{"subject": { "type": "uri", "value": "http://mu.semte.ch/" }, + "predicate": { "type": "uri", "value": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" }, + "object": { "type": "uri", "value": "https://schema.org/Project" }}, + {"subject": { "type": "uri", "value": "http://mu.semte.ch/" }, + "predicate": { "type": "uri", "value": "http://purl.org/dc/terms/modified" }, + "object": { "type": "literal", "value": "https://schema.org/Project", "datatype": "http://www.w3.org/2001/XMLSchema#dateTime"}}], + "deletes": [], + "effectiveInserts": [{"subject": { "type": "uri", "value": "http://mu.semte.ch/" }, + "predicate": { "type": "uri", "value": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" }, + "object": { "type": "uri", "value": "https://schema.org/Project" }}], + "effectiveDeletions": [], + "index" :1629367246152 } + ] +``` + + + ### v0.0.1 -v0.0.1 is the latest format of the delta messages. It may be extended with authorization rights etc. in the future. The value encoding follows the [json-sparql spec RDF term encoding](https://www.w3.org/TR/sparql11-results-json/#select-encode-terms). For example: +v0.0.1 may be extended with authorization rights etc. in the future. The value encoding follows the [json-sparql spec RDF term encoding](https://www.w3.org/TR/sparql11-results-json/#select-encode-terms). For example: ```json [ @@ -95,7 +122,7 @@ v0.0.1 is the latest format of the delta messages. It may be extended with autho Genesis format as described by the initial Delta service PoC. It looks like: ```json - { + { "delta": { "inserts": [{"s": "http://mu.semte.ch/", "p": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", @@ -115,7 +142,7 @@ Debugging can be enabled in the service by setting environment variables. The f - `DEBUG_DELTA_SEND`: Logs all delta messages that are being sent to clients - `DEBUG_DELTA_MATCH`: Logs a check for each target block, indicating a check will occur - `DEBUG_TRIPLE_MATCHES_SPEC`: Extensive logging for triples matching a given specification. Handy when requests are unexpectedly not sent. - + ## Extending You are encouraged to help figure out how to best extend this service. Fork this repository. Run an experiment. Open an issue or PR describing your experiment. Feel free to open up an issue if you would like to discuss a possible extension. diff --git a/app.js b/app.js index eaa46e7..d846d96 100644 --- a/app.js +++ b/app.js @@ -27,9 +27,6 @@ app.post('/', function (req, res) { console.log(req.body); } - console.log(req.body); - console.log(req.get('mu-call-id-trail')); - const changeSets = req.body.changeSets; // const originalMuCallIdTrail = JSON.parse(req.get('mu-call-id-trail') || "[]"); @@ -50,19 +47,6 @@ app.post('/', function (req, res) { res.status(204).send(); }); -// { -// changeSets: -// [{ -// origin: '172.28.0.1', -// mu_call_id_trail: '[]', -// insert: [Array], -// delete: [Array], -// index: 1629274666268, -// authorization_groups: -// '[{"variables":[],"name":"public"},{"variables":[],"name":"user-lookup"},{"variables":[],"name":"clean"}]' -// }] -// } - function getMatchOnEffective(entry) { if (!(entry.options && "matchOnEffective" in entry.options)) return false; // DEFAULT return entry.options.matchOnEffective; @@ -95,8 +79,8 @@ async function informWatchers(changeSets, res, originalMuCallId) { }); } else { originFilteredChangeSets.forEach((change) => { - allInserts = [...allInserts, ...change.inserts]; - allDeletes = [...allDeletes, ...change.deletes]; + allInserts = [...allInserts, ...change.insert]; + allDeletes = [...allDeletes, ...change.delete]; }); } @@ -165,8 +149,8 @@ function formatV002(changeSets, options) { return JSON.stringify( changeSets.map((change) => { return { - inserts: change.inserts, - deletes: change.deletes, + inserts: change.insert, + deletes: change.delete, effectiveInserts: change.effectiveInserts, effectiveDeletes: change.effectiveDeletes, index: change.index @@ -178,8 +162,8 @@ function formatV001(changeSets, options) { return JSON.stringify( changeSets.map((change) => { return { - inserts: change.inserts, - deletes: change.deletes + inserts: change.insert, + deletes: change.delete }; })); } From 176ca3519024e9c9d2953e6df19be252dc00139f Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 15:46:17 +0200 Subject: [PATCH 3/4] optimise triple matched spec --- app.js | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/app.js b/app.js index d846d96..8b419b9 100644 --- a/app.js +++ b/app.js @@ -69,26 +69,17 @@ async function informWatchers(changeSets, res, originalMuCallId) { if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf) console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry(entry)}`); - let allInserts = []; - let allDeletes = []; - if (getMatchOnEffective(entry)) { - originFilteredChangeSets.forEach((change) => { - allInserts = [...allInserts, ...change.effectiveInserts]; - allDeletes = [...allDeletes, ...change.effectiveDeletes]; - }); - } else { - originFilteredChangeSets.forEach((change) => { - allInserts = [...allInserts, ...change.insert]; - allDeletes = [...allDeletes, ...change.delete]; - }); - } + const triple_matches_f = (triple) => tripleMatchesSpec(triple, matchSpec); - const changedTriples = [...allInserts, ...allDeletes]; + const someTripleMatchedSpec = getMatchOnEffective(entry) ? changeSets.some((change) => + change.effectiveInserts.some(triple_matches_f) || + change.effectiveDeletes.some(triple_matches_f) + ) : changeSets.some((change) => + change.inserts.some(triple_matches_f) || + change.deletes.some(triple_matches_f) + ); - const someTripleMatchedSpec = - changedTriples - .some((triple) => tripleMatchesSpec(triple, matchSpec)); if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) console.log(`Triple matches spec? ${someTripleMatchedSpec}`); From e29d5102625337721f8b4fa3d1e71cb6e7677605 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Fri, 20 Aug 2021 15:54:27 +0200 Subject: [PATCH 4/4] squish bug --- app.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/app.js b/app.js index 8b419b9..75b3b5d 100644 --- a/app.js +++ b/app.js @@ -36,8 +36,8 @@ app.post('/', function (req, res) { changeSets.forEach((change) => { change.effectiveInserts = change.effectiveInserts || []; change.effectiveDeletes = change.effectiveDeletes || []; - change.inserts = change.inserts || []; - change.deletes = change.deletes || []; + change.insert = change.insert || []; + change.delete = change.delete || []; }); // inform watchers @@ -72,12 +72,12 @@ async function informWatchers(changeSets, res, originalMuCallId) { const triple_matches_f = (triple) => tripleMatchesSpec(triple, matchSpec); - const someTripleMatchedSpec = getMatchOnEffective(entry) ? changeSets.some((change) => + const someTripleMatchedSpec = getMatchOnEffective(entry) ? originFilteredChangeSets.some((change) => change.effectiveInserts.some(triple_matches_f) || change.effectiveDeletes.some(triple_matches_f) - ) : changeSets.some((change) => - change.inserts.some(triple_matches_f) || - change.deletes.some(triple_matches_f) + ) : originFilteredChangeSets.some((change) => + change.insert.some(triple_matches_f) || + change.delete.some(triple_matches_f) );