1+ <%#
2+ # Elixir template variables:
3+ # schema - string
4+ # stream_id - integer
5+ # number_of_events - integer
6+
7+ # Bind variables
8+ # 1 - stream_id - integer
9+ # 2 - stream_version - integer
10+
11+ # Additionally, we multiply the index by 9 for each event:
12+ # 3 - event_id - uuid
13+ # 4 - event_type - text
14+ # 5 - causation_id - uuid
15+ # 6 - correlation_id - uuid
16+ # 7 - data - <serialized event data format (bytea, jsonb, etc.)>
17+ # 8 - metadata - jsonb (always?)
18+ # 9 - created_at - timestamp
19+ # 10 - index - integer
20+ # 11 - stream_version - integer
21+ %>
22+
123WITH
24+ <%#
25+ # create a table variable with:
26+ # event_id - uuid - the id for the new event
27+ # index - integer - the increase in the stream version for any stream it is linked to
28+ # stream_version - integer - the final stream version after all of the events have been inserted
29+ %>
230 new_events_indexes (event_id, index, stream_version) AS (
331 VALUES
432 <%= for i <- 0..(number_of_events - 1) do %>
735 <% end %>
836 ),
937 events AS (
38+ <%
39+ # insert the new events into the events table
40+ # using the 7 bind variables from 3 to 9 inclusive
41+ # n.b.: the bind for the event_id is re-generated here
42+ %>
1043 INSERT INTO "<%= schema %>".events
1144 (
1245 event_id,
2457 <% end %>
2558 ),
2659 stream AS (
60+ <% # Increase the version to the stream version given %>
2761 <%= cond do %>
2862 <% stream_id -> %>
2963 UPDATE "<%= schema %>".streams
3064 SET stream_version = stream_version + $2::bigint
3165 WHERE stream_id = $1::bigint
3266 returning stream_id
3367 <% created_at -> %>
68+ <%
69+ # the event created_at date has been provided as the last bind variable
70+ # use that instead of generating one
71+ %>
3472 INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at)
3573 VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>)
3674 returning stream_id
4179 <% end %>
4280 ),
4381 source_stream_events AS (
82+ <%
83+ # link the new events into it's source stream
84+ # we're using the passed in event_ids rather than reading/joining from tables
85+ # each insert uses the stream_version calculated for the corresponding event
86+ # we're joining here, so we'll get the product of:
87+ # the stream (1)
88+ # the rows in the table variable (number_of_events)
89+ %>
4490 INSERT INTO "<%= schema %>".stream_events
4591 (
4692 event_id,
@@ -58,12 +104,21 @@ WITH
58104 FROM new_events_indexes, stream
59105 ),
60106 linked_stream AS (
107+ <%
108+ # Update the all streams version by the number of events
109+ # This is the value of the expected version at append time + the number of events
110+ # Returns the version before the update
111+ %>
61112 UPDATE "<%= schema %>".streams
62113 SET stream_version = stream_version + $2::bigint
63114 WHERE stream_id = 0
64115 RETURNING stream_version - $2::bigint as initial_stream_version
65116 ),
66117 linked_stream_events AS (
118+ <%
119+ # Link the new events into the $all stream
120+ # 1 row for each event
121+ %>
67122 INSERT INTO "<%= schema %>".stream_events
68123 (
69124 event_id,
0 commit comments