Skip to content

Commit 2ce414b

Browse files
committed
storage/splitfdstream: add lookup_digest RPC method
Add a new lookup_digest JSON-RPC method that allows retrieving a file or chunk by its digest. This enables efficient content-addressed lookup using the chunked package's cache infrastructure. Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent 2011be4 commit 2ce414b

5 files changed

Lines changed: 137 additions & 17 deletions

File tree

storage/pkg/splitfdstream/protocol.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
const (
1515
MethodApplySplitFDStream = "apply_splitfdstream"
1616
MethodGetSplitFDStream = "get_splitfdstream"
17+
MethodLookupDigest = "lookup_digest"
1718
MethodPing = "ping"
1819
)
1920

@@ -28,6 +29,8 @@ const (
2829
// Application-specific error codes (range -32000 to -32099)
2930
ErrorCodeDriverNotSupported = -32000
3031
ErrorCodeFileDescriptorError = -32001
32+
ErrorCodeDigestNotFound = -32002
33+
ErrorCodeStoreNotAvailable = -32003
3134
)
3235

3336
// SplitFDStreamRequest represents a JSON-RPC request for splitfdstream operations.
@@ -59,6 +62,8 @@ type SplitFDStreamResult struct {
5962
FileDescriptors *int `json:"fileDescriptors,omitempty"`
6063
BatchSize int `json:"batchSize,omitempty"`
6164
NumBatches int `json:"numBatches,omitempty"`
65+
Offset int64 `json:"offset,omitempty"`
66+
ChunkSize int64 `json:"chunkSize,omitempty"`
6267
Message string `json:"message,omitempty"`
6368
}
6469

@@ -167,6 +172,8 @@ func (r *SplitFDStreamRequest) IsValid() error {
167172
Data: "layerId is required for get_splitfdstream",
168173
}
169174
}
175+
case MethodLookupDigest:
176+
// Validation is done in ParseLookupDigestParams
170177
case MethodPing:
171178
// No validation required for ping
172179
default:
@@ -270,3 +277,37 @@ func ParsePingParams(params SplitFDStreamParams) (*PingParams, error) {
270277

271278
return &pingParams, nil
272279
}
280+
281+
// LookupDigestParams represents parameters specific to lookup_digest method.
282+
type LookupDigestParams struct {
283+
Digest string `json:"digest"`
284+
}
285+
286+
// ParseLookupDigestParams parses the generic params as LookupDigestParams.
287+
func ParseLookupDigestParams(params SplitFDStreamParams) (*LookupDigestParams, error) {
288+
// Merge Options into a flat map
289+
flatMap := make(map[string]interface{})
290+
for k, v := range params.Options {
291+
flatMap[k] = v
292+
}
293+
294+
data, err := json.Marshal(flatMap)
295+
if err != nil {
296+
return nil, fmt.Errorf("failed to marshal params: %w", err)
297+
}
298+
299+
var lookupParams LookupDigestParams
300+
if err := json.Unmarshal(data, &lookupParams); err != nil {
301+
return nil, fmt.Errorf("failed to parse lookup params: %w", err)
302+
}
303+
304+
if lookupParams.Digest == "" {
305+
return nil, &SplitFDStreamError{
306+
Code: ErrorCodeInvalidParams,
307+
Message: "Invalid params",
308+
Data: "digest is required for lookup_digest",
309+
}
310+
}
311+
312+
return &lookupParams, nil
313+
}

storage/pkg/splitfdstream/server.go

Lines changed: 85 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,22 +70,30 @@ func (r *limitedSocketReader) Read(p []byte) (int, error) {
7070
return n, nil
7171
}
7272

73+
// DigestLookupFunc is a function that looks up a file by its digest.
74+
// It returns the open file, offset within the file, and chunk size.
75+
// A nil file means the digest was not found.
76+
// The caller is responsible for closing the returned file.
77+
type DigestLookupFunc func(digest string) (file *os.File, offset int64, size int64, err error)
78+
7379
// JSONRPCServer manages the JSON-RPC server for storage operations.
7480
type JSONRPCServer struct {
75-
driver any
76-
listener net.Listener
77-
socketPath string
78-
shutdown chan struct{}
79-
connections sync.WaitGroup
80-
mu sync.RWMutex
81-
running bool
81+
driver any
82+
digestLookup DigestLookupFunc
83+
listener net.Listener
84+
socketPath string
85+
shutdown chan struct{}
86+
connections sync.WaitGroup
87+
mu sync.RWMutex
88+
running bool
8289
}
8390

8491
// NewJSONRPCServer creates a new JSON-RPC server.
85-
func NewJSONRPCServer(driver any) *JSONRPCServer {
92+
func NewJSONRPCServer(driver any, digestLookup DigestLookupFunc) *JSONRPCServer {
8693
return &JSONRPCServer{
87-
driver: driver,
88-
shutdown: make(chan struct{}),
94+
driver: driver,
95+
digestLookup: digestLookup,
96+
shutdown: make(chan struct{}),
8997
}
9098
}
9199

@@ -271,6 +279,8 @@ func (s *JSONRPCServer) handleRequest(fdPasser *FDPasser, requestLine string) {
271279
s.handleApplySplitFDStream(fdPasser, req)
272280
case MethodGetSplitFDStream:
273281
s.handleGetSplitFDStream(fdPasser, req)
282+
case MethodLookupDigest:
283+
s.handleLookupDigest(fdPasser, req)
274284
default:
275285
resp := NewErrorResponse(ErrorCodeMethodNotFound, "Method not found", req.Method, req.ID)
276286
s.sendResponse(fdPasser, resp)
@@ -528,6 +538,71 @@ func (s *JSONRPCServer) handleGetSplitFDStream(fdPasser *FDPasser, req *SplitFDS
528538
}
529539
}
530540

541+
// handleLookupDigest handles lookup_digest requests.
542+
// Protocol:
543+
// 1. Client sends JSON-RPC request with digest
544+
// 2. Server looks up the digest in the cache
545+
// 3. Server sends response with offset and size, plus the file descriptor
546+
func (s *JSONRPCServer) handleLookupDigest(fdPasser *FDPasser, req *SplitFDStreamRequest) {
547+
// Parse the lookup-specific parameters
548+
lookupParams, err := ParseLookupDigestParams(req.Params)
549+
if err != nil {
550+
resp := NewErrorResponse(ErrorCodeInvalidParams, err.Error(), nil, req.ID)
551+
s.sendResponse(fdPasser, resp)
552+
return
553+
}
554+
555+
// Check if digest lookup is available
556+
if s.digestLookup == nil {
557+
resp := NewErrorResponse(
558+
ErrorCodeStoreNotAvailable,
559+
"digest lookup is not available",
560+
nil,
561+
req.ID,
562+
)
563+
s.sendResponse(fdPasser, resp)
564+
return
565+
}
566+
567+
// Look up the digest
568+
file, offset, size, err := s.digestLookup(lookupParams.Digest)
569+
if err != nil {
570+
resp := NewErrorResponse(ErrorCodeInternalError, fmt.Sprintf("failed to lookup digest: %v", err), nil, req.ID)
571+
s.sendResponse(fdPasser, resp)
572+
return
573+
}
574+
575+
if file == nil {
576+
resp := NewErrorResponse(
577+
ErrorCodeDigestNotFound,
578+
fmt.Sprintf("digest %s not found", lookupParams.Digest),
579+
nil,
580+
req.ID,
581+
)
582+
s.sendResponse(fdPasser, resp)
583+
return
584+
}
585+
586+
defer file.Close()
587+
588+
// Send success response with metadata
589+
responseResult := &SplitFDStreamResult{
590+
Offset: offset,
591+
ChunkSize: size,
592+
Message: "found",
593+
}
594+
numFDs := 1
595+
responseResult.FileDescriptors = &numFDs
596+
resp := NewSuccessResponse(responseResult, req.ID)
597+
s.sendResponse(fdPasser, resp)
598+
599+
// Send the file descriptor
600+
if err := fdPasser.SendFileDescriptors([]*os.File{file}, []byte{0}); err != nil {
601+
logrus.Errorf("splitfdstream: failed to send file descriptor for digest lookup: %v", err)
602+
return
603+
}
604+
}
605+
531606
// waitForContinue reads "continue\n" from the client.
532607
func (s *JSONRPCServer) waitForContinue(fdPasser *FDPasser) error {
533608
msg, err := fdPasser.ReadLine()

storage/pkg/splitfdstream/server_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (m *MockDriver) GetSplitFDStream(id, parent string, options *GetSplitFDStre
3131

3232
func TestSplitFDStreamServer_StartStop(t *testing.T) {
3333
mockDriver := &MockDriver{}
34-
server := NewJSONRPCServer(mockDriver)
34+
server := NewJSONRPCServer(mockDriver, nil)
3535

3636
tmpDir := t.TempDir()
3737
socketPath := filepath.Join(tmpDir, "test.sock")
@@ -63,7 +63,7 @@ func TestSplitFDStreamServer_StartStop(t *testing.T) {
6363

6464
func TestSplitFDStreamServer_DoubleStart(t *testing.T) {
6565
mockDriver := &MockDriver{}
66-
server := NewJSONRPCServer(mockDriver)
66+
server := NewJSONRPCServer(mockDriver, nil)
6767

6868
tmpDir := t.TempDir()
6969
socketPath := filepath.Join(tmpDir, "test.sock")
@@ -84,7 +84,7 @@ func TestSplitFDStreamServer_DoubleStart(t *testing.T) {
8484

8585
func TestSplitFDStreamServer_PingRequest(t *testing.T) {
8686
mockDriver := &MockDriver{}
87-
server := NewJSONRPCServer(mockDriver)
87+
server := NewJSONRPCServer(mockDriver, nil)
8888

8989
tmpDir := t.TempDir()
9090
socketPath := filepath.Join(tmpDir, "test.sock")
@@ -128,7 +128,7 @@ func TestSplitFDStreamServer_PingRequest(t *testing.T) {
128128
func TestSplitFDStreamServer_UnsupportedDriver(t *testing.T) {
129129
// Use mockBaseDriver which does NOT implement SplitFDStreamDriver
130130
baseDriver := &mockBaseDriver{}
131-
server := NewJSONRPCServer(baseDriver)
131+
server := NewJSONRPCServer(baseDriver, nil)
132132

133133
tmpDir := t.TempDir()
134134
socketPath := filepath.Join(tmpDir, "test.sock")
@@ -182,7 +182,7 @@ func TestSplitFDStreamServer_UnsupportedDriver(t *testing.T) {
182182

183183
func TestSplitFDStreamServer_InvalidRequest(t *testing.T) {
184184
mockDriver := &MockDriver{}
185-
server := NewJSONRPCServer(mockDriver)
185+
server := NewJSONRPCServer(mockDriver, nil)
186186

187187
tmpDir := t.TempDir()
188188
socketPath := filepath.Join(tmpDir, "test.sock")

storage/pkg/splitfdstream/server_unsupported.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ import (
77
"os"
88
)
99

10+
// DigestLookupFunc is a function that looks up a file by its digest.
11+
// A nil file means the digest was not found.
12+
type DigestLookupFunc func(digest string) (file *os.File, offset int64, size int64, err error)
13+
1014
// JSONRPCServer manages the JSON-RPC server for storage operations.
1115
type JSONRPCServer struct{}
1216

1317
// NewJSONRPCServer creates a new JSON-RPC server.
14-
func NewJSONRPCServer(driver any) *JSONRPCServer {
18+
func NewJSONRPCServer(driver any, digestLookup DigestLookupFunc) *JSONRPCServer {
1519
return &JSONRPCServer{}
1620
}
1721

storage/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4139,7 +4139,7 @@ func (s *store) SplitFDStreamSocket() (*os.File, error) {
41394139

41404140
// Initialize server if not already created
41414141
if s.jsonRPCServer == nil {
4142-
s.jsonRPCServer = splitfdstream.NewJSONRPCServer(s.graphDriver)
4142+
s.jsonRPCServer = splitfdstream.NewJSONRPCServer(s.graphDriver, nil)
41434143
}
41444144

41454145
// Start handling the server connection in a goroutine

0 commit comments

Comments
 (0)