Skip to content

Commit 53049b5

Browse files
committed
Add peer and reflector servers
1 parent f5a4765 commit 53049b5

5 files changed

Lines changed: 136 additions & 13 deletions

File tree

main.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ package main
22

33
import "fmt"
44
import "lbry/daemon/dht"
5+
import "lbry/daemon/peer"
56
import "lbry/daemon/stream"
7+
import "lbry/daemon/reflector"
68
import "lbry/daemon/rpc"
79
import "net"
8-
import "net/http"
910
import "strconv"
1011
import "sync"
1112

1213
var wg sync.WaitGroup
1314

1415
func main() {
1516
node, _ := dht.NewNode(4444)
17+
1618
rpcServer := rpc.CreateServer()
1719
contentServer := stream.CreateServer(stream.NewManager(node))
1820

@@ -24,27 +26,49 @@ func main() {
2426

2527
wg.Go(func() {
2628
fmt.Println("Starting RPC server on port 5279.")
27-
startHTTPServer(rpcServer, 5279)
29+
listener, err := getTCPListener("", 5279)
30+
if err != nil {
31+
fmt.Println("Error when getting TCP listener.")
32+
}
33+
defer listener.Close()
34+
rpc.StartServer(rpcServer, listener)
2835
})
2936

3037
wg.Go(func() {
3138
fmt.Println("Starting content server on port 5280.")
32-
startHTTPServer(contentServer, 5280)
39+
listener, err := getTCPListener("", 5280)
40+
if err != nil {
41+
fmt.Println("Error when getting TCP listener.")
42+
}
43+
defer listener.Close()
44+
stream.StartServer(contentServer, listener)
45+
})
46+
47+
wg.Go(func() {
48+
fmt.Println("Starting reflector server on port 5566.")
49+
listener, err := getTCPListener("", 5566)
50+
if err != nil {
51+
fmt.Println("Error when getting TCP listener.")
52+
}
53+
defer listener.Close()
54+
reflector.StartServer(listener)
55+
})
56+
57+
wg.Go(func() {
58+
fmt.Println("Starting peer server on port 5567.")
59+
listener, err := getTCPListener("", 5567)
60+
if err != nil {
61+
fmt.Println("Error when getting TCP listener.")
62+
}
63+
defer listener.Close()
64+
peer.StartServer(listener)
3365
})
3466

3567
wg.Wait()
3668

3769
fmt.Println("All servers have stopped.")
3870
}
3971

40-
func startHTTPServer(rpcServer *http.Server, port int) {
41-
listener, err := net.Listen("tcp", net.JoinHostPort("", strconv.Itoa(port)))
42-
if err != nil && err != http.ErrServerClosed {
43-
fmt.Println("Error when starting listening.")
44-
}
45-
46-
err = rpcServer.Serve(listener)
47-
if err != nil && err != http.ErrServerClosed {
48-
fmt.Println("Error when starting HTTP server.")
49-
}
72+
func getTCPListener(hostname string, port int) (net.Listener, error) {
73+
return net.Listen("tcp", net.JoinHostPort(hostname, strconv.Itoa(port)))
5074
}

peer/server.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package peer
2+
3+
import "encoding/json"
4+
import "fmt"
5+
import "io"
6+
import "net"
7+
import "time"
8+
9+
func StartServer(listener net.Listener) {
10+
for {
11+
conn, err := listener.Accept()
12+
if err != nil {
13+
fmt.Println("Error accepting:", err)
14+
continue
15+
}
16+
go handleConnection(conn)
17+
}
18+
}
19+
20+
func handleConnection(conn net.Conn) {
21+
defer conn.Close()
22+
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // Prevent hanging
23+
24+
jsonDecoder := json.NewDecoder(conn)
25+
jsonEncoder := json.NewEncoder(conn)
26+
27+
for {
28+
var data map[string]any
29+
30+
err := jsonDecoder.Decode(&data)
31+
if err != nil {
32+
if err == io.EOF {
33+
fmt.Println("Client disconnected")
34+
return
35+
}
36+
// TODO Handle error (return OR continue)
37+
}
38+
39+
fmt.Printf("%+v\n", data)
40+
jsonEncoder.Encode(map[string]any{})
41+
}
42+
}

reflector/server.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package reflector
2+
3+
import "encoding/json"
4+
import "fmt"
5+
import "io"
6+
import "net"
7+
import "time"
8+
9+
func StartServer(listener net.Listener) {
10+
for {
11+
conn, err := listener.Accept()
12+
if err != nil {
13+
fmt.Println("Error accepting:", err)
14+
continue
15+
}
16+
go handleConnection(conn)
17+
}
18+
}
19+
20+
func handleConnection(conn net.Conn) {
21+
defer conn.Close()
22+
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // Prevent hanging
23+
24+
jsonDecoder := json.NewDecoder(conn)
25+
jsonEncoder := json.NewEncoder(conn)
26+
27+
for {
28+
var data map[string]any
29+
30+
err := jsonDecoder.Decode(&data)
31+
if err != nil {
32+
if err == io.EOF {
33+
fmt.Println("Client disconnected")
34+
return
35+
}
36+
// TODO Handle error (return OR continue)
37+
}
38+
39+
fmt.Printf("%+v\n", data)
40+
jsonEncoder.Encode(map[string]any{})
41+
}
42+
}

rpc/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ func CreateServer() *http.Server {
2020
return &http.Server{Handler: rpcServeMux}
2121
}
2222

23+
func StartServer(rpcServer *http.Server, listener net.Listener) {
24+
err := rpcServer.Serve(listener)
25+
if err != nil && err != http.ErrServerClosed {
26+
fmt.Println("Error when starting RPC server.")
27+
}
28+
}
29+
2330
func sendResultResponse(w http.ResponseWriter, result any) {
2431
json.NewEncoder(w).Encode(map[string]any{
2532
"jsonrpc": "2.0",

stream/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import "fmt"
44
import "log"
55
import "lbry/daemon/blob"
66
import "lbry/daemon/dht"
7+
import "net"
78
import "net/http"
89
import "runtime/debug"
910
import "strconv"
@@ -27,6 +28,13 @@ func CreateServer(m *Manager) *http.Server {
2728
return &http.Server{Handler: contentServeMux}
2829
}
2930

31+
func StartServer(contentServer *http.Server, listener net.Listener) {
32+
err := contentServer.Serve(listener)
33+
if err != nil && err != http.ErrServerClosed {
34+
fmt.Println("Error when starting Stream server.")
35+
}
36+
}
37+
3038
func NewManager(dhtNode *dht.Node) *Manager {
3139
return &Manager{
3240
dhtNode: dhtNode,

0 commit comments

Comments
 (0)