Skip to content

Commit b3e28bd

Browse files
committed
CLDSRV-863: pipe requests to ChecksumTransform and validate checksum after upload
1 parent 77f5f1d commit b3e28bd

File tree

2 files changed

+150
-57
lines changed

2 files changed

+150
-57
lines changed
Lines changed: 98 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,108 @@
11
const V4Transform = require('../../../auth/streamingV4/V4Transform');
22
const TrailingChecksumTransform = require('../../../auth/streamingV4/trailingChecksumTransform');
3+
const ChecksumTransform = require('../../../auth/streamingV4/ChecksumTransform');
4+
const {
5+
getChecksumDataFromHeaders,
6+
arsenalErrorFromChecksumError,
7+
} = require('../../apiUtils/integrity/validateChecksums');
8+
const { errors } = require('arsenal');
9+
const { unsupportedSignatureChecksums } = require('../../../../constants');
310

411
/**
5-
* Prepares the stream if the chunks are sent in a v4 Auth request
6-
* @param {object} stream - stream containing the data
7-
* @param {object | null } streamingV4Params - if v4 auth, object containing
8-
* accessKey, signatureFromRequest, region, scopeDate, timestamp, and
9-
* credentialScope (to be used for streaming v4 auth if applicable)
10-
* @param {RequestLogger} log - the current request logger
11-
* @param {function} errCb - callback called if an error occurs
12-
* @return {object|null} - V4Transform object if v4 Auth request, or
13-
* the original stream, or null if the request has no V4 params but
14-
* the type of request requires them
12+
* Prepares the request stream for data storage by wrapping it in the
13+
* appropriate transform pipeline based on the x-amz-content-sha256 header.
14+
* Always returns a ChecksumTransform as the final stream.
15+
* If no checksum was sent by the client a CRC64NVME ChecksumTransform is returned.
16+
*
17+
* @param {object} request - incoming HTTP request with headers and body stream
18+
* @param {object|null} streamingV4Params - v4 streaming auth params (accessKey,
19+
* signatureFromRequest, region, scopeDate, timestamp, credentialScope), or
20+
* null/undefined for non-v4 requests
21+
* @param {RequestLogger} log - request logger
22+
* @param {function} errCb - error callback invoked if a stream error occurs
23+
* @return {{ error: Arsenal.Error|null, stream: ChecksumTransform|null }}
24+
* error is set and stream is null if the request headers are invalid;
25+
* otherwise error is null and stream is the ChecksumTransform to read from
1526
*/
16-
function prepareStream(stream, streamingV4Params, log, errCb) {
17-
if (stream.headers['x-amz-content-sha256'] ===
18-
'STREAMING-AWS4-HMAC-SHA256-PAYLOAD') {
19-
if (typeof streamingV4Params !== 'object') {
20-
// this might happen if the user provided a valid V2
21-
// Authentication header, while the chunked upload method
22-
// requires V4: in such case we don't get any V4 params
23-
// and we should return an error to the client.
24-
return null;
25-
}
26-
const v4Transform = new V4Transform(streamingV4Params, log, errCb);
27-
stream.pipe(v4Transform);
28-
v4Transform.headers = stream.headers;
29-
return v4Transform;
30-
}
31-
return stream;
32-
}
27+
function prepareStream(request, streamingV4Params, log, errCb) {
28+
const xAmzContentSHA256 = request.headers['x-amz-content-sha256'];
3329

34-
function stripTrailingChecksumStream(stream, log, errCb) {
35-
// don't do anything if we are not in the correct integrity check mode
36-
if (stream.headers['x-amz-content-sha256'] !== 'STREAMING-UNSIGNED-PAYLOAD-TRAILER') {
37-
return stream;
30+
const checksumAlgo = getChecksumDataFromHeaders(request.headers);
31+
if (checksumAlgo.error) {
32+
log.debug('prepareStream invalid checksum headers', checksumAlgo);
33+
return { error: arsenalErrorFromChecksumError(checksumAlgo), stream: null };
3834
}
3935

40-
const trailingChecksumTransform = new TrailingChecksumTransform(log);
41-
trailingChecksumTransform.on('error', errCb);
42-
stream.pipe(trailingChecksumTransform);
43-
trailingChecksumTransform.headers = stream.headers;
44-
return trailingChecksumTransform;
36+
let stream = request;
37+
switch (xAmzContentSHA256) {
38+
case 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD': {
39+
if (typeof streamingV4Params !== 'object') {
40+
// this might happen if the user provided a valid V2
41+
// Authentication header, while the chunked upload method
42+
// requires V4: in such case we don't get any V4 params
43+
// and we should return an error to the client.
44+
log.error('missing v4 streaming params for chunked upload', {
45+
method: 'prepareStream',
46+
streamingV4ParamsType: typeof streamingV4Params,
47+
streamingV4Params,
48+
});
49+
return { error: errors.InvalidArgument, stream: null };
50+
}
51+
const v4Transform = new V4Transform(streamingV4Params, log, errCb);
52+
request.pipe(v4Transform);
53+
v4Transform.headers = request.headers;
54+
stream = v4Transform;
55+
56+
const checksumedStream = new ChecksumTransform(
57+
checksumAlgo.algorithm,
58+
checksumAlgo.expected,
59+
checksumAlgo.isTrailer,
60+
log,
61+
);
62+
checksumedStream.on('error', errCb);
63+
stream.pipe(checksumedStream);
64+
return { error: null, stream: checksumedStream };
65+
}
66+
case 'STREAMING-UNSIGNED-PAYLOAD-TRAILER': {
67+
const trailingChecksumTransform = new TrailingChecksumTransform(log);
68+
trailingChecksumTransform.on('error', errCb);
69+
request.pipe(trailingChecksumTransform);
70+
trailingChecksumTransform.headers = request.headers;
71+
stream = trailingChecksumTransform;
72+
73+
const checksumedStream = new ChecksumTransform(
74+
checksumAlgo.algorithm,
75+
checksumAlgo.expected,
76+
checksumAlgo.isTrailer,
77+
log,
78+
);
79+
checksumedStream.on('error', errCb);
80+
stream.on('trailer', (name, value) => {
81+
checksumedStream.setExpectedChecksum(name, value);
82+
});
83+
stream.pipe(checksumedStream);
84+
return { error: null, stream: checksumedStream };
85+
}
86+
case 'UNSIGNED-PAYLOAD': // Fallthrough
87+
default: {
88+
if (unsupportedSignatureChecksums.has(xAmzContentSHA256)) {
89+
return {
90+
error: errors.BadRequest.customizeDescription(`${xAmzContentSHA256} is not supported`),
91+
stream: null,
92+
};
93+
}
94+
95+
const checksumedStream = new ChecksumTransform(
96+
checksumAlgo.algorithm,
97+
checksumAlgo.expected,
98+
checksumAlgo.isTrailer,
99+
log,
100+
);
101+
checksumedStream.on('error', errCb);
102+
stream.pipe(checksumedStream);
103+
return { error: null, stream: checksumedStream };
104+
}
105+
}
45106
}
46107

47-
module.exports = {
48-
prepareStream,
49-
stripTrailingChecksumStream,
50-
};
108+
module.exports = { prepareStream };

lib/api/apiUtils/object/storeObject.js

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
const { errors, jsutil } = require('arsenal');
22

33
const { data } = require('../../../data/wrapper');
4-
const { prepareStream, stripTrailingChecksumStream } = require('./prepareStream');
4+
const { prepareStream } = require('./prepareStream');
5+
const { arsenalErrorFromChecksumError } = require('../../apiUtils/integrity/validateChecksums');
56

67
/**
78
* Check that `hashedStream.completedHash` matches header `stream.contentMD5`
@@ -58,31 +59,65 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
5859
function dataStore(objectContext, cipherBundle, stream, size,
5960
streamingV4Params, backendInfo, log, cb) {
6061
const cbOnce = jsutil.once(cb);
61-
const dataStreamTmp = prepareStream(stream, streamingV4Params, log, cbOnce);
62-
if (!dataStreamTmp) {
63-
return process.nextTick(() => cb(errors.InvalidArgument));
62+
63+
const checksumedStream = prepareStream(stream, streamingV4Params, log, cbOnce);
64+
if (checksumedStream.error) {
65+
log.debug('dataStore failed to prepare stream', checksumedStream);
66+
return process.nextTick(() => cbOnce(checksumedStream.error));
6467
}
65-
const dataStream = stripTrailingChecksumStream(dataStreamTmp, log, cbOnce);
6668
return data.put(
67-
cipherBundle, dataStream, size, objectContext, backendInfo, log,
69+
cipherBundle, checksumedStream.stream, size, objectContext, backendInfo, log,
6870
(err, dataRetrievalInfo, hashedStream) => {
6971
if (err) {
70-
log.error('error in datastore', {
71-
error: err,
72-
});
72+
log.error('error in datastore', { error: err });
7373
return cbOnce(err);
7474
}
7575
if (!dataRetrievalInfo) {
76-
log.fatal('data put returned neither an error nor a key', {
77-
method: 'storeObject::dataStore',
78-
});
76+
log.fatal('data put returned neither an error nor a key', { method: 'storeObject::dataStore' });
7977
return cbOnce(errors.InternalError);
8078
}
81-
log.trace('dataStore: backend stored key', {
82-
dataRetrievalInfo,
83-
});
84-
return checkHashMatchMD5(stream, hashedStream,
85-
dataRetrievalInfo, log, cbOnce);
79+
log.trace('dataStore: backend stored key', { dataRetrievalInfo });
80+
81+
const doValidate = () => {
82+
const checksumErr = checksumedStream.stream.validateChecksum();
83+
if (checksumErr) {
84+
log.debug('failed checksum validation stream', checksumErr);
85+
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
86+
if (deleteErr) {
87+
// Failure of batch delete is only logged.
88+
log.error('dataStore failed to delete old data', { error: deleteErr });
89+
}
90+
return cbOnce(arsenalErrorFromChecksumError(checksumErr));
91+
});
92+
}
93+
return checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cbOnce);
94+
};
95+
96+
// ChecksumTransform._flush computes the digest asynchronously for
97+
// some algorithms (e.g. crc64nvme). writableFinished is true once
98+
// _flush has called its callback, guaranteeing this.digest is set.
99+
// Stream piping ordering means this is virtually always true here,
100+
// but we wait for 'finish' explicitly to be safe.
101+
if (checksumedStream.stream.writableFinished) {
102+
return doValidate();
103+
}
104+
const onFinish = () => {
105+
checksumedStream.stream.removeListener('error', onError);
106+
doValidate();
107+
};
108+
const onError = err => {
109+
checksumedStream.stream.removeListener('finish', onFinish);
110+
log.error('checksum stream error after data.put', { error: err });
111+
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
112+
if (deleteErr) {
113+
log.error('dataStore failed to delete old data', { error: deleteErr });
114+
}
115+
cbOnce(err);
116+
});
117+
};
118+
checksumedStream.stream.once('finish', onFinish);
119+
checksumedStream.stream.once('error', onError);
120+
return undefined;
86121
});
87122
}
88123

0 commit comments

Comments
 (0)