Skip to content

Commit e43975e

Browse files
authored
Merge pull request #418 from jgilbert01/byte-array-s3
Byte array s3
2 parents a92e575 + 3c0d65a commit e43975e

4 files changed

Lines changed: 117 additions & 0 deletions

File tree

src/connectors/s3.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ class Connector {
8484
.then(async (response) => ({ ...response, Body: await response.Body.transformToString() }));
8585
}
8686

87+
getObjectAsByteArray(inputParams, ctx) {
88+
const params = {
89+
Bucket: this.bucketName,
90+
...inputParams,
91+
};
92+
93+
return this._sendCommand(new GetObjectCommand(params), ctx)
94+
.then(async (response) => ({ ...response, Body: await response.Body.transformToByteArray() }));
95+
}
96+
8797
getObjectStream(inputParams, ctx) {
8898
const params = {
8999
Bucket: this.bucketName,

src/queries/s3.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,35 @@ export const getObjectFromS3AsStream = ({
8080
.flatMap(getObject);
8181
};
8282

83+
export const getObjectFromS3AsByteArray = ({
84+
id: pipelineId,
85+
debug = d('s3'),
86+
bucketName = process.env.BUCKET_NAME,
87+
getRequestField = 'getRequest',
88+
getResponseField = 'getResponse',
89+
parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8,
90+
step = 'get',
91+
...opt
92+
} = {}) => {
93+
const connector = new Connector({
94+
pipelineId, debug, bucketName, ...opt,
95+
});
96+
97+
const getObject = (uow) => {
98+
if (!uow[getRequestField]) return _(Promise.resolve(uow));
99+
100+
const p = () => connector.getObjectAsByteArray(uow[getRequestField], uow)
101+
.then((getResponse) => ({ ...uow, [getResponseField]: getResponse })) // TODO decompress
102+
.catch(rejectWithFault(uow));
103+
104+
return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
105+
};
106+
107+
return (s) => s
108+
.map(getObject)
109+
.parallel(parallel);
110+
};
111+
83112
export const splitS3Object = ({
84113
delimiter = '\n',
85114
getResponseField = 'getResponse',

test/unit/connectors/s3.test.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,26 @@ describe('connectors/s3.js', () => {
9595
});
9696
expect(data).to.deep.equal({ Body: 'b' });
9797
});
98+
it('should get object as byte array', async () => {
99+
const arr = new Uint8Array([104, 101, 108, 108, 111]);
100+
const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from('hello')) }));
101+
mockS3.on(GetObjectCommand).callsFake(spy);
102+
103+
const inputParams = {
104+
Key: 'k1',
105+
};
106+
107+
const data = await new Connector({
108+
debug: debug('s3'),
109+
bucketName: 'b1',
110+
}).getObjectAsByteArray(inputParams);
111+
112+
expect(spy).to.have.been.calledWith({
113+
Bucket: 'b1',
114+
Key: 'k1',
115+
});
116+
expect(data).to.deep.equal({ Body: arr });
117+
});
98118

99119
it('should get object as stream', (done) => {
100120
const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from(Buffer.from('data'))) }));

test/unit/queries/s3.test.js

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
toGetObjectRequest2,
1414
getObjectFromS3AsStream,
1515
headS3Object,
16+
getObjectFromS3AsByteArray,
1617
} from '../../../src/queries/s3';
1718

1819
import Connector from '../../../src/connectors/s3';
@@ -54,6 +55,63 @@ describe('queries/s3.js', () => {
5455
.done(done);
5556
});
5657

58+
it('should get object as byte array', (done) => {
59+
const hello = new Uint8Array([104, 101, 108, 108, 111]);
60+
const stub = sinon.stub(Connector.prototype, 'getObjectAsByteArray').resolves({
61+
Body: hello,
62+
});
63+
64+
const uows = [{
65+
getRequest: {
66+
Key: 'k1',
67+
},
68+
}];
69+
70+
_(uows)
71+
.through(getObjectFromS3AsByteArray())
72+
.collect()
73+
.tap((collected) => {
74+
// console.log(JSON.stringify(collected, null, 2));
75+
76+
expect(stub).to.have.been.calledWith({
77+
Key: 'k1',
78+
});
79+
80+
expect(collected.length).to.equal(1);
81+
expect(collected[0]).to.deep.equal({
82+
getRequest: {
83+
Key: 'k1',
84+
},
85+
getResponse: {
86+
Body: hello,
87+
},
88+
});
89+
})
90+
.done(done);
91+
});
92+
it('should get object as byte array - missing get request field', (done) => {
93+
const hello = new Uint8Array([104, 101, 108, 108, 111]);
94+
const stub = sinon.stub(Connector.prototype, 'getObjectAsByteArray').resolves({
95+
Body: hello,
96+
});
97+
98+
const uows = [{
99+
// missing get request
100+
}];
101+
102+
_(uows)
103+
.through(getObjectFromS3AsByteArray())
104+
.collect()
105+
.tap((collected) => {
106+
// console.log(JSON.stringify(collected, null, 2));
107+
108+
expect(stub).to.have.not.been.called;
109+
expect(collected.length).to.equal(1);
110+
expect(collected[0]).to.deep.equal({});
111+
})
112+
.done(done);
113+
});
114+
57115
it('should get and split object', (done) => {
58116
sinon.stub(Connector.prototype, 'getObject').resolves(GET_RESPONSE);
59117

0 commit comments

Comments
 (0)