Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions lib/api/apiUtils/integrity/validateChecksums.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const { CrtCrc64Nvme } = require('@aws-sdk/crc64-nvme-crt');
const { errors: ArsenalErrors, errorInstances } = require('arsenal');
const { config } = require('../../../Config');

const defaultChecksumData = Object.freeze(
{ algorithm: 'crc64nvme', isTrailer: false, expected: undefined });

const errAlgoNotSupported = errorInstances.InvalidRequest.customizeDescription(
'The algorithm type you specified in x-amz-checksum- header is invalid.');
const errAlgoNotSupportedSDK = errorInstances.InvalidRequest.customizeDescription(
Expand Down Expand Up @@ -260,9 +263,8 @@ function getChecksumDataFromHeaders(headers) {
}

if (!checksumHeader) {
// There was no x-amz-checksum- or x-amz-trailer return crc64nvme.
// The calculated crc64nvme will be stored in the object metadata.
return { algorithm: 'crc64nvme', isTrailer: false, expected: undefined };
// No x-amz-checksum- or x-amz-trailer header.
return null;
}

// No x-amz-sdk-checksum-algorithm we expect one x-amz-checksum-[crc64nvme, crc32, crc32C, sha1, sha256].
Expand Down Expand Up @@ -476,6 +478,7 @@ function getChecksumDataFromMPUHeaders(headers) {

module.exports = {
ChecksumError,
defaultChecksumData,
validateChecksumsNoChunking,
validateMethodChecksumNoChunking,
getChecksumDataFromHeaders,
Expand Down
19 changes: 16 additions & 3 deletions lib/api/apiUtils/object/createAndStoreObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const validateWebsiteHeader = require('./websiteServing')
const applyZenkoUserMD = require('./applyZenkoUserMD');
const {
algorithms,
defaultChecksumData,
getChecksumDataFromHeaders,
arsenalErrorFromChecksumError,
} = require('../integrity/validateChecksums');
Expand All @@ -37,7 +38,7 @@ const externalVersioningErrorMessage = 'We do not currently support putting ' +
* @return {undefined}
*/
function zeroSizeBodyChecksumCheck(headers, metadataStoreParams, callback) {
const checksumData = getChecksumDataFromHeaders(headers);
const checksumData = getChecksumDataFromHeaders(headers) || defaultChecksumData;
if (checksumData.error) {
return callback(arsenalErrorFromChecksumError(checksumData));
}
Expand Down Expand Up @@ -291,8 +292,16 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
}
}

const headerChecksum = getChecksumDataFromHeaders(request.headers);
if (headerChecksum && headerChecksum.error) {
return next(arsenalErrorFromChecksumError(headerChecksum));
}
const checksums = {
primary: headerChecksum || defaultChecksumData,
secondary: null,
};
return dataStore(objectKeyContext, cipherBundle, request, size,
streamingV4Params, backendInfo, log, next);
streamingV4Params, backendInfo, checksums, log, next);
},
function processDataResult(dataGetInfo, calculatedHash, checksum, next) {
if (dataGetInfo === null || dataGetInfo === undefined) {
Expand Down Expand Up @@ -320,7 +329,11 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
dataGetInfoArr[0].size = mdOnlySize;
}
metadataStoreParams.contentMD5 = calculatedHash;
metadataStoreParams.checksum = checksum;
if (checksum) {
// eslint-disable-next-line no-param-reassign
checksum.type = 'FULL_OBJECT';
metadataStoreParams.checksum = checksum;
}
return next(null, dataGetInfoArr);
},
function getVersioningInfo(infoArr, next) {
Expand Down
109 changes: 61 additions & 48 deletions lib/api/apiUtils/object/prepareStream.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,33 @@
const V4Transform = require('../../../auth/streamingV4/V4Transform');
const TrailingChecksumTransform = require('../../../auth/streamingV4/trailingChecksumTransform');
const ChecksumTransform = require('../../../auth/streamingV4/ChecksumTransform');
const {
getChecksumDataFromHeaders,
arsenalErrorFromChecksumError,
} = require('../../apiUtils/integrity/validateChecksums');
const { errors, errorInstances, jsutil } = require('arsenal');
const { unsupportedSignatureChecksums } = require('../../../../constants');

/**
* Prepares the request stream for data storage by wrapping it in the
* appropriate transform pipeline based on the x-amz-content-sha256 header.
* Always returns a ChecksumTransform as the final stream.
* If no checksum was sent by the client a CRC64NVME ChecksumTransform is returned.
* The returned stream is always the primary ChecksumTransform (the stored
* checksum). When a secondary checksum is requested it is inserted upstream
* of the primary and exposed via secondaryChecksumStream.
*
* @param {object} request - incoming HTTP request with headers and body stream
* @param {object|null} streamingV4Params - v4 streaming auth params (accessKey,
* signatureFromRequest, region, scopeDate, timestamp, credentialScope), or
* null/undefined for non-v4 requests
* @param {object} checksums - checksum configuration
* @param {object} checksums.primary - primary checksum
* ({ algorithm, isTrailer, expected }) — validated and its digest returned
* @param {object|null} checksums.secondary - optional secondary checksum
* ({ algorithm, isTrailer, expected }) — only validated; used for MPU parts
* @param {RequestLogger} log - request logger
* @param {function} errCb - error callback invoked if a stream error occurs
* @return {{ error: Arsenal.Error|null, stream: ChecksumTransform|null }}
* error is set and stream is null if the request headers are invalid;
* otherwise error is null and stream is the ChecksumTransform to read from
* @return {{ error: Arsenal.Error|null, stream: ChecksumTransform|null,
* secondaryChecksumStream: ChecksumTransform|null }}
*/
function prepareStream(request, streamingV4Params, log, errCb) {
function prepareStream(request, streamingV4Params, checksums, log, errCb) {
const xAmzContentSHA256 = request.headers['x-amz-content-sha256'];

const checksumAlgo = getChecksumDataFromHeaders(request.headers);
if (checksumAlgo.error) {
log.debug('prepareStream invalid checksum headers', checksumAlgo);
return { error: arsenalErrorFromChecksumError(checksumAlgo), stream: null };
}
const { primary, secondary } = checksums;

switch (xAmzContentSHA256) {
case 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD': {
Expand All @@ -54,37 +50,49 @@ function prepareStream(request, streamingV4Params, log, errCb) {
request.pipe(v4Transform);
v4Transform.headers = request.headers;

const checksumedStream = new ChecksumTransform(
checksumAlgo.algorithm,
checksumAlgo.expected,
checksumAlgo.isTrailer,
log,
);
checksumedStream.on('error', onStreamError);
v4Transform.pipe(checksumedStream);
return { error: null, stream: checksumedStream };
let secondaryChecksumStream = null;
let stream = v4Transform;
if (secondary) {
secondaryChecksumStream = new ChecksumTransform(
secondary.algorithm, secondary.expected,
secondary.isTrailer, log);
secondaryChecksumStream.on('error', onStreamError);
stream = v4Transform.pipe(secondaryChecksumStream);
}

const primaryStream = new ChecksumTransform(
primary.algorithm, primary.expected, primary.isTrailer, log);
primaryStream.on('error', onStreamError);
return { error: null, stream: stream.pipe(primaryStream), secondaryChecksumStream };
}
case 'STREAMING-UNSIGNED-PAYLOAD-TRAILER': {
// Use a once-guard so that auto-destroying both piped streams
// when one errors does not result in errCb being called twice.
const onStreamError = jsutil.once(errCb);
const trailingChecksumTransform = new TrailingChecksumTransform(log);
trailingChecksumTransform.on('error', onStreamError);
request.pipe(trailingChecksumTransform);
trailingChecksumTransform.headers = request.headers;

const checksumedStream = new ChecksumTransform(
checksumAlgo.algorithm,
checksumAlgo.expected,
checksumAlgo.isTrailer,
log,
);
checksumedStream.on('error', onStreamError);
trailingChecksumTransform.on('trailer', (name, value) => {
checksumedStream.setExpectedChecksum(name, value);
});
trailingChecksumTransform.pipe(checksumedStream);
return { error: null, stream: checksumedStream };
let secondaryChecksumStream = null;
let stream = trailingChecksumTransform;
if (secondary) {
secondaryChecksumStream = new ChecksumTransform(
secondary.algorithm, secondary.expected,
secondary.isTrailer, log);
secondaryChecksumStream.on('error', onStreamError);
stream = trailingChecksumTransform.pipe(secondaryChecksumStream);
trailingChecksumTransform.on('trailer', (name, value) => {
secondaryChecksumStream.setExpectedChecksum(name, value);
});
}

const primaryStream = new ChecksumTransform(primary.algorithm, primary.expected, primary.isTrailer, log);
primaryStream.on('error', onStreamError);
if (!secondary) {
trailingChecksumTransform.on('trailer', (name, value) => {
primaryStream.setExpectedChecksum(name, value);
});
}
return { error: null, stream: stream.pipe(primaryStream), secondaryChecksumStream };
}
case 'UNSIGNED-PAYLOAD': // Fallthrough
default: {
Expand All @@ -95,15 +103,20 @@ function prepareStream(request, streamingV4Params, log, errCb) {
};
}

const checksumedStream = new ChecksumTransform(
checksumAlgo.algorithm,
checksumAlgo.expected,
checksumAlgo.isTrailer,
log,
);
checksumedStream.on('error', errCb);
request.pipe(checksumedStream);
return { error: null, stream: checksumedStream };
const onStreamError = secondary ? jsutil.once(errCb) : errCb;
let secondaryChecksumStream = null;
let stream = request;
if (secondary) {
secondaryChecksumStream = new ChecksumTransform(
secondary.algorithm, secondary.expected,
secondary.isTrailer, log);
secondaryChecksumStream.on('error', onStreamError);
stream = request.pipe(secondaryChecksumStream);
}

const primaryStream = new ChecksumTransform(primary.algorithm, primary.expected, primary.isTrailer, log);
primaryStream.on('error', onStreamError);
return { error: null, stream: stream.pipe(primaryStream), secondaryChecksumStream };
}
}
}
Expand Down
61 changes: 47 additions & 14 deletions lib/api/apiUtils/object/storeObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
return cb(errors.BadDigest);
});
}
const checksum = checksumStream.digest
? { algorithm: checksumStream.algoName, value: checksumStream.digest, type: 'FULL_OBJECT' }
: null;
const checksum = { algorithm: checksumStream.algoName, value: checksumStream.digest };
return cb(null, dataRetrievalInfo, completedHash, checksum);
}

Expand All @@ -56,77 +54,112 @@
* credentialScope (to be used for streaming v4 auth if applicable)
* @param {BackendInfo} backendInfo - info to determine which data
* backend to use
* @param {object} checksums - checksum configuration
* @param {object} checksums.primary - primary checksum data
* @param {object|null} checksums.secondary - secondary checksum data
* @param {RequestLogger} log - the current stream logger
* @param {function} cb - callback containing result for the next task
* @return {undefined}
*/
function dataStore(objectContext, cipherBundle, stream, size,
streamingV4Params, backendInfo, log, cb) {
function dataStore(objectContext, cipherBundle, stream, size, streamingV4Params, backendInfo, checksums, log, cb) {
const cbOnce = jsutil.once(cb);

// errCb is delegated through a mutable reference so it can be upgraded to
// include batchDelete once data.put has actually stored data.
let onStreamError = cbOnce;
const errCb = err => onStreamError(err);

const checksumedStream = prepareStream(stream, streamingV4Params, log, errCb);
const checksumedStream = prepareStream(stream, streamingV4Params, checksums, log, errCb);
if (checksumedStream.error) {
log.debug('dataStore failed to prepare stream', checksumedStream);
return process.nextTick(() => cbOnce(checksumedStream.error));
}
return data.put(
cipherBundle, checksumedStream.stream, size, objectContext, backendInfo, log,
(err, dataRetrievalInfo, hashedStream) => {
if (err) {
log.error('error in datastore', { error: err });
return cbOnce(err);
}
if (!dataRetrievalInfo) {
log.fatal('data put returned neither an error nor a key', { method: 'storeObject::dataStore' });
return cbOnce(errors.InternalError);
}
log.trace('dataStore: backend stored key', { dataRetrievalInfo });

// Data is now stored. Upgrade the error handler so that any stream
// error from this point on cleans up the stored data first.
onStreamError = streamErr => {
log.error('checksum stream error after data.put', { error: streamErr });
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
if (deleteErr) {
log.error('dataStore failed to delete old data', { error: deleteErr });
}
cbOnce(streamErr);
});
};

// stream is always the primary (end of pipe, stored checksum).
// secondaryChecksumStream is upstream and only validated.
const { secondaryChecksumStream } = checksumedStream;

const doValidate = () => {
const checksumErr = checksumedStream.stream.validateChecksum();
if (checksumErr) {
log.debug('failed checksum validation stream', { error: checksumErr });
// Validate the secondary (checked-only) checksum first.
if (secondaryChecksumStream) {
const secondaryErr = secondaryChecksumStream.validateChecksum();
if (secondaryErr) {
log.debug('failed secondary checksum validation', { error: secondaryErr });
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
if (deleteErr) {
log.error('dataStore failed to delete old data', { error: deleteErr });
}
return cbOnce(arsenalErrorFromChecksumError(secondaryErr));
});
}
}
// Validate the primary (stored) checksum.
const primaryErr = checksumedStream.stream.validateChecksum();
if (primaryErr) {
log.debug('failed primary checksum validation', { error: primaryErr });
return data.batchDelete([dataRetrievalInfo], null, null, log, deleteErr => {
if (deleteErr) {
// Failure of batch delete is only logged.
log.error('dataStore failed to delete old data', { error: deleteErr });
}
return cbOnce(arsenalErrorFromChecksumError(checksumErr));
return cbOnce(arsenalErrorFromChecksumError(primaryErr));
});
}
if (!secondaryChecksumStream) {
return checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo,
checksumedStream.stream, log, cbOnce);
}
// Dual-checksum: checkHashMatchMD5 returns the primary
// (storage) checksum. Swap it to the client-facing one
// from the secondary stream and attach the primary as
// storageChecksum.
return checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo,
checksumedStream.stream, log, cbOnce);
checksumedStream.stream, log, (err, dataInfo, hash, primaryChecksum) => {
if (err) {return cbOnce(err);}
const checksum = {
algorithm: secondaryChecksumStream.algoName, // Used for the response headers.
value: secondaryChecksumStream.digest,
storageChecksum: primaryChecksum,
};
return cbOnce(null, dataInfo, hash, checksum);
});
};

// ChecksumTransform._flush computes the digest asynchronously for
// some algorithms (e.g. crc64nvme). writableFinished is true once
// _flush has called its callback, guaranteeing this.digest is set.
// Stream piping ordering means this is virtually always true here,
// but we wait for 'finish' explicitly to be safe.
// stream is the primary (end of pipe) — when it finishes all
// upstream transforms (including the secondary) have flushed.
if (checksumedStream.stream.writableFinished) {
return doValidate();
}
checksumedStream.stream.once('finish', doValidate);
return undefined;
});
}

Check notice

Code scanning / CodeQL

Callback-style function (async migration) Note

This function uses a callback parameter ('cb'). Refactor to async/await.

module.exports = {
dataStore,
Expand Down
Loading
Loading