4545import technology .tabula .Table ;
4646
4747import java .io .InputStream ;
48+ import java .text .ParseException ;
49+ import java .text .SimpleDateFormat ;
4850import java .time .Instant ;
51+ import java .time .LocalDate ;
52+ import java .time .LocalTime ;
53+ import java .time .format .DateTimeFormatter ;
4954import java .util .ArrayList ;
5055import java .util .Calendar ;
56+ import java .util .Date ;
5157import java .util .List ;
5258
5359public class PdfBatchReader implements ManagedReader <FileScanFramework .FileSchemaNegotiator > {
@@ -68,7 +74,9 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem
6874 private List <String > columnHeaders ;
6975 private int currentRowIndex ;
7076 private Table currentTable ;
71-
77+ private int currentTableIndex ;
78+ private int startingTableIndex ;
79+ private FileScanFramework .FileSchemaNegotiator negotiator ;
7280
7381 // Document Metadata Fields
7482 private int pageCount ;
@@ -86,7 +94,7 @@ public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchem
8694 private int tableCount ;
8795 private int rows ;
8896 private int metadataIndex ;
89- private int currentTableIndex ;
97+
9098
9199 // Tables
92100 private List <Table > tables ;
@@ -103,20 +111,23 @@ static class PdfReaderConfig {
103111 public PdfBatchReader (PdfReaderConfig readerConfig , int maxRecords ) {
104112 this .maxRecords = maxRecords ;
105113 this .unregisteredColumnCount = 0 ;
106- this .currentTableIndex = 0 ;
107114 this .writers = new ArrayList <>();
108115 this .config = readerConfig ;
116+ this .startingTableIndex = readerConfig .plugin .getConfig ().getDefaultTableIndex () < 0 ? 0 : readerConfig .plugin .getConfig ().getDefaultTableIndex ();
117+ this .currentTableIndex = this .startingTableIndex ;
118+ this .columnHeaders = new ArrayList <>();
109119 }
110120
111121 @ Override
112122 public boolean open (FileScanFramework .FileSchemaNegotiator negotiator ) {
113123 System .setProperty ("java.awt.headless" , "true" );
124+ this .negotiator = negotiator ;
114125
115126 split = negotiator .split ();
116127 errorContext = negotiator .parentErrorContext ();
117128 builder = new SchemaBuilder ();
118129
119- openFile (negotiator );
130+ openFile ();
120131
121132 // Get the tables
122133 tables = Utils .extractTablesFromPDF (document );
@@ -125,13 +136,13 @@ public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
125136
126137 // Support provided schema
127138 TupleMetadata schema = null ;
128- if (negotiator .hasProvidedSchema ()) {
129- schema = negotiator .providedSchema ();
130- negotiator .tableSchema (schema , false );
139+ if (this . negotiator .hasProvidedSchema ()) {
140+ schema = this . negotiator .providedSchema ();
141+ this . negotiator .tableSchema (schema , false );
131142 } else {
132- negotiator .tableSchema (buildSchema (), false );
143+ this . negotiator .tableSchema (buildSchema (), false );
133144 }
134- ResultSetLoader loader = negotiator .build ();
145+ ResultSetLoader loader = this . negotiator .build ();
135146 rowWriter = loader .writer ();
136147
137148 if (negotiator .hasProvidedSchema ()) {
@@ -143,7 +154,7 @@ public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
143154
144155 // Prepare for reading
145156 currentRowIndex = 1 ; // Skip the first line if there are headers
146- currentTable = tables .get (0 );
157+ currentTable = tables .get (startingTableIndex );
147158
148159 return true ;
149160 }
@@ -159,7 +170,6 @@ public boolean next() {
159170 } else if (currentRowIndex >= currentTable .getRows ().size () &&
160171 currentTableIndex < tables .size () &&
161172 config .plugin .getConfig ().getCombinePages ()) {
162- logger .debug ("Merging table {} with current table." , currentTableIndex );
163173 currentRowIndex = 0 ;
164174 currentTable = tables .get (currentTableIndex ++);
165175 } else if (currentRowIndex >= currentTable .getRows ().size ()) {
@@ -174,6 +184,10 @@ public boolean next() {
174184 }
175185
176186 private void processRow (List <RectangularTextContainer > row ) {
187+ if (row == null || row .size () == 0 ) {
188+ return ;
189+ }
190+
177191 String value ;
178192 rowWriter .start ();
179193 for (int i = 0 ; i < row .size (); i ++) {
@@ -204,9 +218,8 @@ public void close() {
204218
205219 /**
206220 * This method opens the PDF file, and finds the tables
207- * @param negotiator The Drill file negotiator object that represents the file system
208221 */
209- private void openFile (FileScanFramework . FileSchemaNegotiator negotiator ) {
222+ private void openFile () {
210223 try {
211224 fsStream = negotiator .fileSystem ().openPossiblyCompressedStream (split .getPath ());
212225 document = PDDocument .load (fsStream );
@@ -321,7 +334,7 @@ private void addUnknownColumnToSchemaAndCreateWriter (TupleWriter rowWriter, Str
321334 }
322335
323336 private TupleMetadata buildSchema () {
324- Table table = tables .get (0 );
337+ Table table = tables .get (startingTableIndex );
325338 columns = table .getColCount ();
326339 rows = table .getRowCount ();
327340
@@ -376,9 +389,18 @@ private void buildWriterListFromProvidedSchema(TupleMetadata schema) {
376389 case FLOAT8 :
377390 writers .add (new DoublePdfColumnWriter (counter , fieldName , rowWriter ));
378391 break ;
392+ case DATE :
393+ writers .add (new DatePdfColumnWriter (counter , fieldName , rowWriter , negotiator ));
394+ break ;
395+ case TIME :
396+ writers .add (new TimePdfColumnWriter (counter , fieldName , rowWriter , negotiator ));
397+ break ;
398+ case TIMESTAMP :
399+ writers .add (new TimestampPdfColumnWriter (counter , fieldName , rowWriter , negotiator ));
400+ break ;
379401 default :
380402 throw UserException .unsupportedError ()
381- .message ("PDF Reader with Provided Schema only supports String, and Numeric Types " )
403+ .message ("PDF Reader with provided schema does not support " + type . name () + " data type. " )
382404 .addContext (errorContext )
383405 .build (logger );
384406 }
@@ -446,4 +468,82 @@ public void load(RectangularTextContainer<?> cell) {
446468 writer .setString (cell .getText ());
447469 }
448470 }
471+
472+ public static class DatePdfColumnWriter extends PdfColumnWriter {
473+ private String dateFormat ;
474+
475+ DatePdfColumnWriter (int columnIndex , String columnName , RowSetLoader rowWriter , FileScanFramework .FileSchemaNegotiator negotiator ) {
476+ super (columnIndex , columnName , rowWriter .scalar (columnName ));
477+
478+ ColumnMetadata metadata = negotiator .providedSchema ().metadata (columnName );
479+ if (metadata != null ) {
480+ this .dateFormat = metadata .property ("drill.format" );
481+ }
482+ }
483+
484+ @ Override
485+ public void load (RectangularTextContainer <?> cell ) {
486+ LocalDate localDate ;
487+ if (Strings .isNullOrEmpty (this .dateFormat )) {
488+ localDate = LocalDate .parse (cell .getText ());
489+ } else {
490+ localDate = LocalDate .parse (cell .getText (), DateTimeFormatter .ofPattern (dateFormat ));
491+ }
492+ writer .setDate (localDate );
493+ }
494+ }
495+
496+ public static class TimePdfColumnWriter extends PdfColumnWriter {
497+ private String dateFormat ;
498+
499+ TimePdfColumnWriter (int columnIndex , String columnName , RowSetLoader rowWriter , FileScanFramework .FileSchemaNegotiator negotiator ) {
500+ super (columnIndex , columnName , rowWriter .scalar (columnName ));
501+
502+ ColumnMetadata metadata = negotiator .providedSchema ().metadata (columnName );
503+ if (metadata != null ) {
504+ this .dateFormat = metadata .property ("drill.format" );
505+ }
506+ }
507+
508+ @ Override
509+ public void load (RectangularTextContainer <?> cell ) {
510+ LocalTime localTime ;
511+ if (Strings .isNullOrEmpty (this .dateFormat )) {
512+ localTime = LocalTime .parse (cell .getText ());
513+ } else {
514+ localTime = LocalTime .parse (cell .getText (), DateTimeFormatter .ofPattern (dateFormat ));
515+ }
516+ writer .setTime (localTime );
517+ }
518+ }
519+
520+ public static class TimestampPdfColumnWriter extends PdfColumnWriter {
521+ private String dateFormat ;
522+
523+ TimestampPdfColumnWriter (int columnIndex , String columnName , RowSetLoader rowWriter , FileScanFramework .FileSchemaNegotiator negotiator ) {
524+ super (columnIndex , columnName , rowWriter .scalar (columnName ));
525+
526+ ColumnMetadata metadata = negotiator .providedSchema ().metadata (columnName );
527+ if (metadata != null ) {
528+ this .dateFormat = metadata .property ("drill.format" );
529+ }
530+ }
531+
532+ @ Override
533+ public void load (RectangularTextContainer <?> cell ) {
534+ Instant timestamp = null ;
535+ if (Strings .isNullOrEmpty (this .dateFormat )) {
536+ timestamp = Instant .parse (cell .getText ());
537+ } else {
538+ try {
539+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat (dateFormat );
540+ Date parsedDate = simpleDateFormat .parse (cell .getText ());
541+ timestamp = Instant .ofEpochMilli (parsedDate .getTime ());
542+ } catch (ParseException e ) {
543+ logger .error ("Error parsing timestamp: " + e .getMessage ());
544+ }
545+ }
546+ writer .setTimestamp (timestamp );
547+ }
548+ }
449549}
0 commit comments