Skip to content

Commit 5dea795

Browse files
committed
CLDSRV-863: pipe requests to ChecksumTransform and validate checksum after upload
1 parent 85f6f78 commit 5dea795

File tree

4 files changed

+658
-57
lines changed

4 files changed

+658
-57
lines changed
Lines changed: 101 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,111 @@
11
const V4Transform = require('../../../auth/streamingV4/V4Transform');
22
const TrailingChecksumTransform = require('../../../auth/streamingV4/trailingChecksumTransform');
3+
const ChecksumTransform = require('../../../auth/streamingV4/ChecksumTransform');
4+
const {
5+
getChecksumDataFromHeaders,
6+
arsenalErrorFromChecksumError,
7+
} = require('../../apiUtils/integrity/validateChecksums');
8+
const { errors, jsutil } = require('arsenal');
9+
const { unsupportedSignatureChecksums } = require('../../../../constants');
310

411
/**
5-
* Prepares the stream if the chunks are sent in a v4 Auth request
6-
* @param {object} stream - stream containing the data
7-
* @param {object | null } streamingV4Params - if v4 auth, object containing
8-
* accessKey, signatureFromRequest, region, scopeDate, timestamp, and
9-
* credentialScope (to be used for streaming v4 auth if applicable)
10-
* @param {RequestLogger} log - the current request logger
11-
* @param {function} errCb - callback called if an error occurs
12-
* @return {object|null} - V4Transform object if v4 Auth request, or
13-
* the original stream, or null if the request has no V4 params but
14-
* the type of request requires them
12+
* Prepares the request stream for data storage by wrapping it in the
13+
* appropriate transform pipeline based on the x-amz-content-sha256 header.
14+
* Always returns a ChecksumTransform as the final stream.
15+
* If no checksum was sent by the client a CRC64NVME ChecksumTransform is returned.
16+
*
17+
* @param {object} request - incoming HTTP request with headers and body stream
18+
* @param {object|null} streamingV4Params - v4 streaming auth params (accessKey,
19+
* signatureFromRequest, region, scopeDate, timestamp, credentialScope), or
20+
* null/undefined for non-v4 requests
21+
* @param {RequestLogger} log - request logger
22+
* @param {function} errCb - error callback invoked if a stream error occurs
23+
* @return {{ error: Arsenal.Error|null, stream: ChecksumTransform|null }}
24+
* error is set and stream is null if the request headers are invalid;
25+
* otherwise error is null and stream is the ChecksumTransform to read from
1526
*/
16-
function prepareStream(stream, streamingV4Params, log, errCb) {
17-
if (stream.headers['x-amz-content-sha256'] ===
18-
'STREAMING-AWS4-HMAC-SHA256-PAYLOAD') {
19-
if (typeof streamingV4Params !== 'object') {
20-
// this might happen if the user provided a valid V2
21-
// Authentication header, while the chunked upload method
22-
// requires V4: in such case we don't get any V4 params
23-
// and we should return an error to the client.
24-
return null;
25-
}
26-
const v4Transform = new V4Transform(streamingV4Params, log, errCb);
27-
stream.pipe(v4Transform);
28-
v4Transform.headers = stream.headers;
29-
return v4Transform;
30-
}
31-
return stream;
32-
}
27+
function prepareStream(request, streamingV4Params, log, errCb) {
28+
const xAmzContentSHA256 = request.headers['x-amz-content-sha256'];
3329

34-
function stripTrailingChecksumStream(stream, log, errCb) {
35-
// don't do anything if we are not in the correct integrity check mode
36-
if (stream.headers['x-amz-content-sha256'] !== 'STREAMING-UNSIGNED-PAYLOAD-TRAILER') {
37-
return stream;
30+
const checksumAlgo = getChecksumDataFromHeaders(request.headers);
31+
if (checksumAlgo.error) {
32+
log.debug('prepareStream invalid checksum headers', checksumAlgo);
33+
return { error: arsenalErrorFromChecksumError(checksumAlgo), stream: null };
3834
}
3935

40-
const trailingChecksumTransform = new TrailingChecksumTransform(log);
41-
trailingChecksumTransform.on('error', errCb);
42-
stream.pipe(trailingChecksumTransform);
43-
trailingChecksumTransform.headers = stream.headers;
44-
return trailingChecksumTransform;
36+
let stream = request;
37+
switch (xAmzContentSHA256) {
38+
case 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD': {
39+
if (streamingV4Params === null || typeof streamingV4Params !== 'object') {
40+
// this might happen if the user provided a valid V2
41+
// Authentication header, while the chunked upload method
42+
// requires V4: in such case we don't get any V4 params
43+
// and we should return an error to the client.
44+
log.error('missing v4 streaming params for chunked upload', {
45+
method: 'prepareStream',
46+
streamingV4ParamsType: typeof streamingV4Params,
47+
streamingV4Params,
48+
});
49+
return { error: errors.InvalidArgument, stream: null };
50+
}
51+
const v4Transform = new V4Transform(streamingV4Params, log, errCb);
52+
request.pipe(v4Transform);
53+
v4Transform.headers = request.headers;
54+
stream = v4Transform;
55+
56+
const checksumedStream = new ChecksumTransform(
57+
checksumAlgo.algorithm,
58+
checksumAlgo.expected,
59+
checksumAlgo.isTrailer,
60+
log,
61+
);
62+
checksumedStream.on('error', errCb);
63+
stream.pipe(checksumedStream);
64+
return { error: null, stream: checksumedStream };
65+
}
66+
case 'STREAMING-UNSIGNED-PAYLOAD-TRAILER': {
67+
// Use a once-guard so that auto-destroying both piped streams
68+
// when one errors does not result in errCb being called twice.
69+
const onStreamError = jsutil.once(errCb);
70+
const trailingChecksumTransform = new TrailingChecksumTransform(log);
71+
trailingChecksumTransform.on('error', onStreamError);
72+
request.pipe(trailingChecksumTransform);
73+
trailingChecksumTransform.headers = request.headers;
74+
stream = trailingChecksumTransform;
75+
76+
const checksumedStream = new ChecksumTransform(
77+
checksumAlgo.algorithm,
78+
checksumAlgo.expected,
79+
checksumAlgo.isTrailer,
80+
log,
81+
);
82+
checksumedStream.on('error', onStreamError);
83+
trailingChecksumTransform.on('trailer', (name, value) => {
84+
checksumedStream.setExpectedChecksum(name, value);
85+
});
86+
stream.pipe(checksumedStream);
87+
return { error: null, stream: checksumedStream };
88+
}
89+
case 'UNSIGNED-PAYLOAD': // Fallthrough
90+
default: {
91+
if (unsupportedSignatureChecksums.has(xAmzContentSHA256)) {
92+
return {
93+
error: errors.BadRequest.customizeDescription(`${xAmzContentSHA256} is not supported`),
94+
stream: null,
95+
};
96+
}
97+
98+
const checksumedStream = new ChecksumTransform(
99+
checksumAlgo.algorithm,
100+
checksumAlgo.expected,
101+
checksumAlgo.isTrailer,
102+
log,
103+
);
104+
checksumedStream.on('error', errCb);
105+
stream.pipe(checksumedStream);
106+
return { error: null, stream: checksumedStream };
107+
}
108+
}
45109
}
46110

47-
module.exports = {
48-
prepareStream,
49-
stripTrailingChecksumStream,
50-
};
111+
module.exports = { prepareStream };

lib/api/apiUtils/object/storeObject.js

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
const { errors, jsutil } = require('arsenal');
22

33
const { data } = require('../../../data/wrapper');
4-
const { prepareStream, stripTrailingChecksumStream } = require('./prepareStream');
4+
const { prepareStream } = require('./prepareStream');
5+
const { arsenalErrorFromChecksumError } = require('../../apiUtils/integrity/validateChecksums');
56

67
/**
78
* Check that `hashedStream.completedHash` matches header `stream.contentMD5`
@@ -58,31 +59,67 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
5859
function dataStore(objectContext, cipherBundle, stream, size,
5960
streamingV4Params, backendInfo, log, cb) {
6061
const cbOnce = jsutil.once(cb);
61-
const dataStreamTmp = prepareStream(stream, streamingV4Params, log, cbOnce);
62-
if (!dataStreamTmp) {
63-
return process.nextTick(() => cb(errors.InvalidArgument));
62+
63+
// errCb is delegated through a mutable reference so it can be upgraded to
64+
// include batchDelete once data.put has actually stored data.
65+
let onStreamError = cbOnce;
66+
const errCb = err => onStreamError(err);
67+
68+
const checksumedStream = prepareStream(stream, streamingV4Params, log, errCb);
69+
if (checksumedStream.error) {
70+
log.debug('dataStore failed to prepare stream', checksumedStream);
71+
return process.nextTick(() => cbOnce(checksumedStream.error));
6472
}
65-
const dataStream = stripTrailingChecksumStream(dataStreamTmp, log, cbOnce);
6673
return data.put(
67-
cipherBundle, dataStream, size, objectContext, backendInfo, log,
74+
cipherBundle, checksumedStream.stream, size, objectContext, backendInfo, log,
6875
(err, dataRetrievalInfo, hashedStream) => {
6976
if (err) {
70-
log.error('error in datastore', {
71-
error: err,
72-
});
77+
log.error('error in datastore', { error: err });
7378
return cbOnce(err);
7479
}
7580
if (!dataRetrievalInfo) {
76-
log.fatal('data put returned neither an error nor a key', {
77-
method: 'storeObject::dataStore',
78-
});
81+
log.fatal('data put returned neither an error nor a key', { method: 'storeObject::dataStore' });
7982
return cbOnce(errors.InternalError);
8083
}
81-
log.trace('dataStore: backend stored key', {
82-
dataRetrievalInfo,
83-
});
84-
return checkHashMatchMD5(stream, hashedStream,
85-
dataRetrievalInfo, log, cbOnce);
84+
log.trace('dataStore: backend stored key', { dataRetrievalInfo });
85+
86+
// Data is now stored. Upgrade the error handler so that any stream
87+
// error from this point on cleans up the stored data first.
88+
onStreamError = streamErr => {
89+
log.error('checksum stream error after data.put', { error: streamErr });
90+
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
91+
if (deleteErr) {
92+
log.error('dataStore failed to delete old data', { error: deleteErr });
93+
}
94+
cbOnce(streamErr);
95+
});
96+
};
97+
98+
const doValidate = () => {
99+
const checksumErr = checksumedStream.stream.validateChecksum();
100+
if (checksumErr) {
101+
log.debug('failed checksum validation stream', checksumErr);
102+
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
103+
if (deleteErr) {
104+
// Failure of batch delete is only logged.
105+
log.error('dataStore failed to delete old data', { error: deleteErr });
106+
}
107+
return cbOnce(arsenalErrorFromChecksumError(checksumErr));
108+
});
109+
}
110+
return checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cbOnce);
111+
};
112+
113+
// ChecksumTransform._flush computes the digest asynchronously for
114+
// some algorithms (e.g. crc64nvme). writableFinished is true once
115+
// _flush has called its callback, guaranteeing this.digest is set.
116+
// Stream piping ordering means this is virtually always true here,
117+
// but we wait for 'finish' explicitly to be safe.
118+
if (checksumedStream.stream.writableFinished) {
119+
return doValidate();
120+
}
121+
checksumedStream.stream.once('finish', doValidate);
122+
return undefined;
86123
});
87124
}
88125

0 commit comments

Comments
 (0)