diff --git a/src/@types/Provider.ts b/src/@types/Provider.ts index 4db6aec3e..cd71db8ad 100644 --- a/src/@types/Provider.ts +++ b/src/@types/Provider.ts @@ -1,3 +1,6 @@ +import { type Multiaddr } from '@multiformats/multiaddr' +import { Signer } from 'ethers' +import type { PeerId } from '@libp2p/interface' import type { AccessList } from './AccessList.js' export interface ProviderFees { providerFeeAddress: string @@ -44,6 +47,12 @@ export interface ServiceEndpoint { urlPath: string } +export interface NodeP2P { + nodeId: string + multiaddress?: Multiaddr[] +} +export type OceanNode = string | NodeP2P | PeerId + export interface NodeStatusProvider { chainId: string network: string @@ -155,3 +164,11 @@ export interface NodeLogEntry { message: string meta?: Record } + +export interface CompleteSignature { + consumerAddress: string + nonce: string + signature: string +} + +export type SignerOrAuthTokenOrSignature = string | Signer | CompleteSignature diff --git a/src/services/providers/BaseProvider.ts b/src/services/providers/BaseProvider.ts index 87086df1c..3d9b7d6f3 100644 --- a/src/services/providers/BaseProvider.ts +++ b/src/services/providers/BaseProvider.ts @@ -1,5 +1,5 @@ import { peerIdFromString } from '@libp2p/peer-id' -import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' +import { multiaddr } from '@multiformats/multiaddr' import { Signer } from 'ethers' import { StorageObject, @@ -22,14 +22,17 @@ import { DownloadResponse, NodeStatus, NodeComputeJob, - NodeLogsParams, NodeLogEntry, PersistentStorageAccessList, PersistentStorageBucket, PersistentStorageCreateBucketRequest, PersistentStorageDeleteFileResponse, PersistentStorageFileEntry, - PersistentStorageObject + PersistentStorageObject, + OceanNode, + NodeP2P, + CompleteSignature, + SignerOrAuthTokenOrSignature } from '../../@types/index.js' import { type DDO, type ValidateMetadata } from '@oceanprotocol/ddo-js' import { decodeJwt } from '../../utils/Jwt.js' @@ -40,56 +43,103 @@ import { P2pProvider, type P2PConfig, type P2PRequestBodyStream } from './P2pPro export { OCEAN_P2P_PROTOCOL, type P2PConfig } from './P2pProvider.js' export async function getConsumerAddress( - signerOrAuthToken: Signer | string + signerOrAuthToken: SignerOrAuthTokenOrSignature ): Promise { - return typeof signerOrAuthToken === 'string' - ? decodeJwt(signerOrAuthToken).address - : signerOrAuthToken.getAddress() + if (isAgentSignature(signerOrAuthToken)) return signerOrAuthToken.consumerAddress + if (typeof signerOrAuthToken === 'string') return decodeJwt(signerOrAuthToken).address + + return signerOrAuthToken.getAddress() } export async function getSignature( - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, nonce: string, command: string ): Promise { - if (typeof signerOrAuthToken === 'string') return null + if (typeof signerOrAuthToken === 'string') { + return null + } + if (isAgentSignature(signerOrAuthToken)) { + return signerOrAuthToken.signature + } const message = String( String(await signerOrAuthToken.getAddress()) + String(nonce) + String(command) ) return signRequest(signerOrAuthToken, message) } -export function getAuthorization(signerOrAuthToken: Signer | string): string | undefined { +export function getAuthorization( + signerOrAuthToken: SignerOrAuthTokenOrSignature +): string | undefined { return typeof signerOrAuthToken === 'string' ? signerOrAuthToken : undefined } -export function isP2pUri(nodeUri: string | Multiaddr[]): boolean { - if (Array.isArray(nodeUri)) return true - if (!nodeUri) return false +export function isAgentSignature(v: unknown): v is CompleteSignature { + return ( + !!v && + typeof v === 'object' && + typeof (v as any).consumerAddress === 'string' && + typeof (v as any).nonce === 'string' && + typeof (v as any).signature === 'string' + ) +} + +function isPeerIdOrMultiAddr(param: string) { try { - multiaddr(nodeUri) + multiaddr(param) return true } catch {} try { - peerIdFromString(nodeUri) + peerIdFromString(param) return true } catch { return false } } +export function isP2pUri(node: OceanNode): boolean { + if (!node) return false + if (typeof node === 'string') { + return isPeerIdOrMultiAddr(node) + } + + // NodeP2P -> p2p + if (typeof node === 'object' && ('nodeId' in node || 'multiaddress' in node)) { + const nodeP2p = node as NodeP2P + if (Array.isArray(nodeP2p.multiaddress) && nodeP2p.multiaddress.length > 0) + return true + if (nodeP2p.nodeId) { + return isPeerIdOrMultiAddr(nodeP2p.nodeId) + } + } + + // PeerId (libp2p) -> p2p + if (typeof node === 'object' && typeof (node as any).toString === 'function') { + const s = String((node as any).toString()) + try { + peerIdFromString(s) + return true + } catch { + return false + } + } + return false +} export class BaseProvider { private httpProvider = new HttpProvider() private p2pProvider = new P2pProvider() // eslint-disable-next-line @typescript-eslint/no-explicit-any - protected getImpl(nodeUri: string | Multiaddr[]): any { - if (Array.isArray(nodeUri)) return this.p2pProvider - return isP2pUri(nodeUri) ? this.p2pProvider : this.httpProvider + protected getImpl(node: OceanNode): any { + return isP2pUri(node) ? this.p2pProvider : this.httpProvider + } + + public getP2PProvider() { + return this.p2pProvider } public async getNonce( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, consumerAddress: string, signal?: AbortSignal, providerEndpoints?: any, @@ -107,8 +157,8 @@ export class BaseProvider { public async encrypt( data: any, chainId: number, - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, policyServer?: any, signal?: AbortSignal ): Promise { @@ -125,7 +175,7 @@ export class BaseProvider { public async checkDidFiles( did: string, serviceId: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, withChecksum: boolean = false, signal?: AbortSignal ): Promise { @@ -140,7 +190,7 @@ export class BaseProvider { public async getFileInfo( file: StorageObject, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, withChecksum: boolean = false, signal?: AbortSignal ): Promise { @@ -148,7 +198,7 @@ export class BaseProvider { } public async getComputeEnvironments( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { return this.getImpl(nodeUri).getComputeEnvironments(nodeUri, signal) @@ -159,7 +209,7 @@ export class BaseProvider { serviceId: string, fileIndex: number, consumerAddress: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal, userCustomParameters?: UserCustomParameters, computeEnv?: string, @@ -184,7 +234,7 @@ export class BaseProvider { computeEnv: string, token: string, validUntil: number, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, consumerAddress: string, resources: ComputeResourceRequest[], chainId: number, @@ -217,8 +267,8 @@ export class BaseProvider { serviceId: string, fileIndex: number, transferTxId: string, - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, policyServer?: any, userCustomParameters?: UserCustomParameters ): Promise { @@ -235,8 +285,8 @@ export class BaseProvider { } public async computeStart( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, computeEnv: string, datasets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -273,8 +323,8 @@ export class BaseProvider { } public async freeComputeStart( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, computeEnv: string, datasets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -305,8 +355,8 @@ export class BaseProvider { } public async computeStreamableLogs( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, signal?: AbortSignal ): Promise { @@ -320,8 +370,8 @@ export class BaseProvider { public async computeStop( jobId: string, - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, agreementId?: string, signal?: AbortSignal ): Promise { @@ -335,8 +385,8 @@ export class BaseProvider { } public async computeStatus( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId?: string, agreementId?: string, signal?: AbortSignal @@ -351,8 +401,8 @@ export class BaseProvider { } public async getComputeResultUrl( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, index: number ): Promise { @@ -365,8 +415,8 @@ export class BaseProvider { } public async getComputeResult( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, index: number, offset: number = 0 @@ -382,7 +432,7 @@ export class BaseProvider { public async generateAuthToken( consumer: Signer, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { return this.getImpl(nodeUri).generateAuthToken(consumer, nodeUri, signal) @@ -392,7 +442,7 @@ export class BaseProvider { address: string, signature: string, nonce: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { return this.p2pProvider.generateSignedAuthToken( @@ -407,14 +457,14 @@ export class BaseProvider { public async invalidateAuthToken( consumer: Signer, token: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise<{ success: boolean }> { return this.getImpl(nodeUri).invalidateAuthToken(consumer, token, nodeUri, signal) } public async resolveDdo( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, did: string, signal?: AbortSignal ): Promise { @@ -422,23 +472,20 @@ export class BaseProvider { } public async validateDdo( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, ddo: DDO, - signer: Signer, + signerOrAuthToken: SignerOrAuthTokenOrSignature, signal?: AbortSignal ): Promise { - return this.getImpl(nodeUri).validateDdo(nodeUri, ddo, signer, signal) + return this.getImpl(nodeUri).validateDdo(nodeUri, ddo, signerOrAuthToken, signal) } - public async isValidProvider( - url: string | Multiaddr[], - signal?: AbortSignal - ): Promise { + public async isValidProvider(url: OceanNode, signal?: AbortSignal): Promise { return this.getImpl(url).isValidProvider(url, signal) } public async PolicyServerPassthrough( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, request: PolicyServerPassthroughCommand, signal?: AbortSignal ): Promise { @@ -446,7 +493,7 @@ export class BaseProvider { } public async initializePSVerification( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, request: PolicyServerInitializeCommand, signal?: AbortSignal ): Promise { @@ -454,8 +501,8 @@ export class BaseProvider { } public async downloadNodeLogs( - nodeUri: string | Multiaddr[], - signer: Signer, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, startTime: string, endTime: string, maxLogs?: number, @@ -466,7 +513,7 @@ export class BaseProvider { ): Promise { return this.getImpl(nodeUri).downloadNodeLogs( nodeUri, - signer, + signerOrAuthToken, startTime, endTime, maxLogs, @@ -478,14 +525,14 @@ export class BaseProvider { } public async getNodeStatus( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { return this.getImpl(nodeUri).getNodeStatus(nodeUri, signal) } public async getNodeJobs( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, fromTimestamp?: number, signal?: AbortSignal ): Promise { @@ -510,23 +557,9 @@ export class BaseProvider { return this.p2pProvider.getMultiaddrFromPeerId(peerId) } - /** - * Fetch node logs via P2P with a pre-signed payload. - * For auto-signed log fetching (HTTP or P2P), use downloadNodeLogs(). - */ - public async fetchNodeLogs( - nodeUri: string | Multiaddr[], - address: string, - signature: string, - nonce: string, - logParams?: NodeLogsParams - ): Promise { - return this.p2pProvider.fetchNodeLogs(nodeUri, address, signature, nonce, logParams) - } - public async createPersistentStorageBucket( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, payload: PersistentStorageCreateBucketRequest, signal?: AbortSignal ): Promise<{ @@ -543,8 +576,8 @@ export class BaseProvider { } public async getPersistentStorageBuckets( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, owner: string, signal?: AbortSignal ): Promise { @@ -557,8 +590,8 @@ export class BaseProvider { } public async listPersistentStorageFiles( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, signal?: AbortSignal ): Promise { @@ -571,8 +604,8 @@ export class BaseProvider { } public async getPersistentStorageFileObject( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, signal?: AbortSignal @@ -587,8 +620,8 @@ export class BaseProvider { } public async uploadPersistentStorageFile( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, content: P2PRequestBodyStream, @@ -605,8 +638,8 @@ export class BaseProvider { } public async deletePersistentStorageFile( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, signal?: AbortSignal @@ -621,14 +654,14 @@ export class BaseProvider { } public async fetchConfig( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, payload: Record ): Promise { return this.p2pProvider.fetchConfig(nodeUri, payload) } public async pushConfig( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, payload: Record ): Promise { return this.p2pProvider.pushConfig(nodeUri, payload) diff --git a/src/services/providers/HttpProvider.ts b/src/services/providers/HttpProvider.ts index cff27d275..03015df3d 100644 --- a/src/services/providers/HttpProvider.ts +++ b/src/services/providers/HttpProvider.ts @@ -27,40 +27,54 @@ import { PersistentStorageCreateBucketRequest, PersistentStorageDeleteFileResponse, PersistentStorageFileEntry, - PersistentStorageObject + PersistentStorageObject, + SignerOrAuthTokenOrSignature, + CompleteSignature } from '../../@types/index.js' import { PROTOCOL_COMMANDS } from '../../@types/Provider.js' import { type DDO, type ValidateMetadata } from '@oceanprotocol/ddo-js' import { eciesencrypt } from '../../utils/eciesencrypt.js' import { signRequest } from '../../utils/SignatureUtils.js' -import { getConsumerAddress, getSignature, getAuthorization } from './BaseProvider.js' +import { + getConsumerAddress, + getSignature, + getAuthorization, + isAgentSignature +} from './BaseProvider.js' import { type P2PRequestBodyStream } from './P2pProvider.js' export class HttpProvider { - protected getConsumerAddress(s: Signer | string) { - return getConsumerAddress(s) - } - - protected getSignature(s: Signer | string, nonce: string, command: string) { - return getSignature(s, nonce, command) - } - - protected getAuthorization(s: Signer | string) { + protected getAuthorization(s: SignerOrAuthTokenOrSignature) { return getAuthorization(s) } - private async getPersistentStorageSignaturePayload( + private async getSignedCommandParams( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, command: string, - signal?: AbortSignal - ): Promise<{} | { consumerAddress: string; nonce: string; signature: string }> { + signal?: AbortSignal, + providerEndpoints?: any, + serviceEndpoints?: any + ): Promise { + if (isAgentSignature(signerOrAuthToken)) { + return { + consumerAddress: signerOrAuthToken.consumerAddress, + nonce: signerOrAuthToken.nonce, + signature: signerOrAuthToken.signature + } + } if (typeof signerOrAuthToken === 'string') { - return {} + return { + consumerAddress: await getConsumerAddress(signerOrAuthToken), + nonce: undefined, + signature: undefined + } } - const providerEndpoints = await this.getEndpoints(nodeUri) - const serviceEndpoints = await this.getServiceEndpoints(nodeUri, providerEndpoints) - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) + if (!providerEndpoints) providerEndpoints = await this.getEndpoints(nodeUri) + if (!serviceEndpoints) + serviceEndpoints = await this.getServiceEndpoints(nodeUri, providerEndpoints) + + const consumerAddress = await getConsumerAddress(signerOrAuthToken) const nonce = ( (await this.getNonce( nodeUri, @@ -70,7 +84,7 @@ export class HttpProvider { serviceEndpoints )) + 1 ).toString() - const signature = await this.getSignature(signerOrAuthToken, nonce, command) + const signature = await getSignature(signerOrAuthToken, nonce, command) if (!signature) throw new Error('Could not sign persistent storage request.') return { consumerAddress, nonce, signature } } @@ -251,26 +265,19 @@ export class HttpProvider { data: any, chainId: number, nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, policyServer?: any, signal?: AbortSignal ): Promise { const providerEndpoints = await this.getEndpoints(nodeUri) const serviceEndpoints = await this.getServiceEndpoints(nodeUri, providerEndpoints) - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - signal, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.ENCRYPT + PROTOCOL_COMMANDS.ENCRYPT, + signal, + providerEndpoints, + serviceEndpoints ) let path = @@ -278,9 +285,9 @@ export class HttpProvider { ? this.getEndpointURL(serviceEndpoints, 'encrypt').urlPath : null) + `?chainId=${chainId}` if (!path) return null - path += `&nonce=${nonce}` - path += `&consumerAddress=${consumerAddress}` - path += `&signature=${signature}` + if (nonce) path += `&nonce=${nonce}` + if (consumerAddress) path += `&consumerAddress=${consumerAddress}` + if (signature) path += `&signature=${signature}` try { const response = await fetch(path, { @@ -625,7 +632,7 @@ export class HttpProvider { fileIndex: number, transferTxId: string, nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, policyServer?: any, userCustomParameters?: UserCustomParameters ): Promise { @@ -635,34 +642,26 @@ export class HttpProvider { ? this.getEndpointURL(serviceEndpoints, 'download').urlPath : null if (!downloadUrl) return null - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - null, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.DOWNLOAD + PROTOCOL_COMMANDS.DOWNLOAD, + undefined, + providerEndpoints, + serviceEndpoints ) let consumeUrl = downloadUrl consumeUrl += `?fileIndex=${fileIndex}` consumeUrl += `&documentId=${did}` consumeUrl += `&transferTxId=${transferTxId}` consumeUrl += `&serviceId=${serviceId}` - consumeUrl += `&consumerAddress=${consumerAddress}` - consumeUrl += `&nonce=${nonce}` + if (consumerAddress) consumeUrl += `&consumerAddress=${consumerAddress}` + if (nonce) consumeUrl += `&nonce=${nonce}` + if (signature) consumeUrl += `&signature=${signature}` if (policyServer) { consumeUrl += '&policyServer=' + encodeURI(JSON.stringify(policyServer)) } - consumeUrl += `&signature=${signature}` if (userCustomParameters) consumeUrl += '&userdata=' + encodeURI(JSON.stringify(userCustomParameters)) return consumeUrl @@ -688,7 +687,7 @@ export class HttpProvider { */ public async computeStart( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, computeEnv: string, datasets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -718,20 +717,13 @@ export class HttpProvider { return null } - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - signal, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_START + PROTOCOL_COMMANDS.COMPUTE_START, + signal, + providerEndpoints, + serviceEndpoints ) const payload = Object() payload.consumerAddress = consumerAddress @@ -774,12 +766,13 @@ export class HttpProvider { if (queueMaxWaitTime) payload.queueMaxWaitTime = queueMaxWaitTime let response try { + const authHeader = this.getAuthorization(signerOrAuthToken) response = await fetch(computeStartUrl, { method: 'POST', body: JSON.stringify(payload), headers: { 'Content-Type': 'application/json', - Authorization: this.getAuthorization(signerOrAuthToken) + ...(authHeader ? { Authorization: authHeader } : {}) }, signal }) @@ -821,7 +814,7 @@ export class HttpProvider { */ public async freeComputeStart( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, computeEnv: string, datasets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -848,21 +841,13 @@ export class HttpProvider { return null } - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - signal, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.FREE_COMPUTE_START + PROTOCOL_COMMANDS.FREE_COMPUTE_START, + signal, + providerEndpoints, + serviceEndpoints ) const payload = Object() payload.consumerAddress = consumerAddress @@ -897,12 +882,13 @@ export class HttpProvider { if (queueMaxWaitTime) payload.queueMaxWaitTime = queueMaxWaitTime let response try { + const authHeader = this.getAuthorization(signerOrAuthToken) response = await fetch(computeStartUrl, { method: 'POST', body: JSON.stringify(payload), headers: { 'Content-Type': 'application/json', - Authorization: this.getAuthorization(signerOrAuthToken) + ...(authHeader ? { Authorization: authHeader } : {}) }, signal }) @@ -936,11 +922,10 @@ export class HttpProvider { */ public async computeStreamableLogs( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, signal?: AbortSignal ): Promise { - const isAuthToken = typeof signerOrAuthToken === 'string' const providerEndpoints = await this.getEndpoints(nodeUri) const serviceEndpoints = await this.getServiceEndpoints(nodeUri, providerEndpoints) @@ -957,36 +942,28 @@ export class HttpProvider { ) return null } - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - signal, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, + signerOrAuthToken, + PROTOCOL_COMMANDS.COMPUTE_GET_STREAMABLE_LOGS, + signal, + providerEndpoints, + serviceEndpoints + ) - let url = `?consumerAddress=${consumerAddress}&jobId=${jobId}` - // Is signer, add signature and nonce - if (!isAuthToken) { - const signature = await this.getSignature( - signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_GET_STREAMABLE_LOGS - ) - url += `&signature=${signature}` - url += `&nonce=${nonce}` - } + let url = `?jobId=${jobId}` + if (consumerAddress) url += `&consumerAddress=${consumerAddress}` + if (signature) url += `&signature=${signature}` + if (nonce) url += `&nonce=${nonce}` let response try { + const authHeader = this.getAuthorization(signerOrAuthToken) response = await fetch(computeStreamableLogs + url, { method: 'GET', headers: { 'Content-Type': 'application/json', - Authorization: this.getAuthorization(signerOrAuthToken) + ...(authHeader ? { Authorization: authHeader } : {}) }, signal }) @@ -1018,41 +995,29 @@ export class HttpProvider { public async computeStop( jobId: string, nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, agreementId?: string, signal?: AbortSignal ): Promise { - const isAuthToken = typeof signerOrAuthToken === 'string' const providerEndpoints = await this.getEndpoints(nodeUri) const serviceEndpoints = await this.getServiceEndpoints(nodeUri, providerEndpoints) const computeStopUrl = this.getEndpointURL(serviceEndpoints, 'computeStop') ? this.getEndpointURL(serviceEndpoints, 'computeStop').urlPath : null - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - signal, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_STOP + PROTOCOL_COMMANDS.COMPUTE_STOP, + signal, + providerEndpoints, + serviceEndpoints ) const queryParams = new URLSearchParams() - queryParams.set('consumerAddress', consumerAddress) - queryParams.set('nonce', nonce) + if (consumerAddress) queryParams.set('consumerAddress', consumerAddress) + if (nonce) queryParams.set('nonce', nonce) + if (signature) queryParams.set('signature', signature) queryParams.set('jobId', jobId) - if (!isAuthToken) { - queryParams.set('signature', signature) - } if (agreementId) queryParams.set('agreementId', agreementId) @@ -1060,11 +1025,12 @@ export class HttpProvider { if (!queryString) return null let response try { + const authHeader = this.getAuthorization(signerOrAuthToken) response = await fetch(computeStopUrl + '?' + queryString, { method: 'PUT', headers: { 'Content-Type': 'application/json', - Authorization: this.getAuthorization(signerOrAuthToken) + ...(authHeader ? { Authorization: authHeader } : {}) }, signal }) @@ -1098,13 +1064,12 @@ export class HttpProvider { */ public async computeStatus( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId?: string, agreementId?: string, signal?: AbortSignal ): Promise { const consumerAddress = await getConsumerAddress(signerOrAuthToken) - const authorization = getAuthorization(signerOrAuthToken) const providerEndpoints = await this.getEndpoints(nodeUri) const serviceEndpoints = await this.getServiceEndpoints(nodeUri, providerEndpoints) const computeStatusUrl = this.getEndpointURL(serviceEndpoints, 'computeStatus') @@ -1118,9 +1083,13 @@ export class HttpProvider { if (!computeStatusUrl) return null let response try { + const authHeader = this.getAuthorization(signerOrAuthToken) response = await fetch(computeStatusUrl + url, { method: 'GET', - headers: { 'Content-Type': 'application/json', Authorization: authorization }, + headers: { + 'Content-Type': 'application/json', + ...(authHeader ? { Authorization: authHeader } : {}) + }, signal }) } catch (e) { @@ -1160,7 +1129,7 @@ export class HttpProvider { */ public async getComputeResultUrl( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, index: number ): Promise { @@ -1171,20 +1140,13 @@ export class HttpProvider { ? this.getEndpointURL(serviceEndpoints, 'computeResult').urlPath : null - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - null, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_GET_RESULT + PROTOCOL_COMMANDS.COMPUTE_GET_RESULT, + undefined, + providerEndpoints, + serviceEndpoints ) if (!computeResultUrl) return null let resultUrl = computeResultUrl @@ -1200,7 +1162,7 @@ export class HttpProvider { public async getComputeResult( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, index: number, offset: number = 0 @@ -1245,7 +1207,7 @@ export class HttpProvider { )) + 1 ).toString() - const signature = await this.getSignature( + const signature = await getSignature( consumer, nonce, PROTOCOL_COMMANDS.CREATE_AUTH_TOKEN @@ -1478,7 +1440,7 @@ export class HttpProvider { */ public async downloadNodeLogs( nodeUri: string, - signer: Signer, + signerOrAuthToken: SignerOrAuthTokenOrSignature, startTime: string, endTime: string, maxLogs?: number, @@ -1500,18 +1462,14 @@ export class HttpProvider { ) return null } - const consumerAddress = await signer.getAddress() - const nonce = ( - (await this.getNonce( - nodeUri, - consumerAddress, - signal, - providerEndpoints, - serviceEndpoints - )) + 1 - ).toString() - - const signature = await this.getSignature(signer, nonce, PROTOCOL_COMMANDS.GET_LOGS) + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, + signerOrAuthToken, + PROTOCOL_COMMANDS.GET_LOGS, + signal, + providerEndpoints, + serviceEndpoints + ) let url = logsUrl + `?startTime=${startTime}&endTime=${endTime}` if (maxLogs) url += `&maxLogs=${maxLogs}` if (moduleName) url += `&moduleName=${moduleName}` @@ -1520,6 +1478,7 @@ export class HttpProvider { let response try { + const authHeader = this.getAuthorization(signerOrAuthToken) response = await fetch(url, { method: 'POST', body: JSON.stringify({ @@ -1527,7 +1486,10 @@ export class HttpProvider { nonce, address: consumerAddress }), - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + ...(authHeader ? { Authorization: authHeader } : {}) + }, signal }) } catch (e) { @@ -1599,20 +1561,28 @@ export class HttpProvider { public async validateDdo( nodeUri: string, ddo: DDO, - signer: Signer, + signerOrAuthToken: SignerOrAuthTokenOrSignature, signal?: AbortSignal ): Promise { - const publisherAddress = await signer.getAddress() - const nonceResp = await ( - await this.getData(`${nodeUri}/api/services/nonce?userAddress=${publisherAddress}`) - ).json() - const nonce = (Number(nonceResp.nonce ?? 0) + 1).toString() - const message = publisherAddress + nonce + PROTOCOL_COMMANDS.VALIDATE_DDO - const signature = await signRequest(signer, message) + const { + consumerAddress: publisherAddress, + nonce, + signature + } = await this.getSignedCommandParams( + nodeUri, + signerOrAuthToken, + PROTOCOL_COMMANDS.VALIDATE_DDO, + signal + ) + + const authHeader = this.getAuthorization(signerOrAuthToken) const response = await fetch(`${nodeUri}/api/aquarius/assets/ddo/validate`, { method: 'POST', body: JSON.stringify({ ddo, publisherAddress, nonce, signature }), - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + ...(authHeader ? { Authorization: authHeader } : {}) + }, signal }) if (!response.ok) return null @@ -1636,7 +1606,7 @@ export class HttpProvider { public async createPersistentStorageBucket( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, payload: PersistentStorageCreateBucketRequest, signal?: AbortSignal ): Promise<{ @@ -1652,7 +1622,7 @@ export class HttpProvider { ['persistentStorageCreateBucket'], '/api/services/persistentStorage/buckets' ) - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, @@ -1677,7 +1647,7 @@ export class HttpProvider { public async getPersistentStorageBuckets( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, owner: string, signal?: AbortSignal ): Promise { @@ -1689,7 +1659,7 @@ export class HttpProvider { ['persistentStorageGetBuckets'], '/api/services/persistentStorage/buckets' ) - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, @@ -1714,7 +1684,7 @@ export class HttpProvider { public async listPersistentStorageFiles( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, signal?: AbortSignal ): Promise { @@ -1726,7 +1696,7 @@ export class HttpProvider { ['persistentStorageListFiles'], `/api/services/persistentStorage/buckets/${encodeURIComponent(bucketId)}/files` ) - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, @@ -1751,7 +1721,7 @@ export class HttpProvider { public async getPersistentStorageFileObject( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, signal?: AbortSignal @@ -1766,7 +1736,7 @@ export class HttpProvider { bucketId )}/files/${encodeURIComponent(fileName)}/object` ) - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, @@ -1790,7 +1760,7 @@ export class HttpProvider { public async uploadPersistentStorageFile( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, content: P2PRequestBodyStream, @@ -1806,7 +1776,7 @@ export class HttpProvider { bucketId )}/files/${encodeURIComponent(fileName)}` ) - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, @@ -1880,17 +1850,14 @@ export class HttpProvider { }) as unknown as BodyInit } - const uploadHeaders: Record = { - 'Content-Type': 'application/octet-stream' - } - const maybeAuth = this.getAuthorization(signerOrAuthToken) - if (maybeAuth) uploadHeaders.Authorization = maybeAuth - + const authHeader = this.getAuthorization(signerOrAuthToken) const response = await fetch(`${routeBase}?${query.toString()}`, { method: 'POST', body, + headers: { - ...uploadHeaders + 'Content-Type': 'application/octet-stream', + ...(authHeader ? { Authorization: authHeader } : {}) }, signal }) @@ -1900,7 +1867,7 @@ export class HttpProvider { public async deletePersistentStorageFile( nodeUri: string, - signerOrAuthToken: Signer | string, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, signal?: AbortSignal @@ -1915,7 +1882,7 @@ export class HttpProvider { bucketId )}/files/${encodeURIComponent(fileName)}` ) - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, diff --git a/src/services/providers/P2pProvider.ts b/src/services/providers/P2pProvider.ts index d93962e01..9363eb268 100644 --- a/src/services/providers/P2pProvider.ts +++ b/src/services/providers/P2pProvider.ts @@ -5,8 +5,8 @@ import { webSockets } from '@libp2p/websockets' import { tcp } from '@libp2p/tcp' import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' import { bootstrap } from '@libp2p/bootstrap' -import { identify } from '@libp2p/identify' -import { EventTypes, KadDHT, kadDHT } from '@libp2p/kad-dht' +import { identify, identifyPush } from '@libp2p/identify' +import { EventTypes, KadDHT, kadDHT, passthroughMapper } from '@libp2p/kad-dht' import { ping } from '@libp2p/ping' import { peerIdFromString } from '@libp2p/peer-id' import { lpStream, UnexpectedEOFError } from '@libp2p/utils' @@ -15,7 +15,7 @@ import { Signer } from 'ethers' import { sleep } from '../../utils/General.js' import { LoggerInstance } from '../../utils/Logger.js' import { concatUint8Arrays } from '../../utils/bytes.js' -import type { Connection, Stream } from '@libp2p/interface' +import type { Connection, Stream, PeerId } from '@libp2p/interface' import { StorageObject, FileInfo, @@ -41,13 +41,26 @@ import { PersistentStorageCreateBucketRequest, PersistentStorageDeleteFileResponse, PersistentStorageFileEntry, - PersistentStorageObject + PersistentStorageObject, + OceanNode, + NodeP2P, + SignerOrAuthTokenOrSignature, + CompleteSignature } from '../../@types/index.js' -import { PROTOCOL_COMMANDS, NodeLogsParams, NodeLogEntry } from '../../@types/Provider.js' +import { PROTOCOL_COMMANDS, NodeLogEntry } from '../../@types/Provider.js' import { type DDO, type ValidateMetadata } from '@oceanprotocol/ddo-js' import { signRequest } from '../../utils/SignatureUtils.js' -import { getConsumerAddress, getSignature, getAuthorization } from './BaseProvider.js' +import { + getConsumerAddress, + getSignature, + getAuthorization, + isAgentSignature +} from './BaseProvider.js' import { eciesencrypt } from '../../utils/eciesencrypt.js' +import { CID } from 'multiformats/cid' +import { sha256 } from 'multiformats/hashes/sha2' +import * as multiFormatRaw from 'multiformats/codecs/raw' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' export const OCEAN_P2P_PROTOCOL = '/ocean/nodes/1.0.0' const OCEAN_DHT_PROTOCOL = '/ocean/nodes/1.0.0/kad/1.0.0' @@ -254,8 +267,15 @@ export class P2pProvider { ], services: { identify: identify(), + identifyPush: identifyPush(), ping: ping(), - dht: kadDHT({ protocol: OCEAN_DHT_PROTOCOL, clientMode: true }) + dht: kadDHT({ + peerInfoMapper: passthroughMapper, + allowQueryWithZeroPeers: false, + kBucketSize: 20, + protocol: OCEAN_DHT_PROTOCOL, + clientMode: true // Servers can better query the network + }) }, // Without this we are blocking connection to plain ws - the bundler thinks we are in a browser. // This also applies to local nodes. @@ -301,6 +321,34 @@ export class P2pProvider { return chunk instanceof Uint8Array ? chunk : chunk.subarray() } + public async cidFromRawString(data: string) { + const hash = await sha256.digest(uint8ArrayFromString(data)) + const cid = CID.create(1, multiFormatRaw.code, hash) + return cid + } + + async getProvidersForString( + input: string, + signal?: AbortSignal + ): Promise> { + const node = await this.getOrCreateLibp2pNode() + const cid = await this.cidFromRawString(input) + const peersFound = [] + try { + for await (const result of node.contentRouting.findProviders(cid, { + useCache: false, + useNetwork: true, + signal + })) { + peersFound.push(result) + } + } catch (err) {} + return peersFound.map((peer) => ({ + id: peer.id.toString(), + multiaddrs: peer.multiaddrs + })) + } + private isDialable(ma: Multiaddr): boolean { // Node.js can dial any transport (TCP, WS, WSS) if (typeof window === 'undefined') return true @@ -310,6 +358,14 @@ export class P2pProvider { return str.includes('/tls/sni') } + /** + * True when the multiaddr does not include the relay `p2p-circuit` protocol segment. + * (Direct / transport paths omit it; relay paths contain `/p2p-circuit/...`.) + */ + private isNotP2PCircuit(ma: Multiaddr): boolean { + return !/\/p2p-circuit(\/|$)/.test(ma.toString()) + } + private peerIdFromMultiaddr(ma: Multiaddr): string | null { const parts = ma.toString().split('/p2p/') if (parts.length <= 1) return null @@ -318,200 +374,178 @@ export class P2pProvider { return raw.split('/')[0] || null } + /* Dials a new connection */ private async getConnection( - nodeUri: string | Multiaddr[], - signal: AbortSignal + nodeUri: OceanNode, + signal: AbortSignal, + includeP2PCircuit: boolean = false ): Promise { const node = await this.getOrCreateLibp2pNode() - - if (Array.isArray(nodeUri)) { - const dialable = nodeUri.filter((ma) => this.isDialable(ma)) - - if (dialable.length > 0) { - LoggerInstance.debug(`[P2P] dial array: ${dialable.length} dialable addrs`) - try { - const conn = await node.dial(dialable, { signal }) - LoggerInstance.debug(`[P2P] dial array SUCCESS via ${conn.remoteAddr}`) - return conn - } catch (err: any) { - LoggerInstance.debug(`[P2P] dial array failed: ${err.message}`) - } - } - - for (const ma of nodeUri) { - const pid = this.peerIdFromMultiaddr(ma) - if (pid) { - LoggerInstance.debug(`[P2P] Not multiaddrs, fallback to peerId: ${pid}`) - return this.dialByPeerId(node, pid, signal) + const hasDialable = () => addrs.some((ma) => this.isDialable(ma)) + let peerId: PeerId | null = null + const addrs: Multiaddr[] = [] + if (nodeUri && typeof nodeUri === 'string') { + try { + const addr = multiaddr(nodeUri) + addrs.push(addr) + if (!peerId) { + const pidStr = this.peerIdFromMultiaddr(addr) + if (pidStr) peerId = peerIdFromString(pidStr) } - } - throw new Error('No valid addresses and no peer ID in multiaddrs') + } catch {} + try { + if (!peerId) peerId = peerIdFromString(nodeUri) + } catch {} } - - try { - const ma = multiaddr(nodeUri) - if (this.isDialable(ma)) { - LoggerInstance.debug(`[P2P] dial single addr: ${ma}`) - try { - const conn = await node.dial(ma, { signal }) - LoggerInstance.debug(`[P2P] dial single SUCCESS via ${conn.remoteAddr}`) - return conn - } catch (err: any) { - LoggerInstance.debug(`[P2P] dial single failed: ${err.message}`) + if (typeof nodeUri === 'object' && nodeUri !== null && !Array.isArray(nodeUri)) { + if ('nodeId' in nodeUri || 'multiaddress' in nodeUri) { + const nodeP2p = nodeUri as NodeP2P + if (Array.isArray(nodeP2p.multiaddress) && nodeP2p.multiaddress.length > 0) { + for (const addr of nodeP2p.multiaddress) addrs.push(addr) } - } - const pid = this.peerIdFromMultiaddr(ma) - if (pid) { - LoggerInstance.debug(`[P2P] single fallback -> dialByPeerId ${pid}`) - return this.dialByPeerId(node, pid, signal) - } - throw new Error(`Cannot dial address: ${nodeUri}`) - } catch (err: any) { - if (err.message?.includes('Cannot dial')) throw err - } - - LoggerInstance.debug(`[P2P] bare peerId -> dialByPeerId ${nodeUri}`) - return this.dialByPeerId(node, nodeUri, signal) - } - - private async dialByPeerId( - node: Libp2p, - peerIdStr: string, - signal: AbortSignal - ): Promise { - const peerId = peerIdFromString(peerIdStr) - - const existing = node.getConnections(peerId).filter((c) => c.status === 'open') - if (existing.length > 0) { - LoggerInstance.debug( - `[P2P] ${peerIdStr}: reusing existing connection via ${existing[0].remoteAddr}` - ) - return existing[0] - } - - // Wait briefly for bootstrap if node just started (0 connections) - if (node.getConnections().length === 0) { - LoggerInstance.debug( - `[P2P] ${peerIdStr}: no connections yet, waiting for bootstrap...` - ) - await sleep(3000) - const after = node.getConnections(peerId) - if (after.length > 0) { - LoggerInstance.debug(`[P2P] ${peerIdStr}: connected during bootstrap wait`) - return after[0] + if (nodeP2p.nodeId) { + try { + peerId = peerIdFromString(nodeP2p.nodeId) + } catch {} + } + } else { + peerId = nodeUri as PeerId } } - const seen = new Set() - const allAddrs: Multiaddr[] = [] - const addAddr = (ma: Multiaddr) => { - const key = ma.toString() - if (!seen.has(key)) { - seen.add(key) - allAddrs.push(ma) + // check if we already have a connection + if (peerId) { + const existing = node.getConnections(peerId).filter((c) => c.status === 'open') + if (existing.length > 0) { + LoggerInstance.debug( + `[P2P] ${peerId.toString()}: reusing existing connection via ${ + existing[0].remoteAddr + }` + ) + return existing[0] } } - - try { - const peerData = await node.peerStore.get(peerId) - if (peerData?.addresses) { - for (const addr of peerData.addresses) { - addAddr(addr.multiaddr) + // if there are no dialable ma, search peerstore + if (!hasDialable() && peerId) { + try { + const peerData = await node.peerStore.get(peerId) + if (peerData?.addresses) { + for (const addr of peerData.addresses) { + addrs.push(addr.multiaddr) + } + LoggerInstance.debug( + `[P2P] ${peerId.toString()}: ${peerData.addresses.length} peerStore addrs` + ) } - LoggerInstance.debug( - `[P2P] ${peerIdStr}: ${peerData.addresses.length} peerStore addrs` - ) + } catch { + LoggerInstance.debug(`[P2P] ${peerId.toString()}: not in peerStore`) } - } catch { - LoggerInstance.debug(`[P2P] ${peerIdStr}: not in peerStore`) } - - const knownDialable = allAddrs.filter((ma) => this.isDialable(ma)) - if (knownDialable.length === 0) { - LoggerInstance.debug( - `[P2P] ${peerIdStr}: no dialable addrs in peerStore, querying DHT...` - ) + // if there are no dialable ma, search dht + if (!hasDialable() && peerId) { try { - const dhtSignal = AbortSignal.timeout(this.p2pConfig.dhtLookupTimeout ?? 60_000) - const peerInfo = await node.peerRouting.findPeer(peerId, { signal: dhtSignal }) - for (const ma of peerInfo.multiaddrs) addAddr(ma) + // const dhtSignal = AbortSignal.timeout(this.p2pConfig.dhtLookupTimeout ?? 60_000) + const peerInfo = await node.peerRouting.findPeer(peerId, { signal }) + for (const ma of peerInfo.multiaddrs) addrs.push(ma) LoggerInstance.debug( - `[P2P] ${peerIdStr}: DHT returned ${peerInfo.multiaddrs.length} addrs` + `[P2P] ${peerId.toString()}: DHT returned ${peerInfo.multiaddrs.length} addrs` ) } catch (err: any) { - LoggerInstance.debug(`[P2P] ${peerIdStr}: DHT findPeer failed: ${err.message}`) + LoggerInstance.debug( + `[P2P] ${peerId.toString()}: DHT findPeer failed: ${err.message}` + ) } - } else { - LoggerInstance.debug( - `[P2P] ${peerIdStr}: ${knownDialable.length} dialable addrs from peerStore, skipping DHT` - ) } - - const dialable = allAddrs - .filter((ma) => this.isDialable(ma)) - .map((ma) => { - const str = ma.toString() - return str.includes('/p2p/') ? ma : multiaddr(`${str}/p2p/${peerIdStr}`) - }) - - LoggerInstance.debug( - `[P2P] ${peerIdStr}: ${dialable.length}/${allAddrs.length} addrs are dialable` - ) - - if (dialable.length > 0) { - LoggerInstance.debug(`[P2P] ${peerIdStr}: dialing ${dialable.map(String)}`) - try { - const conn = await node.dial(dialable, { signal }) - LoggerInstance.debug( - `[P2P] ${peerIdStr}: SUCCESS via ${conn.remoteAddr} (limited=${ - conn.limits != null - })` + let dialable = addrs.filter((ma) => this.isDialable(ma)) + const beforePFilter = dialable.length + if (!includeP2PCircuit) dialable = dialable.filter((ma) => this.isNotP2PCircuit(ma)) + + const afterPFilter = dialable.length + + if (dialable.length < 1) { + // try with p2p-circuits if available + if (!includeP2PCircuit && afterPFilter < beforePFilter) { + // we have some p2p-circuit addrs, let's try them + return this.getConnection( + { nodeId: peerId ? peerId.toString() : '', multiaddress: addrs } as NodeP2P, + signal, + true ) - return conn - } catch (err: any) { - LoggerInstance.debug(`[P2P] ${peerIdStr}: direct dial failed: ${err.message}`) } + throw new Error('No valid multiaddresses, cannot connect') + } + // normalize all mas if we have peerId + if (peerId) { + dialable = dialable.map((ma) => { + const str = ma.toString() + return str.includes('/p2p/') ? ma : multiaddr(`${str}/p2p/${peerId.toString()}`) + }) } - - LoggerInstance.debug(`[P2P] ${peerIdStr}: last resort dial by peerId`) try { - const conn = await node.dial(peerId, { signal: AbortSignal.timeout(10_000) }) + const conn = await node.dial(dialable, { signal }) LoggerInstance.debug( - `[P2P] ${peerIdStr}: peerId dial SUCCESS via ${conn.remoteAddr} (limited=${ - conn.limits != null - })` + `[P2P] Dial SUCCESS via ${conn.remoteAddr} (limited=${conn.limits != null})` ) return conn - } catch { + } catch (err: any) { + if (!includeP2PCircuit && afterPFilter < beforePFilter) { + LoggerInstance.debug( + `[P2P] Direct dial failed, falling back to relayed addresses...` + ) + return this.getConnection( + { nodeId: peerId ? peerId.toString() : '', multiaddress: addrs } as NodeP2P, + signal, + true + ) + } throw new Error( - `Cannot reach peer ${peerIdStr}. ` + - (allAddrs.length > 0 - ? `Found addrs: ${allAddrs.map(String).join(', ')} (none dialable). ` + `Cannot dial peer ${peerId?.toString()}. ` + + (addrs.length > 0 + ? `Found addrs: ${addrs.map(String).join(', ')}. ` : 'No addresses found. ') + - `Active connections: ${node.getConnections().length}.` + `Active connections: ${node.getConnections().length}. ` + + err.message ) } } - protected getConsumerAddress(s: Signer | string) { - return getConsumerAddress(s) - } - - protected getSignature(s: Signer | string, nonce: string, command: string) { - return getSignature(s, nonce, command) - } - - private async getNodePublicKey(nodeUri: string | Multiaddr[]): Promise { + private async getNodePublicKey(nodeUri: OceanNode): Promise { const endpoints = await this.getEndpoints(nodeUri) return endpoints?.nodePublicKey } - protected getAuthorization(s: Signer | string) { + protected getAuthorization(s: SignerOrAuthTokenOrSignature) { return getAuthorization(s) } + private async getSignedCommandParams( + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, + command: string, + signal?: AbortSignal + ): Promise { + if (isAgentSignature(signerOrAuthToken)) { + return { + consumerAddress: signerOrAuthToken.consumerAddress, + nonce: signerOrAuthToken.nonce, + signature: signerOrAuthToken.signature + } + } + if (typeof signerOrAuthToken === 'string') { + return { + consumerAddress: await getConsumerAddress(signerOrAuthToken), + nonce: undefined, + signature: undefined + } + } + const consumerAddress = await getConsumerAddress(signerOrAuthToken) + const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() + const signature = await getSignature(signerOrAuthToken, nonce, command) + return { consumerAddress, nonce, signature } + } + private async dialAndStream( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, payload: Record, signal?: AbortSignal, requestBody?: P2PRequestBodyStream @@ -557,10 +591,10 @@ export class P2pProvider { } private async sendP2pCommand( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, command: string, body: Record, - signerOrAuthToken?: Signer | string | null, + signerOrAuthToken?: SignerOrAuthTokenOrSignature | null, signal?: AbortSignal, retrialNumber: number = 0, requestBody?: P2PRequestBodyStream @@ -698,7 +732,7 @@ export class P2pProvider { * Returns node status via P2P STATUS command. * @param {string} nodeUri - multiaddr of the node */ - async getEndpoints(nodeUri: string | Multiaddr[]): Promise { + async getEndpoints(nodeUri: OceanNode): Promise { try { return await this.sendP2pCommand(nodeUri, PROTOCOL_COMMANDS.STATUS, {}) } catch (e) { @@ -708,14 +742,14 @@ export class P2pProvider { } public async getNodeStatus( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { return this.getEndpoints(nodeUri) } public async getNodeJobs( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, fromTimestamp?: number, signal?: AbortSignal ): Promise { @@ -740,7 +774,7 @@ export class P2pProvider { * Get current nonce from the node via P2P. */ public async getNonce( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, consumerAddress: string, signal?: AbortSignal ): Promise { @@ -768,17 +802,16 @@ export class P2pProvider { public async encrypt( data: any, chainId: number, - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, _policyServer?: any, signal?: AbortSignal ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.ENCRYPT + PROTOCOL_COMMANDS.ENCRYPT, + signal ) const result = await this.sendP2pCommand( nodeUri, @@ -802,7 +835,7 @@ export class P2pProvider { public async checkDidFiles( did: string, serviceId: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, withChecksum: boolean = false, signal?: AbortSignal ): Promise { @@ -821,7 +854,7 @@ export class P2pProvider { */ public async getFileInfo( file: StorageObject, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, withChecksum: boolean = false, signal?: AbortSignal ): Promise { @@ -839,7 +872,7 @@ export class P2pProvider { * Returns compute environments via P2P. */ public async getComputeEnvironments( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { const result = await this.sendP2pCommand( @@ -860,7 +893,7 @@ export class P2pProvider { serviceId: string, fileIndex: number, consumerAddress: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal, userCustomParameters?: UserCustomParameters, computeEnv?: string, @@ -887,7 +920,7 @@ export class P2pProvider { computeEnv: string, token: string, validUntil: number, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, consumerAddress: string, resources: ComputeResourceRequest[], chainId: number, @@ -940,16 +973,14 @@ export class P2pProvider { serviceId: string, fileIndex: number, transferTxId: string, - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, policyServer?: any, userCustomParameters?: UserCustomParameters ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress)) + 1).toString() - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, PROTOCOL_COMMANDS.DOWNLOAD ) @@ -1014,8 +1045,8 @@ export class P2pProvider { * Start a paid compute job via P2P. */ public async computeStart( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, computeEnv: string, datasets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -1031,13 +1062,11 @@ export class P2pProvider { queueMaxWaitTime?: number, dockerRegistryAuth?: dockerRegistryAuth ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_START + PROTOCOL_COMMANDS.COMPUTE_START, + signal ) const body: Record = { @@ -1085,8 +1114,8 @@ export class P2pProvider { * Start a free compute job via P2P. */ public async freeComputeStart( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, computeEnv: string, datasets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -1099,13 +1128,11 @@ export class P2pProvider { queueMaxWaitTime?: number, dockerRegistryAuth?: dockerRegistryAuth ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.FREE_COMPUTE_START + PROTOCOL_COMMANDS.FREE_COMPUTE_START, + signal ) const body: Record = { @@ -1149,8 +1176,8 @@ export class P2pProvider { * Get streamable compute logs via P2P. Returns an async generator of Uint8Array chunks. */ public async computeStreamableLogs( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, signal?: AbortSignal ): Promise { @@ -1165,12 +1192,11 @@ export class P2pProvider { ) } - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_GET_STREAMABLE_LOGS + PROTOCOL_COMMANDS.COMPUTE_GET_STREAMABLE_LOGS, + signal ) return this.sendP2pCommand( nodeUri, @@ -1186,18 +1212,16 @@ export class P2pProvider { */ public async computeStop( jobId: string, - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, agreementId?: string, signal?: AbortSignal ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - - const signature = await this.getSignature( + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_STOP + PROTOCOL_COMMANDS.COMPUTE_STOP, + signal ) const body: Record = { jobId, consumerAddress, nonce, signature } @@ -1216,13 +1240,13 @@ export class P2pProvider { * Get compute status via P2P. */ public async computeStatus( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId?: string, agreementId?: string, signal?: AbortSignal ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) + const consumerAddress = await getConsumerAddress(signerOrAuthToken) const body: Record = { consumerAddress } if (jobId) body.jobId = jobId if (agreementId) body.agreementId = agreementId @@ -1241,13 +1265,17 @@ export class P2pProvider { * Supports resumable downloads via `offset` (byte position to resume from). */ public async getComputeResult( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, index: number, offset: number = 0 ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, + signerOrAuthToken, + PROTOCOL_COMMANDS.COMPUTE_GET_RESULT + ) const payload: Record = { command: PROTOCOL_COMMANDS.COMPUTE_GET_RESULT, jobId, @@ -1259,13 +1287,8 @@ export class P2pProvider { if (typeof signerOrAuthToken === 'string') { payload.authorization = signerOrAuthToken } else { - const nonce = ((await this.getNonce(nodeUri, consumerAddress)) + 1).toString() payload.nonce = nonce - payload.signature = await this.getSignature( - signerOrAuthToken, - nonce, - PROTOCOL_COMMANDS.COMPUTE_GET_RESULT - ) + payload.signature = signature } const { lp, firstBytes } = await this.dialAndStream(nodeUri, payload) @@ -1290,12 +1313,12 @@ export class P2pProvider { } public async getComputeResultUrl( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, jobId: string, index: number ): Promise { - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) + const consumerAddress = await getConsumerAddress(signerOrAuthToken) const result = await this.sendP2pCommand( nodeUri, PROTOCOL_COMMANDS.COMPUTE_GET_RESULT, @@ -1310,12 +1333,12 @@ export class P2pProvider { */ public async generateAuthToken( consumer: Signer, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { const address = await consumer.getAddress() const nonce = ((await this.getNonce(nodeUri, address, signal)) + 1).toString() - const signature = await this.getSignature( + const signature = await getSignature( consumer, nonce, PROTOCOL_COMMANDS.CREATE_AUTH_TOKEN @@ -1338,7 +1361,7 @@ export class P2pProvider { address: string, signature: string, nonce: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { const result = await this.sendP2pCommand( @@ -1355,7 +1378,7 @@ export class P2pProvider { * Resolve a DDO by DID via P2P GET_DDO command. */ public async resolveDdo( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, did: string, signal?: AbortSignal ): Promise { @@ -1372,22 +1395,26 @@ export class P2pProvider { * Validate a DDO via P2P VALIDATE_DDO command. */ public async validateDdo( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, ddo: DDO, - signer: Signer, + signerOrAuthToken: SignerOrAuthTokenOrSignature, signal?: AbortSignal ): Promise { - const publisherAddress = await signer.getAddress() - const nonce = ( - (await this.getNonce(nodeUri, publisherAddress, signal)) + 1 - ).toString() - const message = publisherAddress + nonce + PROTOCOL_COMMANDS.VALIDATE_DDO - const sig = await signRequest(signer, message) + const { + consumerAddress: publisherAddress, + nonce, + signature + } = await this.getSignedCommandParams( + nodeUri, + signerOrAuthToken, + PROTOCOL_COMMANDS.VALIDATE_DDO, + signal + ) const result = await this.sendP2pCommand( nodeUri, PROTOCOL_COMMANDS.VALIDATE_DDO, - { ddo, publisherAddress, nonce, signature: sig }, - null, + { ddo, publisherAddress, nonce, signature }, + signerOrAuthToken, signal ) if (!result || result.error) return null @@ -1409,7 +1436,7 @@ export class P2pProvider { public async invalidateAuthToken( consumer: Signer, token: string, - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise<{ success: boolean }> { const consumerAddress = await consumer.getAddress() @@ -1429,7 +1456,7 @@ export class P2pProvider { * Check if a P2P node is reachable by calling STATUS. */ public async isValidProvider( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, signal?: AbortSignal ): Promise { try { @@ -1454,7 +1481,7 @@ export class P2pProvider { * PolicyServer passthrough via P2P. */ public async PolicyServerPassthrough( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, request: PolicyServerPassthroughCommand, signal?: AbortSignal ): Promise { @@ -1471,7 +1498,7 @@ export class P2pProvider { * Initialize Policy Server verification via P2P. */ public async initializePSVerification( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, request: PolicyServerInitializeCommand, signal?: AbortSignal ): Promise { @@ -1488,8 +1515,8 @@ export class P2pProvider { * Download node logs via P2P. */ public async downloadNodeLogs( - nodeUri: string | Multiaddr[], - signer: Signer, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, startTime: string, endTime: string, maxLogs?: number, @@ -1498,9 +1525,12 @@ export class P2pProvider { page?: number, signal?: AbortSignal ): Promise { - const consumerAddress = await signer.getAddress() - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - const signature = await this.getSignature(signer, nonce, PROTOCOL_COMMANDS.GET_LOGS) + const { consumerAddress, nonce, signature } = await this.getSignedCommandParams( + nodeUri, + signerOrAuthToken, + PROTOCOL_COMMANDS.GET_LOGS, + signal + ) const body: Record = { startTime, @@ -1514,26 +1544,13 @@ export class P2pProvider { if (level) body.level = level if (page) body.page = page - return this.sendP2pCommand(nodeUri, PROTOCOL_COMMANDS.GET_LOGS, body, signer, signal) - } - - /** - * Fetch node logs via P2P with a pre-signed payload. - * P2P only — use downloadNodeLogs() for the auto-signed variant. - */ - public async fetchNodeLogs( - nodeUri: string | Multiaddr[], - address: string, - signature: string, - nonce: string, - logParams?: NodeLogsParams - ): Promise { - return this.sendP2pCommand(nodeUri, PROTOCOL_COMMANDS.GET_LOGS, { - address, - signature, - nonce, - ...logParams - }) + return this.sendP2pCommand( + nodeUri, + PROTOCOL_COMMANDS.GET_LOGS, + body, + signerOrAuthToken, + signal + ) } /** @@ -1541,7 +1558,7 @@ export class P2pProvider { * the caller is responsible for nonce retrieval and signing. */ public async fetchConfig( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, payload: Record ): Promise { return this.sendP2pCommand(nodeUri, PROTOCOL_COMMANDS.FETCH_CONFIG, payload) @@ -1552,33 +1569,27 @@ export class P2pProvider { * the caller is responsible for nonce retrieval and signing. */ public async pushConfig( - nodeUri: string | Multiaddr[], + nodeUri: OceanNode, payload: Record ): Promise { return this.sendP2pCommand(nodeUri, PROTOCOL_COMMANDS.PUSH_CONFIG, payload) } private async getPersistentStorageSignaturePayload( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, command: string, signal?: AbortSignal ): Promise<{} | { consumerAddress: string; nonce: string; signature: string }> { if (typeof signerOrAuthToken === 'string') { return {} } - const consumerAddress = await this.getConsumerAddress(signerOrAuthToken) - const nonce = ((await this.getNonce(nodeUri, consumerAddress, signal)) + 1).toString() - const signature = await this.getSignature(signerOrAuthToken, nonce, command) - if (!signature) { - throw new Error('Could not sign persistent storage request.') - } - return { consumerAddress, nonce, signature } + return this.getSignedCommandParams(nodeUri, signerOrAuthToken, command, signal) } public async createPersistentStorageBucket( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, payload: PersistentStorageCreateBucketRequest, signal?: AbortSignal ): Promise<{ @@ -1586,7 +1597,7 @@ export class P2pProvider { owner: string accessList: PersistentStorageAccessList[] }> { - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, @@ -1605,12 +1616,12 @@ export class P2pProvider { } public async getPersistentStorageBuckets( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, owner: string, signal?: AbortSignal ): Promise { - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, @@ -1627,12 +1638,12 @@ export class P2pProvider { } public async listPersistentStorageFiles( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, signal?: AbortSignal ): Promise { - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, @@ -1649,13 +1660,13 @@ export class P2pProvider { } public async getPersistentStorageFileObject( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, signal?: AbortSignal ): Promise { - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, @@ -1671,14 +1682,14 @@ export class P2pProvider { } public async uploadPersistentStorageFile( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, content: P2PRequestBodyStream, signal?: AbortSignal ): Promise { - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, @@ -1696,13 +1707,13 @@ export class P2pProvider { } public async deletePersistentStorageFile( - nodeUri: string | Multiaddr[], - signerOrAuthToken: Signer | string, + nodeUri: OceanNode, + signerOrAuthToken: SignerOrAuthTokenOrSignature, bucketId: string, fileName: string, signal?: AbortSignal ): Promise { - const authPayload = await this.getPersistentStorageSignaturePayload( + const authPayload = await this.getSignedCommandParams( nodeUri, signerOrAuthToken, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE,