-
Notifications
You must be signed in to change notification settings - Fork 284
Expand file tree
/
Copy pathCsvRuntime.fs
More file actions
542 lines (448 loc) · 19.8 KB
/
CsvRuntime.fs
File metadata and controls
542 lines (448 loc) · 19.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
// --------------------------------------------------------------------------------------
// CSV type provider - runtime components (parsing and type representing CSV)
// --------------------------------------------------------------------------------------
namespace FSharp.Data.Runtime
#nowarn "10001"
open System
open System.ComponentModel
open System.Collections.Generic
open System.IO
open System.Runtime.InteropServices
open System.Text
// --------------------------------------------------------------------------------------
// Parser for the CSV format
module internal CsvReader =
/// Lazily reads the specified CSV file using the specified separators
/// (Handles most of the RFC 4180 - most notably quoted values and also
/// quoted newline characters in columns)
let readCsvFile (reader: TextReader) (separators: string) quote =
let inline (|Char|) (n: int) = char n
let inline (|Quote|_|) (n: int) = if char n = quote then Some() else None
let separators = separators.ToCharArray()
let inline (|Separator|_|) (n: int) =
if separators.Length = 1 then
if (char n) = separators.[0] then Some() else None
else if Array.exists ((=) (char n)) separators then
Some()
else
None
/// Read quoted string value until the end (ends with end of stream or
/// the " character, which can be encoded using double ")
let rec readString (chars: StringBuilder) =
match reader.Read() with
| -1 -> chars
| Quote when reader.Peek() = int quote ->
reader.Read() |> ignore
readString (chars.Append quote)
| Quote -> chars
| Char c -> readString (chars.Append c)
/// Reads a line with data that are separated using specified separators
/// and may be quoted. Ends with newline or end of input.
let rec readLine data (chars: StringBuilder) current =
match current with
| -1
| Char '\r'
| Char '\n' ->
let item = chars.ToString()
item :: data
| Separator ->
let item = chars.ToString()
readLine (item :: data) (StringBuilder()) (reader.Read())
| Quote -> readLine data (readString chars) (reader.Read())
| Char c -> readLine data (chars.Append c) (reader.Read())
/// Reads multiple lines from the input, skipping newline characters
let rec readLines lineNumber =
match reader.Read() with
| -1 -> Seq.empty
| Char '\r'
| Char '\n' -> readLines lineNumber
| current ->
seq {
yield readLine [] (StringBuilder()) current |> List.rev |> Array.ofList, lineNumber
yield! readLines (lineNumber + 1)
}
readLines 0
// --------------------------------------------------------------------------------------
[<AutoOpen>]
module private CsvHelpers =
type ParsedCsvLines =
{ FirstLine: string[] * int
SecondLine: (string[] * int) option
Headers: string[] option
LineIterator: IEnumerator<string[] * int>
ColumnCount: int
HasHeaders: bool
Separators: string
Quote: char }
/// An enumerable that will return elements from the 'firstSeq' first time it
/// is accessed and then will call 'nextSeq' each time for all future GetEnumerator calls
type private ReentrantEnumerable<'T>(firstSeq: seq<'T>, nextSeq: unit -> seq<'T>) =
let mutable first = true
interface seq<'T> with
member x.GetEnumerator() =
if first then
first <- false
firstSeq.GetEnumerator()
else
nextSeq().GetEnumerator()
interface System.Collections.IEnumerable with
member x.GetEnumerator() =
(x :> seq<'T>).GetEnumerator() :> System.Collections.IEnumerator
let parseIntoLines newReader separators quote hasHeaders skipRows =
// Get the first iterator and read the first line
let firstReader: TextReader = newReader ()
let linesIterator =
(CsvReader.readCsvFile firstReader separators quote).GetEnumerator()
for i = 1 to skipRows do
linesIterator.MoveNext() |> ignore
let firstLine =
if linesIterator.MoveNext() then
linesIterator.Current
else
// If it does not have any lines, that's wrong...
linesIterator.Dispose()
if hasHeaders then
failwithf "Invalid CSV file: header row not found"
else
failwithf "Invalid CSV file: no data rows found"
let headers =
if not hasHeaders then
None
else
firstLine |> fst |> Array.map (fun columnName -> columnName.Trim()) |> Some
// If there are no headers, use the number of columns of the first line
let numberOfColumns =
match headers, firstLine with
| Some headers, _ -> headers.Length
| _, (columns, _) -> columns.Length
{ FirstLine = firstLine
SecondLine = None
Headers = headers
LineIterator = linesIterator
ColumnCount = numberOfColumns
HasHeaders = hasHeaders
Separators = separators
Quote = quote }
// Always ignore empty rows
let inline ignoreRow untypedRow =
Array.forall String.IsNullOrWhiteSpace untypedRow
let parseIntoTypedRows
newReader
ignoreErrors
stringArrayToRow
{ FirstLine = firstLine
SecondLine = secondLine
LineIterator = linesIterator
ColumnCount = numberOfColumns
HasHeaders = hasHeaders
Separators = separators
Quote = quote }
=
// On the first read, finish reading the opened reader
// On future reads, get a new reader (and skip headers)
let firstSeq =
seq {
use linesIterator = linesIterator
if not hasHeaders then
yield firstLine
match secondLine with
| Some line -> yield line
| None -> ()
while linesIterator.MoveNext() do
yield linesIterator.Current
}
let nextSeq () =
let reader: TextReader = newReader ()
let csv = CsvReader.readCsvFile reader separators quote
if hasHeaders then Seq.skip 1 csv else csv
let untypedRows = ReentrantEnumerable<_>(firstSeq, nextSeq)
// Return data with parsed columns
seq {
for untypedRow, lineNumber in untypedRows do
let hasCorrectNumberOfColumns, untypedRow =
match untypedRow.Length with
| length when length = numberOfColumns -> true, untypedRow
//row is also valid when it ends with single separator
| length when
length = numberOfColumns + 1
&& String.IsNullOrEmpty(untypedRow.[untypedRow.Length - 1])
->
true, untypedRow.[.. numberOfColumns - 1]
| _ -> false, untypedRow
if not hasCorrectNumberOfColumns then
// Ignore rows with different number of columns when ignoreErrors is set to true
if not ignoreErrors then
let lineNumber = if hasHeaders then lineNumber else lineNumber + 1
failwithf
"Couldn't parse row %d according to schema: Expected %d columns, got %d"
lineNumber
numberOfColumns
untypedRow.Length
else if not (ignoreRow untypedRow) then
// Try to convert the untyped rows to 'RowType
let convertedRow =
try
Choice1Of2(stringArrayToRow untypedRow)
with exn ->
Choice2Of2 exn
match convertedRow, ignoreErrors with
| Choice1Of2 convertedRow, _ -> yield convertedRow
| Choice2Of2 _, true -> ()
| Choice2Of2 exn, false ->
let lineNumber = if hasHeaders then lineNumber else lineNumber + 1
failwithf "Couldn't parse row %d according to schema: %s" lineNumber exn.Message
}
// --------------------------------------------------------------------------------------
/// <exclude />
type CsvFile<'RowType>
private
(
rowToStringArray: Func<'RowType, string[]>,
disposer: IDisposable,
rows: seq<'RowType>,
headers,
numberOfColumns,
separators,
quote
) =
/// The rows with data
member _.Rows = rows
/// The names of the columns
member _.Headers = headers
/// The number of columns
member _.NumberOfColumns = numberOfColumns
/// The character(s) used as column separator(s)
member _.Separators = separators
/// The quotation mark use for surrounding values containing separator chars
member _.Quote = quote
interface IDisposable with
member _.Dispose() = disposer.Dispose()
/// <exclude />
[<EditorBrowsableAttribute(EditorBrowsableState.Never)>]
[<CompilerMessageAttribute("This method is intended for use in generated code only.",
10001,
IsHidden = true,
IsError = false)>]
static member CreateEmpty(rowToStringArray, rows: seq<'RowType>, headers, numberOfColumns, separators, quote) =
new CsvFile<'RowType>(
rowToStringArray,
{ new IDisposable with
member x.Dispose() = () },
rows,
headers,
numberOfColumns,
separators,
quote
)
/// <exclude />
[<EditorBrowsableAttribute(EditorBrowsableState.Never)>]
[<CompilerMessageAttribute("This method is intended for use in generated code only.",
10001,
IsHidden = true,
IsError = false)>]
static member Create
(
stringArrayToRow,
rowToStringArray,
reader: TextReader,
separators,
quote,
hasHeaders,
ignoreErrors,
skipRows,
cacheRows
) =
let uncachedCsv =
new CsvFile<'RowType>(
stringArrayToRow,
rowToStringArray,
Func<_>(fun _ -> reader),
separators,
quote,
hasHeaders,
ignoreErrors,
skipRows
)
if cacheRows then uncachedCsv.Cache() else uncachedCsv
/// <exclude />
[<EditorBrowsableAttribute(EditorBrowsableState.Never)>]
[<CompilerMessageAttribute("This method is intended for use in generated code only.",
10001,
IsHidden = true,
IsError = false)>]
static member ParseRows(text, stringArrayToRow: Func<obj, string[], 'RowType>, separators, quote, ignoreErrors) =
let reader = new StringReader(text) :> TextReader
let csv =
CsvFile<_>
.Create(
stringArrayToRow,
null,
reader,
separators,
quote,
hasHeaders = false,
ignoreErrors = ignoreErrors,
skipRows = 0,
cacheRows = false
)
csv.Rows |> Seq.toArray
/// <exclude />
new
(
stringArrayToRow: Func<obj, string[], 'RowType>,
rowToStringArray,
readerFunc: Func<TextReader>,
separators,
quote,
hasHeaders,
ignoreErrors,
skipRows
) as this =
// Track created Readers so that we can dispose of all of them
let disposeFuncs = new ResizeArray<_>()
let mutable disposed = false
let disposer =
{ new IDisposable with
member x.Dispose() =
if not disposed then
Seq.iter (fun f -> f ()) disposeFuncs
disposed <- true }
let newReader () =
if disposed then
raise <| ObjectDisposedException(this.GetType().Name)
let reader = readerFunc.Invoke()
disposeFuncs.Add reader.Dispose
reader
let noSeparatorsSpecified = String.IsNullOrEmpty separators
let separators = if noSeparatorsSpecified then "," else separators
let parsedCsvLines = parseIntoLines newReader separators quote hasHeaders skipRows
// Auto-Detect tab separated files that may not have .TSV extension when no explicit separators defined
let probablyTabSeparated =
parsedCsvLines.ColumnCount < 2
&& noSeparatorsSpecified
&& fst parsedCsvLines.FirstLine |> Array.exists (fun c -> c.IndexOf('\t') >= 0)
let parsedCsvLines =
if probablyTabSeparated then
parseIntoLines newReader "\t" quote hasHeaders skipRows
else
parsedCsvLines
// Detect header that has empty trailing column name that doesn't correspond to a column in
// the following data lines. This is checked if headers exist and the last column in the header
// is empty. The secondLine field of the parsedCsvLines record is used to store the second line
// that is read when testing the length of the first data row following the header.
let parsedCsvLines =
match parsedCsvLines.Headers with
| None -> parsedCsvLines
| Some headers ->
let columnCount = parsedCsvLines.ColumnCount
if String.IsNullOrWhiteSpace headers.[columnCount - 1] then
let secondline =
if parsedCsvLines.LineIterator.MoveNext() then
Some(parsedCsvLines.LineIterator.Current)
else
None
match secondline with
| Some line ->
let linecontents = fst line
if linecontents.Length = columnCount - 1 then
{ parsedCsvLines with
SecondLine = secondline
ColumnCount = columnCount - 1
Headers = Some headers.[.. columnCount - 2] }
else
{ parsedCsvLines with
SecondLine = secondline }
| None -> parsedCsvLines
else
parsedCsvLines
let rows =
parsedCsvLines
|> parseIntoTypedRows newReader ignoreErrors (fun untypedRow -> stringArrayToRow.Invoke(this, untypedRow))
new CsvFile<'RowType>(
rowToStringArray,
disposer,
rows,
parsedCsvLines.Headers,
parsedCsvLines.ColumnCount,
parsedCsvLines.Separators,
parsedCsvLines.Quote
)
/// Saves CSV to the specified writer
member x.Save(writer: TextWriter, [<Optional>] ?separator, [<Optional>] ?quote) =
let separator = (defaultArg separator x.Separators.[0]).ToString()
let quote = (defaultArg quote x.Quote).ToString()
let doubleQuote = quote + quote
use writer = writer
// RFC 4180 (https://www.rfc-editor.org/rfc/rfc4180)
// 2. Definition of the CSV Format
// Each record is located on a separated line, delimited by a line break CRLF
writer.NewLine <- "\r\n"
let nullSafeguard str =
match str with
| null -> String.Empty
| _ -> str
let writeLine writeItem (items: string[]) =
for i = 0 to items.Length - 2 do
writeItem items.[i]
writer.Write separator
writeItem items.[items.Length - 1]
writer.WriteLine()
match x.Headers with
| Some headers -> headers |> writeLine writer.Write
| None -> ()
for row in x.Rows do
row
|> rowToStringArray.Invoke
|> writeLine (fun item ->
let item = item |> nullSafeguard
if
item.IndexOf(separator, StringComparison.Ordinal) >= 0
|| item.IndexOf(quote, StringComparison.Ordinal) >= 0
|| item.IndexOf('\n') >= 0
then
writer.Write quote
writer.Write(item.Replace(quote, doubleQuote))
writer.Write quote
else
writer.Write item)
/// Saves CSV to the specified stream
member x.Save(stream: Stream, [<Optional>] ?separator, [<Optional>] ?quote) =
use writer =
new StreamWriter(stream, System.Text.UTF8Encoding(false, true), 1024, true)
x.Save(writer, ?separator = separator, ?quote = quote)
/// Saves CSV to the specified file
member x.Save(path: string, [<Optional>] ?separator, [<Optional>] ?quote) =
use writer = new StreamWriter(path)
x.Save(writer, ?separator = separator, ?quote = quote)
/// Saves CSV to a string
member x.SaveToString([<Optional>] ?separator, [<Optional>] ?quote) =
use writer = new StringWriter()
x.Save(writer, ?separator = separator, ?quote = quote)
writer.ToString()
member inline private x.withRows rows =
new CsvFile<'RowType>(rowToStringArray, disposer, rows, x.Headers, x.NumberOfColumns, x.Separators, x.Quote)
member inline private x.mapRows f = x.withRows (f x.Rows)
/// Returns a new csv with the same rows as the original but which guarantees
/// that each row will be only be read and parsed from the input at most once.
member x.Cache() = Seq.cache |> x.mapRows
/// Returns a new csv where every row has been transformed by the provided mapping function.
member x.Map(mapping: Func<_, _>) = Seq.map mapping.Invoke |> x.mapRows
/// Returns a new csv containing only the rows for which the given predicate returns "true".
member x.Filter(predicate: Func<_, _>) =
Seq.filter predicate.Invoke |> x.mapRows
/// Returns a new csv with only the first N rows of the underlying csv.
member x.Take count = Seq.take count |> x.mapRows
/// Returns a csv that, when iterated, yields rows while the given predicate
/// returns <c>true</c>, and then returns no further rows.
member x.TakeWhile(predicate: Func<_, _>) =
Seq.takeWhile predicate.Invoke |> x.mapRows
/// Returns a csv that skips N rows and then yields the remaining rows.
member x.Skip count = Seq.skip count |> x.mapRows
/// Returns a csv that, when iterated, skips rows while the given predicate returns
/// <c>true</c>, and then yields the remaining rows.
member x.SkipWhile(predicate: Func<_, _>) =
Seq.skipWhile predicate.Invoke |> x.mapRows
/// Returns a csv that when enumerated returns at most N rows.
member x.Truncate count = Seq.truncate count |> x.mapRows
/// Returns a csv with the same rows as the original plus the provided rows appended
member x.Append rows = Seq.append x.Rows rows |> x.withRows