Skip to content

Commit 7f6e0e5

Browse files
committed
[draft] - Flexible update expressions.
1 parent 39a7f9b commit 7f6e0e5

3 files changed

Lines changed: 570 additions & 180 deletions

File tree

src/sinks/dynamodb.js

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,171 @@ export const pkCondition = (fieldName = 'pk') => ({
7272
ConditionExpression: `attribute_not_exists(${fieldName})`,
7373
});
7474

75+
/**
76+
* A more flexible (but verbose) variant of updateExpression.
77+
* Requires providing an Item in the form of 'item fragments' which are
78+
* functions supporting the varying DDB operations. This function makes no assumptions
79+
* about your incoming values (except for removing undefined). If you need to remove
80+
* a value for example, you need to specify remove.
81+
*/
82+
export const updateExpressionFromFragments = (ItemFragments) => {
83+
const exprAttributes = Object.entries(ItemFragments)
84+
.filter(([, fragmentGenerator]) => fragmentGenerator !== undefined)
85+
.reduce((acc, [key, fragmentGenerator]) => {
86+
const {
87+
nameFragment,
88+
valueFragment,
89+
setFragment,
90+
removeFragment,
91+
addFragment,
92+
deleteFragment,
93+
} = fragmentGenerator(key);
94+
95+
return {
96+
ExpressionAttributeNames: {
97+
...acc.ExpressionAttributeNames,
98+
...nameFragment,
99+
},
100+
ExpressionAttributeValues: {
101+
...acc.ExpressionAttributeValues,
102+
...valueFragment,
103+
},
104+
setFragments: setFragment ? [...acc.setFragments, setFragment] : acc.setFragments,
105+
addFragments: addFragment ? [...acc.addFragments, addFragment] : acc.addFragments,
106+
deleteFragments: deleteFragment ? [...acc.deleteFragments, deleteFragment] : acc.deleteFragments,
107+
removeFragments: removeFragment ? [...acc.removeFragments, removeFragment] : acc.removeFragments,
108+
};
109+
}, {
110+
ExpressionAttributeNames: {},
111+
ExpressionAttributeValues: {},
112+
setFragments: [],
113+
addFragments: [],
114+
deleteFragments: [],
115+
removeFragments: [],
116+
});
117+
118+
// Construct UpdateExpression
119+
const updateExpressionParts = [];
120+
if (exprAttributes.setFragments.length) updateExpressionParts.push(`SET ${exprAttributes.setFragments.join(', ')}`);
121+
if (exprAttributes.removeFragments.length) updateExpressionParts.push(`REMOVE ${exprAttributes.removeFragments.join(', ')}`);
122+
if (exprAttributes.addFragments.length) updateExpressionParts.push(`ADD ${exprAttributes.addFragments.join(', ')}`);
123+
if (exprAttributes.deleteFragments.length) updateExpressionParts.push(`DELETE ${exprAttributes.deleteFragments.join(', ')}`);
124+
const UpdateExpression = updateExpressionParts.join(' ');
125+
126+
return {
127+
ExpressionAttributeNames: exprAttributes.ExpressionAttributeNames,
128+
ExpressionAttributeValues: exprAttributes.ExpressionAttributeValues,
129+
UpdateExpression,
130+
ReturnValues: 'ALL_NEW',
131+
};
132+
};
133+
134+
/**
135+
* Fragment generators.
136+
* Only set and setNested support operand resolvers.
137+
*/
138+
export const setValue = (value, { atIndex } = {}) => (attributeKey) => {
139+
const isResolver = value.__isResolver__;
140+
const resolvedValue = isResolver ? value.resolvedValue : value;
141+
const setFragmentValue = isResolver ? value.top(attributeKey) : `:${attributeKey}`;
142+
143+
return {
144+
nameFragment: {
145+
[`#${attributeKey}`]: attributeKey,
146+
},
147+
valueFragment: {
148+
[`:${attributeKey}`]: resolvedValue,
149+
},
150+
setFragment: `#${attributeKey}${atIndex !== undefined ? `[${atIndex}]` : ''} = ${setFragmentValue}`,
151+
};
152+
};
153+
154+
export const setNestedValue = (value, { atIndex } = {}) => (attributeKey) => {
155+
const isResolver = value.__isResolver__;
156+
const resolvedValue = isResolver ? value.resolvedValue : value;
157+
const setFragmentValue = isResolver ? value.nested(attributeKey) : `:${attributeKey}`;
158+
159+
return {
160+
nameFragment: Object.fromEntries(attributeKey.split('.').map((kp) => [`#${kp}`, kp])),
161+
valueFragment: {
162+
[`:${attributeKey}`]: resolvedValue,
163+
},
164+
setFragment: `${attributeKey.split('.').map((kp) => `#${kp}`).join('.')}${atIndex !== undefined ? `[${atIndex}]` : ''} = ${setFragmentValue}`,
165+
};
166+
};
167+
168+
export const removeValue = ({ atIndex } = {}) => (attributeKey) => ({
169+
nameFragment: { [`#${attributeKey}`]: attributeKey },
170+
removeFragment: `#${attributeKey}${atIndex !== undefined ? `[${atIndex}]` : ''}`,
171+
});
172+
173+
export const removeNestedValue = ({ atIndex } = {}) => (attributeKey) => ({
174+
nameFragment: Object.fromEntries(attributeKey.split('.').map((ak) => [`#${ak}`, ak])),
175+
removeFragment: `${attributeKey.split('.').map((ak) => `#${ak}`).join('.')}${atIndex !== undefined ? `[${atIndex}]` : ''}`,
176+
});
177+
178+
export const addToSet = (value) => (attributeKey) => ({
179+
nameFragment: { [`#${attributeKey}`]: attributeKey },
180+
valueFragment: { [`:${attributeKey}__add`]: value },
181+
addFragment: `#${attributeKey} :${attributeKey}__add`,
182+
});
183+
184+
export const deleteFromSet = (value) => (attributeKey) => ({
185+
nameFragment: { [`#${attributeKey}`]: attributeKey },
186+
valueFragment: { [`:${attributeKey}__delete`]: value },
187+
deleteFragment: `#${attributeKey} :${attributeKey}__delete`,
188+
});
189+
190+
export const addAndDeleteFromSet = (addValues, deleteValues) => (attributeKey) => {
191+
const { addFragment, valueFragment: vfAdd } = addToSet(addValues)(attributeKey);
192+
const { deleteFragment, valueFragment: vfDelete, nameFragment } = deleteFromSet(deleteValues)(attributeKey);
193+
return {
194+
nameFragment,
195+
addFragment,
196+
deleteFragment,
197+
valueFragment: {
198+
...vfAdd,
199+
...vfDelete,
200+
},
201+
};
202+
};
203+
204+
/* Operand resolvers */
205+
export const ifNotExists = (value) => ({
206+
__isResolver__: true,
207+
resolvedValue: value,
208+
top: (attributeKey) => `if_not_exists(#${attributeKey}, :${attributeKey})`,
209+
nested: (attributeKey) => `if_not_exists(${attributeKey.split('.').map((ak) => `#${ak}`).join('.')}), :${attributeKey}`,
210+
});
211+
212+
export const incrementBy = (value) => ({
213+
__isResolver__: true,
214+
resolvedValue: value,
215+
top: (attributeKey) => `#${attributeKey} + :${attributeKey}`,
216+
nested: (attributeKey) => `${attributeKey.split('.').map((ak) => `#${ak}`).join('.')} + :${attributeKey}`,
217+
});
218+
219+
export const decrementBy = (value) => ({
220+
__isResolver__: true,
221+
resolvedValue: value,
222+
top: (attributeKey) => `#${attributeKey} - :${attributeKey}`,
223+
nested: (attributeKey) => `${attributeKey.split('.').map((ak) => `#${ak}`).join('.')} - :${attributeKey}`,
224+
});
225+
226+
export const append = (value) => ({
227+
__isResolver__: true,
228+
resolvedValue: [].concat(value),
229+
top: (attributeKey) => `list_append(#${attributeKey}, :${attributeKey}`,
230+
nested: (attributeKey) => `list_append(${attributeKey.split('.').map((ak) => `#${ak}`).join('.')}, :${attributeKey})`,
231+
});
232+
233+
export const prepend = (value) => ({
234+
__isResolver__: true,
235+
resolvedValue: [].concat(value),
236+
top: (attributeKey) => `list_append(:${attributeKey}, #${attributeKey})`,
237+
nested: (attributeKey) => `list_append(:${attributeKey}, ${attributeKey.split('.').map((ak) => `#${ak}`).join('.')})`,
238+
});
239+
75240
export const updateDynamoDB = ({
76241
id: pipelineId,
77242
debug = d('dynamodb'),

test/unit/flavors/materializeTimestream.test.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import sinon from 'sinon';
44

55
import {
66
initialize, initializeFrom,
7-
ttl,
87
} from '../../../src';
98

109
import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis';

0 commit comments

Comments
 (0)