Skip to content

Commit e9aaef6

Browse files
committed
Fix creating servers
1 parent 5b24899 commit e9aaef6

5 files changed

Lines changed: 80 additions & 49 deletions

File tree

main.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ func main() {
1919

2020
rpcServer := rpc.CreateServer()
2121
contentServer := stream.CreateServer(stream.NewManager(node))
22+
reflectorServer := reflector.CreateServer(blobManager)
23+
peerServer := peer.CreateServer(blobManager)
2224

2325
wg.Go(func() {
2426
fmt.Println("Starting DHT server on port 4444.")
@@ -33,7 +35,7 @@ func main() {
3335
fmt.Println("Error when getting TCP listener.")
3436
}
3537
defer listener.Close()
36-
rpc.StartServer(rpcServer, listener)
38+
rpcServer.StartServer(listener)
3739
})
3840

3941
wg.Go(func() {
@@ -43,7 +45,7 @@ func main() {
4345
fmt.Println("Error when getting TCP listener.")
4446
}
4547
defer listener.Close()
46-
stream.StartServer(contentServer, listener)
48+
contentServer.StartServer(listener)
4749
})
4850

4951
wg.Go(func() {
@@ -53,7 +55,7 @@ func main() {
5355
fmt.Println("Error when getting TCP listener.")
5456
}
5557
defer listener.Close()
56-
reflector.StartServer(blobManager, listener)
58+
reflectorServer.StartServer(listener)
5759
})
5860

5961
wg.Go(func() {
@@ -63,7 +65,7 @@ func main() {
6365
fmt.Println("Error when getting TCP listener.")
6466
}
6567
defer listener.Close()
66-
peer.StartServer(blobManager, listener)
68+
peerServer.StartServer(listener)
6769
})
6870

6971
wg.Wait()

peer/server.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,28 @@ import "lbry/daemon/blob"
77
import "net"
88
import "time"
99

10-
func StartServer(blobManager blob.BlobManager, listener net.Listener) {
10+
type PeerServer struct {
11+
blobManager blob.BlobManager
12+
}
13+
14+
func CreateServer(blobManager blob.BlobManager) PeerServer {
15+
return PeerServer{
16+
blobManager: blobManager,
17+
}
18+
}
19+
20+
func (peerServer PeerServer) StartServer(listener net.Listener) {
1121
for {
1222
conn, err := listener.Accept()
1323
if err != nil {
1424
fmt.Println("Error accepting:", err)
1525
continue
1626
}
17-
go handleConnection(conn, blobManager)
27+
go peerServer.handleConnection(conn)
1828
}
1929
}
2030

21-
func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
31+
func (peerServer PeerServer) handleConnection(conn net.Conn) {
2232
defer conn.Close()
2333
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // Prevent hanging
2434

@@ -41,7 +51,7 @@ func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
4151

4252
requestedBlobsValue, hasRequestedBlobs := data["requested_blobs"]
4353
if hasRequestedBlobs {
44-
responseData["available_blobs"] = getAvailableBlobs(blobManager, requestedBlobsValue.([]string))
54+
responseData["available_blobs"] = peerServer.getAvailableBlobs(requestedBlobsValue.([]string))
4555
}
4656

4757
blobDataPaymentRateValue, hasBlobDataPaymentRate := data["blob_data_payment_rate"]
@@ -54,7 +64,7 @@ func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
5464

5565
requestedBlobValue, hasRequestedBlob := data["requested_blob"]
5666
if hasRequestedBlob {
57-
incomingBlob, blobData = getRequestedBlob(blobManager, requestedBlobValue.(string))
67+
incomingBlob, blobData = peerServer.getRequestedBlob(requestedBlobValue.(string))
5868
responseData["incoming_blob"] = incomingBlob
5969
}
6070

@@ -65,11 +75,11 @@ func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
6575
}
6676
}
6777

68-
func getAvailableBlobs(blobManager blob.BlobManager, requestedBlobs []string) []string {
78+
func (peerServer PeerServer) getAvailableBlobs(requestedBlobs []string) []string {
6979
var availableBlobs []string
7080

7181
for _, requestedBlob := range requestedBlobs {
72-
if blobManager.Has(requestedBlob) {
82+
if peerServer.blobManager.Has(requestedBlob) {
7383
availableBlobs = append(availableBlobs, requestedBlob)
7484
}
7585
}
@@ -82,9 +92,9 @@ func getBlobDataPaymentRate(blobDataPaymentRate float64) string {
8292
return "RATE_UNSET"
8393
}
8494

85-
func getRequestedBlob(blobManager blob.BlobManager, requestedBlob string) (map[string]any, []byte) {
86-
if blobManager.Has(requestedBlob) {
87-
blobData := blobManager.Get(requestedBlob)
95+
func (peerServer PeerServer) getRequestedBlob(requestedBlob string) (map[string]any, []byte) {
96+
if peerServer.blobManager.Has(requestedBlob) {
97+
blobData := peerServer.blobManager.Get(requestedBlob)
8898
return map[string]any{
8999
"blob_hash": requestedBlob,
90100
"length": len(blobData),

reflector/server.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,28 @@ import "lbry/daemon/blob"
77
import "net"
88
import "time"
99

10-
func StartServer(blobManager blob.BlobManager, listener net.Listener) {
10+
type ReflectorServer struct {
11+
blobManager blob.BlobManager
12+
}
13+
14+
func CreateServer(blobManager blob.BlobManager) ReflectorServer {
15+
return ReflectorServer{
16+
blobManager: blobManager,
17+
}
18+
}
19+
20+
func (reflectorServer ReflectorServer) StartServer(listener net.Listener) {
1121
for {
1222
conn, err := listener.Accept()
1323
if err != nil {
1424
fmt.Println("Error accepting:", err)
1525
continue
1626
}
17-
go handleConnection(conn, blobManager)
27+
go reflectorServer.handleConnection(conn)
1828
}
1929
}
2030

21-
func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
31+
func (reflectorServer ReflectorServer) handleConnection(conn net.Conn) {
2232
defer conn.Close()
2333
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // Prevent hanging
2434

@@ -78,7 +88,7 @@ func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
7888
return
7989
}
8090

81-
err = blobManager.Set(blobHash, blobData, false)
91+
err = reflectorServer.blobManager.Set(blobHash, blobData, false)
8292

8393
jsonEncoder.Encode(map[string]any{
8494
"received_blob": err == nil,
@@ -110,7 +120,7 @@ func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
110120
return
111121
}
112122

113-
err = blobManager.Set(sdBlobHash, sdBlobData, true)
123+
err = reflectorServer.blobManager.Set(sdBlobHash, sdBlobData, true)
114124

115125
jsonEncoder.Encode(map[string]any{
116126
"received_sd_blob": err == nil,

rpc/server.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@ import "strings"
1313

1414
import "google.golang.org/protobuf/encoding/protowire"
1515

16-
func CreateServer() *http.Server {
16+
type RPCServer struct {
17+
httpServer http.Server
18+
}
19+
20+
func CreateServer() RPCServer {
1721
rpcServeMux := http.NewServeMux()
1822
rpcServeMux.HandleFunc("/", handleJSONRPC)
1923

20-
return &http.Server{Handler: rpcServeMux}
24+
return RPCServer{
25+
httpServer: http.Server{Handler: rpcServeMux},
26+
}
2127
}
2228

23-
func StartServer(rpcServer *http.Server, listener net.Listener) {
24-
err := rpcServer.Serve(listener)
29+
func (rpcServer RPCServer) StartServer(listener net.Listener) {
30+
err := rpcServer.httpServer.Serve(listener)
2531
if err != nil && err != http.ErrServerClosed {
2632
fmt.Println("Error when starting RPC server.")
2733
}

stream/server.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,21 @@ type Manager struct {
2121
sdCacheMu sync.RWMutex
2222
}
2323

24-
func CreateServer(m *Manager) *http.Server {
24+
type StreamServer struct {
25+
httpServer http.Server
26+
}
27+
28+
func CreateServer(m *Manager) StreamServer {
2529
contentServeMux := http.NewServeMux()
26-
contentServeMux.HandleFunc("/", m.handleStream)
30+
contentServeMux.HandleFunc("/stream/{sd_hash}", m.handleStream)
2731

28-
return &http.Server{Handler: contentServeMux}
32+
return StreamServer{
33+
httpServer: http.Server{Handler: contentServeMux},
34+
}
2935
}
3036

31-
func StartServer(contentServer *http.Server, listener net.Listener) {
32-
err := contentServer.Serve(listener)
37+
func (contentServer StreamServer) StartServer(listener net.Listener) {
38+
err := contentServer.httpServer.Serve(listener)
3339
if err != nil && err != http.ErrServerClosed {
3440
fmt.Println("Error when starting Stream server.")
3541
}
@@ -43,41 +49,39 @@ func NewManager(dhtNode *dht.Node) *Manager {
4349
}
4450
}
4551

46-
// GetStreamingURL returns the local streaming URL for a given SD hash.
47-
func (m *Manager) GetStreamingURL(sdHash string, port int) string {
48-
return fmt.Sprintf("http://localhost:%d/stream/%s", port, sdHash)
49-
}
50-
51-
func (m *Manager) handleStream(w http.ResponseWriter, r *http.Request) {
52-
fmt.Println("Incoming stream request")
53-
54-
sdHash := strings.TrimPrefix(r.URL.Path, "/stream/")
55-
if sdHash == "" || len(sdHash) != blob.BlobHashLength {
56-
http.Error(w, "invalid sd_hash", http.StatusBadRequest)
57-
return
58-
}
59-
52+
func (m *Manager) handleStream(w http.ResponseWriter, req *http.Request) {
6053
info, _ := debug.ReadBuildInfo()
6154

62-
// CORS for frontend
63-
w.Header().Set("Access-Control-Allow-Origin", "*")
55+
w.Header().Set("Accept-Ranges", "bytes")
6456
w.Header().Set("Access-Control-Allow-Headers", "Range")
65-
w.Header().Set("Access-Control-Expose-Headers", "Content-Range, Content-Length, Accept-Ranges")
57+
w.Header().Set("Access-Control-Allow-Origin", "*")
58+
w.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Content-Length, Content-Range")
6659
w.Header().Set("Server", "LBRYd/"+info.Main.Version)
6760

68-
if r.Method == "OPTIONS" {
61+
if strings.EqualFold(req.Method, "OPTIONS") {
6962
w.WriteHeader(http.StatusNoContent)
7063
return
7164
}
7265

66+
if strings.EqualFold(req.Method, "GET") {
67+
sdHash := req.PathValue("sd_hash")
68+
69+
m.handleSDHash(w, req, sdHash)
70+
return
71+
}
72+
73+
http.Error(w, "HTTP method not allowed.", http.StatusMethodNotAllowed)
74+
}
75+
76+
func (m *Manager) handleSDHash(w http.ResponseWriter, req *http.Request, sdHash string) {
7377
// Get or download stream descriptor
7478
sd, err := m.getDescriptor(sdHash)
7579
if err != nil {
7680
log.Printf("P2P STREAM: failed to get descriptor %s: %v", sdHash[:12], err)
7781
http.Error(w, "failed to load stream", http.StatusBadGateway)
7882
return
7983
}
80-
log.Printf("P2P STREAM: serving %s (%s)", sdHash[:12], r.Header.Get("Range"))
84+
log.Printf("P2P STREAM: serving %s (%s)", sdHash[:12], req.Header.Get("Range"))
8185

8286
contentBlobs := sd.ContentBlobs()
8387
if len(contentBlobs) == 0 {
@@ -95,10 +99,9 @@ func (m *Manager) handleStream(w http.ResponseWriter, r *http.Request) {
9599
// Determine MIME type from file extension
96100
mimeType := guessMIME(sd.SuggestedFileName, sd.StreamName)
97101
w.Header().Set("Content-Type", mimeType)
98-
w.Header().Set("Accept-Ranges", "bytes")
99102

100103
// Parse range header
101-
rangeHeader := r.Header.Get("Range")
104+
rangeHeader := req.Header.Get("Range")
102105
var start, end int64
103106
if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") {
104107
rangeParts := strings.TrimPrefix(rangeHeader, "bytes=")

0 commit comments

Comments
 (0)