Skip to content

Commit e3b4194

Browse files
John Gilbert - HomeJohn Gilbert - Home
authored andcommitted
add athena sink
1 parent 73264db commit e3b4194

2 files changed

Lines changed: 118 additions & 0 deletions

File tree

src/sinks/athena.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import _ from 'highland';
2+
import { isServerError, isThrottlingError, isTransientError } from '@smithy/service-error-classification';
3+
import {
4+
rejectWithFault,
5+
ratelimit,
6+
} from '../utils';
7+
8+
import Connector from '../connectors/athena';
9+
10+
export const startQueryExecution = ({
11+
id: pipelineId,
12+
debug,
13+
queryRequestField = 'queryRequest',
14+
queryResponseField = 'queryResponse',
15+
parallel = Number(process.env.ATHENA_PARALLEL) || Number(process.env.PARALLEL) || 1,
16+
outputLocation = 'outputLocationRequest',
17+
encryptionEnabled = true,
18+
encryptionOption = 'CSE_KMS',
19+
kmsKey = process.env.REGIONAL_MASTER_KEY_ARN || /* istanbul ignore next */ process.env.MASTER_KEY_ARN,
20+
database = process.env.GLUE_DATABASE,
21+
catalog = process.env.GLUE_CATALOG,
22+
skipRetry = process.env.SKIP_RETRY === 'true' || process.env.NODE_ENV === 'test',
23+
...opt
24+
} /* = {} */) => {
25+
const connector = new Connector({
26+
pipelineId, debug, ...opt,
27+
});
28+
29+
const invoke = (uow) => {
30+
/* istanbul ignore if */
31+
if (!uow[queryRequestField]) return _(Promise.resolve(uow));
32+
33+
const p = () => connector.startQueryExecution({
34+
ClientRequestToken: `${uow.record?.dynamodb?.Keys?.sk?.S}-${pipelineId}`,
35+
WorkGroup: process.env.WORK_GROUP,
36+
ResultConfiguration: {
37+
OutputLocation: uow[outputLocation],
38+
EncryptionConfiguration: encryptionEnabled ? {
39+
EncryptionOption: encryptionOption,
40+
KmsKey: kmsKey,
41+
} : /* istanbul ignore next */ undefined,
42+
},
43+
QueryExecutionContext: {
44+
Database: database,
45+
Catalog: catalog,
46+
},
47+
48+
...uow[queryRequestField],
49+
}, uow)
50+
.then((queryResponse) => ({ ...uow, [queryResponseField]: queryResponse }))
51+
.catch(/* istanbul ignore next */(err) => {
52+
if (!skipRetry && (isThrottlingError(err) || isServerError(err) || isTransientError(err))) { // TODO potentially check retry count
53+
return {
54+
...uow,
55+
[queryResponseField]: { retry: uow.record.eventID },
56+
};
57+
}
58+
return Promise.reject(err);
59+
})
60+
.catch(rejectWithFault(uow));
61+
62+
return _(uow.metrics?.w(p, 'query') || p()); // wrap promise in a stream
63+
};
64+
65+
return (s) => s
66+
.through(ratelimit(opt))
67+
.map(invoke)
68+
.parallel(parallel);
69+
};

test/unit/sinks/athena.test.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
import _ from 'highland';
5+
6+
import { startQueryExecution } from '../../../src/sinks/athena';
7+
8+
import Connector from '../../../src/connectors/athena';
9+
10+
describe('sinks/athena.js', () => {
11+
afterEach(sinon.restore);
12+
13+
it('should start a query', (done) => {
14+
const stub = sinon.stub(Connector.prototype, 'startQueryExecution').resolves({});
15+
16+
const uows = [{
17+
queryRequest: {
18+
X: 'y',
19+
},
20+
}];
21+
22+
_(uows)
23+
.through(startQueryExecution({ id: 'p1' }))
24+
.collect()
25+
.tap((collected) => {
26+
// console.log(JSON.stringify(collected, null, 2));
27+
28+
expect(stub).to.have.been.calledWith({
29+
ClientRequestToken: 'undefined-p1',
30+
WorkGroup: undefined,
31+
ResultConfiguration: {
32+
OutputLocation: undefined,
33+
EncryptionConfiguration: { EncryptionOption: 'CSE_KMS', KmsKey: 'kms-arn' },
34+
},
35+
QueryExecutionContext: { Database: undefined, Catalog: undefined },
36+
X: 'y',
37+
});
38+
39+
expect(collected.length).to.equal(1);
40+
expect(collected[0]).to.deep.equal({
41+
queryRequest: {
42+
X: 'y',
43+
},
44+
queryResponse: {},
45+
});
46+
})
47+
.done(done);
48+
});
49+
});

0 commit comments

Comments
 (0)