-
Notifications
You must be signed in to change notification settings - Fork 208
Expand file tree
/
Copy pathDStreamStateSample.cs
More file actions
110 lines (91 loc) · 4.37 KB
/
DStreamStateSample.cs
File metadata and controls
110 lines (91 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Streaming;
namespace Microsoft.Spark.CSharp.Samples
{
class DStreamStateSample
{
private static bool stopFileServer;
private static void StartFileServer(string directory, string pattern, int loop)
{
string testDir = Path.Combine(directory, "test1");
if (!Directory.Exists(testDir))
Directory.CreateDirectory(testDir);
stopFileServer = false;
string[] files = Directory.GetFiles(directory, pattern);
Task.Run(() =>
{
while (!stopFileServer)
{
DateTime now = DateTime.Now;
foreach (string path in files)
{
string text = File.ReadAllText(path);
File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text);
}
System.Threading.Thread.Sleep(10000);
}
System.Threading.Thread.Sleep(3000);
foreach (var file in Directory.GetFiles(testDir, "*"))
File.Delete(file);
});
}
[Sample("experimental")]
internal static void DStreamMapWithStateSample()
{
string directory = SparkCLRSamples.Configuration.SampleDataLocation;
string checkpointPath = Path.Combine(directory, "checkpoint");
StreamingContext ssc = StreamingContext.GetOrCreate(checkpointPath,
() =>
{
SparkContext sc = SparkCLRSamples.SparkContext;
StreamingContext context = new StreamingContext(sc, 10000L); // batch interval is in milliseconds
context.Checkpoint(checkpointPath);
var lines = context.TextFileStream(Path.Combine(directory, "test1"));
lines = context.Union(lines, lines);
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new Tuple<string, int>(w, 1));
var wordCounts = pairs.ReduceByKey((x, y) => x + y);
var initialState = sc.Parallelize(new[] { new Tuple<string, int>("NOT_A_WORD", 1024), new Tuple<string, int>("dog", 10000), }, 1);
var stateSpec = new StateSpec<string, int, int, Tuple<string, int>>((word, count, state) =>
{
if (state.IsTimingOut())
{
Console.WriteLine("Found timing out word: {0}", word);
return new Tuple<string, int>(word, state.Get());
}
var sum = 0;
if (state.Exists())
{
sum = state.Get();
}
state.Update(sum + count);
Console.WriteLine("word: {0}, count: {1}", word, sum + count);
return new Tuple<string, int>(word, sum + count);
}).NumPartitions(1).InitialState(initialState).Timeout(TimeSpan.FromSeconds(30));
var snapshots = wordCounts.MapWithState(stateSpec).StateSnapshots();
snapshots.ForeachRDD((double time, RDD<dynamic> rdd) =>
{
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Snapshots @ Time: {0}", time);
Console.WriteLine("-------------------------------------------");
foreach (Tuple<string, int> record in rdd.Collect())
{
Console.WriteLine("[{0}, {1}]", record.Item1, record.Item2);
}
Console.WriteLine();
});
return context;
});
ssc.Start();
StartFileServer(directory, "words.txt", 100);
ssc.AwaitTermination();
ssc.Stop();
}
}
}