-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathScript.fsx
More file actions
67 lines (56 loc) · 2.2 KB
/
Script.fsx
File metadata and controls
67 lines (56 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Learn more about F# at http://fsharp.net. See the 'F# Tutorial' project
// for more guidance on F# programming.
#load "Messages.fs"
#load "Pipes.fs"
#load "Filters.fs"
#load "Runners.fs"
open System
open FSharp.DataProcessingPipelines.Core
open FSharp.DataProcessingPipelines.Core.Messages
open FSharp.DataProcessingPipelines.Core.Pipes
open FSharp.DataProcessingPipelines.Core.Filters
open FSharp.DataProcessingPipelines.Core.Runners
let testMessage = EventInformationMessage("This is a new test Message!", "This is the context of the new Test Message!")
let mutable messages = (testMessage::[])
messages <- (testMessage::messages)
// OutputPipe type example
type ServiceAOutputPipe () =
inherit IOutputPipe<EventInformationMessage> ()
override this.Publish (m) =
messages <- (List.append messages [m])
// InputPIpe type example
type ServiceBInputPipe () =
inherit IInputPipe<EventInformationMessage> ()
override this.Subscribe (handler:(EventInformationMessage -> unit)) =
match messages with
| [] -> printfn "Messages is empty"
| h::t ->
messages <- t
handler h
let outputPipe = new ServiceAOutputPipe()
let inputPipe = new ServiceBInputPipe()
type ServiceAFilter (pipe:ServiceAOutputPipe) =
inherit DataSource<EventInformationMessage>(pipe)
override this.Execute () =
try
try
let msg = EventInformationMessage("This is a new test Message!", "This is the context of the new Test Message!")
printfn "Publish msg to output pipe at Service A: %A" (msg.ToString())
this.OutputPipe.Publish msg
finally
// Dispose if needed
()
with
| ex ->
// log exception
()
type ServiceBFilter (pipe:ServiceBInputPipe) =
inherit DataSink<EventInformationMessage>(pipe)
override this.Execute () =
let handler (msg) = printfn "Service B Execute -> %A" (msg)
this.InputPipe.Subscribe (handler)
let myRunnerA = BaseRunner (ServiceAFilter (outputPipe))
let myRunnerB = BaseRunner (ServiceBFilter (inputPipe))
printfn "%d" (messages.Length)
myRunnerA.Start ()
myRunnerB.Start ()