Skip to content

Commit c323848

Browse files
John Gilbert - HomeJohn Gilbert - Home
authored andcommitted
add athena from function
1 parent e3b4194 commit c323848

2 files changed

Lines changed: 389 additions & 0 deletions

File tree

src/from/athena.js

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
import { fromSqs } from './sqs';
2+
import { faulty } from '../utils';
3+
4+
/*
5+
6+
here is an example of the yaml to route athena and scheduler events
7+
to a queue and listener function using this fromAthena function
8+
9+
resources:
10+
Resources:
11+
EbListenerQueue:
12+
Type: AWS::SQS::Queue
13+
Properties:
14+
QueueName: ${self:service}-${opt:stage}-listener-eb
15+
16+
EbListenerQueuePolicy:
17+
Type: AWS::SQS::QueuePolicy
18+
Properties:
19+
Queues:
20+
- Ref: EbListenerQueue
21+
PolicyDocument:
22+
Statement:
23+
- Effect: Allow
24+
Principal:
25+
Service: events.amazonaws.com
26+
Action: sqs:SendMessage
27+
Resource:
28+
Fn::GetAtt: [ EbListenerQueue, Arn ]
29+
Condition:
30+
ArnEquals:
31+
aws:SourceArn:
32+
Fn::GetAtt: [ AthenaEventRule, Arn ]
33+
- Effect: Allow
34+
Principal:
35+
Service: events.amazonaws.com
36+
Action: sqs:SendMessage
37+
Resource:
38+
Fn::GetAtt: [ EbListenerQueue, Arn ]
39+
Condition:
40+
ArnEquals:
41+
aws:SourceArn:
42+
Fn::GetAtt: [ SchedulerEventRule, Arn ]
43+
44+
AthenaEventRule:
45+
Type: AWS::Events::Rule
46+
Properties:
47+
EventBusName: default
48+
EventPattern:
49+
source:
50+
- aws.athena
51+
detail:
52+
workgroupName:
53+
- ${self:provider.environment.WORK_GROUP}
54+
State: ENABLED
55+
Targets:
56+
- Id: Channel
57+
Arn:
58+
Fn::GetAtt: [ EbListenerQueue, Arn ]
59+
60+
SchedulerEventRule:
61+
Type: AWS::Events::Rule
62+
Properties:
63+
EventBusName: ${cf:${self:custom.sys}-${self:custom.subsys}-event-hub-${opt:stage}.busName}
64+
EventPattern:
65+
source:
66+
- ${self:service}-${opt:stage}
67+
State: ENABLED
68+
Targets:
69+
- Id: Channel
70+
Arn:
71+
Fn::GetAtt: [ EbListenerQueue, Arn ]
72+
73+
Outputs:
74+
EbListenerQueue:
75+
Value:
76+
Ref: EbListenerQueue
77+
*/
78+
79+
// https://docs.aws.amazon.com/athena/latest/ug/athena-events.html
80+
const TYPE_MAP = {
81+
QUEUED: 'athena-query-queued',
82+
RUNNING: 'athena-query-running',
83+
SUCCEEDED: 'athena-query-succeeded',
84+
FAILED: 'athena-query-failed',
85+
};
86+
87+
const toEventEnvelope = (uow) => ({
88+
...uow,
89+
event: {
90+
...uow.event,
91+
type: TYPE_MAP[uow.event.detail.currentState] || uow.event.detail.type,
92+
timestamp: (new Date(uow.event.time)).getTime(),
93+
partitionKey: uow.event.detail?.queryExecutionId || uow.event.detail?.pk,
94+
},
95+
});
96+
97+
export const fromAthena = (event, options = {}) => fromSqs(event)
98+
// .tap(console.log)
99+
.map((uow) => ({
100+
...uow,
101+
event: JSON.parse(uow.record.body),
102+
}))
103+
.map(faulty(toEventEnvelope));
104+
105+
// // uow.record.s3.s3.object.key.match(opt.pk || /table=(\d+)\/year=(\d+)\/month=(\d+)\/day=(\d+)\/hour=(\d+)\/minute=(\d+)/),
106+
107+
// export const decodeKey = (key) => decodeURIComponent(key)
108+
// .split('/')
109+
// .reduce((a, c, i, r) => {
110+
// if (c.includes('=')) {
111+
// const [k, v] = c.split('=');
112+
// return {
113+
// ...a,
114+
// [k]: v,
115+
// };
116+
// } else if (c.includes('.')) {
117+
// const [k, v] = c.split('.');
118+
// return {
119+
// ...a,
120+
// file: k,
121+
// contentType: v, // TODO .metadata ???
122+
// };
123+
// } else if (r[r.length - 1] === c) {
124+
// return {
125+
// ...a,
126+
// file: c,
127+
// };
128+
// } else {
129+
// return {
130+
// ...a,
131+
// prefix: [...a.prefix, c],
132+
// };
133+
// }
134+
// }, {
135+
// prefix: [],
136+
// });
137+
138+
// export const buildS3PartitionKey = (key, level = 1) => key.split('/').slice(0, -level).join('/');
139+
140+
// export const mapPartition = (uow) => ({
141+
// event: {
142+
// id: uow.record.sqs.messageId, // `${uow.record.s3.s3.object.key}-${uow.record.s3.s3.object.eTag}`, // sequencer
143+
// type: uow.record.s3.eventName,
144+
// timestamp: (new Date(uow.record.s3.eventTime)).getTime(),
145+
// partitionKey: buildS3PartitionKey(decodeURIComponent(uow.record.s3.s3.object.key)),
146+
// // tags: {
147+
// // // account
148+
// // // region
149+
// // },
150+
// notification: {
151+
// type: uow.record.s3.eventName,
152+
// bucket: uow.record.s3.s3.bucket.name,
153+
// key: decodeURIComponent(uow.record.s3.s3.object.key),
154+
// partition: decodeKey(uow.record.s3.s3.object.key),
155+
// },
156+
// raw: uow.record.s3,
157+
// },
158+
// });
159+
160+
// export const mapJobRequest = (uow) => ({
161+
// getJobRequest: {
162+
// Bucket: uow.record.s3.s3.bucket.name,
163+
// Key: uow.record.s3.s3.object.key,
164+
// VersionId: uow.record.s3.s3.object.versionId,
165+
// },
166+
// });
167+
168+
// export const fromS3 = (event, options = {}) => fromSqsSnsS3(event)
169+
// .map((uow) => ({
170+
// ...uow,
171+
// isJob: uow.record.s3.s3.object.key.includes('/jobs/'),
172+
// }))
173+
// .map((uow) => ({
174+
// ...uow,
175+
// ...(!uow.isJob && mapPartition(uow)),
176+
// ...(uow.isJob && mapJobRequest(uow)),
177+
// }))
178+
// .through(getObjectFromS3({
179+
// id: 'handler:fromS3',
180+
// getRequestField: 'getJobRequest',
181+
// getResponseField: 'getJobResponse',
182+
// additionalClientOpts: {
183+
// followRegionRedirects: true,
184+
// },
185+
// ...options,
186+
// }))
187+
// .map(faulty((uow) => {
188+
// if (!uow.getJobResponse) return uow;
189+
190+
// const job = JSON.parse(Buffer.from(uow.getJobResponse.Body));
191+
// return ({
192+
// ...uow,
193+
// event: {
194+
// id: uow.record.sqs.messageId,
195+
// type: job.type,
196+
// timestamp: (new Date(uow.record.s3.eventTime)).getTime(),
197+
// job: {
198+
// id: uow.record.s3.s3.object.versionId,
199+
// ...job,
200+
// },
201+
// raw: uow.event,
202+
// },
203+
// });
204+
// }));

test/unit/from/athena.test.js

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
4+
import { fromAthena } from '../../../src/from/athena';
5+
6+
describe('from/athena.js', () => {
7+
it('should parse records', (done) => {
8+
fromAthena(EB_EVENT)
9+
.collect()
10+
.tap((collected) => {
11+
// console.log(JSON.stringify(collected, null, 2));
12+
13+
expect(collected.length).to.equal(5);
14+
expect(collected[0].event.type).to.equal('athena-query-queued');
15+
expect(collected[1].event.type).to.equal('athena-query-running');
16+
expect(collected[2].event.type).to.equal('athena-query-succeeded');
17+
expect(collected[3].event.type).to.equal('athena-query-failed');
18+
expect(collected[4].event.type).to.equal('athena-query-schedule-expired');
19+
})
20+
.done(done);
21+
});
22+
});
23+
24+
export const EB_EVENT = {
25+
Records: [
26+
{ // QUEUED
27+
messageId: 'c9ebf5e9-db02-4afe-b564-971b911e9239',
28+
body: JSON.stringify({
29+
'version': '0',
30+
'id': '091ec135-eb3c-cbe2-c87f-82d91c1cc230',
31+
'detail-type': 'Athena Query State Change',
32+
'source': 'aws.athena',
33+
'account': '012345678912',
34+
'time': '2026-01-26T01:12:25Z',
35+
'region': 'us-west-2',
36+
'resources': [],
37+
'detail': {
38+
currentState: 'QUEUED',
39+
queryExecutionId: 'd3e31b4c-c3e9-4dbd-9cb2-38d0f33ca0f8',
40+
sequenceNumber: '1',
41+
statementType: 'DML',
42+
versionId: '0',
43+
workgroupName: 'my-lh-test-jobs-dev',
44+
},
45+
}),
46+
attributes: {
47+
ApproximateReceiveCount: '1',
48+
SentTimestamp: '1769389945297',
49+
SenderId: 'AIDAKLMT2IB5VGAZG7DSY',
50+
ApproximateFirstReceiveTimestamp: '1769389945306',
51+
},
52+
messageAttributes: {},
53+
eventSource: 'aws:sqs',
54+
eventSourceARN: 'arn:aws-us-gov:sqs:us-west-2:012345678912:my-lh-test-jobs-dev-listener-eb',
55+
awsRegion: 'us-west-2',
56+
},
57+
{ // RUNNING
58+
messageId: 'dbf9ed72-fa26-4061-9b46-bd55cbd2e0de',
59+
body: JSON.stringify({
60+
'version': '0',
61+
'id': '5e51fecf-a36b-237c-ce7a-7fb21dd6c4ac',
62+
'detail-type': 'Athena Query State Change',
63+
'source': 'aws.athena',
64+
'account': '012345678912',
65+
'time': '2026-01-26T01:12:26Z',
66+
'region': 'us-west-2',
67+
'resources': [],
68+
'detail': {
69+
currentState: 'RUNNING',
70+
previousState: 'QUEUED',
71+
queryExecutionId: 'd3e31b4c-c3e9-4dbd-9cb2-38d0f33ca0f8',
72+
sequenceNumber: '2',
73+
statementType: 'DML',
74+
versionId: '0',
75+
workgroupName: 'my-lh-test-jobs-dev',
76+
},
77+
}),
78+
attributes: {
79+
ApproximateReceiveCount: '1',
80+
SentTimestamp: '1769389946356',
81+
SenderId: 'AIDAKLMT2IB5VGAZG7DSY',
82+
ApproximateFirstReceiveTimestamp: '1769389946357',
83+
},
84+
messageAttributes: {},
85+
eventSource: 'aws:sqs',
86+
eventSourceARN: 'arn:aws-us-gov:sqs:us-west-2:012345678912:my-lh-test-jobs-dev-listener-eb',
87+
awsRegion: 'us-west-2',
88+
},
89+
{ // SUCCEEDED
90+
messageId: '8b9330ae-ccb2-40c6-a5f3-fc3338a3a9d9',
91+
body: JSON.stringify({
92+
'version': '0',
93+
'id': '8bce653a-9d74-7d7a-de8b-7b80db178467',
94+
'detail-type': 'Athena Query State Change',
95+
'source': 'aws.athena',
96+
'account': '012345678912',
97+
'time': '2026-01-26T01:12:35Z',
98+
'region': 'us-west-2',
99+
'resources': [],
100+
'detail': {
101+
currentState: 'SUCCEEDED',
102+
previousState: 'RUNNING',
103+
queryExecutionId: 'd3e31b4c-c3e9-4dbd-9cb2-38d0f33ca0f8',
104+
sequenceNumber: '3',
105+
statementType: 'DML',
106+
versionId: '0',
107+
workgroupName: 'my-lh-test-jobs-dev',
108+
},
109+
}),
110+
attributes: {
111+
ApproximateReceiveCount: '1',
112+
SentTimestamp: '1769389955415',
113+
SenderId: 'AIDAKLMT2IB5VGAZG7DSY',
114+
ApproximateFirstReceiveTimestamp: '1769389955417',
115+
},
116+
messageAttributes: {},
117+
eventSource: 'aws:sqs',
118+
eventSourceARN: 'arn:aws-us-gov:sqs:us-west-2:012345678912:my-lh-test-jobs-dev-listener-eb',
119+
awsRegion: 'us-west-2',
120+
},
121+
{ // FAILED
122+
messageId: '8b9330ae-ccb7-40c6-a5f3-fc3338a3a9d7',
123+
body: JSON.stringify({
124+
'version': '0',
125+
'id': 'abcdef00-7234-5678-9abc-def012345677',
126+
'detail-type': 'Athena Query State Change',
127+
'source': 'aws.athena',
128+
'account': '012345678912',
129+
'time': '2026-01-26T01:12:26Z',
130+
'region': 'us-west-2',
131+
'resources': [
132+
],
133+
'detail': {
134+
athenaError: {
135+
errorCategory: 2.0, // Value depends on nature of exception
136+
errorType: 1306.0, // Type depends on nature of exception
137+
errorMessage: 'Amazon S3 bucket not found', // Message depends on nature of exception
138+
retryable: false, // Retryable value depends on nature of exception
139+
},
140+
versionId: '0',
141+
currentState: 'FAILED',
142+
previousState: 'RUNNING',
143+
statementType: 'DML',
144+
queryExecutionId: '01234567-0123-0123-0123-012345678901',
145+
workgroupName: 'primary',
146+
sequenceNumber: '3',
147+
},
148+
}),
149+
attributes: {
150+
ApproximateReceiveCount: '1',
151+
SentTimestamp: '1769389955415',
152+
SenderId: 'AIDAKLMT2IB5VGAZG7DSY',
153+
ApproximateFirstReceiveTimestamp: '1769389955417',
154+
},
155+
messageAttributes: {},
156+
eventSource: 'aws:sqs',
157+
eventSourceARN: 'arn:aws-us-gov:sqs:us-west-2:012345678912:my-lh-test-jobs-dev-listener-eb',
158+
awsRegion: 'us-west-2',
159+
},
160+
{ // schedule expired
161+
messageId: 'd1c37cb3-0afa-43ac-83d7-1c045fa013c3',
162+
body: JSON.stringify({
163+
'version': '0',
164+
'id': '6e3e9f4d-8029-87d0-0e07-f3a4e0712225',
165+
'detail-type': 'athena-query-schedule-expired',
166+
'source': 'my-lh-test-jobs-dev',
167+
'account': '012345678912',
168+
'time': '2026-02-02T17:27:58Z',
169+
'region': 'us-west-2',
170+
'resources': [],
171+
'detail': { type: 'athena-query-schedule-expired', pk: 'nfc-exec-wf-age-group-hsi-report', sk: '1' },
172+
}),
173+
attributes: {
174+
ApproximateReceiveCount: '1',
175+
SentTimestamp: '1770053278876',
176+
SenderId: 'AIDAKLMT2IB5VGAZG7DSY',
177+
ApproximateFirstReceiveTimestamp: '1770053278879',
178+
},
179+
messageAttributes: {},
180+
eventSource: 'aws:sqs',
181+
eventSourceARN: 'arn:aws-us-gov:sqs:us-west-2:012345678912:my-lh-test-jobs-dev-listener-eb',
182+
awsRegion: 'us-west-2',
183+
},
184+
],
185+
};

0 commit comments

Comments
 (0)