Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ The `toString` method uses `toHex`.
| `pssSend` | `POST /pss/send/:topic/:target` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1send~1%7Btopic%7D~1%7Btargets%7D/post) | ❌✅✅ |
| `pssSubscribe` _Websocket_ | `GET /pss/subscribe/:topic` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1subscribe~1%7Btopic%7D/get) | ❌❌✅ |
| `pssReceive` | `GET /pss/subscribe/:topic` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1subscribe~1%7Btopic%7D/get) | ❌❌✅ |
| `gsocSubscribe` _WebSocket_ | `GET /gsoc/subscribe/:address` | ❌❌✅ |
| `pubsubConnect` _WebSocket_ | `GET /pubsub/:topicAddress` | ❌❌✅ |
| `listPubsubTopics` | `GET /pubsub/` | ❌❌✅ |
| `getPostageBatches` | `GET /stamps` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1stamps/get) | ❌✅✅ |
| `getGlobalPostageBatches` | `GET /batches` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1batches/get) | ❌✅✅ |
| `getPostageBatch` | `GET /stamps/:batchId` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1stamps~1%7Bbatch_id%7D/get) | ❌✅✅ |
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
],
"scripts": {
"prepublishOnly": "NODE_ENV=production npm run build",
"prepare": "npm run build",
"build": "rimraf dist && npm run build:node && npm run build:types && npm run build:browser",
"build:node": "tsc -p tsconfig.json && tsc -p tsconfig-mjs.json && ./build-fixup && babel --plugins \"babel-plugin-add-import-extension\" --out-dir dist/mjs/ dist/mjs/",
"build:types": "tsc --emitDeclarationOnly --declaration --outDir dist/types",
Expand Down
144 changes: 144 additions & 0 deletions src/bee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import * as grantee from './modules/grantee'
import * as gsoc from './modules/gsoc'
import * as pinning from './modules/pinning'
import * as pubsub from './modules/pubsub'
import type { PubsubModeParams } from './modules/pubsub'
import * as pss from './modules/pss'
import { rchash } from './modules/rchash'
import * as status from './modules/status'
Expand Down Expand Up @@ -56,6 +58,10 @@
GsocMessageHandler,
GsocSubscription,
Health,
PubsubMessageHandler,
PubsubMode,
PubsubSubscription,
PubsubTopicListResponse,
LastCashoutActionResponse,
LastChequesForPeerResponse,
LastChequesResponse,
Expand Down Expand Up @@ -1155,7 +1161,7 @@
for (let i = 0n; i < 0xffffn; i++) {
const signer = new PrivateKey(Binary.numberToUint256(start + i, 'BE'))
const socAddress = makeSOCAddress(identifier, signer.publicKey().address())
// TODO: test the significance of the hardcoded 256

Check warning on line 1164 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: test the significance of the...'
const actualProximity = 256 - Binary.proximity(socAddress.toUint8Array(), targetOverlay.toUint8Array())

if (actualProximity <= 256 - proximity) {
Expand Down Expand Up @@ -1287,6 +1293,144 @@
return subscription
}

/**
* Connects to a pubsub topic via WebSocket, acting as a publisher (read + write).
*
* The mode enum and its constructor arguments are passed directly
*
* @param mode Pubsub mode enum value (e.g. `PubsubMode.GSOC_EPHEMERAL`)
* @param handler Message handler with `onMessage`, `onError`, `onClose` callbacks
* @param brokerPeer Multiaddress of the broker peer to connect to
* @param modeParams Constructor arguments for the selected mode (topic, optional params)
* @returns A {@link PubsubSubscription} with `send(payload)` and `cancel()` methods
*/
pubsubConnect<M extends PubsubMode>(
mode: M,
handler: PubsubMessageHandler,
brokerPeer: string,
...modeParams: PubsubModeParams[M]
): PubsubSubscription {
const modeInstance = pubsub.createPubsubMode(mode, ...modeParams)

const ws = pubsub.connect(
this.url,
modeInstance.topicAddress,
brokerPeer,
modeInstance.getPublisherHeaders() ?? undefined,
this.requestOptions.headers,
)
// Ensure binary frames are delivered as ArrayBuffer (not Blob) in browser environments.
// prepareWebsocketData handles ArrayBuffer but not Blob.
ws.binaryType = 'arraybuffer'

const PING_INTERVAL_MS = 50_000
let pingTimer: ReturnType<typeof setInterval> | null = null

const startPing = () => {
if (typeof ws.ping === 'function') {
pingTimer = setInterval(() => {
try {
ws.ping()
} catch {
// ignore errors on closed sockets
}
}, PING_INTERVAL_MS)
}
}

const stopPing = () => {
if (pingTimer !== null) {
clearInterval(pingTimer)
pingTimer = null
}
}

let cancelled = false
const cancel = () => {
if (!cancelled) {
cancelled = true
stopPing()

if (ws.terminate) {
ws.terminate()
} else {
ws.close()
}
}
}

let ready = false
const sendQueue: Uint8Array[] = []

const flushQueue = () => {
ready = true

for (const msg of sendQueue) {
ws.send(msg)
}
sendQueue.length = 0
}

const subscription: PubsubSubscription = {
cancel,
send: async (payload: Uint8Array | string): Promise<void> => {
const encoded = await modeInstance.encodeMessage(payload)

if (ready) {
ws.send(encoded)
} else {
sendQueue.push(encoded)
}
},
}

ws.onopen = () => {
if (cancelled) {
ws.close()

return
}

startPing()
flushQueue()

if (handler.onOpen) {
handler.onOpen(subscription)
}
}

ws.onmessage = async event => {
const data = await prepareWebsocketData(event.data)

if (data.length) {
handler.onMessage(modeInstance.decodeMessage(data), subscription)
}
}
ws.onerror = event => {
if (!cancelled) {
handler.onError(new BeeError(event.message), subscription)
}
}
ws.onclose = () => {
stopPing()

if (!cancelled) {
handler.onClose(subscription)
}
}

return subscription
}

/**
* Lists all active pubsub topics this node is participating in.
*
* @param requestOptions Options for making requests, such as timeouts, custom HTTP agents, headers, etc.
*/
async listPubsubTopics(requestOptions?: BeeRequestOptions): Promise<PubsubTopicListResponse> {
return pubsub.listTopics(this.getRequestOptionsForCall(requestOptions))
}

/**
* Creates a feed manifest chunk and returns the reference to it.
*
Expand Down Expand Up @@ -1845,7 +1989,7 @@
gasPrice?: NumberString | string | bigint,
requestOptions?: BeeRequestOptions,
): Promise<TransactionId> {
// TODO: check BZZ in tests

Check warning on line 1992 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: check BZZ in tests'
const amountString =
amount instanceof BZZ ? amount.toPLURString() : asNumberString(amount, { min: 1n, name: 'amount' })

Expand Down Expand Up @@ -2520,7 +2664,7 @@
* @deprecated Use `getPostageBatches` instead
*/
async getAllPostageBatch(requestOptions?: BeeRequestOptions): Promise<PostageBatch[]> {
return stamps.getAllPostageBatches(this.getRequestOptionsForCall(requestOptions)) // TODO: remove in June 2025

Check warning on line 2667 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: remove in June 2025'
}

/**
Expand All @@ -2531,7 +2675,7 @@
* @deprecated Use `getGlobalPostageBatches` instead
*/
async getAllGlobalPostageBatch(requestOptions?: BeeRequestOptions): Promise<GlobalPostageBatch[]> {
return stamps.getGlobalPostageBatches(this.getRequestOptionsForCall(requestOptions)) // TODO: remove in June 2025

Check warning on line 2678 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: remove in June 2025'
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Stamper } from './stamper/stamper'
export { MerkleTree } from 'cafe-utility'
export type { Chunk } from './chunk/cac'
export type { SingleOwnerChunk } from './chunk/soc'
export { GsocEphemeralMode, createPubsubMode } from './modules/pubsub'
export type { PubsubModeParams, PubsubModeInstance } from './modules/pubsub'
export { MantarayNode } from './manifest/manifest'
export { SUPPORTED_BEE_VERSION, SUPPORTED_BEE_VERSION_EXACT } from './modules/debug/status'
export * from './types'
Expand Down
13 changes: 4 additions & 9 deletions src/modules/gsoc.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { System } from 'cafe-utility'
import WebSocket from 'isomorphic-ws'
import { BeeRequestOptions, UploadOptions } from '..'
import { SingleOwnerChunk, uploadSingleOwnerChunk } from '../chunk/soc'
import { BatchId, Reference } from '../utils/typed-bytes'
import { prepareWebsocketConnection } from '../utils/data'

const endpoint = 'gsoc'

Expand All @@ -15,14 +15,9 @@ export async function send(
return uploadSingleOwnerChunk(requestOptions, soc, stamp, options)
}

export function subscribe(url: string, reference: Reference, headers?: Record<string, string>) {
export function subscribe(url: string, reference: Reference, headers?: Record<string, string>): WebSocket {
const wsUrl = url.replace(/^http/i, 'ws')
const wsUrlWithParams = `${wsUrl}/${endpoint}/subscribe/${reference.toHex()}`

if (System.whereAmI() === 'browser') {
return new WebSocket(`${wsUrl}/${endpoint}/subscribe/${reference.toHex()}`)
}

return new WebSocket(`${wsUrl}/${endpoint}/subscribe/${reference.toHex()}`, {
headers,
})
return prepareWebsocketConnection(wsUrlWithParams, headers)
}
11 changes: 3 additions & 8 deletions src/modules/pss.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { System } from 'cafe-utility'
import WebSocket from 'isomorphic-ws'
import type { BeeRequestOptions } from '../types'
import { prepareRequestHeaders } from '../utils/headers'
import { http } from '../utils/http'
import { BatchId, PublicKey, Topic } from '../utils/typed-bytes'
import { prepareWebsocketConnection } from '../utils/data'

const endpoint = 'pss'

Expand Down Expand Up @@ -44,12 +44,7 @@ export async function send(
*/
export function subscribe(url: string, topic: Topic, headers?: Record<string, string>): WebSocket {
const wsUrl = url.replace(/^http/i, 'ws')
const wsUrlWithParams = `${wsUrl}/${endpoint}/subscribe/${topic.toHex()}`

if (System.whereAmI() === 'browser') {
return new WebSocket(`${wsUrl}/${endpoint}/subscribe/${topic.toHex()}`)
}

return new WebSocket(`${wsUrl}/${endpoint}/subscribe/${topic.toHex()}`, {
headers,
})
return prepareWebsocketConnection(wsUrlWithParams, headers)
}
Loading
Loading