Skip to content

Commit ac34d48

Browse files
committed
add docker mounting
1 parent 8856c6b commit ac34d48

16 files changed

Lines changed: 1070 additions & 21 deletions

docs/API.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,32 @@ List files in a bucket.
16861686

16871687
---
16881688

1689+
### `HTTP` GET /api/services/persistentStorage/buckets/:bucketId/files/:fileName/object
1690+
1691+
#### Description
1692+
1693+
Return the `fileObject` for a specific file in a bucket (useful for passing references to other subsystems like compute).
1694+
1695+
#### Query Parameters
1696+
1697+
| name | type | required | description |
1698+
| --------------- | ------ | -------- | ----------- |
1699+
| consumerAddress | string | v | consumer address |
1700+
| signature | string | v | signed message (consumerAddress + nonce + command) |
1701+
| nonce | string | v | request nonce |
1702+
1703+
#### Response (200)
1704+
1705+
```json
1706+
{
1707+
"type": "nodePersistentStorage",
1708+
"bucketId": "uuid",
1709+
"fileName": "hello.txt"
1710+
}
1711+
```
1712+
1713+
---
1714+
16891715
### `HTTP` POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName
16901716

16911717
#### Description

docs/persistentStorage.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ All persistent storage operations are implemented as protocol commands in the ha
144144
- `persistentStorageCreateBucket`
145145
- `persistentStorageGetBuckets`
146146
- `persistentStorageListFiles`
147+
- `persistentStorageGetFileObject`
147148
- `persistentStorageUploadFile`
148149
- `persistentStorageDeleteFile`
149150

@@ -157,6 +158,7 @@ At a glance:
157158
- `POST /api/services/persistentStorage/buckets`
158159
- `GET /api/services/persistentStorage/buckets`
159160
- `GET /api/services/persistentStorage/buckets/:bucketId/files`
161+
- `GET /api/services/persistentStorage/buckets/:bucketId/files/:fileName/object`
160162
- `POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName`
161163
- `DELETE /api/services/persistentStorage/buckets/:bucketId/files/:fileName`
162164

src/@types/PersistentStorage.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { AccessList } from './AccessList'
2-
2+
import type { BaseFileObject } from './fileObject.js'
33
export type PersistentStorageType = 'localfs' | 's3'
44

55
export interface PersistentStorageLocalFSOptions {
@@ -22,3 +22,20 @@ export interface PersistentStorageConfig {
2222
accessLists: AccessList[]
2323
options: PersistentStorageLocalFSOptions | PersistentStorageS3Options
2424
}
25+
26+
/**
27+
* Docker mount descriptor used by the Docker C2D engine.
28+
* Mirrors Dockerode `HostConfig.Mounts[]` item shape.
29+
*/
30+
export interface DockerMountObject {
31+
Type: 'bind'
32+
Source: string
33+
Target: string
34+
ReadOnly: boolean
35+
}
36+
37+
export interface PersistentStorageObject extends BaseFileObject {
38+
type: 'nodePersistentStorage'
39+
bucketId: string
40+
fileName: string
41+
}

src/@types/commands.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,14 @@ export interface PersistentStorageUploadFileCommand extends Command {
348348
fileName: string
349349
}
350350

351+
export interface PersistentStorageGetFileObjectCommand extends Command {
352+
consumerAddress: string
353+
signature: string
354+
nonce: string
355+
bucketId: string
356+
fileName: string
357+
}
358+
351359
export interface PersistentStorageDeleteFileCommand extends Command {
352360
consumerAddress: string
353361
signature: string

src/components/c2d/compute_engine_docker.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1820,6 +1820,64 @@ export class C2DEngineDocker extends C2DEngine {
18201820
}
18211821
containerInfo.Env = envVars
18221822
}
1823+
// persistent Storage: bind-mount bucket files into the job container (localfs backend)
1824+
for (const i in job.assets) {
1825+
const asset = job.assets[i]
1826+
if (!asset.fileObject || asset.fileObject.type !== 'nodePersistentStorage') {
1827+
continue
1828+
}
1829+
const fo = asset.fileObject as { bucketId?: string; fileName?: string }
1830+
if (!fo.bucketId || !fo.fileName) {
1831+
CORE_LOGGER.error(
1832+
`Job ${job.jobId} asset ${i}: nodePersistentStorage requires bucketId and fileName`
1833+
)
1834+
job.status = C2DStatusNumber.DataProvisioningFailed
1835+
job.statusText = C2DStatusText.DataProvisioningFailed
1836+
job.isRunning = false
1837+
job.dateFinished = String(Date.now() / 1000)
1838+
await this.db.updateJob(job)
1839+
await this.cleanupJob(job)
1840+
return
1841+
}
1842+
const ps = OceanNode.getInstance().getPersistentStorage()
1843+
if (!ps) {
1844+
CORE_LOGGER.error(
1845+
`Job ${job.jobId} asset ${i}: persistent storage is not configured on this node`
1846+
)
1847+
job.status = C2DStatusNumber.DataProvisioningFailed
1848+
job.statusText = C2DStatusText.DataProvisioningFailed
1849+
job.isRunning = false
1850+
job.dateFinished = String(Date.now() / 1000)
1851+
await this.db.updateJob(job)
1852+
await this.cleanupJob(job)
1853+
return
1854+
}
1855+
try {
1856+
const bindMount = await ps.getDockerMountObject(
1857+
fo.bucketId,
1858+
fo.fileName,
1859+
job.owner
1860+
)
1861+
CORE_LOGGER.debug(
1862+
`Mounting bucket ${fo.bucketId} to folder ${bindMount.Target}`
1863+
)
1864+
hostConfig.Mounts.push(bindMount)
1865+
mountVols[bindMount.Target] = {}
1866+
} catch (e) {
1867+
const errMsg = e instanceof Error ? e.message : String(e)
1868+
CORE_LOGGER.error(
1869+
`Job ${job.jobId} asset ${i}: failed to resolve persistent storage bind: ${errMsg}`
1870+
)
1871+
job.status = C2DStatusNumber.DataProvisioningFailed
1872+
job.statusText = C2DStatusText.DataProvisioningFailed
1873+
job.isRunning = false
1874+
job.dateFinished = String(Date.now() / 1000)
1875+
await this.db.updateJob(job)
1876+
await this.cleanupJob(job)
1877+
return
1878+
}
1879+
}
1880+
18231881
const container = await this.createDockerContainer(containerInfo, true)
18241882
if (container) {
18251883
job.status = C2DStatusNumber.Provisioning
@@ -2767,6 +2825,10 @@ export class C2DEngineDocker extends C2DEngine {
27672825
if (asset.fileObject) {
27682826
try {
27692827
if (asset.fileObject.type) {
2828+
if (asset.fileObject.type === 'nodePersistentStorage') {
2829+
// local storage is handled later, when we start the container and create the binds
2830+
continue
2831+
}
27702832
storage = Storage.getStorageClass(asset.fileObject, config)
27712833
} else {
27722834
CORE_LOGGER.info('asset file object seems to be encrypted, checking it...')

src/components/core/compute/initialize.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ import {
3939
validateAlgoForDataset,
4040
validateOutput
4141
} from './utils.js'
42+
import {
43+
ensureConsumerAllowedForPersistentStorageLocalfsFileObject,
44+
rejectPersistentStorageFileObjectOnAlgorithm
45+
} from '../../persistentStorage/PersistentStorageFactory.js'
4246

4347
export class ComputeInitializeHandler extends CommandHandler {
4448
validate(command: ComputeInitializeCommand): ValidateParams {
@@ -220,6 +224,22 @@ export class ComputeInitializeHandler extends CommandHandler {
220224
if (isValidOutput.status.httpStatus !== 200) {
221225
return isValidOutput
222226
}
227+
const algoPersistentStorageBan = rejectPersistentStorageFileObjectOnAlgorithm(
228+
task.algorithm.fileObject
229+
)
230+
if (algoPersistentStorageBan) {
231+
return algoPersistentStorageBan
232+
}
233+
for (const dataset of task.datasets) {
234+
const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject(
235+
node,
236+
task.consumerAddress,
237+
dataset.fileObject
238+
)
239+
if (psAccess) {
240+
return psAccess
241+
}
242+
}
223243
// check algo
224244
let index = 0
225245
const policyServer = new PolicyServer()

src/components/core/compute/startCompute.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ import { getNonceAsNumber } from '../utils/nonceHandler.js'
4343
import { PolicyServer } from '../../policyServer/index.js'
4444
import { checkCredentials } from '../../../utils/credentials.js'
4545
import { checkAddressOnAccessList } from '../../../utils/accessList.js'
46+
import {
47+
ensureConsumerAllowedForPersistentStorageLocalfsFileObject,
48+
rejectPersistentStorageFileObjectOnAlgorithm
49+
} from '../../persistentStorage/PersistentStorageFactory.js'
4650

4751
export class CommonComputeHandler extends CommandHandler {
4852
validate(command: PaidComputeStartCommand): ValidateParams {
@@ -225,7 +229,23 @@ export class PaidComputeStartHandler extends CommonComputeHandler {
225229
}
226230
}
227231
const policyServer = new PolicyServer()
228-
// check algo
232+
const algoPersistentStorageBan = rejectPersistentStorageFileObjectOnAlgorithm(
233+
task.algorithm.fileObject
234+
)
235+
if (algoPersistentStorageBan) {
236+
return algoPersistentStorageBan
237+
}
238+
for (const dataset of task.datasets) {
239+
const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject(
240+
node,
241+
task.consumerAddress,
242+
dataset.fileObject
243+
)
244+
if (psAccess) {
245+
return psAccess
246+
}
247+
}
248+
// check algo and datasets (orders, credentials, etc.)
229249
for (const elem of [...[task.algorithm], ...task.datasets]) {
230250
const result: any = { validOrder: false }
231251
if ('documentId' in elem && elem.documentId) {
@@ -747,6 +767,22 @@ export class FreeComputeStartHandler extends CommonComputeHandler {
747767
return isValidOutput
748768
}
749769
const policyServer = new PolicyServer()
770+
const algoPersistentStorageBanFree = rejectPersistentStorageFileObjectOnAlgorithm(
771+
task.algorithm.fileObject
772+
)
773+
if (algoPersistentStorageBanFree) {
774+
return algoPersistentStorageBanFree
775+
}
776+
for (const dataset of task.datasets) {
777+
const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject(
778+
thisNode,
779+
task.consumerAddress,
780+
dataset.fileObject
781+
)
782+
if (psAccess) {
783+
return psAccess
784+
}
785+
}
750786
for (const elem of [...[task.algorithm], ...task.datasets]) {
751787
if (!('documentId' in elem)) {
752788
continue

src/components/core/handler/coreHandlersRegistry.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import {
5151
PersistentStorageCreateBucketHandler,
5252
PersistentStorageDeleteFileHandler,
5353
PersistentStorageGetBucketsHandler,
54+
PersistentStorageGetFileObjectHandler,
5455
PersistentStorageListFilesHandler,
5556
PersistentStorageUploadFileHandler
5657
} from './persistentStorage.js'
@@ -190,6 +191,10 @@ export class CoreHandlersRegistry {
190191
PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE,
191192
new PersistentStorageUploadFileHandler(node)
192193
)
194+
this.registerCoreHandler(
195+
PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT,
196+
new PersistentStorageGetFileObjectHandler(node)
197+
)
193198
this.registerCoreHandler(
194199
PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE,
195200
new PersistentStorageDeleteFileHandler(node)

src/components/core/handler/persistentStorage.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type {
33
PersistentStorageCreateBucketCommand,
44
PersistentStorageDeleteFileCommand,
55
PersistentStorageGetBucketsCommand,
6+
PersistentStorageGetFileObjectCommand,
67
PersistentStorageListFilesCommand,
78
PersistentStorageUploadFileCommand
89
} from '../../../@types/commands.js'
@@ -220,6 +221,60 @@ export class PersistentStorageListFilesHandler extends CommandHandler {
220221
}
221222
}
222223

224+
export class PersistentStorageGetFileObjectHandler extends CommandHandler {
225+
validate(command: PersistentStorageGetFileObjectCommand): ValidateParams {
226+
const base = validateCommandParameters(command, [
227+
'consumerAddress',
228+
'signature',
229+
'nonce',
230+
'bucketId',
231+
'fileName'
232+
])
233+
if (!base.valid) return base
234+
return { valid: true }
235+
}
236+
237+
async handle(task: PersistentStorageGetFileObjectCommand): Promise<P2PCommandResponse> {
238+
const validationResponse = await this.verifyParamsAndRateLimits(task)
239+
if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse
240+
241+
const isAuthRequestValid = await this.validateTokenOrSignature(
242+
task.authorization,
243+
task.consumerAddress,
244+
task.nonce,
245+
task.signature,
246+
task.command
247+
)
248+
if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid
249+
250+
try {
251+
const storage = await requirePersistentStorage(this)
252+
const obj = await storage.getFileObject(
253+
task.bucketId,
254+
task.fileName,
255+
task.consumerAddress
256+
)
257+
return {
258+
stream: Readable.from(JSON.stringify(obj)),
259+
status: { httpStatus: 200, error: null }
260+
}
261+
} catch (e) {
262+
if (e instanceof PersistentStorageAccessDeniedError) {
263+
return {
264+
stream: null,
265+
status: { httpStatus: 403, error: e.message }
266+
}
267+
}
268+
const message = e instanceof Error ? e.message : String(e)
269+
if (message.toLowerCase().includes('file not found')) {
270+
return { stream: null, status: { httpStatus: 404, error: message } }
271+
}
272+
CORE_LOGGER.error(`PersistentStorageGetFileObjectHandler error: ${message}`)
273+
return { stream: null, status: { httpStatus: 500, error: message } }
274+
}
275+
}
276+
}
277+
223278
export class PersistentStorageUploadFileHandler extends CommandHandler {
224279
validate(command: PersistentStorageUploadFileCommand): ValidateParams {
225280
const base = validateCommandParameters(command, [

src/components/httpRoutes/persistentStorage.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
PersistentStorageCreateBucketHandler,
1010
PersistentStorageDeleteFileHandler,
1111
PersistentStorageGetBucketsHandler,
12+
PersistentStorageGetFileObjectHandler,
1213
PersistentStorageListFilesHandler,
1314
PersistentStorageUploadFileHandler
1415
} from '../core/handler/persistentStorage.js'
@@ -108,6 +109,36 @@ persistentStorageRoutes.get(
108109
}
109110
)
110111

112+
// Get file object for a file in a bucket
113+
persistentStorageRoutes.get(
114+
`${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName/object`,
115+
async (req, res) => {
116+
try {
117+
const response = await new PersistentStorageGetFileObjectHandler(
118+
req.oceanNode
119+
).handle({
120+
command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT,
121+
consumerAddress: req.query.consumerAddress as string,
122+
signature: req.query.signature as string,
123+
nonce: req.query.nonce as string,
124+
bucketId: req.params.bucketId,
125+
fileName: req.params.fileName,
126+
authorization: req.headers?.authorization,
127+
caller: req.caller
128+
} as any)
129+
if (!response.stream) {
130+
res.status(response.status.httpStatus).send(response.status.error)
131+
return
132+
}
133+
const payload = await streamToObject(response.stream as Readable)
134+
res.status(200).json(payload)
135+
} catch (error) {
136+
HTTP_LOGGER.error(`PersistentStorage get file object error: ${error}`)
137+
res.status(500).send('Internal Server Error')
138+
}
139+
}
140+
)
141+
111142
// Upload file to bucket. Body is treated as raw bytes.
112143
persistentStorageRoutes.post(
113144
`${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`,

0 commit comments

Comments
 (0)