Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@chainlink/external-adapter-framework",
"version": "2.8.0",
"version": "2.8.1",
Comment thread
mmcallister-cll marked this conversation as resolved.
Outdated
"main": "dist/index.js",
"license": "MIT",
"repository": "git://github.com/smartcontractkit/ea-framework-js.git",
Expand Down
38 changes: 29 additions & 9 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export class WebSocketTransport<
currentUrl = ''
lastMessageReceivedAt = 0
connectionOpenedAt = 0
streamHandlerInvocationsWithNoConnection = 0

constructor(private config: WebSocketTransportConfig<T>) {
super()
Expand Down Expand Up @@ -328,11 +329,6 @@ export class WebSocketTransport<
return
}

// We want to check if the URL we calculate is different from the one currently connected.
// This is because some providers handle subscriptions on the URLs and not through messages.
const urlFromConfig = await this.config.url(context, subscriptions.desired)
const urlChanged = this.currentUrl !== urlFromConfig

// We want to check that if we have a connection, it hasn't gone stale. That is,
// since opening it, have we had any activity from the provider.
const now = Date.now()
Expand All @@ -351,14 +347,29 @@ export class WebSocketTransport<
timeSinceLastActivity: ${timeSinceLastActivity} |
subscriptionUnresponsiveTtl: ${context.adapterSettings.WS_SUBSCRIPTION_UNRESPONSIVE_TTL} |
connectionUnresponsive: ${connectionUnresponsive} |
`)
`)

// The var connectionUnresponsive checks whether the time since last activity on
// _any_ successful open connection (or 0 if we haven't had one yet) has exceeded
// WS_SUBSCRIPTION_UNRESPONSIVE_TTL. There is interplay with WS_SUBSCRIPTION_TTL
// to determine minimum TTL of an open connection given no explicit connection errors.
if (connectionUnresponsive) {
this.streamHandlerInvocationsWithNoConnection += 1
Comment thread
dskloetc marked this conversation as resolved.
logger.trace(`The connection is unresponsive, incremented streamHandlerIterationsWithNoConnection = ${this.streamHandlerInvocationsWithNoConnection}`)
}

// We want to check if the URL we calculate is different from the one currently connected.
// This is because some providers handle subscriptions on the URLs and not through messages.
// Subclasses may also implement alternate URL handling logic,
// eg: toggling through multiple possible URLs in case of failure.
const { urlChanged, url } = await this.determineUrlChange(context, subscriptions)
Comment thread
alejoberardino marked this conversation as resolved.
Outdated

// Check if we should close the current connection
if (!connectionClosed && (urlChanged || connectionUnresponsive)) {
if (urlChanged) {
censorLogs(() =>
logger.debug(
`Websocket url has changed from ${this.currentUrl} to ${urlFromConfig}, closing connection...`,
`Websocket url has changed from ${this.currentUrl} to ${url}, closing connection...`,
Comment thread
dskloetc marked this conversation as resolved.
Outdated
),
)
} else {
Expand Down Expand Up @@ -397,7 +408,7 @@ export class WebSocketTransport<
logger.debug('No established connection and new subscriptions available, connecting to WS')
const options =
this.config.options && (await this.config.options(context, subscriptions.desired))
this.currentUrl = urlFromConfig
this.currentUrl = url
// Need to write this now, otherwise there could be messages sent with values before the open handler finishes
this.providerDataStreamEstablished = Date.now()

Expand All @@ -408,7 +419,7 @@ export class WebSocketTransport<
subscriptions.new = subscriptions.desired

// Connect to the provider
this.wsConnection = await this.establishWsConnection(context, urlFromConfig, options)
this.wsConnection = await this.establishWsConnection(context, url, options)

// Now that we successfully opened the connection, we can reset the variables
connectionClosed = false
Expand Down Expand Up @@ -451,6 +462,15 @@ export class WebSocketTransport<
return
}

async determineUrlChange(
Comment thread
mmcallister-cll marked this conversation as resolved.
Outdated
Comment thread
mmcallister-cll marked this conversation as resolved.
Outdated
context: EndpointContext<T>,
subscriptions: SubscriptionDeltas<TypeFromDefinition<T["Parameters"]>>
): Promise<{ urlChanged: boolean; url: string }> {
const url = await this.config.url(context, subscriptions.desired)
const urlChanged = this.currentUrl !== url
return { urlChanged, url }
}

private rejectionHandler<E>(
rejectionFn: (reason?: unknown) => void,
handler: (event: E) => Promise<void>,
Expand Down
Loading