@@ -120,31 +120,97 @@ where
120120 /// working table unconditionally once it has been created, and
121121 /// propagate failures in causal order (insert error first).
122122 pub async fn run ( & self ) -> Result < ( ) > {
123- let config = DriverConfig :: from_env ( ) ?;
123+ let mut direct = self . connect ( ) . await ?;
124+ // Encrypt exactly the spec's curated values — no random/duplicate
125+ // augmentation. Used by the document fixtures (`v3_ste_vec`) and as the
126+ // shared pipeline for `run_sampled`.
127+ let result = self . render_corpus ( & mut direct, self . values ( ) ) . await ;
128+ let _ = direct. close ( ) . await ;
129+ let lines = result?;
130+ self . write_script ( None , & lines, self . values ( ) . len ( ) )
131+ }
124132
125- let mut direct = config
133+ /// Like [`run`](Self::run), but the encrypted corpus is
134+ /// `mandatory ∪ random ∪ duplicates` (see [`super::random`]). The mandatory
135+ /// floor is `self.values()` (the curated catalog values); the random and
136+ /// duplicate portions come from [`super::random::CorpusConfig::from_env`].
137+ /// The resolved seed is logged and embedded in the generated SQL so the
138+ /// corpus is reproducible.
139+ ///
140+ /// This is the generation entry for catalog **scalar** fixtures (wired
141+ /// through the `scalar_fixture!` macro and the `fixture_dispatch`). The
142+ /// document fixtures (`v3_ste_vec`, `v3_doc_int4`) deliberately keep using
143+ /// `run` / `run_with_payloads` — they carry hand-built payloads, not a
144+ /// sampled scalar corpus.
145+ pub async fn run_sampled ( & self ) -> Result < ( ) >
146+ where
147+ T : super :: random:: RandomPlaintext ,
148+ {
149+ let cfg = super :: random:: CorpusConfig :: from_env ( ) ;
150+ let corpus = super :: random:: compose_corpus ( self . values ( ) , & cfg) ;
151+
152+ println ! (
153+ "fixture {}: corpus = {} mandatory + {} random + {} duplicate rows (FIXTURE_SEED={})" ,
154+ self . name( ) ,
155+ self . values( ) . len( ) ,
156+ cfg. samples,
157+ cfg. duplicates,
158+ cfg. seed,
159+ ) ;
160+
161+ let header = format ! (
162+ "-- Randomized corpus: {floor} mandatory + {samples} random + {dups} duplicate \
163+ plaintext rows.\n \
164+ -- Reproduce with: FIXTURE_SEED={seed} FIXTURE_SAMPLES={samples} \
165+ FIXTURE_DUPLICATES={dups}\n \n ",
166+ floor = self . values( ) . len( ) ,
167+ samples = cfg. samples,
168+ dups = cfg. duplicates,
169+ seed = cfg. seed,
170+ ) ;
171+
172+ let mut direct = self . connect ( ) . await ?;
173+ let result = self . render_corpus ( & mut direct, & corpus) . await ;
174+ let _ = direct. close ( ) . await ;
175+ let lines = result?;
176+ self . write_script ( Some ( & header) , & lines, corpus. len ( ) )
177+ }
178+
179+ /// Open the single direct Postgres connection the pipeline uses for
180+ /// schema / insert / render / drop. Encryption happens in Rust
181+ /// (cipherstash-client), so there is no second connection.
182+ async fn connect ( & self ) -> Result < PgConnection > {
183+ let config = DriverConfig :: from_env ( ) ?;
184+ config
126185 . direct
127186 . clone ( )
128187 . connect ( )
129188 . await
130- . context ( "connecting to Postgres (direct)" ) ?;
189+ . context ( "connecting to Postgres (direct)" )
190+ }
131191
192+ /// The shared generation pipeline: apply the working schema, encrypt + insert
193+ /// `corpus`, render the committed INSERT lines, then drop the working table.
194+ ///
195+ /// Honours the teardown contract: once the working table exists it is dropped
196+ /// unconditionally (success *or* error), and failures propagate in causal
197+ /// order — insert error first (root cause), then render, then drop. Returns
198+ /// the rendered INSERT lines in `id` order.
199+ async fn render_corpus ( & self , direct : & mut PgConnection , corpus : & [ T ] ) -> Result < Vec < String > > {
132200 self . check_complete ( ) . context ( "invalid FixtureSpec" ) ?;
133201
134202 sqlx:: raw_sql ( & self . working_schema_sql ( ) )
135- . execute ( & mut direct)
203+ . execute ( & mut * direct)
136204 . await
137205 . context ( "applying working-table schema" ) ?;
138206
139- // Insert directly on the same connection used for schema/render/drop.
140- // The earlier two-connection design existed because `run_with` borrows
141- // `direct` mutably across the closure call; production has no such
142- // need — `insert_direct` is the only caller of cipherstash-client and
143- // can hold the same `&mut direct` for its duration.
144- let insert_result = self . insert_direct ( & mut direct) . await ;
207+ // Insert on the same connection used for schema/render/drop. `run_with`'s
208+ // two-connection shape exists only for the test seam; production holds a
209+ // single `&mut direct` for the whole pipeline.
210+ let insert_result = self . insert_values ( & mut * direct, corpus) . await ;
145211 let render_result = if insert_result. is_ok ( ) {
146212 sqlx:: query ( & self . render_rows_sql ( ) )
147- . fetch_all ( & mut direct)
213+ . fetch_all ( & mut * direct)
148214 . await
149215 . context ( "rendering fixture rows" )
150216 } else {
@@ -153,63 +219,70 @@ where
153219
154220 let working = self . working_table ( ) ;
155221 let drop_result = sqlx:: raw_sql ( & format ! ( "DROP TABLE IF EXISTS public.{working};" ) )
156- . execute ( & mut direct)
222+ . execute ( & mut * direct)
157223 . await ;
158224
159225 insert_result?;
160226 let rows = render_result?;
161227 drop_result. context ( "dropping the working table" ) ?;
162228
163- let lines: Vec < String > = rows
164- . iter ( )
229+ rows. iter ( )
165230 . map ( |r| r. try_get :: < String , _ > ( 0 ) . context ( "reading rendered INSERT" ) )
166- . collect :: < Result < _ > > ( ) ?;
167-
168- let _ = direct. close ( ) . await ;
231+ . collect ( )
232+ }
169233
234+ /// Compose the committed script (preamble + optional extra header + the
235+ /// rendered INSERT lines) and write it to `tests/sqlx/fixtures/<name>.sql`.
236+ fn write_script (
237+ & self ,
238+ extra_header : Option < & str > ,
239+ lines : & [ String ] ,
240+ row_count : usize ,
241+ ) -> Result < ( ) > {
170242 let mut script = self . fixture_script_preamble ( ) ;
171- for line in & lines {
243+ if let Some ( header) = extra_header {
244+ script. push_str ( header) ;
245+ }
246+ for line in lines {
172247 script. push_str ( line) ;
173248 script. push ( '\n' ) ;
174249 }
175250
176251 let path = fixture_script_path ( & self . script_filename ( ) ) ;
177252 std:: fs:: write ( & path, script)
178253 . with_context ( || format ! ( "writing fixture script {}" , path. display( ) ) ) ?;
179- println ! ( "wrote {} ({} rows)" , path. display( ) , self . values ( ) . len ( ) ) ;
254+ println ! ( "wrote {} ({} rows)" , path. display( ) , row_count ) ;
180255 Ok ( ( ) )
181256 }
182257
183- /// Encrypt every plaintext value via cipherstash-client in **one
184- /// batched call**, then INSERT each ciphertext into the working
185- /// table as plain JSONB. The committed `ColumnConfig` is built once
186- /// from the spec's indexes + cast — the fixture name is fed as the
187- /// table identifier so the resulting payload's `i.t` field matches
188- /// the working table, preserving the shape Proxy used to emit.
258+ /// Encrypt every value in `values` via cipherstash-client in **one batched
259+ /// call**, then INSERT each ciphertext into the working table as plain JSONB.
260+ /// The committed `ColumnConfig` is built once from the spec's indexes + cast
261+ /// — the fixture name is fed as the table identifier so the resulting
262+ /// payload's `i.t` field matches the working table, preserving the shape
263+ /// Proxy used to emit.
189264 ///
190- /// Batching means one ZeroKMS round trip per fixture run regardless
191- /// of value count; the INSERT loop is per-row because the working
192- /// table is local Postgres and the per-row execute cost is in
193- /// microseconds.
194- async fn insert_direct ( & self , direct : & mut PgConnection ) -> Result < ( ) > {
265+ /// Batching means one ZeroKMS round trip per run regardless of value count;
266+ /// the INSERT loop is per-row because the working table is local Postgres and
267+ /// the per-row execute cost is in microseconds. A repeated plaintext in
268+ /// `values` is encrypted independently here, so it lands as a distinct
269+ /// ciphertext row sharing the plaintext — the cross-ciphertext-equality
270+ /// coverage `run_sampled` depends on.
271+ async fn insert_values ( & self , direct : & mut PgConnection , values : & [ T ] ) -> Result < ( ) > {
195272 let config = cipherstash:: column_config_for ( self . indexes ( ) , T :: CAST )
196273 . context ( "building ColumnConfig from FixtureSpec indexes" ) ?;
197274
198275 let working = self . working_table ( ) ;
199- let payloads = cipherstash:: encrypt_store (
200- & working,
201- cipherstash:: PAYLOAD_COLUMN ,
202- self . values ( ) ,
203- & config,
204- )
205- . await
206- . context ( "encrypting fixture values" ) ?;
276+ let payloads =
277+ cipherstash:: encrypt_store ( & working, cipherstash:: PAYLOAD_COLUMN , values, & config)
278+ . await
279+ . context ( "encrypting fixture values" ) ?;
207280
208281 let insert = format ! (
209282 "INSERT INTO public.{working} (id, plaintext, {col}) VALUES ($1, $2, $3)" ,
210283 col = cipherstash:: PAYLOAD_COLUMN
211284 ) ;
212- for ( i, ( value, payload) ) in self . values ( ) . iter ( ) . zip ( payloads) . enumerate ( ) {
285+ for ( i, ( value, payload) ) in values. iter ( ) . zip ( payloads) . enumerate ( ) {
213286 let id = ( i as i64 ) + 1 ;
214287 sqlx:: query ( & insert)
215288 . bind ( id)
0 commit comments