@@ -120,31 +120,49 @@ where
120120 /// working table unconditionally once it has been created, and
121121 /// propagate failures in causal order (insert error first).
122122 pub async fn run ( & self ) -> Result < ( ) > {
123- let config = DriverConfig :: from_env ( ) ?;
123+ let mut direct = self . connect ( ) . await ?;
124+ // Encrypt exactly the spec's curated values.
125+ let result = self . render_values ( & mut direct, self . values ( ) ) . await ;
126+ let _ = direct. close ( ) . await ;
127+ let lines = result?;
128+ self . write_script ( None , & lines, self . values ( ) . len ( ) )
129+ }
124130
125- let mut direct = config
131+ /// Open the single direct Postgres connection the pipeline uses for
132+ /// schema / insert / render / drop. Encryption happens in Rust
133+ /// (cipherstash-client), so there is no second connection.
134+ async fn connect ( & self ) -> Result < PgConnection > {
135+ let config = DriverConfig :: from_env ( ) ?;
136+ config
126137 . direct
127138 . clone ( )
128139 . connect ( )
129140 . await
130- . context ( "connecting to Postgres (direct)" ) ?;
141+ . context ( "connecting to Postgres (direct)" )
142+ }
131143
144+ /// The shared generation pipeline: apply the working schema, encrypt + insert
145+ /// `values`, render the committed INSERT lines, then drop the working table.
146+ ///
147+ /// Honours the teardown contract: once the working table exists it is dropped
148+ /// unconditionally (success *or* error), and failures propagate in causal
149+ /// order — insert error first (root cause), then render, then drop. Returns
150+ /// the rendered INSERT lines in `id` order.
151+ async fn render_values ( & self , direct : & mut PgConnection , values : & [ T ] ) -> Result < Vec < String > > {
132152 self . check_complete ( ) . context ( "invalid FixtureSpec" ) ?;
133153
134154 sqlx:: raw_sql ( & self . working_schema_sql ( ) )
135- . execute ( & mut direct)
155+ . execute ( & mut * direct)
136156 . await
137157 . context ( "applying working-table schema" ) ?;
138158
139- // Insert directly on the same connection used for schema/render/drop.
140- // The earlier two-connection design existed because `run_with` borrows
141- // `direct` mutably across the closure call; production has no such
142- // need — `insert_direct` is the only caller of cipherstash-client and
143- // can hold the same `&mut direct` for its duration.
144- let insert_result = self . insert_direct ( & mut direct) . await ;
159+ // Insert on the same connection used for schema/render/drop. `run_with`'s
160+ // two-connection shape exists only for the test seam; production holds a
161+ // single `&mut direct` for the whole pipeline.
162+ let insert_result = self . insert_values ( & mut * direct, values) . await ;
145163 let render_result = if insert_result. is_ok ( ) {
146164 sqlx:: query ( & self . render_rows_sql ( ) )
147- . fetch_all ( & mut direct)
165+ . fetch_all ( & mut * direct)
148166 . await
149167 . context ( "rendering fixture rows" )
150168 } else {
@@ -153,63 +171,69 @@ where
153171
154172 let working = self . working_table ( ) ;
155173 let drop_result = sqlx:: raw_sql ( & format ! ( "DROP TABLE IF EXISTS public.{working};" ) )
156- . execute ( & mut direct)
174+ . execute ( & mut * direct)
157175 . await ;
158176
159177 insert_result?;
160178 let rows = render_result?;
161179 drop_result. context ( "dropping the working table" ) ?;
162180
163- let lines: Vec < String > = rows
164- . iter ( )
181+ rows. iter ( )
165182 . map ( |r| r. try_get :: < String , _ > ( 0 ) . context ( "reading rendered INSERT" ) )
166- . collect :: < Result < _ > > ( ) ?;
167-
168- let _ = direct. close ( ) . await ;
183+ . collect ( )
184+ }
169185
186+ /// Compose the committed script (preamble + optional extra header + the
187+ /// rendered INSERT lines) and write it to `tests/sqlx/fixtures/<name>.sql`.
188+ fn write_script (
189+ & self ,
190+ extra_header : Option < & str > ,
191+ lines : & [ String ] ,
192+ row_count : usize ,
193+ ) -> Result < ( ) > {
170194 let mut script = self . fixture_script_preamble ( ) ;
171- for line in & lines {
195+ if let Some ( header) = extra_header {
196+ script. push_str ( header) ;
197+ }
198+ for line in lines {
172199 script. push_str ( line) ;
173200 script. push ( '\n' ) ;
174201 }
175202
176203 let path = fixture_script_path ( & self . script_filename ( ) ) ;
177204 std:: fs:: write ( & path, script)
178205 . with_context ( || format ! ( "writing fixture script {}" , path. display( ) ) ) ?;
179- println ! ( "wrote {} ({} rows)" , path. display( ) , self . values ( ) . len ( ) ) ;
206+ println ! ( "wrote {} ({} rows)" , path. display( ) , row_count ) ;
180207 Ok ( ( ) )
181208 }
182209
183- /// Encrypt every plaintext value via cipherstash-client in **one
184- /// batched call**, then INSERT each ciphertext into the working
185- /// table as plain JSONB. The committed `ColumnConfig` is built once
186- /// from the spec's indexes + cast — the fixture name is fed as the
187- /// table identifier so the resulting payload's `i.t` field matches
188- /// the working table, preserving the shape Proxy used to emit.
210+ /// Encrypt every value in `values` via cipherstash-client in **one batched
211+ /// call**, then INSERT each ciphertext into the working table as plain JSONB.
212+ /// The committed `ColumnConfig` is built once from the spec's indexes + cast
213+ /// — the fixture name is fed as the table identifier so the resulting
214+ /// payload's `i.t` field matches the working table, preserving the shape
215+ /// Proxy used to emit.
189216 ///
190- /// Batching means one ZeroKMS round trip per fixture run regardless
191- /// of value count; the INSERT loop is per-row because the working
192- /// table is local Postgres and the per-row execute cost is in
193- /// microseconds.
194- async fn insert_direct ( & self , direct : & mut PgConnection ) -> Result < ( ) > {
217+ /// Batching means one ZeroKMS round trip per run regardless of value count;
218+ /// the INSERT loop is per-row because the working table is local Postgres and
219+ /// the per-row execute cost is in microseconds. A repeated plaintext in
220+ /// `values` is encrypted independently here, so a repeated plaintext lands as
221+ /// a distinct ciphertext row sharing that plaintext.
222+ async fn insert_values ( & self , direct : & mut PgConnection , values : & [ T ] ) -> Result < ( ) > {
195223 let config = cipherstash:: column_config_for ( self . indexes ( ) , T :: CAST )
196224 . context ( "building ColumnConfig from FixtureSpec indexes" ) ?;
197225
198226 let working = self . working_table ( ) ;
199- let payloads = cipherstash:: encrypt_store (
200- & working,
201- cipherstash:: PAYLOAD_COLUMN ,
202- self . values ( ) ,
203- & config,
204- )
205- . await
206- . context ( "encrypting fixture values" ) ?;
227+ let payloads =
228+ cipherstash:: encrypt_store ( & working, cipherstash:: PAYLOAD_COLUMN , values, & config)
229+ . await
230+ . context ( "encrypting fixture values" ) ?;
207231
208232 let insert = format ! (
209233 "INSERT INTO public.{working} (id, plaintext, {col}) VALUES ($1, $2, $3)" ,
210234 col = cipherstash:: PAYLOAD_COLUMN
211235 ) ;
212- for ( i, ( value, payload) ) in self . values ( ) . iter ( ) . zip ( payloads) . enumerate ( ) {
236+ for ( i, ( value, payload) ) in values. iter ( ) . zip ( payloads) . enumerate ( ) {
213237 let id = ( i as i64 ) + 1 ;
214238 sqlx:: query ( & insert)
215239 . bind ( id)
0 commit comments