-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathDataTable.fs
More file actions
129 lines (109 loc) · 5.79 KB
/
DataTable.fs
File metadata and controls
129 lines (109 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
namespace FSharp.Data.SqlClient
open System
open System.Data
open System.Data.SqlClient
open System.Collections.Generic
open FSharp.Data.SqlClient.Internals
[<Sealed>]
[<CompilerMessageAttribute("This API supports the FSharp.Data.SqlClient infrastructure and is not intended to be used directly from your code.", 101, IsHidden = true)>]
[<RequireQualifiedAccessAttribute>]
type DataTable<'T when 'T :> DataRow>(selectCommand: SqlCommand, ?connectionString: Lazy<string>) =
inherit DataTable()
let rows = base.Rows
member __.Rows : IList<'T> = {
new IList<'T> with
member __.GetEnumerator() = rows.GetEnumerator()
member __.GetEnumerator() : IEnumerator<'T> = (Seq.cast<'T> rows).GetEnumerator()
member __.Count = rows.Count
member __.IsReadOnly = rows.IsReadOnly
member __.Item
with get index = downcast rows.[index]
and set index row =
rows.RemoveAt(index)
rows.InsertAt(row, index)
member __.Add row = rows.Add row
member __.Clear() = rows.Clear()
member __.Contains row = rows.Contains row
member __.CopyTo(dest, index) = rows.CopyTo(dest, index)
member __.IndexOf row = rows.IndexOf row
member __.Insert(index, row) = rows.InsertAt(row, index)
member __.Remove row = rows.Remove(row); true
member __.RemoveAt index = rows.RemoveAt(index)
}
member __.NewRow(): 'T = downcast base.NewRow()
member private this.IsDirectTable = not (isNull this.TableName)
member this.Update(?connection, ?transaction, ?batchSize, ?continueUpdateOnError, ?timeout: TimeSpan) =
// not supported on all DataTable instances
match selectCommand with
| null -> failwith "This command wasn't constructed from SqlProgrammabilityProvider, call to Update is not supported."
| _ -> ()
connection |> Option.iter selectCommand.set_Connection
transaction |> Option.iter selectCommand.set_Transaction
if isNull selectCommand.Connection && this.IsDirectTable
then
assert(connectionString.IsSome)
selectCommand.Connection <- new SqlConnection( connectionString.Value.Value)
use dataAdapter = new SqlDataAdapter(selectCommand)
use commandBuilder = new SqlCommandBuilder(dataAdapter)
use __ = dataAdapter.RowUpdating.Subscribe(fun args ->
timeout |> Option.iter (fun x -> args.Command.CommandTimeout <- int x.TotalSeconds)
if args.Errors = null && args.StatementType = StatementType.Insert
&& defaultArg batchSize dataAdapter.UpdateBatchSize = 1
then
let columnsToRefresh = ResizeArray()
for c in this.Columns do
if c.AutoIncrement
|| (c.AllowDBNull && args.Row.IsNull c.Ordinal)
then
columnsToRefresh.Add( "inserted." + commandBuilder.QuoteIdentifier c.ColumnName)
if columnsToRefresh.Count > 0
then
let outputClause = columnsToRefresh |> String.concat "," |> sprintf " OUTPUT %s"
let cmd = args.Command
let sql = cmd.CommandText
let insertOutputClauseAt =
match sql.IndexOf( " DEFAULT VALUES") with
| -1 -> sql.IndexOf( " VALUES")
| pos -> pos
cmd.CommandText <- sql.Insert(insertOutputClauseAt, outputClause)
cmd.UpdatedRowSource <- UpdateRowSource.FirstReturnedRecord
)
batchSize |> Option.iter dataAdapter.set_UpdateBatchSize
continueUpdateOnError |> Option.iter dataAdapter.set_ContinueUpdateOnError
dataAdapter.Update(this)
member this.BulkCopy(?connection, ?copyOptions, ?transaction, ?batchSize, ?timeout: TimeSpan) =
let conn', tran' =
match connection, transaction with
| _, Some(t: SqlTransaction) -> t.Connection, t
| Some c, None -> c, null
| None, None ->
match selectCommand with
| null -> failwith "To issue BulkCopy on this table, you need to provide your own connection or transaction"
| _ -> ()
if this.IsDirectTable
then
assert(connectionString.IsSome)
new SqlConnection(connectionString.Value.Value), null
else
selectCommand.Connection, selectCommand.Transaction
use __ = conn'.UseLocally()
let options = defaultArg copyOptions SqlBulkCopyOptions.Default
use bulkCopy = new SqlBulkCopy(conn', options, tran')
bulkCopy.DestinationTableName <- this.TableName
batchSize |> Option.iter bulkCopy.set_BatchSize
timeout |> Option.iter (fun x -> bulkCopy.BulkCopyTimeout <- int x.TotalSeconds)
// Exclude computed columns (ReadOnly=true, AutoIncrement=false) because SQL Server
// rejects writes to computed columns. Also exclude identity columns unless the
// caller explicitly opted in with KeepIdentity.
let keepIdentity = options.HasFlag(SqlBulkCopyOptions.KeepIdentity)
for col in this.Columns |> Seq.cast<DataColumn> do
if not col.ReadOnly || (col.AutoIncrement && keepIdentity) then
bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName) |> ignore
bulkCopy.WriteToServer this
#if WITH_LEGACY_NAMESPACE
namespace FSharp.Data
open System
open System.Data
[<Obsolete("use 'FSharp.Data.SqlClient.DataTable' instead");AutoOpen>]
type DataTable<'T when 'T :> DataRow> = FSharp.Data.SqlClient.DataTable<'T>
#endif