Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions components/management-controller/src/mc-main.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

"use strict";

import * as k8s from '@kubernetes/client-node';
import yaml from 'yaml';
import fs from 'node:fs';
import rhea from 'rhea';
import * as bbLinks from './backbone-links.js';
import * as externalVans from './external-vans.js';
import * as certs from './certs.js';
Expand Down Expand Up @@ -53,12 +49,12 @@ if (STANDALONE_NS) {
//
export async function Main() {
try {
await kube.Start(k8s, fs, yaml, STANDALONE_NS);
await kube.Start(STANDALONE_NS);
await db.Start();
await config.Start();
await certs.Start();
await prune.Start();
await amqp.Start(rhea);
await amqp.Start();
await apiserver.Start(!!STANDALONE_NS);
await bbLinks.Start(CONTROLLER);
await externalVans.Start();
Expand Down
8 changes: 2 additions & 6 deletions components/site-controller/src/sc-main.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

"use strict";

import * as k8s from '@kubernetes/client-node';
import yaml from 'yaml';
import fs from 'node:fs';
import rhea from 'rhea';
import * as kube from '@skupperx/modules/kube'
import * as amqp from '@skupperx/modules/amqp'
import * as apiserver from './sc-apiserver.js'
Expand Down Expand Up @@ -53,8 +49,8 @@ if (STANDALONE_NAMESPACE) {
//
export async function Main() {
try {
await kube.Start(k8s, fs, yaml, STANDALONE_NAMESPACE);
await amqp.Start(rhea);
await kube.Start(STANDALONE_NAMESPACE);
await amqp.Start();

//
// Start the API server early so we don't cause readiness-probe problems.
Expand Down
53 changes: 26 additions & 27 deletions modules/src/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
under the License.
*/

import rhea from 'rhea';
import { Log } from "./log.js"

var container
var nextCid = 1
var nextMessageId = 1
var inFlight = {} // { cid : handler }
let nextCid = 1
let nextMessageId = 1
const inFlight = {} // { cid : handler }

const DEFAULT_TIMEOUT_SECONDS = 5

const rhea_handlers = function () {
container.options.enable_sasl_external = true
rhea.options.enable_sasl_external = true

container.on("connection_open", function (context) {
rhea.on("connection_open", function (context) {
const conn = context.connection.skxConn
Log(`AMQP Connection '${conn.logName}' is open`)
})

container.on("receiver_open", function (context) {
let conn = context.connection.skxConn
rhea.on("receiver_open", function (context) {
const conn = context.connection.skxConn
if (context.receiver == conn.replyReceiver) {
const firstTime = conn.replyTo == undefined
conn.replyTo = context.receiver.source.address
Expand All @@ -52,15 +52,15 @@ const rhea_handlers = function () {
})
}
} else {
let rx = context.receiver.skxReceiver
const rx = context.receiver.skxReceiver
if (rx && rx.onAddress) {
rx.onAddress(rx.context, context.receiver.source.address)
}
}
})

container.on("sendable", function (context) {
let conn = context.connection.skxConn
rhea.on("sendable", function (context) {
const conn = context.connection.skxConn
conn.senders.forEach((sender) => {
if (sender.amqpSender == context.sender) {
if (!sender.notified) {
Expand All @@ -75,11 +75,11 @@ const rhea_handlers = function () {
})
})

container.on("message", function (context) {
let conn = context.connection.skxConn
let message = context.message
let cid = message.correlation_id
var handler
rhea.on("message", function (context) {
const conn = context.connection.skxConn
const message = context.message
const cid = message.correlation_id
let handler
if (context.receiver == conn.replyReceiver) {
if (cid) {
handler = inFlight[cid]
Expand Down Expand Up @@ -120,8 +120,8 @@ export function OpenConnection(
cert = undefined,
key = undefined,
) {
let conn = {
amqpConnection: container.connect({
const conn = {
amqpConnection: rhea.connect({
host: host,
hostname: host,
transport: transport,
Expand Down Expand Up @@ -160,7 +160,7 @@ export function OpenSender(
//
// This is the synchronous version of the function
//
let sender = {
const sender = {
conn: conn,
amqpSender: conn.amqpConnection.open_sender(address),
onSendable: onSendable,
Expand All @@ -179,7 +179,7 @@ export function OpenSender(
// This is the asynchronous version of the function which does not resolve until the sender is sendable
//
return new Promise((resolve, reject) => {
let sender = {
const sender = {
conn: conn,
amqpSender: null,
onSendable: null,
Expand All @@ -201,7 +201,7 @@ export function OpenSender(
}

export function OpenReceiver(conn, address, onMessage, context = undefined) {
let receiver = {
const receiver = {
amqpReceiver: conn.amqpConnection.open_receiver(address),
onMessage: onMessage,
onAddress: null,
Expand All @@ -220,7 +220,7 @@ export function OpenDynamicReceiver(
onAddress,
context = undefined,
) {
let receiver = {
const receiver = {
amqpReceiver: conn.amqpConnection.open_receiver({
source: { dynamic: true },
}),
Expand All @@ -238,7 +238,7 @@ export function OpenDynamicReceiver(
export function SendMessage(sender, messageBody, ap = {}, destination = null) {
const messageId = nextMessageId
nextMessageId++
let message = {
const message = {
message_id: messageId,
reply_to: sender.conn.replyTo,
body: messageBody,
Expand All @@ -260,7 +260,7 @@ export function Request(
return new Promise((resolve, reject) => {
const cid = nextCid
const msgId = nextMessageId
let timer = setTimeout(() => {
const timer = setTimeout(() => {
delete inFlight[cid]
reject(Error("AMQP request/response timeout"))
}, timeoutSeconds * 1000)
Expand All @@ -270,7 +270,7 @@ export function Request(
clearTimeout(timer)
resolve([response.application_properties, response.body])
}
let message = {
const message = {
message_id: msgId,
reply_to: sender.conn.replyTo,
correlation_id: cid,
Expand All @@ -284,8 +284,7 @@ export function Request(
})
}

export async function Start(rhea) {
export async function Start() {
Log("[AMQP module started]")
container = rhea
rhea_handlers()
}
Loading