Skip to content
This repository was archived by the owner on Aug 5, 2024. It is now read-only.

Commit 8c3d07e

Browse files
authored
Eventhub (#6)
* added event hub in arm * correctly grabing connection string * add event hub support
1 parent c6d3981 commit 8c3d07e

File tree

11 files changed

+330
-31
lines changed

11 files changed

+330
-31
lines changed

.prettierrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"arrowParens": "avoid",
3-
"semi": "es5",
3+
"semi": false,
44
"tabWidth": 4
55
}

infra/azuredeploy.json

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,30 +42,76 @@
4242
"appInsightsApiVersion": "2020-02-02",
4343
"webAppApiVersion": "2022-03-01",
4444
"keyVaultApiVersion": "2022-07-01",
45+
"eventHubApiVersion": "2022-01-01-preview",
4546
"sku": "Free",
4647
"skuCode": "F1",
4748
"workerSize": "0",
4849
"workerSizeId": "0",
4950
"numberOfWorkers": "1",
5051
"webAppAlwaysOn": false,
5152
"webAppLinuxFxVersion": "NODE|18-lts",
53+
"eventHubSku": "Basic",
54+
"eventHubPartitionCount": 1,
55+
"eventHubMessageRetentionInDays": 1,
5256
"tenantId": "[subscription().tenantId]",
5357
"hostingPlanName": "[concat(variables('prefix'), 'hosting')]",
5458
"webAppName": "[concat(variables('prefix'), 'webapp')]",
5559
"appInsightsName": "[concat(variables('prefix'), 'appinsights')]",
5660
"storageAccountName": "[concat(variables('prefix'), 'storage')]",
5761
"keyVaultName": "[concat(variables('prefix'), 'keys')]",
62+
"eventHubNamespaceName": "[concat(variables('prefix'), 'ns')]",
63+
"eventHubAuthorizationRuleName": "gateway",
64+
"telemetryEventHubName": "telemetry",
5865
"serverFarmId": "[resourceId('Microsoft.Web/serverfarms', variables('hostingPlanName'))]",
5966
"keyVaultId": "[resourceId('Microsoft.KeyVault/vaults', variables('keyVaultName'))]",
6067
"storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]",
6168
"appInsightsId": "[resourceId('Microsoft.Insights/components', variables('appInsightsName'))]",
6269
"webAppId": "[resourceId('Microsoft.Web/sites', variables('webAppName'))]",
70+
"eventHubNamespaceId": "[resourceId('Microsoft.EventHub/namespaces', variables('eventHubNamespaceName'))]",
71+
"telemetryEventHubId": "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventHubNamespaceName'), variables('telemetryEventHubName'))]",
72+
"eventHubAuthorizationRuleId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubNamespaceName'), variables('eventHubAuthorizationRuleName'))]",
6373
"storageConnectionStringSecretName": "storageAccountConnectionString",
74+
"eventHubConnectionStringSecretName": "eventHubAccountConnectionString",
6475
"adminToken": "[concat('admin:', parameters('adminPassword'))]",
6576
"passwordsSecretName": "passwords",
6677
"adminConnectionStringSecretName": "adminConnectionString"
6778
},
6879
"resources": [
80+
//event hub
81+
{
82+
"type": "Microsoft.EventHub/namespaces",
83+
"apiVersion": "[variables('eventHubApiVersion')]",
84+
"name": "[variables('eventHubNamespaceName')]",
85+
"location": "[variables('location')]",
86+
"sku": {
87+
"name": "[variables('eventHubSku')]",
88+
"tier": "[variables('eventHubSku')]",
89+
"capacity": 1
90+
},
91+
"properties": {
92+
"isAutoInflateEnabled": false,
93+
"maximumThroughputUnits": 0
94+
}
95+
},
96+
{
97+
"type": "Microsoft.EventHub/namespaces/authorizationRules",
98+
"apiVersion": "[variables('eventHubApiVersion')]",
99+
"name": "[concat(variables('eventHubNamespaceName'), '/', variables('eventHubAuthorizationRuleName'))]",
100+
"properties": {
101+
"rights": ["Send"]
102+
},
103+
"dependsOn": ["[variables('eventHubNamespaceId')]"]
104+
},
105+
{
106+
"type": "Microsoft.EventHub/namespaces/eventhubs",
107+
"apiVersion": "[variables('eventHubApiVersion')]",
108+
"name": "[concat(variables('eventHubNamespaceName'), '/', variables('telemetryEventHubName'))]",
109+
"properties": {
110+
"messageRetentionInDays": "[variables('eventHubMessageRetentionInDays')]",
111+
"partitionCount": "[variables('eventHubPartitionCount')]"
112+
},
113+
"dependsOn": ["[variables('eventHubNamespaceId')]"]
114+
},
69115
// storage
70116
{
71117
"type": "Microsoft.Storage/storageAccounts",
@@ -168,9 +214,7 @@
168214
"type": "Microsoft.KeyVault/vaults/secrets",
169215
"apiVersion": "[variables('keyVaultApiVersion')]",
170216
"name": "[concat(variables('keyVaultName'), '/', variables('adminConnectionStringSecretName'))]",
171-
"dependsOn": [
172-
"[variables('keyVaultId')]"
173-
],
217+
"dependsOn": ["[variables('keyVaultId')]"],
174218
"properties": {
175219
"value": "[concat('WebAppName=', variables('webAppName') ,';AccountName=admin;AccountKey=', parameters('adminPassword'), ';EndPointSuffix=azurewebsites.net')]"
176220
}
@@ -187,6 +231,18 @@
187231
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName')), variables('storageApiVersion')).keys[0].value,';EndpointSuffix=core.windows.net')]"
188232
}
189233
},
234+
{
235+
"type": "Microsoft.KeyVault/vaults/secrets",
236+
"apiVersion": "[variables('keyVaultApiVersion')]",
237+
"name": "[concat(variables('keyVaultName'), '/', variables('eventHubConnectionStringSecretName'))]",
238+
"dependsOn": [
239+
"[variables('keyVaultId')]",
240+
"[variables('eventHubAuthorizationRuleId')]"
241+
],
242+
"properties": {
243+
"value": "[listKeys(variables('eventHubAuthorizationRuleId'), variables('eventHubApiVersion')).primaryConnectionString]"
244+
}
245+
},
190246
// web
191247
{
192248
"type": "Microsoft.Web/serverfarms",
@@ -220,7 +276,9 @@
220276
},
221277
"dependsOn": [
222278
"[variables('serverFarmId')]",
223-
"[variables('appInsightsId')]"
279+
"[variables('appInsightsId')]",
280+
"[variables('eventHubAuthorizationRuleId')]",
281+
"[variables('telemetryEventHubId')]"
224282
],
225283
"identity": {
226284
"type": "SystemAssigned"
@@ -242,10 +300,18 @@
242300
"name": "DEVS_KEY_VAULT_NAME",
243301
"value": "[variables('keyVaultName')]"
244302
},
303+
{
304+
"name": "DEVS_EVENT_HUB_NAME",
305+
"value": "[variables('eventHubNamespaceName')]"
306+
},
245307
{
246308
"name": "DEVS_STORAGE_CONNECTION_STRING_SECRET",
247309
"value": "[variables('storageConnectionStringSecretName')]"
248310
},
311+
{
312+
"name": "DEVS_EVENT_HUB_CONNECTION_STRING_SECRET",
313+
"value": "[variables('eventHubConnectionStringSecretName')]"
314+
},
249315
{
250316
"name": "DEVS_PASSWORDS_SECRET",
251317
"value": "[variables('passwordsSecretName')]"

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
},
2323
"dependencies": {
2424
"@azure/data-tables": "^13.2.0",
25+
"@azure/event-hubs": "^5.8.0",
2526
"@azure/identity": "^3.1.3",
2627
"@azure/keyvault-secrets": "^4.6.0",
2728
"@azure/storage-blob": "^12.11.0",
@@ -48,4 +49,4 @@
4849
"xmlbuilder2": "^3.0.2",
4950
"zx": "^7.2.0"
5051
}
51-
}
52+
}

src/apidevices.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { DeviceId, DeviceInfo, FromDeviceMessage } from "./schema"
1111
import { wsskConnString } from "./wssk"
1212
import { fullDeviceId, pubToDevice, untilFromDevice } from "./devutil"
1313
import { fwdSockConnSettings } from "./fwdsock"
14+
import { Telemetry } from "./telemetry"
1415

1516
function displayName(info: DeviceInfo) {
1617
const devid = info.rowKey
@@ -269,7 +270,7 @@ async function execTelemetryQuery(
269270
...q0,
270271
})
271272
).map(r => {
272-
const rr: storage.Telemetry = {
273+
const rr: Telemetry = {
273274
ms: r.ms,
274275
brainId: r.brainId,
275276
sensorId: r.sensorId,

src/eventhub.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { EventHubProducerClient } from "@azure/event-hubs"
2+
import { telemetrySinks } from "./telemetry"
3+
import { createSecretClient } from "./vault"
4+
5+
export async function setup() {
6+
const secrets = createSecretClient()
7+
const connectionStringSecretName =
8+
process.env["DEVS_EVENT_HUB_CONNECTION_STRING_SECRET"] ||
9+
"eventHubAccountConnectionString"
10+
const connStrSecret = await secrets.getSecret(connectionStringSecretName)
11+
const connStr = connStrSecret.value
12+
if (!connStr) throw new Error("event hub connection string is empty")
13+
14+
const eventHubName = process.env["DEVS_EVENT_HUB_NAME"]
15+
if (!eventHubName) throw new Error("event hub name is empty")
16+
17+
const producer = new EventHubProducerClient(connStr, eventHubName)
18+
19+
telemetrySinks.push(async (part, entries) => {
20+
// Prepare a batch of three events.
21+
const batch = await producer.createBatch()
22+
entries.forEach(entry => batch.tryAdd({ body: entry }))
23+
await producer.sendBatch(batch)
24+
})
25+
}

src/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import fastifyCors from "@fastify/cors"
66
import websocketPlugin from "@fastify/websocket"
77
import { throwStatus } from "./util"
88
import fastifyStatic from "@fastify/static"
9-
import { readFile } from "fs/promises"
109

1110
import * as storage from "./storage"
11+
import * as eventhub from "./eventhub"
1212
import * as mq from "./mq"
1313
import { wsskInit } from "./wssk"
1414
import { fwdSockInit } from "./fwdsock"
@@ -18,7 +18,8 @@ import { generateOpenApiSpec } from "./swagger/openapi"
1818

1919
async function initAuth(server: FastifyInstance) {
2020
const secrets = createSecretClient()
21-
const passwordSecretName = process.env["DEVS_PASSWORDS_SECRET"] || "passwords"
21+
const passwordSecretName =
22+
process.env["DEVS_PASSWORDS_SECRET"] || "passwords"
2223
const passwordsSecret = await secrets.getSecret(passwordSecretName)
2324
if (!passwordsSecret.value) throw new Error("passwords is empty")
2425

@@ -119,6 +120,7 @@ async function main() {
119120
})
120121

121122
await storage.setup()
123+
await eventhub.setup()
122124
await initAuth(server)
123125
await wsskInit(server)
124126
await fwdSockInit(server)

src/jdbr-binfmt.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Telemetry } from "./storage"
1+
import { Telemetry } from "./telemetry"
22

33
function readU32(s: Buffer, off: number) {
44
return s.readUInt32LE(off)

src/storage.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { randomBytes } from "crypto"
1010
import { promisify } from "util"
1111
import { gunzip, gzip } from "zlib"
1212
import { pubToDevice } from "./devutil"
13+
import { Telemetry, telemetrySinks, TelemetrySource } from "./telemetry"
1314
import { DeviceId, DeviceInfo, DeviceStats, zeroDeviceStats } from "./schema"
1415
import { delay, throwStatus } from "./util"
1516
import { createSecretClient } from "./vault"
@@ -37,8 +38,7 @@ export function webSiteName() {
3738
export function selfUrl() {
3839
// use https://learn.microsoft.com/en-us/azure/app-service/reference-app-settings?tabs=kudu%2Cdotnet
3940
const hostname = process.env["WEBSITE_HOSTNAME"]
40-
if (!hostname)
41-
throw new Error("WEBSITE_HOSTNAME not configured")
41+
if (!hostname) throw new Error("WEBSITE_HOSTNAME not configured")
4242
const protocol = /^(0\.|localhost)/i.test(hostname) ? "http" : "https"
4343
return `${protocol}://${hostname}`
4444
}
@@ -84,6 +84,8 @@ export async function setup() {
8484
await telemetryTable.createTable()
8585
await scriptsBlobs.createIfNotExists()
8686

87+
telemetrySinks.push(insertTelemetry)
88+
8789
if (false) {
8890
await scriptsBlobs.createIfNotExists({ access: "blob" })
8991
await blobClient.setProperties({
@@ -471,22 +473,6 @@ export async function getScriptVersions(scr: ScriptHeader) {
471473
)
472474
}
473475

474-
export interface TelemetrySource {
475-
brainId: string // "161f314155de245b"
476-
sensorId: string // "260f3e4155de245b"
477-
srv: string // "temperature"
478-
srvIdx: number // typically 0
479-
}
480-
481-
export interface Telemetry extends TelemetrySource {
482-
ms: number // milliseconds since epoch aka Date.now()
483-
avg: number // 22.2 C, etc
484-
min?: number
485-
max?: number
486-
nsampl?: number // number of samples
487-
dur?: number // sampling time is from [ms-dur, ms]
488-
}
489-
490476
export type TelemetryQuery = Partial<TelemetrySource> & {
491477
start?: number
492478
stop?: number
@@ -587,7 +573,7 @@ export async function queryTelemetry(part: string, query: TelemetryQuery) {
587573
return res
588574
}
589575

590-
export async function insertTelemetry(part: string, entries: Telemetry[]) {
576+
async function insertTelemetry(part: string, entries: Telemetry[]) {
591577
let acc: TransactionAction[] = []
592578
for (const e of entries) {
593579
const actions = keysOfTelemetry(e).map(

src/telemetry.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
export interface TelemetrySource {
2+
brainId: string // "161f314155de245b"
3+
sensorId: string // "260f3e4155de245b"
4+
srv: string // "temperature"
5+
srvIdx: number // typically 0
6+
}
7+
8+
export interface Telemetry extends TelemetrySource {
9+
ms: number // milliseconds since epoch aka Date.now()
10+
avg: number // 22.2 C, etc
11+
min?: number
12+
max?: number
13+
nsampl?: number // number of samples
14+
dur?: number // sampling time is from [ms-dur, ms]
15+
}
16+
17+
export type TelemetrySink = (
18+
part: string,
19+
entries: Telemetry[]
20+
) => Promise<void>
21+
22+
export const telemetrySinks: TelemetrySink[] = []
23+
24+
export async function insertTelemetry(part: string, entries: Telemetry[]) {
25+
await Promise.all(telemetrySinks.map(sink => sink(part, entries)))
26+
}

src/wssk.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
import { runInBg } from "./util"
1313
import { fullDeviceId, pubFromDevice, subToDevice } from "./devutil"
1414
import { parseJdbrMessage, toTelemetry } from "./jdbr-binfmt"
15+
import { insertTelemetry } from "./telemetry"
1516

1617
const JD_AES_CCM_TAG_BYTES = 4
1718
const JD_AES_CCM_LENGTH_BYTES = 2
@@ -357,7 +358,7 @@ class ConnectedDevice {
357358
Date.now() - 20
358359
)
359360
console.log(telemetry)
360-
await storage.insertTelemetry(this.id.partitionKey, telemetry)
361+
await insertTelemetry(this.id.partitionKey, telemetry)
361362
} catch (e: any) {
362363
this.log.error(`upload-bin: ${e.stack}`)
363364
}

0 commit comments

Comments
 (0)