Skip to content

Commit 5b24899

Browse files
committed
Implement blob manager
1 parent a970d0f commit 5b24899

5 files changed

Lines changed: 81 additions & 55 deletions

File tree

blob/exchange.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -132,38 +132,3 @@ func DownloadBlob(ip net.IP, tcpPort int, blobHash string) ([]byte, error) {
132132

133133
return blobData, nil
134134
}
135-
136-
// findJSONEnd finds the index of the closing '}' that ends the JSON object.
137-
// Handles nested braces.
138-
func findJSONEnd(data []byte) int {
139-
depth := 0
140-
inString := false
141-
escaped := false
142-
for i, b := range data {
143-
if escaped {
144-
escaped = false
145-
continue
146-
}
147-
if b == '\\' && inString {
148-
escaped = true
149-
continue
150-
}
151-
if b == '"' {
152-
inString = !inString
153-
continue
154-
}
155-
if inString {
156-
continue
157-
}
158-
if b == '{' {
159-
depth++
160-
}
161-
if b == '}' {
162-
depth--
163-
if depth == 0 {
164-
return i
165-
}
166-
}
167-
}
168-
return -1
169-
}

blob/manager.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package blob
2+
3+
import "fmt"
4+
5+
type BlobManager struct {
6+
Blobs map[string][]byte
7+
}
8+
9+
func (blobManager *BlobManager) Has(blobHash string) bool {
10+
_, ok := blobManager.Blobs[blobHash]
11+
return ok
12+
}
13+
14+
func (blobManager *BlobManager) Get(blobHash string) []byte {
15+
blobData, ok := blobManager.Blobs[blobHash]
16+
if ok {
17+
return blobData
18+
}
19+
return nil
20+
}
21+
22+
func (blobManager *BlobManager) Set(blobHash string, blobData []byte, isStreamDescriptor bool) error {
23+
if isStreamDescriptor {
24+
// TODO Process SD blob data
25+
blobManager.Blobs[blobHash] = blobData
26+
fmt.Printf("SD BLOB (%s) = %+v\n", blobHash, string(blobData))
27+
return nil
28+
}
29+
// TODO Process blob data
30+
blobManager.Blobs[blobHash] = blobData
31+
fmt.Printf("BLOB (%s) = %+v\n", blobHash, blobData)
32+
return nil
33+
}

main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import "fmt"
4+
import "lbry/daemon/blob"
45
import "lbry/daemon/dht"
56
import "lbry/daemon/peer"
67
import "lbry/daemon/stream"
@@ -13,6 +14,7 @@ import "sync"
1314
var wg sync.WaitGroup
1415

1516
func main() {
17+
blobManager := blob.BlobManager{}
1618
node, _ := dht.NewNode(4444)
1719

1820
rpcServer := rpc.CreateServer()
@@ -51,7 +53,7 @@ func main() {
5153
fmt.Println("Error when getting TCP listener.")
5254
}
5355
defer listener.Close()
54-
reflector.StartServer(listener)
56+
reflector.StartServer(blobManager, listener)
5557
})
5658

5759
wg.Go(func() {
@@ -61,7 +63,7 @@ func main() {
6163
fmt.Println("Error when getting TCP listener.")
6264
}
6365
defer listener.Close()
64-
peer.StartServer(listener)
66+
peer.StartServer(blobManager, listener)
6567
})
6668

6769
wg.Wait()

peer/server.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,22 @@ package peer
33
import "encoding/json"
44
import "fmt"
55
import "io"
6+
import "lbry/daemon/blob"
67
import "net"
78
import "time"
89

9-
func StartServer(listener net.Listener) {
10+
func StartServer(blobManager blob.BlobManager, listener net.Listener) {
1011
for {
1112
conn, err := listener.Accept()
1213
if err != nil {
1314
fmt.Println("Error accepting:", err)
1415
continue
1516
}
16-
go handleConnection(conn)
17+
go handleConnection(conn, blobManager)
1718
}
1819
}
1920

20-
func handleConnection(conn net.Conn) {
21+
func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
2122
defer conn.Close()
2223
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // Prevent hanging
2324

@@ -40,7 +41,7 @@ func handleConnection(conn net.Conn) {
4041

4142
requestedBlobsValue, hasRequestedBlobs := data["requested_blobs"]
4243
if hasRequestedBlobs {
43-
responseData["available_blobs"] = getAvailableBlobs(requestedBlobsValue.([]string))
44+
responseData["available_blobs"] = getAvailableBlobs(blobManager, requestedBlobsValue.([]string))
4445
}
4546

4647
blobDataPaymentRateValue, hasBlobDataPaymentRate := data["blob_data_payment_rate"]
@@ -53,7 +54,7 @@ func handleConnection(conn net.Conn) {
5354

5455
requestedBlobValue, hasRequestedBlob := data["requested_blob"]
5556
if hasRequestedBlob {
56-
incomingBlob, blobData = getRequestedBlob(requestedBlobValue.(string))
57+
incomingBlob, blobData = getRequestedBlob(blobManager, requestedBlobValue.(string))
5758
responseData["incoming_blob"] = incomingBlob
5859
}
5960

@@ -64,17 +65,35 @@ func handleConnection(conn net.Conn) {
6465
}
6566
}
6667

67-
func getAvailableBlobs(requestedBlobs []string) []string {
68-
// TODO
69-
return []string{}
68+
func getAvailableBlobs(blobManager blob.BlobManager, requestedBlobs []string) []string {
69+
var availableBlobs []string
70+
71+
for _, requestedBlob := range requestedBlobs {
72+
if blobManager.Has(requestedBlob) {
73+
availableBlobs = append(availableBlobs, requestedBlob)
74+
}
75+
}
76+
77+
return availableBlobs
7078
}
7179

7280
func getBlobDataPaymentRate(blobDataPaymentRate float64) string {
7381
// TODO
7482
return "RATE_UNSET"
7583
}
7684

77-
func getRequestedBlob(requestedBlob string) (map[string]any, []byte) {
78-
// TODO
79-
return map[string]any{}, []byte{}
85+
func getRequestedBlob(blobManager blob.BlobManager, requestedBlob string) (map[string]any, []byte) {
86+
if blobManager.Has(requestedBlob) {
87+
blobData := blobManager.Get(requestedBlob)
88+
return map[string]any{
89+
"blob_hash": requestedBlob,
90+
"length": len(blobData),
91+
}, blobData
92+
}
93+
94+
return map[string]any{
95+
"blob_hash": "",
96+
"length": 0,
97+
"error": "Blob not found",
98+
}, nil
8099
}

reflector/server.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,22 @@ package reflector
33
import "encoding/json"
44
import "fmt"
55
import "io"
6+
import "lbry/daemon/blob"
67
import "net"
78
import "time"
89

9-
func StartServer(listener net.Listener) {
10+
func StartServer(blobManager blob.BlobManager, listener net.Listener) {
1011
for {
1112
conn, err := listener.Accept()
1213
if err != nil {
1314
fmt.Println("Error accepting:", err)
1415
continue
1516
}
16-
go handleConnection(conn)
17+
go handleConnection(conn, blobManager)
1718
}
1819
}
1920

20-
func handleConnection(conn net.Conn) {
21+
func handleConnection(conn net.Conn, blobManager blob.BlobManager) {
2122
defer conn.Close()
2223
conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // Prevent hanging
2324

@@ -72,9 +73,12 @@ func handleConnection(conn net.Conn) {
7273

7374
blobData := make([]byte, blobSize)
7475
_, err := io.ReadFull(conn, blobData)
76+
if err != nil {
77+
conn.Close()
78+
return
79+
}
7580

76-
// TODO Process blob data
77-
fmt.Printf("BLOB [%d] (%s) = %+v\n", blobSize, blobHash, blobData)
81+
err = blobManager.Set(blobHash, blobData, false)
7882

7983
jsonEncoder.Encode(map[string]any{
8084
"received_blob": err == nil,
@@ -101,9 +105,12 @@ func handleConnection(conn net.Conn) {
101105

102106
sdBlobData := make([]byte, sdBlobSize)
103107
_, err := io.ReadFull(conn, sdBlobData)
108+
if err != nil {
109+
conn.Close()
110+
return
111+
}
104112

105-
// TODO Process SD blob data
106-
fmt.Printf("SD BLOB [%d] (%s) = %+v\n", sdBlobSize, sdBlobHash, string(sdBlobData))
113+
err = blobManager.Set(sdBlobHash, sdBlobData, true)
107114

108115
jsonEncoder.Encode(map[string]any{
109116
"received_sd_blob": err == nil,

0 commit comments

Comments
 (0)