Skip to content

Commit 3138552

Browse files
committed
CLDSRV-892: dataStore takes checksum param instead of parsing headers
1 parent 72a7568 commit 3138552

9 files changed

Lines changed: 360 additions & 149 deletions

File tree

lib/api/apiUtils/integrity/validateChecksums.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ const { CrtCrc64Nvme } = require('@aws-sdk/crc64-nvme-crt');
55
const { errors: ArsenalErrors, errorInstances } = require('arsenal');
66
const { config } = require('../../../Config');
77

8+
const defaultChecksumData = Object.freeze(
9+
{ algorithm: 'crc64nvme', isTrailer: false, expected: undefined });
10+
811
const errAlgoNotSupported = errorInstances.InvalidRequest.customizeDescription(
912
'The algorithm type you specified in x-amz-checksum- header is invalid.');
1013
const errAlgoNotSupportedSDK = errorInstances.InvalidRequest.customizeDescription(
@@ -260,9 +263,8 @@ function getChecksumDataFromHeaders(headers) {
260263
}
261264

262265
if (!checksumHeader) {
263-
// There was no x-amz-checksum- or x-amz-trailer return crc64nvme.
264-
// The calculated crc64nvme will be stored in the object metadata.
265-
return { algorithm: 'crc64nvme', isTrailer: false, expected: undefined };
266+
// No x-amz-checksum- or x-amz-trailer header.
267+
return null;
266268
}
267269

268270
// No x-amz-sdk-checksum-algorithm we expect one x-amz-checksum-[crc64nvme, crc32, crc32C, sha1, sha256].
@@ -476,6 +478,7 @@ function getChecksumDataFromMPUHeaders(headers) {
476478

477479
module.exports = {
478480
ChecksumError,
481+
defaultChecksumData,
479482
validateChecksumsNoChunking,
480483
validateMethodChecksumNoChunking,
481484
getChecksumDataFromHeaders,

lib/api/apiUtils/object/createAndStoreObject.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const validateWebsiteHeader = require('./websiteServing')
1616
const applyZenkoUserMD = require('./applyZenkoUserMD');
1717
const {
1818
algorithms,
19+
defaultChecksumData,
1920
getChecksumDataFromHeaders,
2021
arsenalErrorFromChecksumError,
2122
} = require('../integrity/validateChecksums');
@@ -37,7 +38,7 @@ const externalVersioningErrorMessage = 'We do not currently support putting ' +
3738
* @return {undefined}
3839
*/
3940
function zeroSizeBodyChecksumCheck(headers, metadataStoreParams, callback) {
40-
const checksumData = getChecksumDataFromHeaders(headers);
41+
const checksumData = getChecksumDataFromHeaders(headers) || defaultChecksumData;
4142
if (checksumData.error) {
4243
return callback(arsenalErrorFromChecksumError(checksumData));
4344
}
@@ -291,8 +292,16 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
291292
}
292293
}
293294

295+
const headerChecksum = getChecksumDataFromHeaders(request.headers);
296+
if (headerChecksum && headerChecksum.error) {
297+
return next(arsenalErrorFromChecksumError(headerChecksum));
298+
}
299+
const checksums = {
300+
primary: headerChecksum || defaultChecksumData,
301+
secondary: null,
302+
};
294303
return dataStore(objectKeyContext, cipherBundle, request, size,
295-
streamingV4Params, backendInfo, log, next);
304+
streamingV4Params, backendInfo, checksums, log, next);
296305
},
297306
function processDataResult(dataGetInfo, calculatedHash, checksum, next) {
298307
if (dataGetInfo === null || dataGetInfo === undefined) {
@@ -320,7 +329,11 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
320329
dataGetInfoArr[0].size = mdOnlySize;
321330
}
322331
metadataStoreParams.contentMD5 = calculatedHash;
323-
metadataStoreParams.checksum = checksum;
332+
if (checksum) {
333+
// eslint-disable-next-line no-param-reassign
334+
checksum.type = 'FULL_OBJECT';
335+
metadataStoreParams.checksum = checksum;
336+
}
324337
return next(null, dataGetInfoArr);
325338
},
326339
function getVersioningInfo(infoArr, next) {

lib/api/apiUtils/object/prepareStream.js

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,33 @@
11
const V4Transform = require('../../../auth/streamingV4/V4Transform');
22
const TrailingChecksumTransform = require('../../../auth/streamingV4/trailingChecksumTransform');
33
const ChecksumTransform = require('../../../auth/streamingV4/ChecksumTransform');
4-
const {
5-
getChecksumDataFromHeaders,
6-
arsenalErrorFromChecksumError,
7-
} = require('../../apiUtils/integrity/validateChecksums');
84
const { errors, errorInstances, jsutil } = require('arsenal');
95
const { unsupportedSignatureChecksums } = require('../../../../constants');
106

117
/**
128
* Prepares the request stream for data storage by wrapping it in the
139
* appropriate transform pipeline based on the x-amz-content-sha256 header.
14-
* Always returns a ChecksumTransform as the final stream.
15-
* If no checksum was sent by the client a CRC64NVME ChecksumTransform is returned.
10+
* The returned stream is always the primary ChecksumTransform (the stored
11+
* checksum). When a secondary checksum is requested it is inserted upstream
12+
* of the primary and exposed via secondaryChecksumStream.
1613
*
1714
* @param {object} request - incoming HTTP request with headers and body stream
1815
* @param {object|null} streamingV4Params - v4 streaming auth params (accessKey,
1916
* signatureFromRequest, region, scopeDate, timestamp, credentialScope), or
2017
* null/undefined for non-v4 requests
18+
* @param {object} checksums - checksum configuration
19+
* @param {object} checksums.primary - primary checksum
20+
* ({ algorithm, isTrailer, expected }) — validated and its digest returned
21+
* @param {object|null} checksums.secondary - optional secondary checksum
22+
* ({ algorithm, isTrailer, expected }) — only validated; used for MPU parts
2123
* @param {RequestLogger} log - request logger
2224
* @param {function} errCb - error callback invoked if a stream error occurs
23-
* @return {{ error: Arsenal.Error|null, stream: ChecksumTransform|null }}
24-
* error is set and stream is null if the request headers are invalid;
25-
* otherwise error is null and stream is the ChecksumTransform to read from
25+
* @return {{ error: Arsenal.Error|null, stream: ChecksumTransform|null,
26+
* secondaryChecksumStream: ChecksumTransform|null }}
2627
*/
27-
function prepareStream(request, streamingV4Params, log, errCb) {
28+
function prepareStream(request, streamingV4Params, checksums, log, errCb) {
2829
const xAmzContentSHA256 = request.headers['x-amz-content-sha256'];
29-
30-
const checksumAlgo = getChecksumDataFromHeaders(request.headers);
31-
if (checksumAlgo.error) {
32-
log.debug('prepareStream invalid checksum headers', checksumAlgo);
33-
return { error: arsenalErrorFromChecksumError(checksumAlgo), stream: null };
34-
}
30+
const { primary, secondary } = checksums;
3531

3632
switch (xAmzContentSHA256) {
3733
case 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD': {
@@ -54,37 +50,49 @@ function prepareStream(request, streamingV4Params, log, errCb) {
5450
request.pipe(v4Transform);
5551
v4Transform.headers = request.headers;
5652

57-
const checksumedStream = new ChecksumTransform(
58-
checksumAlgo.algorithm,
59-
checksumAlgo.expected,
60-
checksumAlgo.isTrailer,
61-
log,
62-
);
63-
checksumedStream.on('error', onStreamError);
64-
v4Transform.pipe(checksumedStream);
65-
return { error: null, stream: checksumedStream };
53+
let secondaryChecksumStream = null;
54+
let stream = v4Transform;
55+
if (secondary) {
56+
secondaryChecksumStream = new ChecksumTransform(
57+
secondary.algorithm, secondary.expected,
58+
secondary.isTrailer, log);
59+
secondaryChecksumStream.on('error', onStreamError);
60+
stream = v4Transform.pipe(secondaryChecksumStream);
61+
}
62+
63+
const primaryStream = new ChecksumTransform(
64+
primary.algorithm, primary.expected, primary.isTrailer, log);
65+
primaryStream.on('error', onStreamError);
66+
return { error: null, stream: stream.pipe(primaryStream), secondaryChecksumStream };
6667
}
6768
case 'STREAMING-UNSIGNED-PAYLOAD-TRAILER': {
68-
// Use a once-guard so that auto-destroying both piped streams
69-
// when one errors does not result in errCb being called twice.
7069
const onStreamError = jsutil.once(errCb);
7170
const trailingChecksumTransform = new TrailingChecksumTransform(log);
7271
trailingChecksumTransform.on('error', onStreamError);
7372
request.pipe(trailingChecksumTransform);
7473
trailingChecksumTransform.headers = request.headers;
7574

76-
const checksumedStream = new ChecksumTransform(
77-
checksumAlgo.algorithm,
78-
checksumAlgo.expected,
79-
checksumAlgo.isTrailer,
80-
log,
81-
);
82-
checksumedStream.on('error', onStreamError);
83-
trailingChecksumTransform.on('trailer', (name, value) => {
84-
checksumedStream.setExpectedChecksum(name, value);
85-
});
86-
trailingChecksumTransform.pipe(checksumedStream);
87-
return { error: null, stream: checksumedStream };
75+
let secondaryChecksumStream = null;
76+
let stream = trailingChecksumTransform;
77+
if (secondary) {
78+
secondaryChecksumStream = new ChecksumTransform(
79+
secondary.algorithm, secondary.expected,
80+
secondary.isTrailer, log);
81+
secondaryChecksumStream.on('error', onStreamError);
82+
stream = trailingChecksumTransform.pipe(secondaryChecksumStream);
83+
trailingChecksumTransform.on('trailer', (name, value) => {
84+
secondaryChecksumStream.setExpectedChecksum(name, value);
85+
});
86+
}
87+
88+
const primaryStream = new ChecksumTransform(primary.algorithm, primary.expected, primary.isTrailer, log);
89+
primaryStream.on('error', onStreamError);
90+
if (!secondary) {
91+
trailingChecksumTransform.on('trailer', (name, value) => {
92+
primaryStream.setExpectedChecksum(name, value);
93+
});
94+
}
95+
return { error: null, stream: stream.pipe(primaryStream), secondaryChecksumStream };
8896
}
8997
case 'UNSIGNED-PAYLOAD': // Fallthrough
9098
default: {
@@ -95,15 +103,20 @@ function prepareStream(request, streamingV4Params, log, errCb) {
95103
};
96104
}
97105

98-
const checksumedStream = new ChecksumTransform(
99-
checksumAlgo.algorithm,
100-
checksumAlgo.expected,
101-
checksumAlgo.isTrailer,
102-
log,
103-
);
104-
checksumedStream.on('error', errCb);
105-
request.pipe(checksumedStream);
106-
return { error: null, stream: checksumedStream };
106+
const onStreamError = secondary ? jsutil.once(errCb) : errCb;
107+
let secondaryChecksumStream = null;
108+
let stream = request;
109+
if (secondary) {
110+
secondaryChecksumStream = new ChecksumTransform(
111+
secondary.algorithm, secondary.expected,
112+
secondary.isTrailer, log);
113+
secondaryChecksumStream.on('error', onStreamError);
114+
stream = request.pipe(secondaryChecksumStream);
115+
}
116+
117+
const primaryStream = new ChecksumTransform(primary.algorithm, primary.expected, primary.isTrailer, log);
118+
primaryStream.on('error', onStreamError);
119+
return { error: null, stream: stream.pipe(primaryStream), secondaryChecksumStream };
107120
}
108121
}
109122
}

lib/api/apiUtils/object/storeObject.js

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, checksumStre
3838
return cb(errors.BadDigest);
3939
});
4040
}
41-
const checksum = checksumStream.digest
42-
? { algorithm: checksumStream.algoName, value: checksumStream.digest, type: 'FULL_OBJECT' }
43-
: null;
41+
const checksum = { algorithm: checksumStream.algoName, value: checksumStream.digest };
4442
return cb(null, dataRetrievalInfo, completedHash, checksum);
4543
}
4644

@@ -56,20 +54,22 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, checksumStre
5654
* credentialScope (to be used for streaming v4 auth if applicable)
5755
* @param {BackendInfo} backendInfo - info to determine which data
5856
* backend to use
57+
* @param {object} checksums - checksum configuration
58+
* @param {object} checksums.primary - primary checksum data
59+
* @param {object|null} checksums.secondary - secondary checksum data
5960
* @param {RequestLogger} log - the current stream logger
6061
* @param {function} cb - callback containing result for the next task
6162
* @return {undefined}
6263
*/
63-
function dataStore(objectContext, cipherBundle, stream, size,
64-
streamingV4Params, backendInfo, log, cb) {
64+
function dataStore(objectContext, cipherBundle, stream, size, streamingV4Params, backendInfo, checksums, log, cb) {
6565
const cbOnce = jsutil.once(cb);
6666

6767
// errCb is delegated through a mutable reference so it can be upgraded to
6868
// include batchDelete once data.put has actually stored data.
6969
let onStreamError = cbOnce;
7070
const errCb = err => onStreamError(err);
7171

72-
const checksumedStream = prepareStream(stream, streamingV4Params, log, errCb);
72+
const checksumedStream = prepareStream(stream, streamingV4Params, checksums, log, errCb);
7373
if (checksumedStream.error) {
7474
log.debug('dataStore failed to prepare stream', checksumedStream);
7575
return process.nextTick(() => cbOnce(checksumedStream.error));
@@ -99,27 +99,60 @@ function dataStore(objectContext, cipherBundle, stream, size,
9999
});
100100
};
101101

102+
// stream is always the primary (end of pipe, stored checksum).
103+
// secondaryChecksumStream is upstream and only validated.
104+
const { secondaryChecksumStream } = checksumedStream;
105+
102106
const doValidate = () => {
103-
const checksumErr = checksumedStream.stream.validateChecksum();
104-
if (checksumErr) {
105-
log.debug('failed checksum validation stream', { error: checksumErr });
107+
// Validate the secondary (checked-only) checksum first.
108+
if (secondaryChecksumStream) {
109+
const secondaryErr = secondaryChecksumStream.validateChecksum();
110+
if (secondaryErr) {
111+
log.debug('failed secondary checksum validation', { error: secondaryErr });
112+
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
113+
if (deleteErr) {
114+
log.error('dataStore failed to delete old data', { error: deleteErr });
115+
}
116+
return cbOnce(arsenalErrorFromChecksumError(secondaryErr));
117+
});
118+
}
119+
}
120+
// Validate the primary (stored) checksum.
121+
const primaryErr = checksumedStream.stream.validateChecksum();
122+
if (primaryErr) {
123+
log.debug('failed primary checksum validation', { error: primaryErr });
106124
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
107125
if (deleteErr) {
108-
// Failure of batch delete is only logged.
109126
log.error('dataStore failed to delete old data', { error: deleteErr });
110127
}
111-
return cbOnce(arsenalErrorFromChecksumError(checksumErr));
128+
return cbOnce(arsenalErrorFromChecksumError(primaryErr));
112129
});
113130
}
131+
if (!secondaryChecksumStream) {
132+
return checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo,
133+
checksumedStream.stream, log, cbOnce);
134+
}
135+
// Dual-checksum: checkHashMatchMD5 returns the primary
136+
// (storage) checksum. Swap it to the client-facing one
137+
// from the secondary stream and attach the primary as
138+
// storageChecksum.
114139
return checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo,
115-
checksumedStream.stream, log, cbOnce);
140+
checksumedStream.stream, log, (err, dataInfo, hash, primaryChecksum) => {
141+
if (err) {return cbOnce(err);}
142+
const checksum = {
143+
algorithm: secondaryChecksumStream.algoName, // Used for the response headers.
144+
value: secondaryChecksumStream.digest,
145+
storageChecksum: primaryChecksum,
146+
};
147+
return cbOnce(null, dataInfo, hash, checksum);
148+
});
116149
};
117150

118151
// ChecksumTransform._flush computes the digest asynchronously for
119152
// some algorithms (e.g. crc64nvme). writableFinished is true once
120153
// _flush has called its callback, guaranteeing this.digest is set.
121-
// Stream piping ordering means this is virtually always true here,
122-
// but we wait for 'finish' explicitly to be safe.
154+
// stream is the primary (end of pipe) — when it finishes all
155+
// upstream transforms (including the secondary) have flushed.
123156
if (checksumedStream.stream.writableFinished) {
124157
return doValidate();
125158
}

0 commit comments

Comments
 (0)