Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@
*/
lmstEpoch: Date.UTC(2020, 2, 18, 0, 0, 0),

/*
* subscriptionMCWSFilterDelay: delay in milliseconds for combining filters for the same subscription
* endpoint connection. Smaller value = quicker display of realtime data (ex, 10ms in a
* low latency environment), higher value = avoids potentially creating and subsequently tearing down new websocket connections if filter changes are happening faster than server response times
* (ex, 100ms+ in a high latency environment)
*/
subscriptionMCWSFilterDelay: 100,

/**
* timeSystems: specify the time systems to use.
* Options are 'scet', 'ert', 'sclk', 'msl.sol' and 'lmst'.
Expand Down
5 changes: 0 additions & 5 deletions src/realtime/MCWSDataProductStreamProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import MCWSStreamProvider from './MCWSStreamProvider';
* @memberof {vista/telemetry}
*/
class MCWSDataProductStreamProvider extends MCWSStreamProvider {
constructor(openmct, vistaTime, options) {
super(openmct, vistaTime);
this.options = options;
}

getUrl(domainObject) {
return domainObject.telemetry?.dataProductStreamUrl;
}
Expand Down
8 changes: 0 additions & 8 deletions src/realtime/MCWSFrameEventStreamProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ import MCWSStreamProvider from './MCWSStreamProvider';
* @memberof {vista/telemetry}
*/
class MCWSFrameEventStreamProvider extends MCWSStreamProvider {
/**
* @param {Object} openmct The Open MCT API
* @param {Object} vistaTime The Vista time API
*/
constructor(openmct, vistaTime) {
super(openmct, vistaTime);
}

/**
* Get the URL for streaming data for this domain object
* @param {Object} domainObject The domain object
Expand Down
8 changes: 6 additions & 2 deletions src/realtime/MCWSStreamProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ import GlobalStaleness from 'services/globalStaleness/globalStaleness';
*/

class MCWSStreamProvider {
constructor(openmct, vistaTime) {
constructor(openmct, vistaTime, options) {
this.openmct = openmct;
this.vistaTime = function () {
return vistaTime;
};
this.options = options;

this.sessions = sessionService();
this.filterService = filterService();

this.subscriptions = {};
this.requests = {};

this.subscriptionMCWSFilterDelay = options?.time?.subscriptionMCWSFilterDelay;
}

processGlobalStaleness(data, latestTimestamp) {
Expand Down Expand Up @@ -214,7 +217,8 @@ class MCWSStreamProvider {
key: this.getKey(domainObject),
property: this.getProperty(domainObject),
mcwsVersion: domainObject.telemetry.mcwsVersion,
extraFilterTerms: options?.filters ? this.serializeFilters(options.filters) : undefined
extraFilterTerms: options?.filters ? this.serializeFilters(options.filters) : undefined,
subscriptionMCWSFilterDelay: this.subscriptionMCWSFilterDelay
};

function unsubscribe() {
Expand Down
10 changes: 6 additions & 4 deletions src/realtime/MCWSStreamWorkerScript.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
* @param {Object} extraFilterTerms additional filter terms
* @param {Object} globalFilters global filters to apply
*/
constructor(url, property, topic, extraFilterTerms, globalFilters) {
constructor(url, property, topic, extraFilterTerms, globalFilters, subscriptionMCWSFilterDelay) {
this.url = url;
this.topic = topic;
this.subscribers = {};
this.property = property;
this.extraFilterTerms = extraFilterTerms;
this.globalFilters = globalFilters;
this.subscriptionMCWSFilterDelay = subscriptionMCWSFilterDelay ?? 100;
Comment thread
jvigliotta marked this conversation as resolved.
}

/**
Expand Down Expand Up @@ -140,7 +141,7 @@
this.pending = setTimeout(() => {
this.pending = undefined;
this.reconnect();
}, 100);
}, this.subscriptionMCWSFilterDelay);
}

/**
Expand Down Expand Up @@ -244,7 +245,7 @@
* @param {MCWSStreamSubscription} subscription the subscription to obtain
*/
subscribe(subscription) {
const { url, key, property, extraFilterTerms } = subscription;
const { url, key, property, extraFilterTerms, subscriptionMCWSFilterDelay } = subscription;
const cacheKey = this.generateCacheKey(url, property, extraFilterTerms);

if (!this.connections[cacheKey]) {
Expand All @@ -253,7 +254,8 @@
property,
this.activeTopic,
extraFilterTerms,
this.activeGlobalFilters
this.activeGlobalFilters,
subscriptionMCWSFilterDelay
);
}

Expand Down
18 changes: 9 additions & 9 deletions src/realtime/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ function RealtimeTelemetryPlugin(vistaTime, options) {
return function install(openmct) {
filterService(openmct, options.globalFilters);

openmct.telemetry.addProvider(new MCWSChannelStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSEVRStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSEVRLevelStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSCommandStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSPacketSummaryEventProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSChannelStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSEVRStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSEVRLevelStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSCommandStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSPacketSummaryEventProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSDataProductStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSMessageStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSFrameSummaryStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSFrameEventStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSAlarmMessageStreamProvider(openmct, vistaTime));
openmct.telemetry.addProvider(new MCWSMessageStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSFrameSummaryStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSFrameEventStreamProvider(openmct, vistaTime, options));
openmct.telemetry.addProvider(new MCWSAlarmMessageStreamProvider(openmct, vistaTime, options));
}
}

Expand Down