-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathindex.js
More file actions
121 lines (111 loc) · 3.5 KB
/
index.js
File metadata and controls
121 lines (111 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import _ from 'highland';
import {
now, trimAndRedact, uuid, compress, FAULT_COMPRESSION_IGNORE,
} from '../utils';
export const FAULT_EVENT_TYPE = 'fault';
// collect faults until the end so that an unhandled error
// does not cause handler faults to be repeatedly published
const theFaults = [];
export const faults = (opt) => (err, push) => {
logErr(err);
if (err.uow && !(opt.retryable && opt.retryable(err, opt))) {
// handled exceptions are adorned with the uow in error
// push a fault event onto the stack for publishing by publishFaultsPipeline
theFaults.push({
id: uuid.v1(),
partitionKey: uuid.v4(),
type: FAULT_EVENT_TYPE,
timestamp: now(),
tags: {
functionname: process.env.AWS_LAMBDA_FUNCTION_NAME || 'undefined',
pipeline: err.uow.pipeline || 'undefined',
},
err: {
name: err.name,
message: err.message,
stack: err.stack,
},
uow: trimAndRedact(err.uow),
});
} else {
// rethrow unhandled/unexpected exceptions to stop processing
push(err);
}
};
export const flushFaults = (opt) => (s) => {
// use at the every end with through() to redirect to faults streams
const faultStream = () => {
const s2 = _((push, next) => {
const f = theFaults.shift();
if (f) {
push(null, limitFaultSize(f, opt));
next();
} else {
push(null, _.nil);
}
});
return s2
// batch and publish fault events
.map((fault) => ({ event: fault })) // map to uow format
.through(opt.publish({
...opt,
...opt.faultOpt, // override options specific for faults
handleErrors: false, // don't publish faults for faults
batchSize: Number(process.env.FAULTS_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 4,
parallel: Number(process.env.FAULTS_PARALLEL) || Number(process.env.PARALLEL) || 4,
}));
};
return s
.consume((err, x, push, next) => {
/* istanbul ignore if */
if (err) {
push(err);
next();
} else if (x === _.nil) {
// this is the purpose of this consume step
// publish all acuumulated faults at the very end
next(faultStream());
} else {
push(null, x);
next();
}
});
};
const logErr = (err) => {
/* istanbul ignore if */
if (process.env.AWS_LAMBDA_LOG_GROUP_NAME) {
if (err instanceof Error) {
console.error(JSON.stringify({
errorMessage: err.message,
errorType: err.name,
pipeline: err.uow?.pipeline || 'undefined',
handled: err.uow !== undefined,
retryable: err.retryable,
stackTrace: err.stack,
}));
} else {
console.error(err);
}
}
};
export const limitFaultSize = (fault, opt) => {
const str = JSON.stringify(fault, compress({ ...opt, compressionIgnore: FAULT_COMPRESSION_IGNORE }));
const size = Buffer.byteLength(str);
if (size > opt.maxRequestSize) {
// just include what is essential to resubmit faults
return {
...fault,
uow: fault.uow.batch ? {
batch: fault.uow.batch.map(({ record }) => ({ record })),
} : {
record: fault.uow.record,
},
};
// if it is still too big there is not a lot we can do
// maybe the original event is too big with unnecessary data
// for a fault with uow.batch maybe reduce the batch size temporarily
// TODO add-source-side-claim-check-support - https://github.com/jgilbert01/aws-lambda-stream/issues/355
} else {
return fault;
}
};