-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStorageNode.java
More file actions
59 lines (46 loc) · 1.93 KB
/
StorageNode.java
File metadata and controls
59 lines (46 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.*;
import java.util.*;
import org.apache.thrift.*;
import org.apache.thrift.server.*;
import org.apache.thrift.transport.*;
import org.apache.thrift.protocol.*;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.*;
import org.apache.curator.*;
import org.apache.curator.retry.*;
import org.apache.curator.framework.*;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.utils.*;
import org.apache.log4j.*;
public class StorageNode {
static Logger log;
public static void main(String[] args) throws Exception {
BasicConfigurator.configure();
log = Logger.getLogger(StorageNode.class.getName());
if (args.length != 4) {
System.err.println("Usage: java StorageNode host port zkconnectstring zknode");
System.exit(-1);
}
String host = args[0];
String port = args[1];
String zkNode = args[3];
String serverAddress = host + ":" + port;
CuratorFramework curClient = CuratorFrameworkFactory.builder().connectString(args[2])
.retryPolicy(new RetryNTimes(10, 1000)).connectionTimeoutMs(1000).sessionTimeoutMs(10000).build();
curClient.start();
curClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNode + "/s-", serverAddress.getBytes());
TServerSocket socket = new TServerSocket(Integer.parseInt(port));
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(socket);
sargs.protocolFactory(new TBinaryProtocol.Factory());
sargs.transportFactory(new TFramedTransport.Factory());
// start curator cache to listen events
PathChildrenCache cache = new PathChildrenCache(curClient, zkNode, true);
cache.start();
KeyValueService.Processor<KeyValueService.Iface> processor = new KeyValueService.Processor<>(
new KeyValueHandler(host, Integer.parseInt(port), curClient, zkNode, cache));
sargs.processorFactory(new TProcessorFactory(processor));
sargs.maxWorkerThreads(64);
TServer server = new TThreadPoolServer(sargs);
server.serve();
}
}