@@ -49,13 +49,119 @@ pub async fn get(
4949 AuthUser ( _claims) : AuthUser ,
5050 Path ( id) : Path < Uuid > ,
5151) -> AppResult < Json < Dataset > > {
52- let dataset: Dataset = sqlx:: query_as ( "SELECT * FROM datasets WHERE id = $1" )
52+ let mut dataset: Dataset = sqlx:: query_as ( "SELECT * FROM datasets WHERE id = $1" )
5353 . bind ( id)
5454 . fetch_one ( & state. db )
5555 . await ?;
56+
57+ // Lazy backfill: if schema is missing but we have a stored CSV file, extract it now
58+ if dataset. schema . is_none ( ) && dataset. format . eq_ignore_ascii_case ( "csv" ) {
59+ if let Some ( ref key) = dataset. s3_key {
60+ let path = key. strip_prefix ( "local:" ) . unwrap_or ( key) ;
61+ if let Ok ( bytes) = std:: fs:: read ( path) {
62+ if let Some ( ( schema, row_count) ) = extract_csv_schema ( & bytes) {
63+ let _ = sqlx:: query (
64+ "UPDATE datasets SET schema = $1, row_count = COALESCE(row_count, $2), updated_at = NOW() WHERE id = $3"
65+ )
66+ . bind ( & schema)
67+ . bind ( row_count)
68+ . bind ( dataset. id )
69+ . execute ( & state. db )
70+ . await ;
71+ dataset. schema = Some ( schema) ;
72+ if dataset. row_count . is_none ( ) {
73+ dataset. row_count = Some ( row_count) ;
74+ }
75+ }
76+ }
77+ }
78+ }
79+
5680 Ok ( Json ( dataset) )
5781}
5882
83+ /// Infer the type of a CSV cell value by attempting numeric/bool parsing.
84+ fn infer_cell_type ( val : & str ) -> & ' static str {
85+ if val. is_empty ( ) {
86+ return "string" ;
87+ }
88+ if val. parse :: < i64 > ( ) . is_ok ( ) {
89+ return "int64" ;
90+ }
91+ if val. parse :: < f64 > ( ) . is_ok ( ) {
92+ return "float64" ;
93+ }
94+ if val. eq_ignore_ascii_case ( "true" ) || val. eq_ignore_ascii_case ( "false" ) {
95+ return "boolean" ;
96+ }
97+ "string"
98+ }
99+
100+ /// Parse a CSV byte slice and return (schema JSON, row_count).
101+ fn extract_csv_schema ( bytes : & [ u8 ] ) -> Option < ( serde_json:: Value , i64 ) > {
102+ let mut rdr = csv:: ReaderBuilder :: new ( )
103+ . has_headers ( true )
104+ . from_reader ( bytes) ;
105+
106+ let headers = rdr. headers ( ) . ok ( ) ?. clone ( ) ;
107+ if headers. is_empty ( ) {
108+ return None ;
109+ }
110+
111+ let num_cols = headers. len ( ) ;
112+ // Track best type per column: start with unknown, refine by sampling rows
113+ let mut col_types: Vec < Option < & ' static str > > = vec ! [ None ; num_cols] ;
114+ let mut row_count: i64 = 0 ;
115+ let sample_limit = 100 ; // sample first 100 rows for type inference
116+
117+ for result in rdr. records ( ) {
118+ let record = match result {
119+ Ok ( r) => r,
120+ Err ( _) => continue ,
121+ } ;
122+ row_count += 1 ;
123+
124+ if row_count <= sample_limit {
125+ for ( i, field) in record. iter ( ) . enumerate ( ) {
126+ if i >= num_cols {
127+ break ;
128+ }
129+ let cell_type = infer_cell_type ( field. trim ( ) ) ;
130+ col_types[ i] = Some ( match col_types[ i] {
131+ None => cell_type,
132+ Some ( prev) => {
133+ if prev == cell_type {
134+ prev
135+ } else if ( prev == "int64" && cell_type == "float64" )
136+ || ( prev == "float64" && cell_type == "int64" )
137+ {
138+ "float64" // promote int ↔ float
139+ } else {
140+ "string" // fall back to string on conflict
141+ }
142+ }
143+ } ) ;
144+ }
145+ }
146+ }
147+ // Count remaining rows after sampling
148+ // (rdr already consumed all records in the loop above)
149+
150+ let columns: Vec < serde_json:: Value > = headers
151+ . iter ( )
152+ . enumerate ( )
153+ . map ( |( i, name) | {
154+ serde_json:: json!( {
155+ "name" : name,
156+ "type" : col_types. get( i) . and_then( |t| * t) . unwrap_or( "string" ) ,
157+ "nullable" : true
158+ } )
159+ } )
160+ . collect ( ) ;
161+
162+ Some ( ( serde_json:: Value :: Array ( columns) , row_count) )
163+ }
164+
59165pub async fn create (
60166 State ( state) : State < AppState > ,
61167 AuthUser ( claims) : AuthUser ,
@@ -64,7 +170,7 @@ pub async fn create(
64170 let dataset_id = Uuid :: new_v4 ( ) ;
65171
66172 // If file data is provided (base64), store it to local PVC
67- let ( s3_key, size_bytes) = if let Some ( ref data_b64) = req. data {
173+ let ( s3_key, size_bytes, inferred_schema , inferred_row_count ) = if let Some ( ref data_b64) = req. data {
68174 use base64:: Engine ;
69175 let bytes = base64:: engine:: general_purpose:: STANDARD
70176 . decode ( data_b64)
@@ -80,14 +186,24 @@ pub async fn create(
80186 std:: fs:: write ( & file_path, & bytes)
81187 . map_err ( |e| AppError :: Internal ( format ! ( "Failed to write file: {e}" ) ) ) ?;
82188
83- ( Some ( format ! ( "local:{}" , file_path) ) , Some ( size) )
189+ // Extract schema from CSV files
190+ let ( schema, row_count) = if ext == "csv" {
191+ extract_csv_schema ( & bytes) . unwrap_or ( ( serde_json:: Value :: Null , 0 ) )
192+ } else {
193+ ( serde_json:: Value :: Null , 0 )
194+ } ;
195+
196+ let schema_opt = if schema. is_null ( ) { None } else { Some ( schema) } ;
197+ let row_count_opt = if row_count > 0 { Some ( row_count) } else { req. row_count } ;
198+
199+ ( Some ( format ! ( "local:{}" , file_path) ) , Some ( size) , schema_opt, row_count_opt)
84200 } else {
85- ( None , None )
201+ ( None , None , None , req . row_count )
86202 } ;
87203
88204 let dataset: Dataset = sqlx:: query_as (
89- "INSERT INTO datasets (id, project_id, name, description, format, s3_key, size_bytes, row_count, version, created_by, created_at, updated_at)
90- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 1, $9, NOW(), NOW()) RETURNING *"
205+ "INSERT INTO datasets (id, project_id, name, description, format, s3_key, size_bytes, row_count, version, created_by, schema, created_at, updated_at)
206+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 1, $9, $10, NOW(), NOW()) RETURNING *"
91207 )
92208 . bind ( dataset_id)
93209 . bind ( req. project_id )
@@ -96,8 +212,9 @@ pub async fn create(
96212 . bind ( & req. format )
97213 . bind ( & s3_key)
98214 . bind ( size_bytes)
99- . bind ( req . row_count )
215+ . bind ( inferred_row_count )
100216 . bind ( claims. sub )
217+ . bind ( & inferred_schema)
101218 . fetch_one ( & state. db )
102219 . await ?;
103220 notify ( & state. db , claims. sub , "Dataset Created" , & format ! ( "Dataset '{}' ({}) uploaded" , dataset. name, dataset. format) , NotifyType :: Success , Some ( & format ! ( "/datasets/{}" , dataset. id) ) ) . await ;
0 commit comments