From 768fd8b458198245888f68d0d967cb52329c06da Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Mon, 2 Oct 2023 16:19:25 -0300 Subject: [PATCH 01/23] Add OpenTelemetry Support for Observability Added support of OpenTelemetry to the broker package. This will help in diagnostic tracing and querying of metrics within a distributed system which is beneficial for observing performance bottlenecks, slow network requests and other issues. It's a part of observability feature development for improving service reliability and performance monitoring. 3 new files were created under `packages/broker/src/telemetry/setup/` directory for setting up the OpenTelemetry functionality. The existing cli index file has been updated to call the setup. New observability dependencies have been added in the package.json file and jest configuration file was updated accordingly for testing requirements. --- packages/broker/jest.config.js | 11 ++- packages/broker/package.json | 11 +++ packages/broker/src/cli/index.ts | 2 + .../src/telemetry/globalTelemetryObjects.ts | 4 ++ .../broker/src/telemetry/setup/setupSdk.ts | 70 +++++++++++++++++++ .../src/telemetry/setup/startOpenTelemetry.ts | 18 +++++ 6 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 packages/broker/src/telemetry/globalTelemetryObjects.ts create mode 100644 packages/broker/src/telemetry/setup/setupSdk.ts create mode 100644 packages/broker/src/telemetry/setup/startOpenTelemetry.ts diff --git a/packages/broker/jest.config.js b/packages/broker/jest.config.js index 6ba9653f..54d672c7 100644 --- a/packages/broker/jest.config.js +++ b/packages/broker/jest.config.js @@ -1,7 +1,11 @@ -/** @type {import('ts-jest').JestConfigWithTsJest} */ +// we must use require here because the OTelEnvironment needs some env vars +const { config } = require('dotenv'); +config(); + +/** @type {import("ts-jest").JestConfigWithTsJest} */ module.exports = { preset: 'ts-jest/presets/js-with-ts', - testEnvironment: 'node', + testEnvironment: './test/OTelEnvironment.ts', clearMocks: true, // can't use prettier 3 with jest prettierPath: require.resolve('prettier-2'), @@ -10,5 +14,6 @@ module.exports = { tsconfig: 'tsconfig.jest.json', }, }, - setupFilesAfterEnv: ['jest-extended/all', 'dotenv/config'], + setupFiles: ['dotenv/config'], + setupFilesAfterEnv: ['jest-extended/all'], }; diff --git a/packages/broker/package.json b/packages/broker/package.json index d81d462c..e3fe21a2 100644 --- a/packages/broker/package.json +++ b/packages/broker/package.json @@ -43,6 +43,16 @@ "@streamr/network-tracker": "8.1", "@streamr/protocol": "^8.1.0", "@streamr/utils": "^8.1.0", + "@opentelemetry/api": "^1.6.0", + "@opentelemetry/context-async-hooks": "^1.17.0", + "@opentelemetry/auto-instrumentations-node": "^0.39.2", + "@opentelemetry/sdk-node": "^0.43.0", + "@opentelemetry/exporter-metrics-otlp-grpc": "^0.43.0", + "@opentelemetry/sdk-metrics": "^1.17.0", + "@opentelemetry/exporter-trace-otlp-grpc": "^0.43.0", + "@opentelemetry/instrumentation-http": "^0.43.0", + "@opentelemetry/semantic-conventions": "^1.17.0", + "@opentelemetry/resources": "^1.17.0", "ajv": "8.8.2", "ajv-formats": "^2.1.1", "axios": "^1.4.0", @@ -76,6 +86,7 @@ "@types/heap": "^0.2.31", "@types/inquirer": "^8.1.3", "@types/jest": "^29.5.0", + "jest-environment-node": "^29.6.2", "@types/lodash": "^4.14.191", "@types/merge2": "^1.4.0", "@types/node-fetch": "^2.6.2", diff --git a/packages/broker/src/cli/index.ts b/packages/broker/src/cli/index.ts index 6a3881ce..8ca3fdf8 100644 --- a/packages/broker/src/cli/index.ts +++ b/packages/broker/src/cli/index.ts @@ -1,3 +1,5 @@ +import '../telemetry/setup/startOpenTelemetry'; + export * from './initCommand'; export * from './joinCommand'; export * from './startCommand'; diff --git a/packages/broker/src/telemetry/globalTelemetryObjects.ts b/packages/broker/src/telemetry/globalTelemetryObjects.ts new file mode 100644 index 00000000..9509336f --- /dev/null +++ b/packages/broker/src/telemetry/globalTelemetryObjects.ts @@ -0,0 +1,4 @@ +import { metrics, trace } from '@opentelemetry/api'; + +export const globalTelemetryObjects = trace.getTracer('logstore-broker'); +export const meter = metrics.getMeter('logstore-broker'); diff --git a/packages/broker/src/telemetry/setup/setupSdk.ts b/packages/broker/src/telemetry/setup/setupSdk.ts new file mode 100644 index 00000000..518fda23 --- /dev/null +++ b/packages/broker/src/telemetry/setup/setupSdk.ts @@ -0,0 +1,70 @@ +import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; +import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; +import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc'; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'; +import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; +import { Resource } from '@opentelemetry/resources'; +import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; +import { NodeSDK } from '@opentelemetry/sdk-node'; +import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; +import process from 'process'; + +import pkg from '../../../package.json'; + +const otelTracingExport = process.env.TRACING_URL; +const otelMetricsExport = process.env.METRICS_URL; + +const exporterOptions = { + url: otelTracingExport, +} satisfies ConstructorParameters[0]; + +export const traceExporter = otelTracingExport + ? new OTLPTraceExporter(exporterOptions) + : undefined; + +export const metricReader = otelMetricsExport + ? new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporter({ + url: otelMetricsExport, + }), + exportIntervalMillis: 30_000, + }) + : undefined; + +const isLocal = process.env.IS_LOCAL === 'true'; + +const contextManager = new AsyncHooksContextManager(); +contextManager.enable(); + +// get name from package.json +const serviceName = pkg.name; + +export const sdk: NodeSDK = new NodeSDK({ + traceExporter, + metricReader: metricReader, + contextManager: contextManager, + instrumentations: [ + getNodeAutoInstrumentations({ + '@opentelemetry/instrumentation-fs': { + enabled: false, + }, + }), + new HttpInstrumentation({ + enabled: true, + }), + ], + resource: new Resource({ + [SemanticResourceAttributes.SERVICE_NAME]: isLocal + ? `${serviceName} (local)` + : serviceName, + }), +}); + +export const enabledObservabilityFeatures = { + tracing: !!traceExporter, + metrics: !!metricReader, +}; + +export const isAnyObservabilityFeatureEnabled = Object.values( + enabledObservabilityFeatures +).some((value) => value); diff --git a/packages/broker/src/telemetry/setup/startOpenTelemetry.ts b/packages/broker/src/telemetry/setup/startOpenTelemetry.ts new file mode 100644 index 00000000..4168a67a --- /dev/null +++ b/packages/broker/src/telemetry/setup/startOpenTelemetry.ts @@ -0,0 +1,18 @@ +import process from 'process'; + +import { isAnyObservabilityFeatureEnabled, sdk } from './setupSdk'; + +// initialize the SDK and register with the OpenTelemetry API +// this enables the API to record telemetry + +if (isAnyObservabilityFeatureEnabled) { + sdk.start(); + // gracefully shut down the SDK on process exit + process.on('SIGTERM', () => { + sdk + .shutdown() + .then(() => console.log('Tracing terminated')) + .catch((error: Error) => console.log('Error terminating tracing', error)) + .finally(() => process.exit(0)); + }); +} From 6bef3aa1dfcc51b46222fe2ca6519b572736f969 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Mon, 2 Oct 2023 16:24:53 -0300 Subject: [PATCH 02/23] Add OpenTelemetry support and infrastucture for it This commit adds support for OpenTelemetry across the development network. Added METRICS_URL and TRACING_URL for all brokers to enable visibility into performance metrics and tracing of broker interactions. Furthermore, a Tempo local backend for traces is set up along with Prometheus for metrics and Grafana for visualization. Opentelemetry collector configuration was also added for data ingestion and forwarding. This will improve our ability to debug performance issues and understand network interactions in more granular detail. --- dev-network/assets/broker/.env.broker-1 | 2 + dev-network/assets/broker/.env.broker-2 | 2 + dev-network/assets/broker/.env.broker-3 | 2 + .../otel-collector-config.yaml | 23 ++++ dev-network/assets/prometheus/prometheus.yml | 5 + dev-network/assets/tempo/tempo.yaml | 34 +++++ dev-network/docker-compose.yml | 118 ++++++++++++++++++ 7 files changed, 186 insertions(+) create mode 100644 dev-network/assets/opentelemetry-collector/otel-collector-config.yaml create mode 100644 dev-network/assets/prometheus/prometheus.yml create mode 100644 dev-network/assets/tempo/tempo.yaml diff --git a/dev-network/assets/broker/.env.broker-1 b/dev-network/assets/broker/.env.broker-1 index 86b8e214..e1afbe65 100644 --- a/dev-network/assets/broker/.env.broker-1 +++ b/dev-network/assets/broker/.env.broker-1 @@ -3,5 +3,7 @@ LOG_LEVEL=debug # node-fetch tweak to call arweave.net with self-signed certificate # https://stackoverflow.com/a/52479399 NODE_TLS_REJECT_UNAUTHORIZED=0 +METRICS_URL=http://opentelemetry-collector:4317 +TRACING_URL=http://opentelemetry-collector:4317 BROKER_METADATA='{ "http": "http://logstore-broker-1:7771" }' diff --git a/dev-network/assets/broker/.env.broker-2 b/dev-network/assets/broker/.env.broker-2 index 781bebe5..b9d28883 100644 --- a/dev-network/assets/broker/.env.broker-2 +++ b/dev-network/assets/broker/.env.broker-2 @@ -3,5 +3,7 @@ LOG_LEVEL=debug # node-fetch tweak to call arweave.net with self-signed certificate # https://stackoverflow.com/a/52479399 NODE_TLS_REJECT_UNAUTHORIZED=0 +METRICS_URL=http://opentelemetry-collector:4317 +TRACING_URL=http://opentelemetry-collector:4317 BROKER_METADATA='{ "http": "http://logstore-broker-2:7772" }' diff --git a/dev-network/assets/broker/.env.broker-3 b/dev-network/assets/broker/.env.broker-3 index 83b2024a..f2190285 100644 --- a/dev-network/assets/broker/.env.broker-3 +++ b/dev-network/assets/broker/.env.broker-3 @@ -3,5 +3,7 @@ LOG_LEVEL=debug # node-fetch tweak to call arweave.net with self-signed certificate # https://stackoverflow.com/a/52479399 NODE_TLS_REJECT_UNAUTHORIZED=0 +METRICS_URL=http://opentelemetry-collector:4317 +TRACING_URL=http://opentelemetry-collector:4317 BROKER_METADATA='{ "http": "http://logstore-broker-3:7773" }' diff --git a/dev-network/assets/opentelemetry-collector/otel-collector-config.yaml b/dev-network/assets/opentelemetry-collector/otel-collector-config.yaml new file mode 100644 index 00000000..53cb208f --- /dev/null +++ b/dev-network/assets/opentelemetry-collector/otel-collector-config.yaml @@ -0,0 +1,23 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: ":4317" + +processors: + batch: + +exporters: + prometheus: + endpoint: "0.0.0.0:8889" + otlphttp: + endpoint: http://tempo:4318 + +service: + pipelines: + metrics: + receivers: [ otlp ] + exporters: [ prometheus ] + traces: + receivers: [ otlp ] + exporters: [ otlphttp ] diff --git a/dev-network/assets/prometheus/prometheus.yml b/dev-network/assets/prometheus/prometheus.yml new file mode 100644 index 00000000..04712d35 --- /dev/null +++ b/dev-network/assets/prometheus/prometheus.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: 'otel-collector' + scrape_interval: 15s + static_configs: + - targets: ['opentelemetry-collector:8889'] diff --git a/dev-network/assets/tempo/tempo.yaml b/dev-network/assets/tempo/tempo.yaml new file mode 100644 index 00000000..883f7253 --- /dev/null +++ b/dev-network/assets/tempo/tempo.yaml @@ -0,0 +1,34 @@ +auth_enabled: false # Authentication. TODO + +server: + http_listen_port: 3200 # Port for HTTP API + +ingester: + lifecycler: + ring: + kvstore: + store: inmemory # In-memory ring store. For production, consider using etcd or Consul. + replication_factor: 1 # Number of the replicas for the ingested data. + final_sleep: 0s # Time to sleep between shutdown and exiting the ingester. + +distributor: + receivers: # Supported protocols for tracing ingestion. + jaeger: + protocols: + grpc: + thrift_binary: + thrift_compact: + zipkin: + otlp: + protocols: + grpc: + http: + +storage: + trace: + backend: local # local storage backend. For production, we might want to use cloud + wal: + path: /tmp/tempo/wal # Write-ahead log (WAL) directory path. Make sure to mount this directory in Docker. + local: + path: /tmp/tempo/traces # Traces storage directory path. Make sure to mount this directory in Docker. + diff --git a/dev-network/docker-compose.yml b/dev-network/docker-compose.yml index be938187..b64dffc9 100644 --- a/dev-network/docker-compose.yml +++ b/dev-network/docker-compose.yml @@ -339,6 +339,121 @@ services: logstore-deploy-contracts: condition: service_completed_successfully + logstore-observer: + container_name: logstore-observer + hostname: logstore-observer + image: logstore-observer + build: + context: ../ + dockerfile: ./dev-network/Dockerfile.observer + restart: always + networks: + - streamr-network + env_file: + - ./assets/observer/.env.observer + volumes: + - type: bind + source: ./assets/observer/start-in-docker.sh + target: /usr/local/bin/start-in-docker + - type: bind + source: ./assets/observer/observer.json + target: /home/node/.logstore/config/observer.json + depends_on: + logstore-base: + condition: service_completed_successfully + logstore-deploy-subgraph: + condition: service_completed_successfully + logstore-deploy-contracts: + condition: service_completed_successfully + + # logstore-vector: + # container_name: logstore-vector + # hostname: logstore-vector + # build: + # context: ../ + # dockerfile: ./dev-network/Dockerfile.vector + # args: + # - NODE_ID=${NODE_ID:?NODE_ID is required} + # - INFISICAL_TOKEN=${INFISICAL_TOKEN_VECTOR:?INFISICAL_TOKEN_VECTOR is required} + # restart: always + # ports: + # - '8686:8686' + # volumes: + # - './assets/vector/vector.toml:/etc/vector/vector.toml' + # - '/var/run/docker.sock:/var/run/docker.sock' + + logstore-grafana: + image: grafana/grafana:latest + container_name: logstore-grafana + hostname: logstore-grafana + restart: always + networks: + - streamr-network + ports: + - '3300:3000' + volumes: + - grafana-storage:/var/lib/grafana + depends_on: + - logstore-prometheus + + opentelemetry-collector: + image: otel/opentelemetry-collector:0.85.0 + container_name: opentelemetry-collector + hostname: opentelemetry-collector + restart: always + ports: + - '4317:4317' + networks: + - streamr-network + volumes: + - ./assets/opentelemetry-collector/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro + command: ['--config=/etc/otel-collector-config.yaml'] + + logstore-prometheus: + image: prom/prometheus:v2.47.0 + container_name: logstore-prometheus + hostname: logstore-prometheus + restart: always + networks: + - streamr-network + command: + [ + '--config.file=/etc/prometheus/prometheus.yml', + '--storage.tsdb.path=/prometheus', + '--web.enable-admin-api', + ] + ports: + - '9090:9090' + volumes: + - ./assets/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - type: volume + source: data-logstore-prometheus + target: /prometheus + volume: + nocopy: false + depends_on: + - opentelemetry-collector + + tempo: + image: grafana/tempo:main-6c7e07d # Use the latest version + container_name: tempo + hostname: tempo + restart: always + networks: + - streamr-network + ports: + - '3200:3200' # Tempo's default port + command: ['-config.file=/etc/tempo/tempo.yaml'] + volumes: + - ./assets/tempo/tempo.yaml:/etc/tempo/tempo.yaml:ro + - type: volume + source: data-tempo + target: /tmp/tempo + volume: + nocopy: false + depends_on: + - opentelemetry-collector + networks: streamr-network: name: streamr-docker-dev_streamr-network @@ -349,3 +464,6 @@ volumes: data-heartbeat: data-logstore-deploy-contracts: data-logstore-deploy-subgraph: + data-logstore-prometheus: + grafana-storage: + data-tempo: From 7d51ede1ba62ecc9a4aca14e0c976bf106964e76 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:39:32 -0300 Subject: [PATCH 03/23] Enable declaration file generation in tsconfig Permits using broker in other packages --- packages/broker/tsconfig.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/broker/tsconfig.json b/packages/broker/tsconfig.json index 65f95821..664ccfad 100644 --- a/packages/broker/tsconfig.json +++ b/packages/broker/tsconfig.json @@ -44,7 +44,7 @@ // "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */ /* Emit */ - // "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */ + "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */ // "declarationMap": true, /* Create sourcemaps for d.ts files. */ // "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */ "sourceMap": true /* Create source map files for emitted JavaScript files. */, From 2b7317115d875a24f900378b855316d43f0d0790 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:41:36 -0300 Subject: [PATCH 04/23] Add telemetry metrics to broker service This new feature introduces metrics tracking to the broker service. With this implementation, we can now trace and observe system activities accurately. Several utilities were added for middleware handling (middlewares.ts and activeSpanDecorator.ts) and context management (context/index.ts and helpers.ts). Newly created metric counters include monitoring bytes read/written, number of messages read/written, HTTP queries performed, total stored messages/bytes and more. --- packages/broker/src/helpers/middlewares.ts | 11 +++ .../broker/src/telemetry/context/helpers.ts | 37 ++++++++ .../broker/src/telemetry/context/index.ts | 17 ++++ .../telemetry/metrics/cassandraCounters.ts | 88 ++++++++++++++++++ .../telemetry/metrics/generalNodeActions.ts | 92 +++++++++++++++++++ .../telemetry/metrics/generalNodeCounters.ts | 58 ++++++++++++ .../metrics/generalNodeUpDownCounters.ts | 6 ++ .../metrics/observeMessageMetricsCollector.ts | 41 +++++++++ .../telemetry/metrics/systemMessageMetrics.ts | 11 +++ .../telemetry/utils/activeSpanDecorator.ts | 89 ++++++++++++++++++ .../broker/src/telemetry/utils/middlewares.ts | 65 +++++++++++++ .../src/telemetry/utils/withActiveSpan.ts | 21 +++++ 12 files changed, 536 insertions(+) create mode 100644 packages/broker/src/helpers/middlewares.ts create mode 100644 packages/broker/src/telemetry/context/helpers.ts create mode 100644 packages/broker/src/telemetry/context/index.ts create mode 100644 packages/broker/src/telemetry/metrics/cassandraCounters.ts create mode 100644 packages/broker/src/telemetry/metrics/generalNodeActions.ts create mode 100644 packages/broker/src/telemetry/metrics/generalNodeCounters.ts create mode 100644 packages/broker/src/telemetry/metrics/generalNodeUpDownCounters.ts create mode 100644 packages/broker/src/telemetry/metrics/observeMessageMetricsCollector.ts create mode 100644 packages/broker/src/telemetry/metrics/systemMessageMetrics.ts create mode 100644 packages/broker/src/telemetry/utils/activeSpanDecorator.ts create mode 100644 packages/broker/src/telemetry/utils/middlewares.ts create mode 100644 packages/broker/src/telemetry/utils/withActiveSpan.ts diff --git a/packages/broker/src/helpers/middlewares.ts b/packages/broker/src/helpers/middlewares.ts new file mode 100644 index 00000000..da32ff24 --- /dev/null +++ b/packages/broker/src/helpers/middlewares.ts @@ -0,0 +1,11 @@ +export type Middleware = (next: T) => T; + +export const applyMiddlewares = ( + initialState: T, + middlewares: Middleware[] +): T => { + return middlewares.reduceRight( + (acc, middleware) => middleware(acc), + initialState + ); +}; diff --git a/packages/broker/src/telemetry/context/helpers.ts b/packages/broker/src/telemetry/context/helpers.ts new file mode 100644 index 00000000..a3e55ee2 --- /dev/null +++ b/packages/broker/src/telemetry/context/helpers.ts @@ -0,0 +1,37 @@ +import { Counter } from '@opentelemetry/api'; +import { AsyncLocalStorage } from 'async_hooks'; + +type AsyncLocalStorageWithGetter = [ + AsyncLocalStorage, + (a: T | undefined) => any, +]; +type ContextMapperValue = + | AsyncLocalStorage + | AsyncLocalStorageWithGetter; +type ContextMapper> = { + [K in keyof T]: ContextMapperValue; +}; + +/** + * Increment a counter with additional context values using a context mapper function. + */ +export const incrementCounterWithCtx = + >( + counter: Counter, + ctxMapper: ContextMapper + ) => + (n: number) => { + const stores = Object.entries(ctxMapper).reduce( + (acc, [key, store]) => { + if (Array.isArray(store)) { + acc[key] = store[1](store[0].getStore()); + } else { + acc[key] = store.getStore(); + } + return acc; + }, + {} as Record + ); + + return counter.add(n, stores); + }; diff --git a/packages/broker/src/telemetry/context/index.ts b/packages/broker/src/telemetry/context/index.ts new file mode 100644 index 00000000..adf92650 --- /dev/null +++ b/packages/broker/src/telemetry/context/index.ts @@ -0,0 +1,17 @@ +import { QueryType } from '@logsn/protocol'; +import { AsyncLocalStorage } from 'async_hooks'; + +type OperationType = + | 'query_request' + | 'consensus' + | 'user_request' + | 'recovery' + | 'proof_of_message_stored'; + +export const ctx = { + operation: new AsyncLocalStorage(), + nodeInfo: new AsyncLocalStorage<{ id: string }>(), + queryType: new AsyncLocalStorage(), +}; + +export type StoreType = T extends AsyncLocalStorage ? U : never; diff --git a/packages/broker/src/telemetry/metrics/cassandraCounters.ts b/packages/broker/src/telemetry/metrics/cassandraCounters.ts new file mode 100644 index 00000000..c7eb0644 --- /dev/null +++ b/packages/broker/src/telemetry/metrics/cassandraCounters.ts @@ -0,0 +1,88 @@ +import { BatchObservableCallback, ValueType } from '@opentelemetry/api'; +import { Client } from 'cassandra-driver'; + +import { ctx } from '../context'; +import { meter } from '../globalTelemetryObjects'; + +const totalMessagesStored = meter.createObservableCounter( + 'cassandra_db.messages_stored', + { + unit: '1', + valueType: ValueType.INT, + description: + 'Total number of messages stored in a keyspace of the database.', + } +); + +const totalBytesStored = meter.createObservableCounter( + 'cassandra_db.bytes_stored', + { + unit: 'By', // Standardized unit symbol for bytes + valueType: ValueType.INT, + description: 'Total number of bytes stored in a keyspace of the database.', + } +); + +const numberOfBuckets = meter.createObservableCounter( + 'cassandra_db.number_of_buckets', + { + unit: '1', + valueType: ValueType.INT, + description: 'Total number of buckets used for storing messages.', + } +); + +// ====== Helpers ====== + +const getTotalMessagesStored = (client: Client) => + client + .execute( + `SELECT SUM(records) as count + FROM bucket` + ) + .then((result) => result.rows[0].count as number); + +const getTotalBytesStored = (client: Client) => + client + .execute( + `SELECT SUM(size) as count + FROM bucket` + ) + .then((result) => result.rows[0].count as number); + +const getNumberOfBuckets = (client: Client) => + client + .execute( + `SELECT COUNT(*) as count + FROM bucket` + ) + .then((result) => Number(result.rows[0].count)); + +// ====== Collect directly ====== +// This won't be executed unless the metrics collecting is enabled for this broker node + +export const observeCassandraStorage = (client: Client) => { + const observables = [totalMessagesStored, totalBytesStored, numberOfBuckets]; + const nodeId = ctx.nodeInfo.getStore()?.id; + + const batchCallback: BatchObservableCallback = async (observableResult) => { + const [messages, bytes, buckets] = await Promise.all([ + getTotalMessagesStored(client), + getTotalBytesStored(client), + getNumberOfBuckets(client), + ]); + observableResult.observe(totalMessagesStored, messages, { + nodeId, + }); + observableResult.observe(totalBytesStored, bytes, { + nodeId, + }); + observableResult.observe(numberOfBuckets, buckets, { + nodeId, + }); + }; + + meter.addBatchObservableCallback(batchCallback, observables); + + return () => meter.removeBatchObservableCallback(batchCallback, observables); +}; diff --git a/packages/broker/src/telemetry/metrics/generalNodeActions.ts b/packages/broker/src/telemetry/metrics/generalNodeActions.ts new file mode 100644 index 00000000..15b1f25b --- /dev/null +++ b/packages/broker/src/telemetry/metrics/generalNodeActions.ts @@ -0,0 +1,92 @@ +import { QueryType } from '@logsn/protocol'; + +import { ctx, StoreType } from '../context'; +import { generalCounters } from './generalNodeCounters'; + +type WithoutContext = Omit; + +type NewMessageEvent = { + type: 'read' | 'write'; + qty: number; + context: { + operation: StoreType; + queryType?: StoreType; + }; +}; + +type NewBytesEvent = { + type: 'read' | 'write'; + qty: number; + context: { + operation: StoreType; + queryType?: StoreType; + }; +}; + +type NewQueryEvent = { + qty: number; + statusCode: string; + context: { + type: QueryType; + operation: StoreType; + }; +}; + +type NewMessagesReturnedFromQueryEvent = { + qty: number; + context: { + operation: StoreType; + queryType?: StoreType; + }; +}; + +export const addNewMessageEvent = (event: WithoutContext) => { + const operation = ctx.operation.getStore(); + const nodeInfo = ctx.nodeInfo.getStore(); + const queryType = ctx.queryType.getStore(); + + return generalCounters[`${event.type}Messages`].add(event.qty, { + operation, + nodeId: nodeInfo?.id, + queryType, + }); +}; + +export const addNewBytesEvent = (event: WithoutContext) => { + const operation = ctx.operation.getStore(); + const nodeInfo = ctx.nodeInfo.getStore(); + const queryType = ctx.queryType.getStore(); + + return generalCounters[`${event.type}Bytes`].add(event.qty, { + operation, + nodeId: nodeInfo?.id, + queryType, + }); +}; + +export const addNewQueryEvent = (event: WithoutContext) => { + const operation = ctx.operation.getStore(); + const nodeInfo = ctx.nodeInfo.getStore(); + const queryType = ctx.queryType.getStore(); + + return generalCounters.httpQueries.add(event.qty, { + nodeId: nodeInfo?.id, + queryType: queryType, + operation, + statusCode: event.statusCode, + }); +}; + +export const addNewMessagesReturnedFromQueryEvent = ( + event: WithoutContext +) => { + const operation = ctx.operation.getStore(); + const nodeInfo = ctx.nodeInfo.getStore(); + const queryType = ctx.queryType.getStore(); + + return generalCounters.httpQueryMessages.add(event.qty, { + operation, + nodeId: nodeInfo?.id, + queryType, + }); +}; diff --git a/packages/broker/src/telemetry/metrics/generalNodeCounters.ts b/packages/broker/src/telemetry/metrics/generalNodeCounters.ts new file mode 100644 index 00000000..bd3a3dd4 --- /dev/null +++ b/packages/broker/src/telemetry/metrics/generalNodeCounters.ts @@ -0,0 +1,58 @@ +import { ValueType } from '@opentelemetry/api/build/src/metrics/Metric'; + +import { meter } from '../globalTelemetryObjects'; + +const readBytes = meter.createCounter('node.db_read_bytes', { + description: 'Total bytes read, emitted when a new message is read from DB', + unit: 'By', + valueType: ValueType.INT, +}); + +const writeBytes = meter.createCounter('node.db_write_bytes', { + description: + 'Total bytes written, emitted when a new message is read from DB', + unit: 'By', + valueType: ValueType.INT, +}); + +const readMessages = meter.createCounter('node.db_read_messages', { + description: + 'Total messages read, emitted when a new message is read from DB', + unit: '1', + valueType: ValueType.INT, +}); + +const writeMessages = meter.createCounter('node.db_write_messages', { + description: + 'Total messages written, emitted when a new message is read from DB', + unit: '1', + valueType: ValueType.INT, +}); + +const httpQueries = meter.createCounter('node.http_queries', { + description: 'Total queries received on HTTP endpoint', + unit: '1', + valueType: ValueType.INT, +}); + +const httpQueryMessages = meter.createCounter('node.http_query_messages', { + description: 'Total messages returned from queries', + unit: '1', + valueType: ValueType.INT, +}); + +const recoveryRequests = meter.createCounter('node.recovery_requests', { + description: 'Total recovery requests', + unit: '1', + valueType: ValueType.INT, +}); + +export const generalCounters = { + readBytes, + writeBytes, + readMessages, + writeMessages, + httpQueries, + recoveryRequests, + httpQueryMessages +}; diff --git a/packages/broker/src/telemetry/metrics/generalNodeUpDownCounters.ts b/packages/broker/src/telemetry/metrics/generalNodeUpDownCounters.ts new file mode 100644 index 00000000..5122ffc7 --- /dev/null +++ b/packages/broker/src/telemetry/metrics/generalNodeUpDownCounters.ts @@ -0,0 +1,6 @@ +import { meter } from '../globalTelemetryObjects'; + +const openStreams = meter.createUpDownCounter('node.open_streams', { + description: 'Total number of open streams', + unit: 'streams', +}); diff --git a/packages/broker/src/telemetry/metrics/observeMessageMetricsCollector.ts b/packages/broker/src/telemetry/metrics/observeMessageMetricsCollector.ts new file mode 100644 index 00000000..af805b3b --- /dev/null +++ b/packages/broker/src/telemetry/metrics/observeMessageMetricsCollector.ts @@ -0,0 +1,41 @@ +import { BatchObservableResult } from '@opentelemetry/api'; +import _ from 'lodash'; + +import type { MessageMetricsCollector } from '../../plugins/logStore/MessageMetricsCollector'; +import { ctx } from '../context'; +import { meter } from '../globalTelemetryObjects'; +import { systemMessageObservableCounters } from './systemMessageMetrics'; + +/** + * Message Metrics Collector is a class designed to help us collect metrics + * from the system messages that are sent and received by the broker. + * + * We're using the OpenTelemetry BatchObservableResult to collect metrics from this + * same class, to reuse the same code. + * + * @param collector + */ +export const observeMessageMetricsCollector = ( + collector: MessageMetricsCollector +) => { + const { bytes, count, lost } = systemMessageObservableCounters; + const observables = [bytes, count, lost]; + const nodeId = ctx.nodeInfo.getStore()?.id; + + const batchCallback = (observableResult: BatchObservableResult) => { + const metrics = _.compact(collector.summary); + + for (const metric of metrics) { + const attributes = { + nodeId, + subject: metric.subject, + }; + observableResult.observe(bytes, metric?.bytes, attributes); + observableResult.observe(count, metric?.count, attributes); + observableResult.observe(lost, metric?.lost, attributes); + } + }; + meter.addBatchObservableCallback(batchCallback, observables); + + return () => meter.removeBatchObservableCallback(batchCallback, observables); +}; diff --git a/packages/broker/src/telemetry/metrics/systemMessageMetrics.ts b/packages/broker/src/telemetry/metrics/systemMessageMetrics.ts new file mode 100644 index 00000000..bad7ca02 --- /dev/null +++ b/packages/broker/src/telemetry/metrics/systemMessageMetrics.ts @@ -0,0 +1,11 @@ +import { ObservableCounter } from '@opentelemetry/api'; + +import { meter } from '../globalTelemetryObjects'; + +export const systemMessageObservableCounters = { + lost: meter.createObservableCounter('systemMessage.lost'), + bytes: meter.createObservableCounter('systemMessage.bytes'), + count: meter.createObservableCounter('systemMessage.count'), +} satisfies { + [key in 'bytes' | 'count' | 'lost']: ObservableCounter; +}; diff --git a/packages/broker/src/telemetry/utils/activeSpanDecorator.ts b/packages/broker/src/telemetry/utils/activeSpanDecorator.ts new file mode 100644 index 00000000..db27b443 --- /dev/null +++ b/packages/broker/src/telemetry/utils/activeSpanDecorator.ts @@ -0,0 +1,89 @@ +import { context, Context, Span, SpanOptions } from '@opentelemetry/api'; + +import { globalTelemetryObjects } from '../globalTelemetryObjects'; + +/** + * Creates a traced method decorator that wraps the original method with tracing functionality. + * When the decorated method is called, a new span is started and ended around the execution of the original method. + * The span will have the specified name and options, and will be created with the specified context or the active context if not provided. + * If an exception is thrown during the execution of the method, the span's status code will be set to ERROR and the exception will be re-thrown. + * If the method returns a Promise, the span will be ended when the Promise is settled. + * Otherwise, the span will be ended immediately after the method returns. + * + * @param spanName - The name of the span to be created. + * @param spanOptions - The options to be applied to the span. Optional. + * @param spanContext - The context to be used for creating the span. Optional. If not provided, the active context will be used. + * + * @returns The decorated descriptor. + */ +function createTracedMethod( + spanName: string, + spanOptions?: SpanOptions, + spanContext?: Context +) { + return function ( + _target: object, + _propertyKey: string, + descriptor: PropertyDescriptor + ) { + const originalMethod = descriptor.value; + + descriptor.value = function (...args: any[]) { + let result: unknown; + globalTelemetryObjects.startActiveSpan( + spanName, + spanOptions ?? {}, + spanContext ?? context.active(), + (span: Span) => { + try { + result = originalMethod.apply(this, args); + span.setStatus({ code: 0 }); // Set status code to OK + } catch (e) { + span.setStatus({ + code: 2, // Set status code to ERROR + message: e.message, + }); + throw e; // re-throw the error + } finally { + if (result instanceof Promise) { + result.finally(() => span.end()).catch(() => span.end()); + } else { + span.end(); + } + } + } + ); + return result; + }; + + return descriptor; + }; +} + +/** + * Decorator that starts an active span for a method. + * + * @param {string} [spanName] - The name of the span. If not provided, it will use the target's constructor name and property key. + * @param {SpanOptions} [spanOptions] - The options to configure the span. + * @param {Context} [spanContext] - The context of the span. + * @returns {Function} - The decorated method. + */ +export function StartActiveSpan( + spanName?: string, + spanOptions?: SpanOptions, + spanContext?: Context +) { + return function ( + target: object, + propertyKey: string, + descriptor: PropertyDescriptor + ) { + const usedSpanName = + spanName ?? `${target.constructor.name}.${propertyKey}`; + return createTracedMethod(usedSpanName, spanOptions, spanContext)( + target, + propertyKey, + descriptor + ); + }; +} diff --git a/packages/broker/src/telemetry/utils/middlewares.ts b/packages/broker/src/telemetry/utils/middlewares.ts new file mode 100644 index 00000000..a737936b --- /dev/null +++ b/packages/broker/src/telemetry/utils/middlewares.ts @@ -0,0 +1,65 @@ +import { PublishMiddlewares } from '../../shared/BroadbandPublisher'; +import { SubscriptionMiddleware } from '../../shared/BroadbandSubscriber'; + +/** + * Middleware function that adds telemetry context to a published message. + * + * @param {Function} next - The next middleware function in the pipeline. + * @returns {Function} - The middleware function. + * + * @see https://opentelemetry.io/docs/instrumentation/js/propagation/ + */ +export const addTelemetryContextToPublishedMessageMiddleware: PublishMiddlewares = + (next) => { + // return (message: T) => { + // const activeContext = api.context.active(); + // const carrier = {}; + // propagation.inject(activeContext, carrier); + // const telemetryContextStr = JSON.stringify(carrier); + // const messageStr = JSON.stringify(message); + // return next(`${messageStr}!context!${telemetryContextStr}`); + // }; + + // todo - create non invasive way to do this + return next; + }; + +/** + * Middleware for retrieving telemetry context from a message. + * + * This middleware checks if the message contains telemetry context information, + * and if so, extracts and sets the telemetry context before calling the next middleware. + * If the message does not contain telemetry context, it simply calls the next middleware. + * + * @param {Function} next - The next middleware to call. + * @returns {Function} A middleware function that takes in a message and metadata. + * + * @see https://opentelemetry.io/docs/instrumentation/js/propagation/ + */ +export const getTelemetryContextMiddleware: SubscriptionMiddleware = + (next) => (message, metadata) => { + // so to deserialize the message we need to do this: + // first let's detect if the message is a telemetry message + // if (typeof message === 'string' && message.includes('!context!')) { + // // if it is, we need to split it + // const [messageStr, telemetryContextStr] = message.split('!context!'); + // // and then we need to parse the message + // const nextMessage = JSON.parse(messageStr); + // // and the telemetry context + // const telemetryContext = JSON.parse(telemetryContextStr); + // // and then we need to set the telemetry context + // const activeContext = propagation.extract( + // api.context.active(), + // telemetryContext + // ); + // api.context.with(activeContext, () => { + // next(nextMessage, metadata); + // }); + // } else { + // // if it's not a telemetry message, we just call the next middleware + // next(message, metadata); + // } + + // todo - create non invasive way to do this + next(message, metadata); + }; diff --git a/packages/broker/src/telemetry/utils/withActiveSpan.ts b/packages/broker/src/telemetry/utils/withActiveSpan.ts new file mode 100644 index 00000000..0568093e --- /dev/null +++ b/packages/broker/src/telemetry/utils/withActiveSpan.ts @@ -0,0 +1,21 @@ +import { Span } from '@opentelemetry/api'; + +import { globalTelemetryObjects } from '../globalTelemetryObjects'; + +export function withActiveSpan( + name: string, + fn: (span: Span) => Promise | T +) { + return globalTelemetryObjects.startActiveSpan(name, async (span) => { + // fn(span).finally(() => span.end()) + try { + const res = await fn(span); + return res; + } catch (error) { + span.recordException(error); + throw error; + } finally { + span.end(); + } + }); +} From 2865a522126b84e19119aca75437f678fab7227b Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:45:18 -0300 Subject: [PATCH 05/23] Enhance telemetry by observing Cassandra and tracking node actions --- .../broker/src/plugins/logStore/LogStore.ts | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/packages/broker/src/plugins/logStore/LogStore.ts b/packages/broker/src/plugins/logStore/LogStore.ts index 10839b63..4ef9e726 100644 --- a/packages/broker/src/plugins/logStore/LogStore.ts +++ b/packages/broker/src/plugins/logStore/LogStore.ts @@ -6,6 +6,12 @@ import merge2 from 'merge2'; import { pipeline, Readable, Transform } from 'stream'; import { v1 as uuidv1 } from 'uuid'; +import { observeCassandraStorage } from '../../telemetry/metrics/cassandraCounters'; +import { + addNewBytesEvent, + addNewMessageEvent, +} from '../../telemetry/metrics/generalNodeActions'; +import { StartActiveSpan } from '../../telemetry/utils/activeSpanDecorator'; import { BatchManager } from './BatchManager'; import { Bucket, BucketId } from './Bucket'; import { BucketManager, BucketManagerOptions } from './BucketManager'; @@ -55,6 +61,7 @@ export class LogStore extends EventEmitter { bucketManager: BucketManager; batchManager: BatchManager; pendingStores: Map; + unobserveCassandraStorage?: () => void; constructor(cassandraClient: Client, opts: LogStoreOptions) { super(); @@ -75,8 +82,12 @@ export class LogStore extends EventEmitter { useTtl: this.opts.useTtl, }); this.pendingStores = new Map(); + this.unobserveCassandraStorage = observeCassandraStorage( + this.cassandraClient + ); } + @StartActiveSpan() async store(streamMessage: StreamMessage): Promise { logger.debug('Store message'); @@ -118,6 +129,7 @@ export class LogStore extends EventEmitter { }); } + @StartActiveSpan() requestLast( streamId: string, partition: number, @@ -227,6 +239,7 @@ export class LogStore extends EventEmitter { return resultStream; } + @StartActiveSpan() requestFrom( streamId: string, partition: number, @@ -257,6 +270,7 @@ export class LogStore extends EventEmitter { ); } + @StartActiveSpan() requestRange( streamId: string, partition: number, @@ -311,13 +325,29 @@ export class LogStore extends EventEmitter { metricsContext.addMetrics('broker.plugin.logStore', metrics); this.on('read', (streamMessage: StreamMessage) => { metrics.readMessagesPerSecond.record(1); - metrics.readBytesPerSecond.record(streamMessage.getContent(false).length); + addNewMessageEvent({ + type: 'read', + qty: 1, + }); + const bytes = streamMessage.getContent(false).length; + metrics.readBytesPerSecond.record(bytes); + addNewBytesEvent({ + type: 'read', + qty: bytes, + }); }); this.on('write', (streamMessage: StreamMessage) => { metrics.writeMessagesPerSecond.record(1); - metrics.writeBytesPerSecond.record( - streamMessage.getContent(false).length - ); + addNewMessageEvent({ + type: 'write', + qty: 1, + }); + const bytes = streamMessage.getContent(false).length; + metrics.writeBytesPerSecond.record(bytes); + addNewBytesEvent({ + type: 'write', + qty: bytes, + }); }); } @@ -331,9 +361,11 @@ export class LogStore extends EventEmitter { this.bucketManager.stop(); this.batchManager.stop(); + this.unobserveCassandraStorage?.(); return this.cassandraClient.shutdown(); } + @StartActiveSpan() private fetchRange( streamId: string, partition: number, @@ -658,6 +690,7 @@ export class LogStore extends EventEmitter { return count; } + @StartActiveSpan() async getTotalBytesInStream( streamId: string, partition: number From 1024ea2eee03604bd727896404d8b5e182177491 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:47:55 -0300 Subject: [PATCH 06/23] Add telemetry context and active span in logStore Telemetry context and active span decorators were added to ConsensusManger, SystemRecovery, and QueryRequestHandler methods in the logStore package. This change is intended to enhance observability of these operations. Furthermore, observeMessageMetricsCollector was used in LogStorePlugin, which collects metrics for observing messages. Unnecessary comments were also removed from LogStorePlugin. --- packages/broker/src/plugins/logStore/ConsensusManger.ts | 4 ++++ packages/broker/src/plugins/logStore/LogStorePlugin.ts | 7 ++++++- .../broker/src/plugins/logStore/QueryRequestHandler.ts | 4 ++++ packages/broker/src/plugins/logStore/SystemRecovery.ts | 2 ++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/broker/src/plugins/logStore/ConsensusManger.ts b/packages/broker/src/plugins/logStore/ConsensusManger.ts index 71c5cb44..3eb6e0d5 100644 --- a/packages/broker/src/plugins/logStore/ConsensusManger.ts +++ b/packages/broker/src/plugins/logStore/ConsensusManger.ts @@ -10,6 +10,8 @@ import { Logger } from '@streamr/utils'; import { BroadbandPublisher } from '../../shared/BroadbandPublisher'; import { BroadbandSubscriber } from '../../shared/BroadbandSubscriber'; +import { ctx } from '../../telemetry/context'; +import { StartActiveSpan } from '../../telemetry/utils/activeSpanDecorator'; import { Consensus } from './Consensus'; const logger = new Logger(module); @@ -33,6 +35,7 @@ export class ConsensusManager { await this.subscriber.unsubscribe(); } + @StartActiveSpan() public async getConsensus(queryRequest: QueryRequest) { const requestPublisherId = await this.publisher.getAddress(); const awaitingResponses = (await this.nodeManager.totalNodes()).toNumber(); @@ -55,6 +58,7 @@ export class ConsensusManager { } private onMessage(content: unknown, metadata: MessageMetadata) { + ctx.operation.enterWith('consensus'); const systemMessage = SystemMessage.deserialize(content); if (systemMessage.messageType != SystemMessageType.QueryResponse) { return; diff --git a/packages/broker/src/plugins/logStore/LogStorePlugin.ts b/packages/broker/src/plugins/logStore/LogStorePlugin.ts index 5774e949..eba1804d 100644 --- a/packages/broker/src/plugins/logStore/LogStorePlugin.ts +++ b/packages/broker/src/plugins/logStore/LogStorePlugin.ts @@ -2,10 +2,10 @@ import { Stream } from '@logsn/client'; import { EthereumAddress, Logger, MetricsContext } from '@streamr/utils'; import { Schema } from 'ajv'; -// import reportData from '../../../test/unit/plugins/logStore/data/report.json'; import { Plugin, PluginOptions } from '../../Plugin'; import { BroadbandPublisher } from '../../shared/BroadbandPublisher'; import { BroadbandSubscriber } from '../../shared/BroadbandSubscriber'; +import { observeMessageMetricsCollector } from '../../telemetry/metrics/observeMessageMetricsCollector'; import PLUGIN_CONFIG_SCHEMA from './config.schema.json'; import { ConsensusManager } from './ConsensusManger'; import { createDataQueryEndpoint } from './http/dataQueryEndpoint'; @@ -62,6 +62,7 @@ export class LogStorePlugin extends Plugin { private readonly queryRequestHandler: QueryRequestHandler; private readonly reportPoller: ReportPoller; private readonly messageListener: MessageListener; + private stopSharingWithTelemetryCollector?: () => void; private seqNum: number = 0; @@ -185,10 +186,14 @@ export class LogStorePlugin extends Plugin { this.logMetrics.bind(this), METRICS_INTERVAL ); + this.stopSharingWithTelemetryCollector = observeMessageMetricsCollector( + this.messageMetricsCollector + ); } async stop(): Promise { clearInterval(this.metricsTimer); + this.stopSharingWithTelemetryCollector?.(); await Promise.all([ this.messageMetricsCollector.stop(), diff --git a/packages/broker/src/plugins/logStore/QueryRequestHandler.ts b/packages/broker/src/plugins/logStore/QueryRequestHandler.ts index 69b8b5ac..f17f23bb 100644 --- a/packages/broker/src/plugins/logStore/QueryRequestHandler.ts +++ b/packages/broker/src/plugins/logStore/QueryRequestHandler.ts @@ -16,6 +16,8 @@ import { Readable } from 'stream'; import { BroadbandPublisher } from '../../shared/BroadbandPublisher'; import { BroadbandSubscriber } from '../../shared/BroadbandSubscriber'; +import { ctx } from '../../telemetry/context'; +import { StartActiveSpan } from '../../telemetry/utils/activeSpanDecorator'; import { LogStore, MAX_SEQUENCE_NUMBER_VALUE, @@ -46,12 +48,14 @@ export class QueryRequestHandler { await this.subscriber.unsubscribe(); } + @StartActiveSpan() private async onMessage(content: unknown, metadata: MessageMetadata) { const systemMessage = SystemMessage.deserialize(content); if (systemMessage.messageType !== SystemMessageType.QueryRequest) { return; } + ctx.operation.enterWith('query_request'); const queryRequest = systemMessage as QueryRequest; logger.debug( diff --git a/packages/broker/src/plugins/logStore/SystemRecovery.ts b/packages/broker/src/plugins/logStore/SystemRecovery.ts index 4b6e5e6f..4fb80744 100644 --- a/packages/broker/src/plugins/logStore/SystemRecovery.ts +++ b/packages/broker/src/plugins/logStore/SystemRecovery.ts @@ -13,6 +13,7 @@ import { } from '@logsn/protocol'; import { Logger } from '@streamr/utils'; +import { ctx } from '../../telemetry/context'; import { SystemCache } from './SystemCache'; const INTERVAL = 100; @@ -49,6 +50,7 @@ export class SystemRecovery { } private async onMessage(message: unknown) { + ctx.operation.enterWith('recovery'); const systemMessage = SystemMessage.deserialize(message); if (systemMessage.messageType !== SystemMessageType.RecoveryRequest) { return; From e9612098a1368812faee821f91b72ffc60b3fdde Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:50:57 -0300 Subject: [PATCH 07/23] Add OpenTelemetry for tracing broker request metrics Integrated OpenTelemetry for better request tracing within the broker service. This change provides enhanced visibility into 'last', 'from', and 'range' queries in the dataQueryEndpoint, by creating spans and adding contexts to requests. This will greatly aid in monitoring and maintaining the system. --- packages/broker/src/broker.ts | 5 +- .../logStore/http/dataQueryEndpoint.ts | 100 +++++++++++++----- .../src/plugins/logStore/recoveryEndpoint.ts | 2 +- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/packages/broker/src/broker.ts b/packages/broker/src/broker.ts index e69f01c7..a76bd350 100644 --- a/packages/broker/src/broker.ts +++ b/packages/broker/src/broker.ts @@ -19,6 +19,7 @@ import { generateMnemonicFromAddress } from './helpers/generateMnemonicFromAddre import { startServer as startHttpServer, stopServer } from './httpServer'; import { HttpServerEndpoint, Plugin, PluginOptions } from './Plugin'; import { createPlugin } from './pluginRegistry'; +import { ctx } from './telemetry/context'; const logger = new Logger(module); @@ -101,6 +102,9 @@ export const createBroker = async ( return { getNode, start: async () => { + const nodeId = (await logStoreClient.getNode()).getNodeId(); + ctx.nodeInfo.enterWith({ id: nodeId }); + logger.info(`Starting LogStore broker version ${CURRENT_VERSION}`); await Promise.all(plugins.map((plugin) => plugin.start())); const httpServerEndpoints = plugins.flatMap((plugin: Plugin) => { @@ -119,7 +123,6 @@ export const createBroker = async ( ); } - const nodeId = (await logStoreClient.getNode()).getNodeId(); const brokerAddress = await logStoreClient.getAddress(); const mnemonic = generateMnemonicFromAddress( toEthereumAddress(brokerAddress) diff --git a/packages/broker/src/plugins/logStore/http/dataQueryEndpoint.ts b/packages/broker/src/plugins/logStore/http/dataQueryEndpoint.ts index 9c45805a..5607990c 100644 --- a/packages/broker/src/plugins/logStore/http/dataQueryEndpoint.ts +++ b/packages/broker/src/plugins/logStore/http/dataQueryEndpoint.ts @@ -3,6 +3,7 @@ */ import { QueryRequest, QueryType } from '@logsn/protocol'; import { getQueryManagerContract } from '@logsn/shared'; +import { api } from '@opentelemetry/sdk-node'; import { Logger, MetricsContext, @@ -17,6 +18,10 @@ import { v4 as uuid } from 'uuid'; import { StrictConfig } from '../../../config/config'; import { HttpServerEndpoint } from '../../../Plugin'; +import { ctx } from '../../../telemetry/context'; +import { globalTelemetryObjects } from '../../../telemetry/globalTelemetryObjects'; +import { addNewQueryEvent } from '../../../telemetry/metrics/generalNodeActions'; +import { withActiveSpan } from '../../../telemetry/utils/withActiveSpan'; import { createBasicAuthenticatorMiddleware } from '../authentication'; import { ConsensusResponse } from '../Consensus'; import { ConsensusManager } from '../ConsensusManger'; @@ -155,7 +160,13 @@ const getDataForLastRequest = async ( consensusManager: ConsensusManager, metrics: MetricsDefinition ) => { - metrics.resendLastQueriesPerSecond.record(1); + const span = globalTelemetryObjects.startSpan('data for last request', { + attributes: { type: 'last' }, + }); + // set active span + api.trace.setSpan(api.context.active(), span); + + metrics.queryLastQueriesPerSecond.record(1); const count = getCountForLastRequest(req); if (count === 'NOT_A_NUMBER') { sendError(`Query parameter "count" not a number: ${req.query.count}`, res); @@ -177,9 +188,9 @@ const getDataForLastRequest = async ( const consensus = await consensusManager.getConsensus(queryMessage); const data = logStore.requestLast(streamId, partition, count!); + span.end(); return { data, consensus }; }; - const getDataForFromRequest = async ( req: FromRequest, streamId: string, @@ -189,7 +200,7 @@ const getDataForFromRequest = async ( consensusManager: ConsensusManager, metrics: MetricsDefinition ) => { - metrics.resendFromQueriesPerSecond.record(1); + metrics.queryFromQueriesPerSecond.record(1); const fromTimestamp = parseIntIfExists(req.query.fromTimestamp); const fromSequenceNumber = parseIntIfExists(req.query.fromSequenceNumber) || MIN_SEQUENCE_NUMBER_VALUE; @@ -253,7 +264,7 @@ const getDataForRangeRequest = async ( consensusManager: ConsensusManager, metrics: MetricsDefinition ) => { - metrics.resendRangeQueriesPerSecond.record(1); + metrics.queryRangeQueriesPerSecond.record(1); const fromTimestamp = parseIntIfExists(req.query.fromTimestamp); const toTimestamp = parseIntIfExists(req.query.toTimestamp); const fromSequenceNumber = @@ -369,27 +380,29 @@ const getDataForRequest = async ( | typeof getDataForRangeRequest > ) => { - const [req, streamId, partition, res, logStore, consensusManager, metrics] = - args; - const rest = [ - streamId, - partition, - res, - logStore, - consensusManager, - metrics, - ] as const; - const reqType = getRequestType(req); - switch (reqType.type) { - case 'last': - return getDataForLastRequest(reqType.req, ...rest); - case 'from': - return getDataForFromRequest(reqType.req, ...rest); - case 'range': - return getDataForRangeRequest(reqType.req, ...rest); - default: - throw new Error(`Unknown query type: ${reqType}`); - } + return withActiveSpan('getDataForRequest', async () => { + const [req, streamId, partition, res, logStore, consensusManager, metrics] = + args; + const rest = [ + streamId, + partition, + res, + logStore, + consensusManager, + metrics, + ] as const; + const reqType = getRequestType(req); + switch (reqType.type) { + case 'last': + return getDataForLastRequest(reqType.req, ...rest); + case 'from': + return getDataForFromRequest(reqType.req, ...rest); + case 'range': + return getDataForRangeRequest(reqType.req, ...rest); + default: + throw new Error(`Unknown query type: ${reqType}`); + } + }); }; const createHandler = ( @@ -457,17 +470,46 @@ export const createDataQueryEndpoint = ( metricsContext: MetricsContext ): HttpServerEndpoint => { const metrics = { - resendLastQueriesPerSecond: new RateMetric(), - resendFromQueriesPerSecond: new RateMetric(), - resendRangeQueriesPerSecond: new RateMetric(), + queryLastQueriesPerSecond: new RateMetric(), + queryFromQueriesPerSecond: new RateMetric(), + queryRangeQueriesPerSecond: new RateMetric(), }; - metricsContext.addMetrics('broker.plugin.logstore', metrics); + metricsContext.addMetrics('broker.plugin.logStore', metrics); return { path: `/streams/:id/data/partitions/:partition/:queryType`, method: 'get', requestHandlers: [ createBasicAuthenticatorMiddleware(), + otelMiddleware, createHandler(config, logStore, consensusManager, metrics), ], }; }; + +const otelMiddleware = (req: Request, res: Response, next: () => void) => { + globalTelemetryObjects.startActiveSpan( + 'logstore.http.query', + async (span) => { + ctx.operation.enterWith('user_request'); + ctx.queryType.enterWith(req.params.queryType as QueryType); + + res.once('finish', () => { + addNewQueryEvent({ + qty: 1, + statusCode: res.statusCode.toString(), + }); + span?.end(); + }); + + res.once('close', () => { + span?.end(); + }); + + res.once('error', () => { + span?.end(); + }); + + next(); + } + ); +}; diff --git a/packages/broker/src/plugins/logStore/recoveryEndpoint.ts b/packages/broker/src/plugins/logStore/recoveryEndpoint.ts index 1d43bc29..1a3e51ba 100644 --- a/packages/broker/src/plugins/logStore/recoveryEndpoint.ts +++ b/packages/broker/src/plugins/logStore/recoveryEndpoint.ts @@ -42,7 +42,7 @@ export const createRecoveryEndpoint = ( const metrics = { recoveryRequestsPerSecond: new RateMetric(), }; - metricsContext.addMetrics('broker.plugin.logstore', metrics); + metricsContext.addMetrics('broker.plugin.logStore', metrics); return { path: `/recovery`, method: 'post', From cc95f052e0c5bc566f1ebcaf6b428984df187e40 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:52:14 -0300 Subject: [PATCH 08/23] Added support for middlewares in BroadbandSubscriber and BroadbandPublisher The BroadbandSubscriber and BroadbandPublisher classes have been updated to support the application of different kinds of middlewares for processing. The use of middlewares will allow enhancements in the data processing flow, thereby enabling the code to be more maintainable, readable, and expandable in the future. The default set of middlewares has also been defined. These can be overridden or new middlewares can be added as per the application needs. --- .../broker/src/shared/BroadbandPublisher.ts | 23 +++++++++++++++++-- .../broker/src/shared/BroadbandSubscriber.ts | 18 +++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/packages/broker/src/shared/BroadbandPublisher.ts b/packages/broker/src/shared/BroadbandPublisher.ts index 2a0a51d2..88604d9d 100644 --- a/packages/broker/src/shared/BroadbandPublisher.ts +++ b/packages/broker/src/shared/BroadbandPublisher.ts @@ -1,4 +1,15 @@ import { LogStoreClient, Stream } from '@logsn/client'; +import { trace } from '@opentelemetry/api'; +import _ from 'lodash'; + +import { applyMiddlewares, Middleware } from '../helpers/middlewares'; +import { addTelemetryContextToPublishedMessageMiddleware } from '../telemetry/utils/middlewares'; + +export type PublishMiddlewares = Middleware<(message: string) => string>; + +const defaultMiddlewares: PublishMiddlewares[] = [ + addTelemetryContextToPublishedMessageMiddleware, +]; export class BroadbandPublisher { private readonly partitions: number; @@ -6,7 +17,8 @@ export class BroadbandPublisher { constructor( private readonly client: LogStoreClient, - private readonly stream: Stream + private readonly stream: Stream, + private readonly middlewares: PublishMiddlewares[] = [] ) { this.partitions = this.stream.getMetadata().partitions; } @@ -17,13 +29,20 @@ export class BroadbandPublisher { public async publish(message: unknown) { const partition = this.counter % this.partitions; + const span = trace.getActiveSpan(); + span?.addEvent('publish'); + + const getFinalMessage = applyMiddlewares(_.identity, [ + ...defaultMiddlewares, + ...this.middlewares, + ]); await this.client.publish( { id: this.stream.id, partition, }, - message + getFinalMessage(message as string) ); this.counter++; diff --git a/packages/broker/src/shared/BroadbandSubscriber.ts b/packages/broker/src/shared/BroadbandSubscriber.ts index 9b6724eb..702cc8c2 100644 --- a/packages/broker/src/shared/BroadbandSubscriber.ts +++ b/packages/broker/src/shared/BroadbandSubscriber.ts @@ -5,13 +5,23 @@ import { Subscription, } from '@logsn/client'; +import { applyMiddlewares, Middleware } from '../helpers/middlewares'; +import { getTelemetryContextMiddleware } from '../telemetry/utils/middlewares'; + +export type SubscriptionMiddleware = Middleware; + +const defaultMiddlewares: SubscriptionMiddleware[] = [ + getTelemetryContextMiddleware, +]; + export class BroadbandSubscriber { private readonly partitions: number; private readonly subscriptions: Subscription[] = []; constructor( private readonly client: LogStoreClient, - private readonly stream: Stream + private readonly stream: Stream, + private readonly middlewares: SubscriptionMiddleware[] = [] ) { this.partitions = this.stream.getMetadata().partitions; } @@ -19,8 +29,12 @@ export class BroadbandSubscriber { public async subscribe(onMessage: MessageListener) { const promises = []; for (let partition = 0; partition < this.partitions; partition++) { + const listener = applyMiddlewares(onMessage, [ + ...defaultMiddlewares, + ...this.middlewares, + ]); promises.push( - this.client.subscribe({ id: this.stream.id, partition }, onMessage) + this.client.subscribe({ id: this.stream.id, partition }, listener) ); } From d8e1ca8ec35656f85c870450626ef0ea933fd128 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:53:20 -0300 Subject: [PATCH 09/23] Modify ReportPoller's config parameter and access modifier for KyvePool Relaxed the definition of the config parameter in the constructor from StrictConfig to Pick for the ReportPoller class for better flexibility. Also, the access modifier of kyvePool is updated from 'private' to 'protected' to make it accessible in inheriting classes. --- packages/broker/src/plugins/logStore/ReportPoller.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/broker/src/plugins/logStore/ReportPoller.ts b/packages/broker/src/plugins/logStore/ReportPoller.ts index 6d6a38d9..7b68ec88 100644 --- a/packages/broker/src/plugins/logStore/ReportPoller.ts +++ b/packages/broker/src/plugins/logStore/ReportPoller.ts @@ -24,7 +24,7 @@ const logger = new Logger(module); const REPORT_TRESHOLD_MULTIPLIER = 0.5; export class ReportPoller { - private readonly kyvePool: KyvePool; + protected readonly kyvePool: KyvePool; private readonly poolConfig: StrictConfig['pool']; private readonly signer: Signer; private readonly publisher: BroadbandPublisher; @@ -41,7 +41,7 @@ export class ReportPoller { constructor( kyvePool: KyvePool, - config: StrictConfig, + config: Pick, signer: Signer, publisher: BroadbandPublisher, subscriber: BroadbandSubscriber From 946b3d0762d5daa8d9f1edb28309d94113831b27 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:56:23 -0300 Subject: [PATCH 10/23] Add OpenTelemetry support to test suite Multiple changes were made in the broker package to embed OpenTelemetry metrics and traces into the testing process. This was accomplished by importing the 'metrics' and 'trace' modules from @opentelemetry/api into the test utils, and by defining functions that initialize and shut down the OpenTelemetry SDK within the test environment setup and teardown procedures. Also, new unit tests were added to ensure the correctness of metrics and traces. --- packages/broker/test/OTelEnvironment.ts | 39 +++++++++++++++ packages/broker/test/unit/context.test.ts | 50 +++++++++++++++++++ .../broker/test/unit/opentelemetry.test.ts | 20 ++++++++ packages/broker/test/utils.ts | 22 ++++++++ 4 files changed, 131 insertions(+) create mode 100644 packages/broker/test/OTelEnvironment.ts create mode 100644 packages/broker/test/unit/context.test.ts create mode 100644 packages/broker/test/unit/opentelemetry.test.ts diff --git a/packages/broker/test/OTelEnvironment.ts b/packages/broker/test/OTelEnvironment.ts new file mode 100644 index 00000000..2f26b3b6 --- /dev/null +++ b/packages/broker/test/OTelEnvironment.ts @@ -0,0 +1,39 @@ +import NodeEnvironment from 'jest-environment-node'; + +import { + metricReader, + sdk, + traceExporter, +} from '../src/telemetry/setup/setupSdk'; + +class CustomEnvironment extends NodeEnvironment { + override async setup() { + sdk.start(); + await super.setup(); + + this.global.teardownOTel = async () => { + // TODO this won't work, I didn't figure how to get metrics exported + // correctly from our tests + // The problem is that the tests are run in a separate process, so + // the metrics are not exported, as per isolation of modules during tests + // so best current way to test is mocking the counters. + // Tracing works nicely + await metricReader?.forceFlush(); + await traceExporter?.forceFlush(); + await sdk.shutdown(); + }; + } + + override async teardown() { + // @ts-ignore + await this.global.teardownOTel(); + await super.teardown(); + } + + runScript(script: any) { + // @ts-ignore + return super.runScript(script); + } +} + +module.exports = CustomEnvironment; diff --git a/packages/broker/test/unit/context.test.ts b/packages/broker/test/unit/context.test.ts new file mode 100644 index 00000000..9b199628 --- /dev/null +++ b/packages/broker/test/unit/context.test.ts @@ -0,0 +1,50 @@ +import { ctx } from '../../src/telemetry/context'; + +it('context works as expected', async () => { + ctx.operation.enterWith('test'); + let referenceObject = 'test'; + + expect(ctx.operation.getStore()).toBe('test'); + expect(referenceObject).toBe('test'); + + ctx.operation.enterWith('test2'); + referenceObject = 'test2'; + + const execution1 = executeAfterDelay(() => { + expect(ctx.operation.getStore()).toBe('test2'); + // different because we set it after the execution + expect(referenceObject).toBe('test3'); + }, 100); + + referenceObject = 'test3'; + ctx.operation.enterWith('test3'); + + expect(ctx.operation.getStore()).toBe('test3'); + + // we await to simplify the test + await execution1; + + await new Promise((resolve) => { + setTimeout(() => { + expect(ctx.operation.getStore()).toBe('test3'); + resolve(); + }, 100); + ctx.operation.enterWith('test4'); + referenceObject = 'test4'; + }); + + const execution2 = executeAfterDelay(() => { + expect(ctx.operation.getStore()).toBe('test4'); + ctx.operation.enterWith('test5'); + referenceObject = 'test5'; + }, 100); + + expect(ctx.operation.getStore()).toBe('test4'); + expect(referenceObject).toBe('test4'); + await execution2; + expect(ctx.operation.getStore()).toBe('test4'); + expect(referenceObject).toBe('test5'); +}); + +const executeAfterDelay = (fn: (...args: any[]) => any, delay: number) => + new Promise((resolve) => setTimeout(() => resolve(fn()), delay)); diff --git a/packages/broker/test/unit/opentelemetry.test.ts b/packages/broker/test/unit/opentelemetry.test.ts new file mode 100644 index 00000000..1646f0dc --- /dev/null +++ b/packages/broker/test/unit/opentelemetry.test.ts @@ -0,0 +1,20 @@ +import { metrics, trace } from '@opentelemetry/api'; + +import { addOpentelemetry } from '../utils'; + +const { meter } = addOpentelemetry(); +test('can see messages on prometheus', async () => { + const counter = metrics.getMeter('oo').createCounter('test-counter', { + description: 'Example of a Counter', + unit: '1', + }); + counter.add(11, { myLabel: 'my-counter' }); + expect(1 + 1).toBe(2); +}, 10_000); + +test('tracing', async () => { + const span = trace.getTracer('test-trace').startSpan('test-span'); + + span.end(); + expect(1 + 1).toBe(2); +}); diff --git a/packages/broker/test/utils.ts b/packages/broker/test/utils.ts index ebb5cafc..164f8900 100644 --- a/packages/broker/test/utils.ts +++ b/packages/broker/test/utils.ts @@ -5,6 +5,7 @@ import { Stream, StreamMetadata, } from '@logsn/client'; +import { metrics, trace } from '@opentelemetry/api'; import { TEST_CONFIG } from '@streamr/network-node'; import { startTracker, Tracker } from '@streamr/network-tracker'; import { @@ -17,6 +18,7 @@ import _ from 'lodash'; import { Broker, createBroker as createLogStoreBroker } from '../src/broker'; import { Config } from '../src/config/config'; +import { metricReader, sdk } from '../src/telemetry/setup/setupSdk'; export const STREAMR_DOCKER_DEV_HOST = process.env.STREAMR_DOCKER_DEV_HOST || '127.0.0.1'; @@ -166,3 +168,23 @@ export async function sleep(ms = 0): Promise { setTimeout(resolve, ms); }); } + +export function addOpentelemetry() { + const meter = metrics.getMeter('test-logstore'); + const tracer = trace.getTracer('test-logstore'); + jest.setMock('../src/telemetry/globalTelemetryObjects', () => ({ + meter: meter, + tracer: tracer, + })); + + beforeAll(async () => { + sdk.start(); + }); + afterAll(async () => { + const collection = await metricReader?.collect(); + await metricReader?.forceFlush(); + await sdk.shutdown(); + }); + + return { meter, tracer }; +} From a786309a6e4ca9b00f7ecdc3bec1d02ce72ec13f Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:58:01 -0300 Subject: [PATCH 11/23] Refactor client package.json to ensure correct defaults first --- packages/client/package.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/client/package.json b/packages/client/package.json index ecd068e5..23a3dcf8 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -10,9 +10,6 @@ "url": "git+https://github.com/usherlabs/logstore.git", "directory": "packages/client" }, - "types": "./dist/types/src/index.d.ts", - "main": "./dist/src/exports-commonjs.js", - "browser": "./dist/logstore-client.web.js", "exports": { "browser": "./dist/logstore-client.web.js", "default": { @@ -20,6 +17,9 @@ "require": "./dist/src/exports-commonjs.js" } }, + "types": "./dist/types/src/index.d.ts", + "main": "./dist/src/exports-commonjs.js", + "browser": "./dist/logstore-client.web.js", "files": [ "dist" ], From 30cbc0e2f302531d354c27c31bbab4a0fa74a174 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 07:58:17 -0300 Subject: [PATCH 12/23] Add new exports to resolve conflict Added CONFIG_TEST from ConfigTest and validateConfig from Config to the exports in "exports-esm.mjs". This resolves a previously existing conflict with the streamr re-exporting. --- packages/client/src/exports-esm.mjs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/client/src/exports-esm.mjs b/packages/client/src/exports-esm.mjs index 5caa7d6c..c49da7da 100644 --- a/packages/client/src/exports-esm.mjs +++ b/packages/client/src/exports-esm.mjs @@ -1,6 +1,11 @@ // ESM EntryPoint -import LogStoreClient from './index.js'; -export * from './index.js'; +import LogStoreClient from "./index.js"; + +export * from "./index.js"; +// Necessary to solve conflict with streamr re-exports +export { CONFIG_TEST } from "./ConfigTest"; +export { validateConfig } from "./Config"; + // required to get import LogStoreClient from '@logsn/streamr-client' to work export default LogStoreClient.default; // note this file is manually copied as-is into dist/src since we don't want tsc to compile it to commonjs From 26ed6d86ac840b377674c22bb0c7d652ce240f10 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 08:17:19 -0300 Subject: [PATCH 13/23] Update prettier config and upgrade Typescript version Adjusted the importOrder in the prettier configuration file (.prettierrc) to include a specific pattern for the 'startOpenTelemetry' module to make sure open telemetry is imported before others. --- .prettierrc | 6 +++++- package.json | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.prettierrc b/.prettierrc index d271afe8..751a952c 100644 --- a/.prettierrc +++ b/.prettierrc @@ -25,7 +25,11 @@ "options": { "plugins": ["@trivago/prettier-plugin-sort-imports"], "parser": "typescript", - "importOrder": ["", "^[./]"], + "importOrder": [ + ".*startOpenTelemetry*", + "", + "^[./]" + ], "importOrderParserPlugins": ["typescript", "decorators-legacy"], "importOrderSeparation": true, "trailingComma": "es5", diff --git a/package.json b/package.json index 06f8dcce..e3302cd1 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,6 @@ "prettier-2": "npm:prettier@^2", "prettier-plugin-solidity": "^1.1.3", "turbo": "^1.8.3", - "typescript": "^4.9.5" + "typescript": "^5.2.2" } } From 154614552907c0b87fc5bc798118daf47530349e Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 08:18:51 -0300 Subject: [PATCH 14/23] Create Observer package and configuration settings --- packages/observer/.eslintrc | 11 +++ .../observer/configs/development-1.env.json | 74 +++++++++++++++++++ packages/observer/package.json | 62 ++++++++++++++++ packages/observer/tsconfig.json | 33 +++++++++ packages/observer/vitest.config.ts | 12 +++ 5 files changed, 192 insertions(+) create mode 100644 packages/observer/.eslintrc create mode 100644 packages/observer/configs/development-1.env.json create mode 100644 packages/observer/package.json create mode 100644 packages/observer/tsconfig.json create mode 100644 packages/observer/vitest.config.ts diff --git a/packages/observer/.eslintrc b/packages/observer/.eslintrc new file mode 100644 index 00000000..428cfd51 --- /dev/null +++ b/packages/observer/.eslintrc @@ -0,0 +1,11 @@ +{ + "extends": "../../.eslintrc", + "rules": { + // Imported from eslint-config-streamr-ts (https://www.npmjs.com/package/eslint-config-streamr-ts) + "no-restricted-syntax": ["error", "LabeledStatement", "WithStatement"], + "@typescript-eslint/no-explicit-any": 0, + "@typescript-eslint/no-non-null-assertion": 0, + "@typescript-eslint/no-floating-promises": 0, + "@typescript-eslint/no-empty-function": 0 + } +} diff --git a/packages/observer/configs/development-1.env.json b/packages/observer/configs/development-1.env.json new file mode 100644 index 00000000..71d3f121 --- /dev/null +++ b/packages/observer/configs/development-1.env.json @@ -0,0 +1,74 @@ +{ + "$schema": "https://schema.streamr.network/config-v2.schema.json", + "client": { + "logLevel": "trace", + "auth": { + "privateKey": "0x3333333333333333333333333333333333333333333333333333333333333333" + }, + "network": { + "id": "0x5cbdd86a2fa8dc4bddd8a8f69dba48572eec07fb", + "trackers": [ + { + "id": "0xb9e7cEBF7b03AE26458E32a059488386b05798e8", + "ws": "ws://127.0.0.1:30301", + "http": "http://127.0.0.1:30301" + }, + { + "id": "0x0540A3e144cdD81F402e7772C76a5808B71d2d30", + "ws": "ws://127.0.0.1:30302", + "http": "http://127.0.0.1:30302" + }, + { + "id": "0xf2C195bE194a2C91e93Eacb1d6d55a00552a85E2", + "ws": "ws://127.0.0.1:30303", + "http": "http://127.0.0.1:30303" + } + ], + "location": { + "latitude": 60.19, + "longitude": 24.95, + "country": "Finland", + "city": "Helsinki" + }, + "webrtcDisallowPrivateAddresses": false + }, + "contracts": { + "streamRegistryChainAddress": "0x6cCdd5d866ea766f6DF5965aA98DeCCD629ff222", + "streamStorageRegistryChainAddress": "0xd04af489677001444280366Dd0885B03dAaDe71D", + "storageNodeRegistryChainAddress": "0x231b810D98702782963472e1D60a25496999E75D", + "logStoreNodeManagerChainAddress": "0x85ac4C8E780eae81Dd538053D596E382495f7Db9", + "logStoreStoreManagerChainAddress": "0x8560200b8E7477FB09281A0566B50fa6E7a66a34", + "streamRegistryChainRPCs": { + "chainId": 8997, + "rpcs": [ + { + "url": "http://127.0.0.1:8546" + } + ] + }, + "mainChainRPCs": { + "chainId": 8995, + "rpcs": [ + { + "url": "http://127.0.0.1:8545" + } + ] + }, + "theGraphUrl": "http://127.0.0.1:8000/subgraphs/name/streamr-dev/network-contracts", + "logStoreTheGraphUrl": "http://127.0.0.1:8000/subgraphs/name/logstore-dev/network-contracts" + }, + "metrics": false + }, + "pool": { + "id": "0", + "url": "http://logstore-kyve:1317", + "pollInterval": 60000 + }, + "plugins": { + "observer": { + "logStoreConfig": { + "refreshInterval": 10000 + } + } + } +} diff --git a/packages/observer/package.json b/packages/observer/package.json new file mode 100644 index 00000000..f6d70f96 --- /dev/null +++ b/packages/observer/package.json @@ -0,0 +1,62 @@ +{ + "name": "@logsn/observer", + "version": "0.0.1", + "description": "The Log Store Network Broker Node", + "author": "Ryan Soury , Victor Shevtsov ", + "license": "SEE LICENSE IN ./LICENSE", + "private": false, + "type": "module", + "repository": { + "type": "git", + "url": "git+https://github.com/usherlabs/logstore.git", + "directory": "packages/observer" + }, + "main": "./dist/src/index.ts", + "files": [ + "./dist" + ], + "scripts": { + "build": "echo 'no build'", + "format": "prettier --write .", + "start:dev": "dotenv -- tsx scripts/start-dev-server.ts", + "start:prod": "tsx scripts/start-prod-server.ts", + "ts": "tsc --noEmit", + "lint": "eslint ./src/**/*.ts ./test/**/*.ts", + "test": "dotenv -e .env jest --" + }, + "dependencies": { + "@logsn/client": "workspace:^", + "@logsn/contracts": "workspace:^", + "@logsn/protocol": "workspace:^", + "@logsn/shared": "workspace:^", + "@logsn/broker": "workspace:^", + "@streamr/utils": "^8.5.5", + "@streamr/protocol": "^8.5.5", + "ethers": "^5.7.2", + "ajv": "^8.12.0", + "@opentelemetry/api": "^1.6.0", + "@opentelemetry/context-async-hooks": "^1.17.0", + "@opentelemetry/auto-instrumentations-node": "^0.39.2", + "@opentelemetry/sdk-node": "^0.43.0", + "@opentelemetry/exporter-metrics-otlp-grpc": "^0.43.0", + "@opentelemetry/sdk-metrics": "^1.17.0", + "@opentelemetry/exporter-trace-otlp-grpc": "^0.43.0", + "@opentelemetry/instrumentation-http": "^0.43.0", + "@opentelemetry/semantic-conventions": "^1.17.0", + "@opentelemetry/resources": "^1.17.0", + "lodash": "^4.17.21" + }, + "devDependencies": { + "ts-toolbelt": "^9.6.0", + "@tsconfig/node16": "^1.0.3", + "@types/lodash": "^4.14.191", + "@types/node": "^16.18.39", + "dotenv-cli": "^7.2.1", + "esbuild": "^0.19.4", + "dotenv": "^16.3.1", + "ts-essentials": "^9.3.0", + "tsx": "^3.12.10", + "vitest": "^0.34.4", + "typescript": "^5.1.6" + } +} diff --git a/packages/observer/tsconfig.json b/packages/observer/tsconfig.json new file mode 100644 index 00000000..7b4047b6 --- /dev/null +++ b/packages/observer/tsconfig.json @@ -0,0 +1,33 @@ +{ + "compilerOptions": { + /* Basic Options */ + "target": "ES2020", + "module": "ESNext", + "outDir": "./dist", + "rootDir": ".", + "declaration": true, + /* Strict Type-Checking Options */ + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "strictFunctionTypes": true, + "strictBindCallApply": true, + "strictPropertyInitialization": true, + "noImplicitThis": true, + "alwaysStrict": true, + /* Additional Checks */ + "noUnusedLocals": true, + "noUnusedParameters": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + /* Module Resolution Options */ + "moduleResolution": "node", + "esModuleInterop": true, + "resolveJsonModule": true, + /* Experimental Options */ + "experimentalDecorators": true, + "emitDecoratorMetadata": true + }, + "include": ["./**/*.ts", "./package.json"], + "exclude": ["node_modules", "**/*.spec.ts"] +} diff --git a/packages/observer/vitest.config.ts b/packages/observer/vitest.config.ts new file mode 100644 index 00000000..ddec09a3 --- /dev/null +++ b/packages/observer/vitest.config.ts @@ -0,0 +1,12 @@ +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + test: { + // load env variables + setupFiles: 'dotenv/config', + include: ['**/?(*.){test,spec}.?(c|m)[jt]s?(x)'], + includeSource: ['src/**/*.{js,ts}'], + testTimeout: 220_000, + dangerouslyIgnoreUnhandledErrors: true, + }, +}); From 66c2f6644322dd80f22476f7bc0b5607bbf26deb Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 08:22:19 -0300 Subject: [PATCH 15/23] Add Observer and associated handlers, configuration, metrics, plugins Implemented the ObserverPlugin class, responsible for managing the system messages and reports, and handling start and stop events. Added comprehensive configuration schema, including handling auth, plugins, and pool configurations. Set up detailed metrics for both system messages and reports, this includes observables for bytes, count, lost messages, and bundled metrics such as the total number of reported queries and stored messages etc. Created scripts for start-dev-server and start-prod-server to facilitate testing and deployment. Additional utility function moduleFromMetaUrl added to support conversion from a given meta URL to a NodeJS.Module object. --- packages/observer/scripts/start-dev-server.ts | 18 +++ .../observer/scripts/start-prod-server.ts | 25 +++ packages/observer/src/Plugin.ts | 62 +++++++ .../observer/src/config/config.schema.json | 101 ++++++++++++ packages/observer/src/config/config.ts | 79 +++++++++ packages/observer/src/constants.ts | 4 + packages/observer/src/observer.ts | 152 ++++++++++++++++++ packages/observer/src/pluginRegistry.ts | 15 ++ .../src/plugins/observer/ObserverPlugin.ts | 97 +++++++++++ .../observer/ReportsMetricsCollector.ts | 128 +++++++++++++++ .../observer/SystemMessagesGeneralHandler.ts | 88 ++++++++++ .../observer/src/telemetry/context/index.ts | 7 + .../src/telemetry/globalTelemetryObjects.ts | 6 + .../src/telemetry/metrics/reportMetrics.ts | 97 +++++++++++ .../telemetry/metrics/systemMessageMetrics.ts | 64 ++++++++ .../metrics/uniqueResponsesCounter.ts | 71 ++++++++ .../observer/src/telemetry/setup/setupSdk.ts | 67 ++++++++ .../src/telemetry/setup/startOpenTelemetry.ts | 35 ++++ .../observer/src/utils/moduleFromMetaUrl.ts | 10 ++ packages/observer/tests/basic.test.ts | 26 +++ 20 files changed, 1152 insertions(+) create mode 100644 packages/observer/scripts/start-dev-server.ts create mode 100644 packages/observer/scripts/start-prod-server.ts create mode 100644 packages/observer/src/Plugin.ts create mode 100644 packages/observer/src/config/config.schema.json create mode 100644 packages/observer/src/config/config.ts create mode 100644 packages/observer/src/constants.ts create mode 100644 packages/observer/src/observer.ts create mode 100644 packages/observer/src/pluginRegistry.ts create mode 100644 packages/observer/src/plugins/observer/ObserverPlugin.ts create mode 100644 packages/observer/src/plugins/observer/ReportsMetricsCollector.ts create mode 100644 packages/observer/src/plugins/observer/SystemMessagesGeneralHandler.ts create mode 100644 packages/observer/src/telemetry/context/index.ts create mode 100644 packages/observer/src/telemetry/globalTelemetryObjects.ts create mode 100644 packages/observer/src/telemetry/metrics/reportMetrics.ts create mode 100644 packages/observer/src/telemetry/metrics/systemMessageMetrics.ts create mode 100644 packages/observer/src/telemetry/metrics/uniqueResponsesCounter.ts create mode 100644 packages/observer/src/telemetry/setup/setupSdk.ts create mode 100644 packages/observer/src/telemetry/setup/startOpenTelemetry.ts create mode 100644 packages/observer/src/utils/moduleFromMetaUrl.ts create mode 100644 packages/observer/tests/basic.test.ts diff --git a/packages/observer/scripts/start-dev-server.ts b/packages/observer/scripts/start-dev-server.ts new file mode 100644 index 00000000..7db02331 --- /dev/null +++ b/packages/observer/scripts/start-dev-server.ts @@ -0,0 +1,18 @@ +import '../src/telemetry/setup/startOpenTelemetry'; + +import config from '../configs/development-1.env.json'; +import { Config } from '../src/config/config'; +import { createObserver } from '../src/observer'; + +const typesafeConfig = { + ...config, +} as Config; + +const observer = await createObserver(typesafeConfig); + +try { + await observer.start(); +} catch (error) { + console.error(error); + process.exit(1); +} diff --git a/packages/observer/scripts/start-prod-server.ts b/packages/observer/scripts/start-prod-server.ts new file mode 100644 index 00000000..35fcfdc1 --- /dev/null +++ b/packages/observer/scripts/start-prod-server.ts @@ -0,0 +1,25 @@ +import '../src/telemetry/setup/startOpenTelemetry'; + +import fs from 'fs'; +import * as os from 'os'; + +import { Config } from '../src/config/config'; +import { createObserver } from '../src/observer'; + +const homeDir = os.homedir(); +const configFilename = 'observer.json'; + +const fullConfigPath = `${homeDir}/.logstore/config/${configFilename}`; + +const content = JSON.parse(fs.readFileSync(fullConfigPath, 'utf8')) as + | Config + | undefined; + +const observer = await createObserver(content ?? {}); + +try { + await observer.start(); +} catch (error) { + console.error(error); + process.exit(1); +} diff --git a/packages/observer/src/Plugin.ts b/packages/observer/src/Plugin.ts new file mode 100644 index 00000000..d5d083e7 --- /dev/null +++ b/packages/observer/src/Plugin.ts @@ -0,0 +1,62 @@ +import { validateConfig } from '@logsn/broker/dist/src/config/validateConfig'; +import { LogStoreClient, Stream } from '@logsn/client'; +import { LogStoreNodeManager } from '@logsn/contracts'; +import { Schema } from 'ajv'; +import { Signer } from 'ethers'; + +import { StrictConfig } from './config/config'; + +export interface PluginOptions { + name: string; + logStoreClient: LogStoreClient; + recoveryStream: Stream; + rollCallStream: Stream; + systemStream: Stream; + observerConfig: StrictConfig; + signer: Signer; + nodeManger: LogStoreNodeManager; +} + +export abstract class Plugin { + readonly name: string; + readonly logStoreClient: LogStoreClient; + readonly recoveryStream: Stream; + readonly rollCallStream: Stream; + readonly systemStream: Stream; + readonly observerConfig: StrictConfig; + readonly signer: Signer; + readonly nodeManger: LogStoreNodeManager; + readonly pluginConfig: T; + + constructor(options: PluginOptions) { + this.name = options.name; + this.logStoreClient = options.logStoreClient; + this.recoveryStream = options.recoveryStream; + this.rollCallStream = options.rollCallStream; + this.systemStream = options.systemStream; + this.observerConfig = options.observerConfig; + this.signer = options.signer; + this.nodeManger = options.nodeManger; + this.pluginConfig = options.observerConfig.plugins[this.name]; + const configSchema = this.getConfigSchema(); + if (configSchema !== undefined) { + validateConfig(this.pluginConfig, configSchema, `${this.name} plugin`); + } + } + + /** + * This lifecycle method is called once when Broker starts + */ + abstract start(): Promise; + + /** + * This lifecycle method is called once when Broker stops + * It is be called only if the plugin was started successfully + */ + abstract stop(): Promise; + + // eslint-disable-next-line class-methods-use-this + getConfigSchema(): Schema | undefined { + return undefined; + } +} diff --git a/packages/observer/src/config/config.schema.json b/packages/observer/src/config/config.schema.json new file mode 100644 index 00000000..20483555 --- /dev/null +++ b/packages/observer/src/config/config.schema.json @@ -0,0 +1,101 @@ +{ + "$id": "config.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "Observer configuration format", + "type": "object", + "additionalProperties": false, + "properties": { + "$schema": { + "type": "string" + }, + "client": { + "type": "object", + "description": "Client configuration", + "additionalProperties": true, + "properties": { + "auth": { + "anyOf": [ + { + "type": "object", + "properties": { + "privateKey": { + "type": "string", + "pattern": "^(0x)?[a-fA-F0-9]{64}$" + } + }, + "required": ["privateKey"] + }, + { + "type": "object", + "properties": { + "ethereum": { + "type": "object" + } + }, + "required": ["ethereum"] + } + ] + } + }, + "default": {} + }, + "plugins": { + "type": "object", + "description": "Plugin configurations", + "additionalProperties": true, + "default": {} + }, + "pool": { + "type": "object", + "description": "Kyve Pool configuration", + "additionalProperties": false, + "properties": { + "id": { + "description": "The Id of the pool to poll for a new bundle", + "type": "string" + }, + "url": { + "description": "The URL of the pool to poll for a new bundle", + "type": "string" + }, + "pollInterval": { + "description": "The interval to which to poll the kyve network for newly validated bundles", + "type": "number" + } + }, + "required": ["id", "url", "pollInterval"], + "default": { + "id": "26", + "url": "https://api-eu-1.korellia.kyve.network", + "pollInterval": 60000 + } + } + }, + "definitions": { + "port": { + "type": "number", + "minimum": 0, + "maximum": 65353 + }, + "smartContractConfig": { + "type": "object", + "required": [ + "contractAddress", + "jsonRpcProvider" + ], + "additionalProperties": false, + "properties": { + "contractAddress": { + "type": "string", + "description": "Ethereum address of registry smart contract", + "pattern": "^0x[a-fA-F0-9]{40}$" + }, + "jsonRpcProvider": { + "type": "string", + "description": "URL for JSON RPC Provider", + "format": "uri" + } + } + } + } +} diff --git a/packages/observer/src/config/config.ts b/packages/observer/src/config/config.ts new file mode 100644 index 00000000..b882b386 --- /dev/null +++ b/packages/observer/src/config/config.ts @@ -0,0 +1,79 @@ +import { LogStoreClientConfig } from '@logsn/client'; +import { camelCase, set } from 'lodash'; + +export type ObserverPluginConfig = {}; + +// } +export interface Config { + client?: LogStoreClientConfig; + plugins?: { + observer?: ObserverPluginConfig; + }; + pool?: { + id: string; + url: string; + pollInterval: number; + }; +} + +// StrictConfig is a config object to which some default values have been applied +// (see `default` definitions in config.schema.json) +export type StrictConfig = Config & { + client: Exclude; + plugins: Exclude; + pool: Exclude; +}; + +export interface ConfigFile extends Config { + $schema?: string; +} + +export const getDefaultFile = (): string => { + throw new Error('not implemented'); +}; + +export function overrideConfigToEnvVarsIfGiven(config: Config): void { + const parseValue = (value: string) => { + const number = /^-?\d+\.?\d*$/; + if (number.test(value)) { + return Number(value); + } else if (value === 'true') { + return true; + } else if (value === 'false') { + return false; + } else if (value == 'null') { + return null; + } else { + return value; + } + }; + + const PREFIX = 'LOGSTORE__OBSERVER__'; + Object.keys(process.env).forEach((variableName: string) => { + if (variableName.startsWith(PREFIX)) { + const parts = variableName + .substring(PREFIX.length) + .split('__') + .map((part: string) => { + const groups = part.match(/^([A-Z_]*[A-Z])(_\d+)?$/); + if (groups !== null) { + const base = camelCase(groups[1]); + const suffix = groups[2]; + if (suffix === undefined) { + return base; + } else { + const index = Number(suffix.substring(1)) - 1; + return `${base}[${index}]`; + } + } else { + throw new Error(`Malformed environment variable ${variableName}`); + } + }); + const key = parts.join('.'); + const value = parseValue(process.env[variableName]!); + if (value !== '') { + set(config, key, value); + } + } + }); +} diff --git a/packages/observer/src/constants.ts b/packages/observer/src/constants.ts new file mode 100644 index 00000000..70476d71 --- /dev/null +++ b/packages/observer/src/constants.ts @@ -0,0 +1,4 @@ +export { + name as PACKAGE_NAME, + version as PACKAGE_VERSION, +} from '../package.json'; diff --git a/packages/observer/src/observer.ts b/packages/observer/src/observer.ts new file mode 100644 index 00000000..feb508d4 --- /dev/null +++ b/packages/observer/src/observer.ts @@ -0,0 +1,152 @@ +import { validateConfig } from '@logsn/broker/dist/src/config/validateConfig'; +import { + LogStoreClient, + NetworkNodeStub, + PrivateKeyAuthConfig, + validateConfig as validateClientConfig, +} from '@logsn/client'; +import { getNodeManagerContract } from '@logsn/shared'; +import { toStreamID } from '@streamr/protocol'; +import { Logger, toEthereumAddress } from '@streamr/utils'; +import { ethers } from 'ethers'; + +import { Config } from './config/config'; +import OBSERVER_CONFIG_SCHEMA from './config/config.schema.json'; +import { PACKAGE_NAME, PACKAGE_VERSION } from './constants'; +import { Plugin, PluginOptions } from './Plugin'; +import { createPlugin } from './pluginRegistry'; +import { ctx } from './telemetry/context'; +import { moduleFromMetaUrl } from './utils/moduleFromMetaUrl'; + +const logger = new Logger(moduleFromMetaUrl(import.meta?.url)); + +export interface LogstoreComponent { + getNode: () => Promise; + start: () => Promise; + stop: () => Promise; +} + +export const createObserver = async ( + configWithoutDefaults: Config +): Promise => { + const config = validateConfig(configWithoutDefaults, OBSERVER_CONFIG_SCHEMA); + validateClientConfig(config.client); + + const logStoreClient = new LogStoreClient(config.client); + + // Tweaks suggested by the Streamr Team + // Copied from @logsn/broker + config.client.network!.webrtcSendBufferMaxMessageCount = 5000; + config.client.gapFill = true; + config.client.gapFillTimeout = 30 * 1000; + + const nodeManagerAddress = toEthereumAddress( + config.client.contracts!.logStoreNodeManagerChainAddress! + ); + + const isDevNetwork = + nodeManagerAddress === + toEthereumAddress('0x85ac4C8E780eae81Dd538053D596E382495f7Db9'); + + const recoveryStreamId = isDevNetwork + ? toStreamID('/recovery', nodeManagerAddress) + : '0xa156eda7dcd689ac725ce9595d4505bf28256454/alpha-recovery'; + + const rollcallStreamId = isDevNetwork + ? toStreamID('/rollcall', nodeManagerAddress) + : '0xa156eda7dcd689ac725ce9595d4505bf28256454/alpha-rollcall'; + + const systemStreamId = isDevNetwork + ? toStreamID('/system', nodeManagerAddress) + : '0xa156eda7dcd689ac725ce9595d4505bf28256454/alpha-system'; + + const recoveryStream = await logStoreClient.getStream(recoveryStreamId); + const rollCallStream = await logStoreClient.getStream(rollcallStreamId); + const systemStream = await logStoreClient.getStream(systemStreamId); + + const privateKey = (config.client!.auth as PrivateKeyAuthConfig).privateKey; + + const provider = new ethers.providers.JsonRpcProvider( + config.client!.contracts?.streamRegistryChainRPCs!.rpcs[0] + ); + const signer = new ethers.Wallet(privateKey, provider); + + const nodeManger = await getNodeManagerContract(signer); + + const plugins: Plugin[] = Object.keys(config.plugins).map((name) => { + const pluginOptions: PluginOptions = { + name, + logStoreClient, + recoveryStream, + rollCallStream, + systemStream, + observerConfig: config, + signer, + nodeManger, + }; + return createPlugin(name, pluginOptions); + }); + + let started = false; + + const getNode = async (): Promise => { + if (!started) { + throw new Error('cannot invoke on non-started component'); + } + return logStoreClient.getNode(); + }; + + return { + getNode, + start: async () => { + const nodeId = (await logStoreClient.getNode()).getNodeId(); + ctx.nodeInfo.enterWith({ id: nodeId }); + + logger.info(`Starting ${PACKAGE_NAME} version ${PACKAGE_VERSION}`); + await Promise.all(plugins.map((plugin) => plugin.start())); + + const nodeAddress = await logStoreClient.getAddress(); + + logger.info(`Ethereum address ${nodeAddress}`); + logger.info( + `Tracker Configuration: ${ + config.client.network?.trackers + ? JSON.stringify(config.client.network?.trackers) + : 'default' + }` + ); + + logger.info(`Plugins: ${JSON.stringify(plugins.map((p) => p.name))}`); + + if ( + config.client.network?.webrtcDisallowPrivateAddresses === undefined || + config.client.network.webrtcDisallowPrivateAddresses + ) { + logger.warn( + 'WebRTC private address probing is disabled. ' + + 'This makes it impossible to create network layer connections directly via local routers ' + + 'More info: https://github.com/streamr-dev/network-monorepo/wiki/WebRTC-private-addresses' + ); + } + started = true; + }, + stop: async () => { + await Promise.all(plugins.map((plugin) => plugin.stop())); + await logStoreClient.destroy(); + }, + }; +}; + +process.on('uncaughtException', (err) => { + logger.error(err.message, { + type: 'uncaughtException', + }); + process.exit(1); +}); + +process.on('unhandledRejection', (err) => { + logger.error(String(err), { + type: 'unhandledRejection', + }); + process.exit(1); +}); diff --git a/packages/observer/src/pluginRegistry.ts b/packages/observer/src/pluginRegistry.ts new file mode 100644 index 00000000..8e2dc504 --- /dev/null +++ b/packages/observer/src/pluginRegistry.ts @@ -0,0 +1,15 @@ +import { Plugin, PluginOptions } from './Plugin'; +import { ObserverPlugin } from './plugins/observer/ObserverPlugin'; + +export const createPlugin = ( + name: string, + pluginOptions: PluginOptions +): Plugin | never => { + switch (name) { + case 'observer': + return new ObserverPlugin(pluginOptions); + + default: + throw new Error(`Unknown plugin: ${name}`); + } +}; diff --git a/packages/observer/src/plugins/observer/ObserverPlugin.ts b/packages/observer/src/plugins/observer/ObserverPlugin.ts new file mode 100644 index 00000000..4219f5c6 --- /dev/null +++ b/packages/observer/src/plugins/observer/ObserverPlugin.ts @@ -0,0 +1,97 @@ +// import reportData from '../../../test/unit/plugins/logStore/data/report.json'; +import { KyvePool } from '@logsn/broker/dist/src/plugins/logStore/KyvePool'; +import { MessageMetricsCollector } from '@logsn/broker/dist/src/plugins/logStore/MessageMetricsCollector'; +import { BroadbandSubscriber } from '@logsn/broker/dist/src/shared/BroadbandSubscriber'; +import { SystemMessageType } from '@logsn/protocol'; + +import { ObserverPluginConfig } from '../../config/config'; +import { Plugin, PluginOptions } from '../../Plugin'; +import { observeReportsMetrics } from '../../telemetry/metrics/reportMetrics'; +import { addResponseCountIfUnique } from '../../telemetry/metrics/uniqueResponsesCounter'; +import { ReportsMetricsCollector } from './ReportsMetricsCollector'; +import { SystemMessagesGeneralHandler } from './SystemMessagesGeneralHandler'; +import { observeMessageMetricsCollector } from "../../telemetry/metrics/systemMessageMetrics"; + +export class ObserverPlugin extends Plugin { + private readonly systemSubscriber: BroadbandSubscriber; + private readonly rollcallSubscriber: BroadbandSubscriber; + private readonly kyvePool: KyvePool; + private readonly messageMetricsCollector: MessageMetricsCollector; + private stopSharingWithTelemetryCollector?: Unsubscribe; + private systemMessagesGeneralHandler: SystemMessagesGeneralHandler; + private reportsMetricsCollector: ReportsMetricsCollector; + private unobserveReportsMetrics?: Unsubscribe; + + constructor(options: PluginOptions) { + super(options); + + this.kyvePool = new KyvePool( + this.observerConfig.pool.url, + this.observerConfig.pool.id + ); + + this.systemSubscriber = new BroadbandSubscriber( + this.logStoreClient, + this.systemStream + ); + + this.rollcallSubscriber = new BroadbandSubscriber( + this.logStoreClient, + this.rollCallStream + ); + + this.messageMetricsCollector = new MessageMetricsCollector( + this.logStoreClient, + this.systemSubscriber, + this.rollcallSubscriber, + this.recoveryStream + ); + + this.systemMessagesGeneralHandler = new SystemMessagesGeneralHandler( + this.systemSubscriber + ); + + this.reportsMetricsCollector = new ReportsMetricsCollector( + this.kyvePool, + this.observerConfig, + this.signer, + this.systemSubscriber + ); + } + + getApiAuthentication(): undefined { + return undefined; + } + + /// Start receiving messages. Must be explicitly called. + async start(): Promise { + await this.messageMetricsCollector.start(); + await this.systemMessagesGeneralHandler.start(); + this.reportsMetricsCollector.start(); + + this.systemMessagesGeneralHandler.addHandler( + SystemMessageType.QueryResponse, + addResponseCountIfUnique + ); + + this.unobserveReportsMetrics = observeReportsMetrics( + this.reportsMetricsCollector + ); + + this.stopSharingWithTelemetryCollector = observeMessageMetricsCollector( + this.messageMetricsCollector + ); + } + + /// Stops the subscription + async stop(): Promise { + await Promise.all([ + this.messageMetricsCollector.stop(), + this.stopSharingWithTelemetryCollector?.(), + this.reportsMetricsCollector?.stop(), + this.unobserveReportsMetrics?.(), + ]); + } +} + +type Unsubscribe = () => void; diff --git a/packages/observer/src/plugins/observer/ReportsMetricsCollector.ts b/packages/observer/src/plugins/observer/ReportsMetricsCollector.ts new file mode 100644 index 00000000..00852fa8 --- /dev/null +++ b/packages/observer/src/plugins/observer/ReportsMetricsCollector.ts @@ -0,0 +1,128 @@ +import { KyvePool } from '@logsn/broker/dist/src/plugins/logStore/KyvePool'; +import { ReportPoller } from '@logsn/broker/dist/src/plugins/logStore/ReportPoller'; +import { BroadbandSubscriber } from '@logsn/broker/dist/src/shared/BroadbandSubscriber'; +import { IReportV1 } from '@logsn/protocol'; +import { Logger } from '@streamr/utils'; +import { Signer } from 'ethers'; +import { Promise } from 'ts-toolbelt/out/Any/Promise'; + +import { StrictConfig } from '../../config/config'; +import { moduleFromMetaUrl } from '../../utils/moduleFromMetaUrl'; + +const logger = new Logger(moduleFromMetaUrl(import.meta?.url)); + +export class ReportsMetricsCollector extends ReportPoller { + private reportsAggregation = { + storedMessages: 0, + storedBytes: 0, + totalQueries: 0, + totalBytesQueried: 0, + // totalMessagesQueried: 0, + totalBundles: 0, + }; + private latestProcessedBundle: number = 0; + /** + * Indicates if the summary is ready to be used. Necessary to prevent early access, and reports less than + * the latest bundle indicates. + */ + private allBundlesProcessed = false; + private pollTimer: NodeJS.Timer | null = null; + + constructor( + kyvePool: KyvePool, + private config: Pick, + signer: Signer, + subscriber: BroadbandSubscriber + ) { + super(kyvePool, config, signer, {} as any, subscriber); + } + + /// Start collecting metrics. Must be explicitly called. + override async start(): Promise { + logger.info('Starting reports metrics collector'); + + // do once and then start polling + this.fetchAndUpdateLatestReports(); + this.pollTimer = setInterval(() => { + this.fetchAndUpdateLatestReports(); + }, this.config.pool.pollInterval); + } + + /// Clear the timers, cleanups + public stop() { + if (this.pollTimer) { + clearInterval(this.pollTimer); + this.pollTimer = null; + } + } + + /** + * Fetches and updates the latest reports from the pool. + */ + private async fetchAndUpdateLatestReports() { + const { totalBundles } = await this.kyvePool.fetchPoolData(); + const latestAvailableBundle = totalBundles - 1; + logger.debug( + `Updating reports data. Latest bundle: ${latestAvailableBundle}` + ); + if (latestAvailableBundle <= this.latestProcessedBundle) { + return; + } + + await this.processBundlesUntilLatest(latestAvailableBundle); + this.allBundlesProcessed = true; + } + + /** + * Processes specified bundles starting from the first unprocessed one. + * @param latestAvailableBundle - The latest bundle to process. + */ + private async processBundlesUntilLatest(latestAvailableBundle: number) { + const firstUnprocessedBundle = this.latestProcessedBundle + 1; + for (let i = firstUnprocessedBundle; i <= latestAvailableBundle; ++i) { + logger.debug(`Fetching report data for bundle ${i}`); + const report = await super.fetchReportData(i); + this.aggregateReport(report.deserialize() as IReportV1); + this.latestProcessedBundle = i; + } + } + + public get summary() { + return this.reportsAggregation; + } + + public get isReady() { + return this.allBundlesProcessed; + } + + /** + * Aggregates the report data into the summary. + */ + private aggregateReport(report: IReportV1) { + const { events } = report; + const queries = events?.queries ?? []; + const queriesCountIncrease = queries.length; + const queriesByteIncrease = queries.reduce( + (prev, q) => (q.size ?? 0) + prev, + 0 + ); + + const storageEvents = events?.storage ?? []; + const storeCountIncrease = storageEvents.length; + const storeByteIncrease = storageEvents.reduce( + (prev, q) => (q.size ?? 0) + prev, + 0 + ); + + logger.info( + `Aggregating report data. Bundle: ${report.id}. Queries: ${queriesCountIncrease}. Storage: ${storeCountIncrease}` + ); + + this.reportsAggregation.storedMessages += storeCountIncrease; + this.reportsAggregation.storedBytes += storeByteIncrease; + this.reportsAggregation.totalQueries += queriesCountIncrease; + this.reportsAggregation.totalBytesQueried += queriesByteIncrease; + // this.reportsAggregation.totalMessagesQueried += queriesCountIncrease; + this.reportsAggregation.totalBundles += 1; + } +} diff --git a/packages/observer/src/plugins/observer/SystemMessagesGeneralHandler.ts b/packages/observer/src/plugins/observer/SystemMessagesGeneralHandler.ts new file mode 100644 index 00000000..52325889 --- /dev/null +++ b/packages/observer/src/plugins/observer/SystemMessagesGeneralHandler.ts @@ -0,0 +1,88 @@ +import { BroadbandSubscriber } from '@logsn/broker/dist/src/shared/BroadbandSubscriber'; +import { MessageMetadata } from '@logsn/client'; +import { + QueryRequest, + QueryResponse, + SystemMessage, + SystemMessageType, +} from '@logsn/protocol'; + +type SystemMessageMap = { + [SystemMessageType.QueryResponse]: QueryResponse; + [SystemMessageType.QueryRequest]: QueryRequest; +}; + +type SystemMessageHandler< + T extends keyof SystemMessageMap = keyof SystemMessageMap, +> = ( + message: SystemMessageMap[T], + metadata: MessageMetadata +) => Promise | void; + +/** + * The SystemMessagesGeneralHandler class is responsible for managing system messages + * that flow through the BroadbandSubscriber. It allows dynamic addition and removal of + * handlers for different system message types, providing a flexible way to extend functionality. + */ +export class SystemMessagesGeneralHandler { + /** + * Map to store handlers for each SystemMessageType. + * The use of a Set ensures unique handlers for each message type. + * @private + */ + private handlersBySystemMessage = new Map< + SystemMessageType, + Set + >(); + + constructor(private readonly subscriber: BroadbandSubscriber) {} + + /// Start receiving messages. Must be explicitly called. + public async start() { + await this.subscriber.subscribe(this.onMessage.bind(this)); + } + + /// Stops the subscription and clears the handlers. + public async stop() { + await this.subscriber.unsubscribe(); + this.handlersBySystemMessage.clear(); + } + + // ========= Add and removal of handles ======== + public addHandler( + type: T, + handler: SystemMessageHandler + ) { + this.handlersBySystemMessage + .get(type) + ?.add(handler as SystemMessageHandler); + } + + public removeHandler( + type: T, + handler: SystemMessageHandler + ) { + this.handlersBySystemMessage + .get(type) + ?.delete(handler as SystemMessageHandler); + } + // ============================================== + + private async onMessage(content: unknown, metadata: MessageMetadata) { + const systemMessage = SystemMessage.deserialize(content); + const messageType = systemMessage.messageType; + + const handlers = this.handlersBySystemMessage.get(messageType); + + if (!handlers) { + return; + } else { + for (const handler of handlers) { + handler( + systemMessage as SystemMessageMap[keyof SystemMessageMap], + metadata + ); + } + } + } +} diff --git a/packages/observer/src/telemetry/context/index.ts b/packages/observer/src/telemetry/context/index.ts new file mode 100644 index 00000000..4fba681e --- /dev/null +++ b/packages/observer/src/telemetry/context/index.ts @@ -0,0 +1,7 @@ +import { AsyncLocalStorage } from 'async_hooks'; + +export const ctx = { + nodeInfo: new AsyncLocalStorage<{ id: string }>(), +}; + +export type StoreType = T extends AsyncLocalStorage ? U : never; diff --git a/packages/observer/src/telemetry/globalTelemetryObjects.ts b/packages/observer/src/telemetry/globalTelemetryObjects.ts new file mode 100644 index 00000000..1964715c --- /dev/null +++ b/packages/observer/src/telemetry/globalTelemetryObjects.ts @@ -0,0 +1,6 @@ +import { metrics, trace } from '@opentelemetry/api'; + +import { PACKAGE_NAME } from '../constants'; + +export const tracer = trace.getTracer(PACKAGE_NAME); +export const meter = metrics.getMeter(PACKAGE_NAME); diff --git a/packages/observer/src/telemetry/metrics/reportMetrics.ts b/packages/observer/src/telemetry/metrics/reportMetrics.ts new file mode 100644 index 00000000..79298224 --- /dev/null +++ b/packages/observer/src/telemetry/metrics/reportMetrics.ts @@ -0,0 +1,97 @@ +import { BatchObservableResult, ValueType } from '@opentelemetry/api'; + +import { ReportsMetricsCollector } from '../../plugins/observer/ReportsMetricsCollector'; +import { ctx } from '../context'; +import { meter } from '../globalTelemetryObjects'; + +const reportTotalQueries = meter.createObservableCounter( + 'bundle.total_reported_queries', + { + unit: '1', + description: + 'Total number of queries aggregated from all reports inside valid bundles', + valueType: ValueType.INT, + } +); + +const reportStoredMessages = meter.createObservableCounter( + 'bundle.total_stored_messages', + { + unit: '1', + description: 'Total number of messages stored inside valid bundles', + valueType: ValueType.INT, + } +); + +const reportStoredBytes = meter.createObservableCounter( + 'bundle.total_stored_bytes', + { + unit: '1', + description: 'Total number of bytes stored inside valid bundles', + valueType: ValueType.INT, + } +); + +const reportTotalBytesQueried = meter.createObservableCounter( + 'bundle.total_bytes_queried', + { + unit: '1', + description: + 'Total number of bytes queried from all reports inside valid bundles', + valueType: ValueType.INT, + } +); + +const totalBundles = meter.createObservableCounter('bundle.total_bundles', { + unit: '1', + description: 'Total number of bundles', + valueType: ValueType.INT, +}); + +export const observeReportsMetrics = ( + reportsMetricsCollector: ReportsMetricsCollector +) => { + const observables = [ + reportTotalQueries, + reportStoredMessages, + reportStoredBytes, + reportTotalBytesQueried, + totalBundles, + ]; + const nodeId = ctx.nodeInfo.getStore()?.id; + + const batchCallback = (observableResult: BatchObservableResult) => { + const metrics = reportsMetricsCollector.summary; + if (!reportsMetricsCollector.isReady) { + return; + } + const attributes = { + nodeId: nodeId, + }; + observableResult.observe( + reportTotalQueries, + metrics.totalQueries, + attributes + ); + observableResult.observe( + reportStoredMessages, + metrics.storedMessages, + attributes + ); + observableResult.observe( + reportStoredBytes, + metrics.storedBytes, + attributes + ); + observableResult.observe( + reportTotalBytesQueried, + metrics.totalBytesQueried, + attributes + ); + observableResult.observe(totalBundles, metrics.totalBundles, attributes); + }; + + meter.addBatchObservableCallback(batchCallback, observables); + + return () => meter.removeBatchObservableCallback(batchCallback, observables); +}; diff --git a/packages/observer/src/telemetry/metrics/systemMessageMetrics.ts b/packages/observer/src/telemetry/metrics/systemMessageMetrics.ts new file mode 100644 index 00000000..04d6c471 --- /dev/null +++ b/packages/observer/src/telemetry/metrics/systemMessageMetrics.ts @@ -0,0 +1,64 @@ +import type { MessageMetricsCollector } from '@logsn/broker/dist/src/plugins/logStore/MessageMetricsCollector'; +import { + BatchObservableResult, + ObservableCounter, + ValueType, +} from '@opentelemetry/api'; +import _ from 'lodash'; + +import { ctx } from '../context'; +import { meter } from '../globalTelemetryObjects'; + +const systemMessageObservableCounters = { + lost: meter.createObservableCounter('systemMessage.lost', { + unit: '1', + valueType: ValueType.INT, + description: 'Total number of messages lost on system stream', + }), + bytes: meter.createObservableCounter('systemMessage.bytes', { + unit: 'By', + valueType: ValueType.INT, + description: 'Total number of bytes lost on system stream messages', + }), + count: meter.createObservableCounter('systemMessage.count', { + unit: '1', + valueType: ValueType.INT, + description: 'Total number of messages on system stream', + }), +} satisfies { + [key in 'bytes' | 'count' | 'lost']: ObservableCounter; +}; + +/** + * Message Metrics Collector is a class designed to help us collect metrics + * from the system messages that are sent and received by the broker. + * + * We're using the OpenTelemetry BatchObservableResult to collect metrics from this + * same class, to reuse the same code. + * + * @param collector + */ +export const observeMessageMetricsCollector = ( + collector: MessageMetricsCollector +) => { + const { bytes, count, lost } = systemMessageObservableCounters; + const observables = [bytes, count, lost]; + const nodeId = ctx.nodeInfo.getStore()?.id; + + const batchCallback = (observableResult: BatchObservableResult) => { + const metrics = _.compact(collector.summary); + + for (const metric of metrics) { + const attributes = { + nodeId, + subject: metric.subject, + }; + observableResult.observe(bytes, metric?.bytes, attributes); + observableResult.observe(count, metric?.count, attributes); + observableResult.observe(lost, metric?.lost, attributes); + } + }; + meter.addBatchObservableCallback(batchCallback, observables); + + return () => meter.removeBatchObservableCallback(batchCallback, observables); +}; diff --git a/packages/observer/src/telemetry/metrics/uniqueResponsesCounter.ts b/packages/observer/src/telemetry/metrics/uniqueResponsesCounter.ts new file mode 100644 index 00000000..9b0b246d --- /dev/null +++ b/packages/observer/src/telemetry/metrics/uniqueResponsesCounter.ts @@ -0,0 +1,71 @@ +import { QueryResponse } from '@logsn/protocol'; +import { ValueType } from '@opentelemetry/api'; + +import { ctx } from '../context'; +import { meter } from '../globalTelemetryObjects'; + +/// Counter for tracking unique query responses to prevent processing duplicates. +const uniqueResponsesCounter = meter.createCounter('unique_query_responses', { + description: + 'Total number of unique query responses, deduplicated by requestId. Based on QueryResponse system message.', + unit: '1', + valueType: ValueType.INT, +}); + +/** + * Increment the unique responses counter. + * This is a part of deduplication logic to keep track of unique messages processed. + */ +const addUniqueResponsesCounter = (count: number) => { + const nodeId = ctx.nodeInfo.getStore()?.id; + uniqueResponsesCounter.add(count, { + nodeId, + }); +}; + +const NEW_MESSAGES_TIMEOUT = 30_000; +const cachedMessagesMap = new Map(); + +/** + * Determine if a response has been processed before. + * This helps in deduplication by ensuring a message is only processed once, + * even if received multiple times within a short timeframe. + */ +const isThisResponseMissing = (response: QueryResponse) => { + const cachedMessage = cachedMessagesMap.get(response.requestId); + if (cachedMessage) { + return false; + } + + const now = Date.now(); + cachedMessagesMap.set(response.requestId, now); + return true; +}; + +/** + * Periodically clear old entries from the cache. + * This is essential to prevent memory leaks by removing entries that are no longer needed, + * based on the assumption that duplicates are not expected after a certain time period. + */ +const clearOldMessages = () => { + const now = Date.now(); + for (const [key, value] of cachedMessagesMap) { + const difference = now - value; + if (difference > NEW_MESSAGES_TIMEOUT) { + cachedMessagesMap.delete(key); + } + } +}; + +setInterval(clearOldMessages, NEW_MESSAGES_TIMEOUT); + +/** + * Process a new response if it hasn't been processed before. + * This is the main entry point for handling incoming messages, + * ensuring deduplication and updating metrics accordingly. + */ +export const addResponseCountIfUnique = (response: QueryResponse) => { + if (isThisResponseMissing(response)) { + addUniqueResponsesCounter(1); + } +}; diff --git a/packages/observer/src/telemetry/setup/setupSdk.ts b/packages/observer/src/telemetry/setup/setupSdk.ts new file mode 100644 index 00000000..d2558a14 --- /dev/null +++ b/packages/observer/src/telemetry/setup/setupSdk.ts @@ -0,0 +1,67 @@ +import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; +import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; +import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc'; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'; +import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; +import { Resource } from '@opentelemetry/resources'; +import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; +import { NodeSDK } from '@opentelemetry/sdk-node'; +import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; +import process from 'process'; + +import { PACKAGE_NAME } from '../../constants'; + +const otelTracingExport = process.env.TRACING_URL; +const otelMetricsExport = process.env.METRICS_URL; + +const exporterOptions = { + url: otelTracingExport, +} satisfies ConstructorParameters[0]; + +export const traceExporter = otelTracingExport + ? new OTLPTraceExporter(exporterOptions) + : undefined; + +export const metricReader = otelMetricsExport + ? new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporter({ + url: otelMetricsExport, + }), + exportIntervalMillis: 30_000, + }) + : undefined; + +const isLocal = process.env.IS_LOCAL === 'true'; + +const contextManager = new AsyncHooksContextManager(); +contextManager.enable(); + +export const sdk: NodeSDK = new NodeSDK({ + traceExporter, + metricReader: metricReader, + contextManager: contextManager, + instrumentations: [ + getNodeAutoInstrumentations({ + '@opentelemetry/instrumentation-fs': { + enabled: false, + }, + }), + new HttpInstrumentation({ + enabled: true, + }), + ], + resource: new Resource({ + [SemanticResourceAttributes.SERVICE_NAME]: isLocal + ? `${PACKAGE_NAME} (local)` + : PACKAGE_NAME, + }), +}); + +export const enabledObservabilityFeatures = { + tracing: !!traceExporter, + metrics: !!metricReader, +}; + +export const isAnyObservabilityFeatureEnabled = Object.values( + enabledObservabilityFeatures +).some((value) => value); diff --git a/packages/observer/src/telemetry/setup/startOpenTelemetry.ts b/packages/observer/src/telemetry/setup/startOpenTelemetry.ts new file mode 100644 index 00000000..ba194b26 --- /dev/null +++ b/packages/observer/src/telemetry/setup/startOpenTelemetry.ts @@ -0,0 +1,35 @@ +import { Logger } from '@streamr/utils'; +import process from 'process'; + +import { moduleFromMetaUrl } from '../../utils/moduleFromMetaUrl'; +import { enabledObservabilityFeatures, sdk } from './setupSdk'; + +const logger = new Logger(moduleFromMetaUrl(import.meta.url)); + +if (!enabledObservabilityFeatures.metrics) { + throw new Error( + 'Metrics exporting is not enabled, the observer should not be running' + ); +} + +// initialize the SDK and register with the OpenTelemetry API +// this enables the API to record telemetry +// We create errors here because this package is all about observing and collecting metrics +try { + sdk.start(); +} catch (err) { + logger.error('Error starting OpenTelemetry SDK'); + logger.error(String(err)); + process.exit(1); +} + +logger.info('Tracing initialized'); + +// gracefully shut down the SDK on process exit +process.on('SIGTERM', () => { + sdk + .shutdown() + .then(() => logger.info('Tracing terminated')) + .catch((error: Error) => logger.error('Error terminating tracing', error)) + .finally(() => process.exit(0)); +}); diff --git a/packages/observer/src/utils/moduleFromMetaUrl.ts b/packages/observer/src/utils/moduleFromMetaUrl.ts new file mode 100644 index 00000000..f66f90e2 --- /dev/null +++ b/packages/observer/src/utils/moduleFromMetaUrl.ts @@ -0,0 +1,10 @@ +import { fileURLToPath } from 'url'; + +/** + * Creates a NodeJS.Module object from a given meta URL. Used because + * at ESM modules, the module object is not available in the global scope. + */ +export const moduleFromMetaUrl = (url: string | undefined) => + ({ + id: fileURLToPath(url ?? ''), + }) as NodeJS.Module; diff --git a/packages/observer/tests/basic.test.ts b/packages/observer/tests/basic.test.ts new file mode 100644 index 00000000..e7857cca --- /dev/null +++ b/packages/observer/tests/basic.test.ts @@ -0,0 +1,26 @@ +import { expect, it } from 'vitest'; + +import config from '../configs/development-1.env.json'; +import { Config } from '../src/config/config'; +import { createObserver } from '../src/observer'; + +// todo spy on classes as identify if components start correctly + +it('is able to start', async () => { + const typesafeConfig = { + ...config, + } as Config; + + const observer = await createObserver(typesafeConfig); + + try { + await observer.start(); + } catch (error) { + console.error(error); + process.exit(1); + } + + const node = await observer.getNode(); + + expect(node.getNodeId()).toBe('0x5cbdd86a2fa8dc4bddd8a8f69dba48572eec07fb'); +}); From 24e8d861b1ad31bb64bcdcfa7295404600e72dd0 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 08:23:46 -0300 Subject: [PATCH 16/23] Add observer package to Dockerfile and configure it In this commit, the observer package is added to the Dockerfile.base from dev-network. This incorporate the observer functionality to the docker image. Necessary configuration files such as observer.json and Dockerfile.observer, a shell script start-in-docker.sh is also added to aid in starting the observer in a Docker container. This change was necessary to facilitate observer functionality in development phase. --- dev-network/Dockerfile.base | 1 + dev-network/Dockerfile.observer | 8 +++ dev-network/assets/observer/observer.json | 56 +++++++++++++++++++ .../assets/observer/start-in-docker.sh | 3 + 4 files changed, 68 insertions(+) create mode 100644 dev-network/Dockerfile.observer create mode 100644 dev-network/assets/observer/observer.json create mode 100755 dev-network/assets/observer/start-in-docker.sh diff --git a/dev-network/Dockerfile.base b/dev-network/Dockerfile.base index b4789585..cf915d02 100644 --- a/dev-network/Dockerfile.base +++ b/dev-network/Dockerfile.base @@ -27,6 +27,7 @@ COPY --chown=node:node ./packages/broker/package.json ./packages/broker/ COPY --chown=node:node ./packages/validator/package.json ./packages/validator/ COPY --chown=node:node ./packages/heartbeat/package.json ./packages/heartbeat/ COPY --chown=node:node ./packages/benchmarks/package.json ./packages/benchmarks/ +COPY --chown=node:node ./packages/observer/package.json ./packages/observer/ RUN pnpm install diff --git a/dev-network/Dockerfile.observer b/dev-network/Dockerfile.observer new file mode 100644 index 00000000..3214d4aa --- /dev/null +++ b/dev-network/Dockerfile.observer @@ -0,0 +1,8 @@ +FROM logstore-base + +USER root + +USER node +WORKDIR /home/node/logstore + +CMD [ "start-in-docker" ] diff --git a/dev-network/assets/observer/observer.json b/dev-network/assets/observer/observer.json new file mode 100644 index 00000000..562db7dc --- /dev/null +++ b/dev-network/assets/observer/observer.json @@ -0,0 +1,56 @@ +{ + "$schema": "https://schema.streamr.network/config-v2.schema.json", + "client": { + "logLevel": "trace", + "auth": { + "privateKey": "0x2222222222222222222222222222222222222222222222222222222222222222" + }, + "network": { + "id": "0x1563915e194d8cfba1943570603f7606a3115508", + "trackers": { + "contractAddress": "0xBFCF120a8fD17670536f1B27D9737B775b2FD4CF" + }, + "location": { + "latitude": 60.19, + "longitude": 24.95, + "country": "Finland", + "city": "Helsinki" + }, + "webrtcDisallowPrivateAddresses": false + }, + "contracts": { + "streamRegistryChainAddress": "0x6cCdd5d866ea766f6DF5965aA98DeCCD629ff222", + "streamStorageRegistryChainAddress": "0xd04af489677001444280366Dd0885B03dAaDe71D", + "storageNodeRegistryChainAddress": "0x231b810D98702782963472e1D60a25496999E75D", + "logStoreNodeManagerChainAddress": "0x85ac4C8E780eae81Dd538053D596E382495f7Db9", + "logStoreStoreManagerChainAddress": "0x8560200b8E7477FB09281A0566B50fa6E7a66a34", + "streamRegistryChainRPCs": { + "chainId": 8997, + "rpcs": [ + { + "url": "http://10.200.10.1:8546" + } + ] + }, + "mainChainRPCs": { + "chainId": 8995, + "rpcs": [ + { + "url": "http://10.200.10.1:8545" + } + ] + }, + "theGraphUrl": "http://10.200.10.1:8000/subgraphs/name/streamr-dev/network-contracts", + "logStoreTheGraphUrl": "http://10.200.10.1:8000/subgraphs/name/logstore-dev/network-contracts" + }, + "metrics": false + }, + "pool": { + "id": "0", + "url": "http://logstore-kyve:1317", + "pollInterval": 60000 + }, + "plugins": { + "observer": {} + } +} diff --git a/dev-network/assets/observer/start-in-docker.sh b/dev-network/assets/observer/start-in-docker.sh new file mode 100755 index 00000000..6feb7ff5 --- /dev/null +++ b/dev-network/assets/observer/start-in-docker.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +pnpm --filter @logsn/observer start:prod From 636c715e3aa4436b62f220cb3812adfaecc7a534 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 08:49:14 -0300 Subject: [PATCH 17/23] Add @opentelemetry/api to peerDependencies in protocol package.json --- packages/protocol/package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/protocol/package.json b/packages/protocol/package.json index a568053f..ff4ed255 100644 --- a/packages/protocol/package.json +++ b/packages/protocol/package.json @@ -39,5 +39,8 @@ "@ethersproject/bignumber": "^5.7.0", "@ethersproject/bytes": "^5.7.0", "@ethersproject/solidity": "^5.7.0" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.0.0" } } From c58f308298c0bada2363796c02435177ead925f8 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 09:10:03 -0300 Subject: [PATCH 18/23] Add observer config for dev-network and update .gitignore Updated the .gitignore file to ensure that '.env' files within the 'dev-network/assets' directory are no longer ignored by Git, allowing for development-specific environment variables to be tracked. In addition, a '.env.observer' file has been created within the 'dev-network/assets/observer' directory. This file contains settings related to log level and telemetry data collection, including a workaround to allow 'node-fetch' to call 'arweave.net' with a self-signed certificate. --- .gitignore | 1 + dev-network/assets/observer/.env.observer | 7 +++++++ 2 files changed, 8 insertions(+) create mode 100644 dev-network/assets/observer/.env.observer diff --git a/.gitignore b/.gitignore index aff0a06f..9fdb88f3 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,7 @@ yarn-error.log* !.env.example *.env .env* +!dev-network/assets/**/.env.* # Typescript tsconfig.tsbuildinfo diff --git a/dev-network/assets/observer/.env.observer b/dev-network/assets/observer/.env.observer new file mode 100644 index 00000000..2b9812ac --- /dev/null +++ b/dev-network/assets/observer/.env.observer @@ -0,0 +1,7 @@ +LOG_LEVEL=debug + +# node-fetch tweak to call arweave.net with self-signed certificate +# https://stackoverflow.com/a/52479399 +NODE_TLS_REJECT_UNAUTHORIZED=0 +METRICS_URL=http://opentelemetry-collector:4317 +TRACING_URL=http://opentelemetry-collector:4317 From bb6c40eb362840c13bbd4082fcc20c55f073b7d9 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 17:06:45 -0300 Subject: [PATCH 19/23] Change logLevel from 'trace' to 'debug' in observer.json --- dev-network/assets/observer/observer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-network/assets/observer/observer.json b/dev-network/assets/observer/observer.json index 562db7dc..0bc845ab 100644 --- a/dev-network/assets/observer/observer.json +++ b/dev-network/assets/observer/observer.json @@ -1,7 +1,7 @@ { "$schema": "https://schema.streamr.network/config-v2.schema.json", "client": { - "logLevel": "trace", + "logLevel": "debug", "auth": { "privateKey": "0x2222222222222222222222222222222222222222222222222222222222222222" }, From 4650c94ba4685918fa12111ad14fa72e48dcd5aa Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 17:07:16 -0300 Subject: [PATCH 20/23] Add Grafana, OpenTelemetry Collector to ports, new observer Added Grafana and OpenTelemetry Collector to the list of ports in ports.md. Grafana will enable better data visualization for Logstore, and OpenTelemetry Collector will enhance monitoring capabilities. Also, created observer.md with the observer information for Logstore DevNetwork. Updated the connect.sh script to ensure both new services can be accessed when SSH'ing into the server. --- dev-network/bin/scripts/connect.sh | 4 +++- dev-network/docs/observer.md | 6 ++++++ dev-network/docs/ports.md | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 dev-network/docs/observer.md diff --git a/dev-network/bin/scripts/connect.sh b/dev-network/bin/scripts/connect.sh index 6978971b..4b94d4da 100755 --- a/dev-network/bin/scripts/connect.sh +++ b/dev-network/bin/scripts/connect.sh @@ -13,14 +13,16 @@ Keep the script running. Hit [Ctrl+C] to abort. " -$SSH \ +sudo $SSH \ -o ServerAliveInterval=60 \ -N \ -L 80:localhost:80 \ -L 443:localhost:443 \ -L 1317:localhost:1317 \ -L 3000:localhost:3000 \ + -L 3300:localhost:3300 \ -L 4001:localhost:4001 \ + -L 4317:localhost:4317 \ -L 5432:localhost:5432 \ -L 7771:localhost:7771 \ -L 7772:localhost:7772 \ diff --git a/dev-network/docs/observer.md b/dev-network/docs/observer.md new file mode 100644 index 00000000..bb2c1b9e --- /dev/null +++ b/dev-network/docs/observer.md @@ -0,0 +1,6 @@ +# LogStore DevNetwork - Observer + +| Properby | Value | +| ----------- | ------------------------------------------------------------------ | +| Address | `0x1563915e194d8cfba1943570603f7606a3115508` | +| Private Key | `2222222222222222222222222222222222222222222222222222222222222222` | diff --git a/dev-network/docs/ports.md b/dev-network/docs/ports.md index c1cf8c6a..e6d1d49e 100644 --- a/dev-network/docs/ports.md +++ b/dev-network/docs/ports.md @@ -7,10 +7,12 @@ | 443 | Streamr APP | streamr-dev-nginx | Streamr | | 1317 | KYVE REST API | logstore-nginx | LogStore | | 1984 | Arweave | logstore-arweave | LogStore | +| 3300 | Grafana | logstore-grafana | Logstore | | 3306 | MySql | streamr-dev-mysql | Streamr | | 3333 | | streamr-dev-platform | Streamr | | 3334 | | streamr-dev-network-explorer | Streamr | | 4001 | | streamr-dev-stream-metrics-index | Streamr | +| 4317 | OpenTelemetry Collector | opentelemetry-collector | LogStore | | 5001 | | streamr-dev-ipfs | Streamr | | 5432 | | streamr-dev-postgres | Streamr | | 6379 | Redis | streamr-dev-redis | Streamr | From d04d4be3de6748439032203051b4e9c77d11af67 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 17:08:10 -0300 Subject: [PATCH 21/23] Update config loading mechanism and development keys A CONFIG_PATH environment variable was introduced to allow customization of the configuration file path. Now, the system will check for this variable and use the corresponding file if it's available, otherwise, it will fall back to the default path. This provides flexibility in configurations for different deployment scenarios. Also, we updated the privateKey and network id within the development-1.env.json. --- packages/observer/configs/development-1.env.json | 4 ++-- packages/observer/scripts/start-prod-server.ts | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/observer/configs/development-1.env.json b/packages/observer/configs/development-1.env.json index 71d3f121..756b05c8 100644 --- a/packages/observer/configs/development-1.env.json +++ b/packages/observer/configs/development-1.env.json @@ -3,10 +3,10 @@ "client": { "logLevel": "trace", "auth": { - "privateKey": "0x3333333333333333333333333333333333333333333333333333333333333333" + "privateKey": "0x2222222222222222222222222222222222222222222222222222222222222222" }, "network": { - "id": "0x5cbdd86a2fa8dc4bddd8a8f69dba48572eec07fb", + "id": "0x1563915e194d8cfba1943570603f7606a3115508", "trackers": [ { "id": "0xb9e7cEBF7b03AE26458E32a059488386b05798e8", diff --git a/packages/observer/scripts/start-prod-server.ts b/packages/observer/scripts/start-prod-server.ts index 35fcfdc1..4510e57a 100644 --- a/packages/observer/scripts/start-prod-server.ts +++ b/packages/observer/scripts/start-prod-server.ts @@ -9,9 +9,11 @@ import { createObserver } from '../src/observer'; const homeDir = os.homedir(); const configFilename = 'observer.json'; -const fullConfigPath = `${homeDir}/.logstore/config/${configFilename}`; +const defaultConfigPath = `${homeDir}/.logstore/config/${configFilename}`; -const content = JSON.parse(fs.readFileSync(fullConfigPath, 'utf8')) as +const configPath = process.env.CONFIG_PATH ?? defaultConfigPath; + +const content = JSON.parse(fs.readFileSync(configPath, 'utf8')) as | Config | undefined; From 9425aa115f7ffd0447ea60933b0cba8a474b0b9c Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 17:08:30 -0300 Subject: [PATCH 22/23] Add README.md for Observer package Introduced a README markdown file for the 'Observer' package. This documentation provides an overview, key features, quick start guide, environment variables, configuration, and deployment steps for the package. The purpose of this addition is to aid users in understanding and working with the Observer package, which is central to inspecting network activities and collecting telemetry data. --- packages/observer/README.md | 47 +++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 packages/observer/README.md diff --git a/packages/observer/README.md b/packages/observer/README.md new file mode 100644 index 00000000..194e228c --- /dev/null +++ b/packages/observer/README.md @@ -0,0 +1,47 @@ +# @logsn/observer + +## **Overview** + +The Observer package is responsible for inspecting network activities and collecting telemetry data. It passively listens to network streams, gathers metrics, and forwards them to the Open Telemetry Collector for further processing and visualization via Grafana. + +## **Key Features** + +- **System Message Inspection**: Monitors and logs system messages like **`QueryRequest`**. +- **Bundle Reports**: Aggregates and summarizes bundle reports across the network. +- **High Configurability**: Similar configuration steps to the broker's setup. +- **Secure**: Future-proofed with private key configurations. + +## **Quick Start** + +### **Running Observer** + +- **Development Mode** + +```bash +pnpm run start:dev +``` + +- **Production Mode** + +```bash +pnpm run start:prod +``` + + +### **Environment Variables** + +For custom configurations, you can set environment variables in a **`.env`** file within the Observer directory. + +- **`CONFIG_PATH`**: Path to the Observer configuration file. (Default: **`~/.logstore/config/observer.json`**) +- **`TRACING_URL`**: URL for tracing data. +- **`METRICS_URL`**: URL for metrics data. + +## **Configuration** + +See the example configuration **[here](https://github.com/usherlabs/logstore/blob/main/dev-network/assets/observer/observer.json)**. + +## **Deployment** + +- Check the devnetwork **`docker-compose.yml`** file to inspect the Observer service configuration as an example. +- Ensure the necessary telemetry pipeline is properly configured and running for metrics and tracing. E.g., OpenTelemetry Collector, Prometheus, Tempo and Grafana. +- Always ensure your environment variables are correctly set for the deployment context (Dev or Prod). From feb13028c3d68247667a299e9d319ebe5d3eeb15 Mon Sep 17 00:00:00 2001 From: Raffael Campos Date: Tue, 3 Oct 2023 17:11:03 -0300 Subject: [PATCH 23/23] revert accidental command change on dev-network connect.sh --- dev-network/bin/scripts/connect.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-network/bin/scripts/connect.sh b/dev-network/bin/scripts/connect.sh index 4b94d4da..bae97fdf 100755 --- a/dev-network/bin/scripts/connect.sh +++ b/dev-network/bin/scripts/connect.sh @@ -13,7 +13,7 @@ Keep the script running. Hit [Ctrl+C] to abort. " -sudo $SSH \ +$SSH \ -o ServerAliveInterval=60 \ -N \ -L 80:localhost:80 \