11"""Command-line interface for A4D pipeline."""
22
33from pathlib import Path
4+ from typing import Annotated
45
6+ import polars as pl
57import typer
68from rich .console import Console
79from rich .table import Table
810
9- from a4d .pipeline .patient import run_patient_pipeline
11+ from a4d .pipeline .patient import process_patient_tables , run_patient_pipeline
12+ from a4d .tables .logs import create_table_logs
1013
1114app = typer .Typer (name = "a4d" , help = "A4D medical tracker data processing pipeline" , no_args_is_help = True )
1215
1316console = Console ()
1417
1518
19+ def _display_tables_summary (tables : dict [str , Path ]) -> None :
20+ """Display summary table of created tables with record counts.
21+
22+ Args:
23+ tables: Dictionary mapping table name to output path
24+ """
25+ if not tables :
26+ return
27+
28+ console .print ("\n [bold green]Created Tables:[/bold green]" )
29+ tables_table = Table (title = "Created Tables" )
30+ tables_table .add_column ("Table" , style = "cyan" )
31+ tables_table .add_column ("Path" , style = "green" )
32+ tables_table .add_column ("Records" , justify = "right" , style = "magenta" )
33+
34+ # Add patient tables first, then logs table
35+ for name in ["static" , "monthly" , "annual" ]:
36+ if name in tables :
37+ path = tables [name ]
38+ try :
39+ df = pl .read_parquet (path )
40+ record_count = f"{ len (df ):,} "
41+ except Exception :
42+ record_count = "?"
43+ tables_table .add_row (name , str (path .name ), record_count )
44+
45+ # Add logs table last
46+ if "logs" in tables :
47+ path = tables ["logs" ]
48+ try :
49+ df = pl .read_parquet (path )
50+ record_count = f"{ len (df ):,} "
51+ except Exception :
52+ record_count = "?"
53+ tables_table .add_row ("logs" , str (path .name ), record_count )
54+
55+ console .print (tables_table )
56+ console .print ()
57+
58+
1659@app .command ("process-patient" )
1760def process_patient_cmd (
18- file : Path | None = typer .Option (
19- None , "--file" , "-f" , help = "Process specific tracker file (if not set, processes all files in data_root)"
20- ),
21- workers : int = typer .Option (1 , "--workers" , "-w" , help = "Number of parallel workers (1 = sequential)" ),
22- skip_tables : bool = typer .Option (False , "--skip-tables" , help = "Skip table creation (only extract + clean)" ),
23- force : bool = typer .Option (False , "--force" , help = "Force reprocessing (ignore existing outputs)" ),
24- output_root : Path | None = typer .Option (None , "--output" , "-o" , help = "Output directory (default: from config)" ),
61+ file : Annotated [
62+ Path | None ,
63+ typer .Option (
64+ "--file" , "-f" , help = "Process specific tracker file (if not set, processes all files in data_root)"
65+ ),
66+ ] = None ,
67+ workers : Annotated [int , typer .Option ("--workers" , "-w" , help = "Number of parallel workers (1 = sequential)" )] = 1 ,
68+ skip_tables : Annotated [
69+ bool , typer .Option ("--skip-tables" , help = "Skip table creation (only extract + clean)" )
70+ ] = False ,
71+ force : Annotated [bool , typer .Option ("--force" , help = "Force reprocessing (ignore existing outputs)" )] = False ,
72+ output_root : Annotated [
73+ Path | None , typer .Option ("--output" , "-o" , help = "Output directory (default: from config)" )
74+ ] = None ,
2575):
2676 """Process patient data pipeline.
2777
@@ -126,7 +176,7 @@ def process_patient_cmd(
126176 files_by_errors = sorted (
127177 [(tr .tracker_file .name , tr .cleaning_errors ) for tr in result .tracker_results if tr .cleaning_errors > 0 ],
128178 key = lambda x : x [1 ],
129- reverse = True
179+ reverse = True ,
130180 )[:10 ]
131181
132182 errors_table = Table ()
@@ -139,16 +189,7 @@ def process_patient_cmd(
139189 console .print (errors_table )
140190
141191 # Show created tables
142- if result .tables :
143- console .print ("\n [bold green]Created Tables:[/bold green]" )
144- tables_table = Table ()
145- tables_table .add_column ("Table" , style = "cyan" )
146- tables_table .add_column ("Path" , style = "green" )
147-
148- for name , path in result .tables .items ():
149- tables_table .add_row (name , str (path ))
150-
151- console .print (tables_table )
192+ _display_tables_summary (result .tables )
152193
153194 # Exit status
154195 if result .success :
@@ -160,8 +201,74 @@ def process_patient_cmd(
160201
161202 except Exception as e :
162203 console .print (f"\n [bold red]Error: { e } [/bold red]\n " )
204+ raise typer .Exit (1 ) from e
205+
206+
207+ @app .command ("create-tables" )
208+ def create_tables_cmd (
209+ input_dir : Annotated [Path , typer .Option ("--input" , "-i" , help = "Directory containing cleaned parquet files" )],
210+ output_dir : Annotated [
211+ Path | None , typer .Option ("--output" , "-o" , help = "Output directory for tables (default: input_dir/tables)" )
212+ ] = None ,
213+ ):
214+ """Create final tables from existing cleaned parquet files.
215+
216+ This command creates the patient tables (static, monthly, annual) and logs table
217+ from existing cleaned parquet files, without running the full pipeline.
218+
219+ Useful for:
220+ - Re-creating tables after fixing table creation logic
221+ - Creating tables from manually cleaned data
222+ - Testing table creation independently
223+
224+ \\ b
225+ Examples:
226+ # Create tables from existing output
227+ uv run a4d create-tables --input output/patient_data_cleaned
228+
229+ # Specify custom output directory
230+ uv run a4d create-tables --input output/patient_data_cleaned --output custom_tables
231+ """
232+ console .print ("\n [bold blue]A4D Table Creation[/bold blue]\n " )
233+
234+ # Determine output directory
235+ if output_dir is None :
236+ output_dir = input_dir .parent / "tables"
237+
238+ console .print (f"Input directory: { input_dir } " )
239+ console .print (f"Output directory: { output_dir } \n " )
240+
241+ # Find cleaned parquet files
242+ cleaned_files = list (input_dir .glob ("*_patient_cleaned.parquet" ))
243+ if not cleaned_files :
244+ console .print (f"[bold red]Error: No cleaned parquet files found in { input_dir } [/bold red]\n " )
163245 raise typer .Exit (1 )
164246
247+ console .print (f"Found { len (cleaned_files )} cleaned parquet files\n " )
248+
249+ try :
250+ console .print ("[bold]Creating tables...[/bold]" )
251+
252+ # Create patient tables
253+ tables = process_patient_tables (input_dir , output_dir )
254+
255+ # Create logs table separately (operational data)
256+ logs_dir = input_dir .parent / "logs"
257+ if logs_dir .exists ():
258+ console .print (" • Creating logs table..." )
259+ logs_table_path = create_table_logs (logs_dir , output_dir )
260+ tables ["logs" ] = logs_table_path
261+ else :
262+ console .print (f" [yellow]Warning: Logs directory not found at { logs_dir } [/yellow]" )
263+
264+ # Display results
265+ console .print ("\n [bold green]✓ Tables created successfully![/bold green]" )
266+ _display_tables_summary (tables )
267+
268+ except Exception as e :
269+ console .print (f"\n [bold red]Error creating tables: { e } [/bold red]\n " )
270+ raise typer .Exit (1 ) from e
271+
165272
166273@app .command ("version" )
167274def version_cmd ():
0 commit comments