Skip to content

Commit d872c27

Browse files
committed
fix(sdk): replaces nri/net/multiplex with yamux
1 parent 3ac7bbc commit d872c27

21 files changed

Lines changed: 2535 additions & 115 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
connectrpc.com/connect v1.18.1
77
github.com/alecthomas/kong v1.11.0
88
github.com/containerd/nri v0.9.0
9+
github.com/hashicorp/yamux v0.1.2
910
github.com/keybase/go-keychain v0.0.1
1011
github.com/sirupsen/logrus v1.9.3
1112
github.com/stretchr/testify v1.10.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
3030
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
3131
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
3232
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
33+
github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8=
34+
github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns=
3335
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
3436
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
3537
github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU=

internal/ipc/design.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# IPC and multiplexing a socket
2+
The plugin system has two parts:
3+
- the runtime: launches plugins or allows manual launched plugins to connect
4+
- the plugin(s): registers to the runtime and is domain expert for a specific secret provider
5+
6+
From the plugin system perspective, the runtime is a lifecycle management server for plugins.
7+
However, from the secrets engine perspective, the runtime is a client that can request secrets from a plugin.
8+
9+
To avoid having a socket per plugin, we multiplex the socket.
10+
```mermaid
11+
flowchart TD
12+
Socket[(Multiplexed<br>Socket)]
13+
14+
subgraph PluginBlock [Plugin]
15+
direction TB
16+
PSrv["Provider (Server)"]
17+
PC["Plugin (Client)"]
18+
end
19+
20+
subgraph ResolverBlock [Resolver]
21+
direction TB
22+
RPC["Resolver (Provider Client)"]
23+
PServ[Plugin Server/Runtime]
24+
end
25+
26+
Socket -->|register plugin| PServ
27+
Socket -->|get secret| PSrv
28+
PC -->|register plugin| Socket
29+
RPC -->|get secret| Socket
30+
31+
```
32+
33+
## Choosing a multiplexer
34+
The multiplexer adds a custom layer on top of the socket that allows running servers on both ends of the socket.
35+
36+
### nri/net/multiplex - a minimal multiplexer
37+
The plugin system in [containerd/nr](https://github.com/containerd/nri) implements its own simple frame-based multiplexer [nri/net/multiplex](https://github.com/containerd/nri/tree/main/pkg/net/multiplex).
38+
It provides two streams on each side that need to be re-used for all communication.
39+
This works well for [ttrpc](https://github.com/containerd/ttrpc) which uses its own length-prefixed framing.
40+
However, the standard Go HTTP server inside `Server(net.Listener)` does one `Accept()`, gets one `net.Conn`, and then loops inside serveConn to decode requests.
41+
But because that sub-connection isn’t a real socket—and because the mux delivers no further “new connections” and any pipelined data after the first request can get lost or stuck in the mux’s framing, and the server never sees it.
42+
43+
Alternatively, HTTP/2 without TLS could be used as it has gives control over the framing.
44+
Unfortunately, Go's `net/http` package does not easily support HTTP/2 without TLS and getting it work comes with its own set of challenges, such as requiring a custom `net.Listener` implementation that handles the HTTP/2 framing.
45+
46+
TLDR: [nri/net/multiplex](https://github.com/containerd/nri/tree/main/pkg/net/multiplex) is not ideal for general HTTP servers.
47+
48+
### Yamux
49+
50+
Yamux is a full-featured, multiplexing protocol that allows multiple streams to be sent over a single TCP connection, actively maintained by Hashicorp and e.g. used in Hashicorp's Nomad.
51+
I.e., using Yamux, Go's `net/http` works out-of-the-box.
52+
53+
54+
55+

internal/ipc/ipc.go

Lines changed: 29 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,33 @@ import (
88
"net/http"
99
"sync"
1010

11-
"github.com/containerd/nri/pkg/net/multiplex"
11+
"github.com/hashicorp/yamux"
1212
)
1313

14-
type PluginIPC interface {
15-
// Conn returns a connection that can be used to reach the runtime server on the other end of
16-
// the multiplexed connection (this is not the original net.Conn, but a multiplexed connection!).
17-
Conn() net.Conn
18-
// Wait blocks forever until the server is closed or an error occurs.
19-
// Cancelling the context will not close the server, but will return nil.
20-
Wait(ctx context.Context) error
21-
// Close shuts down the server and closes the multiplexer, and its connection/listener.
22-
Close() error
23-
}
24-
25-
func NewPluginIPC(sockConn net.Conn, handler http.Handler) (PluginIPC, error) {
26-
return newMuxedIPC(sockConn, handler, multiplex.PluginServiceConn, multiplex.RuntimeServiceConn)
14+
func NewPluginIPC(sockConn net.Conn, handler http.Handler) (IPC, *http.Client, error) {
15+
session, err := yamux.Client(sockConn, nil)
16+
if err != nil {
17+
return nil, nil, fmt.Errorf("creating yamux client: %w", err)
18+
}
19+
i, c := newMuxedIPC(session, handler)
20+
return i, c, nil
2721
}
2822

29-
type RuntimeIPC interface {
30-
// Conn returns a connection that can be used to reach the server running in the plugin on the other end of
31-
// the multiplexed connection (this is not the original net.Conn, but a multiplexed connection!).
32-
Conn() net.Conn
23+
type IPC interface {
3324
// Wait blocks forever until the server is closed or an error occurs.
3425
// Cancelling the context will not close the server, but will return nil.
3526
Wait(ctx context.Context) error
3627
// Close shuts down the server and closes the multiplexer, and its connection/listener.
3728
Close() error
38-
// Unblock unblocks the multiplexed connection, allowing it to read from the socket.
39-
Unblock()
4029
}
4130

42-
func NewRuntimeIPC(sockConn net.Conn, handler http.Handler) (RuntimeIPC, error) {
43-
return newMuxedIPC(sockConn, handler, multiplex.RuntimeServiceConn, multiplex.PluginServiceConn, multiplex.WithBlockedRead())
31+
func NewRuntimeIPC(sockConn net.Conn, handler http.Handler) (IPC, *http.Client, error) {
32+
session, err := yamux.Server(sockConn, nil)
33+
if err != nil {
34+
return nil, nil, fmt.Errorf("creating yamux server: %w", err)
35+
}
36+
i, c := newMuxedIPC(session, handler)
37+
return i, c, nil
4438
}
4539

4640
type ipcServer struct {
@@ -49,7 +43,7 @@ type ipcServer struct {
4943
err error
5044
}
5145

52-
func newIpcServer(l net.Listener, handler http.Handler, onError func()) *ipcServer {
46+
func newIpcServer(l net.Listener, handler http.Handler, onError func() error) *ipcServer {
5347
result := &ipcServer{
5448
done: make(chan struct{}),
5549
server: &http.Server{
@@ -59,48 +53,28 @@ func newIpcServer(l net.Listener, handler http.Handler, onError func()) *ipcServ
5953
go func() {
6054
err := result.server.Serve(l)
6155
if !errors.Is(err, http.ErrServerClosed) {
62-
onError()
63-
result.err = err
56+
result.err = errors.Join(err, onError())
6457
}
6558
close(result.done)
6659
}()
6760
return result
6861
}
6962

7063
type ipcImpl struct {
71-
mConn net.Conn
7264
server *ipcServer
7365
teardown func() error
74-
unblock func()
7566
}
7667

77-
func newMuxedIPC(sockConn net.Conn, handler http.Handler, listenerID, connID multiplex.ConnID, options ...multiplex.Option) (*ipcImpl, error) {
78-
mux := multiplex.Multiplex(sockConn, options...)
79-
listener, err := mux.Listen(listenerID)
80-
if err != nil {
81-
mux.Close()
82-
return nil, err
83-
}
84-
conn, err := mux.Open(connID)
85-
if err != nil {
86-
mux.Close()
87-
return nil, fmt.Errorf("failed to multiplex grcp client connection: %w", err)
88-
}
89-
server := newIpcServer(listener, handler, func() { mux.Close() })
68+
func newMuxedIPC(session *yamux.Session, handler http.Handler) (*ipcImpl, *http.Client) {
69+
server := newIpcServer(session, handler, session.Close)
9070
return &ipcImpl{
91-
mConn: conn,
9271
server: server,
9372
teardown: sync.OnceValue(func() error {
94-
err := errors.Join(server.server.Close(), mux.Close())
73+
err := errors.Join(server.server.Close(), session.Close())
9574
<-server.done
9675
return err
9776
}),
98-
unblock: mux.Unblock,
99-
}, nil
100-
}
101-
102-
func (i *ipcImpl) Conn() net.Conn {
103-
return i.mConn
77+
}, createYamuxedClient(session)
10478
}
10579

10680
func (i *ipcImpl) Wait(ctx context.Context) error {
@@ -116,6 +90,11 @@ func (i *ipcImpl) Close() error {
11690
return i.teardown()
11791
}
11892

119-
func (i *ipcImpl) Unblock() {
120-
i.unblock()
93+
func createYamuxedClient(session *yamux.Session) *http.Client {
94+
transport := &http.Transport{
95+
DialContext: func(context.Context, string, string) (net.Conn, error) {
96+
return session.Open()
97+
},
98+
}
99+
return &http.Client{Transport: transport}
121100
}

internal/ipc/ipc_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package ipc
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"log"
7+
"net"
8+
"net/http"
9+
"os"
10+
"path/filepath"
11+
"testing"
12+
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
const (
18+
mockPingPath = "/ping"
19+
)
20+
21+
func Test_newExternalPlugin(t *testing.T) {
22+
tests := []struct {
23+
name string
24+
test func(t *testing.T)
25+
}{
26+
{
27+
name: "ping pong on both sides",
28+
test: func(t *testing.T) {
29+
socketPath := filepath.Join(os.TempDir(), "secrets-engine-plugin.sock")
30+
os.Remove(socketPath)
31+
require.NoError(t, os.MkdirAll(filepath.Dir(socketPath), 0755))
32+
l, err := net.ListenUnix("unix", &net.UnixAddr{
33+
Name: socketPath,
34+
Net: "unix",
35+
})
36+
require.NoError(t, err)
37+
doneRuntime := make(chan struct{})
38+
donePlugin := make(chan struct{})
39+
go func() {
40+
sock, err := l.Accept()
41+
if err != nil {
42+
log.Fatalf("plugin accept: %v", err)
43+
}
44+
defer sock.Close()
45+
46+
pluginHandler := http.NewServeMux()
47+
pluginHandler.HandleFunc(mockPingPath, func(w http.ResponseWriter, _ *http.Request) {
48+
fmt.Fprint(w, "pong-runtime")
49+
})
50+
i, c, err := NewRuntimeIPC(sock, pluginHandler)
51+
if err != nil {
52+
log.Fatalf("plugin IPC setup error: %v", err)
53+
}
54+
defer i.Close()
55+
assertCommunicationToServer(t, c, "pong-plugin")
56+
assertCommunicationToServer(t, c, "pong-plugin") // run at least twice to ensure re-opening connection works
57+
58+
close(doneRuntime)
59+
<-donePlugin
60+
}()
61+
62+
sock, err := net.DialUnix("unix", nil, &net.UnixAddr{Name: socketPath, Net: "unix"})
63+
if err != nil {
64+
log.Fatalf("dial error: %v", err)
65+
}
66+
t.Cleanup(func() { sock.Close() })
67+
68+
runtimeHandler := http.NewServeMux()
69+
runtimeHandler.HandleFunc(mockPingPath, func(w http.ResponseWriter, _ *http.Request) {
70+
fmt.Fprint(w, "pong-plugin")
71+
})
72+
i, c, err := NewPluginIPC(sock, runtimeHandler)
73+
require.NoError(t, err)
74+
t.Cleanup(func() { i.Close() })
75+
76+
assertCommunicationToServer(t, c, "pong-runtime")
77+
assertCommunicationToServer(t, c, "pong-runtime") // run at least twice to ensure re-opening connection works
78+
close(donePlugin)
79+
<-doneRuntime
80+
},
81+
},
82+
}
83+
for _, tt := range tests {
84+
t.Run(tt.name, func(t *testing.T) {
85+
tt.test(t)
86+
})
87+
}
88+
}
89+
90+
func assertCommunicationToServer(t *testing.T, c *http.Client, response string) {
91+
req, _ := http.NewRequest("GET", "http://unused"+mockPingPath, nil)
92+
resp, err := c.Do(req)
93+
require.NoError(t, err)
94+
body, _ := io.ReadAll(resp.Body)
95+
resp.Body.Close()
96+
assert.Equal(t, http.StatusOK, resp.StatusCode)
97+
assert.Equal(t, response, string(body))
98+
}

pkg/adaptation/setup.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
)
1515

1616
type SetupResult struct {
17-
conn net.Conn
18-
cfg pluginCfgIn
19-
close func() error
17+
client *http.Client
18+
cfg pluginCfgIn
19+
close func() error
2020
}
2121

2222
var _ pluginCfgInValidator = &setupValidator{}
@@ -36,11 +36,10 @@ func Setup(conn net.Conn, v setupValidator) (*SetupResult, error) {
3636
})
3737
registrator := newRegistrationLogic(v, chRegistrationResult)
3838
httpMux.Handle(resolverv1connect.NewEngineServiceHandler(&RegisterService{registrator}))
39-
i, err := ipc.NewRuntimeIPC(conn, httpMux)
39+
i, c, err := ipc.NewRuntimeIPC(conn, httpMux)
4040
if err != nil {
4141
return nil, err
4242
}
43-
i.Unblock()
4443
ctx, cancel := context.WithCancel(context.Background())
4544
defer cancel()
4645
chIpcErr := make(chan error, 1)
@@ -63,9 +62,9 @@ func Setup(conn net.Conn, v setupValidator) (*SetupResult, error) {
6362
return nil, errors.New("plugin registration timed out")
6463
}
6564
return &SetupResult{
66-
conn: conn,
67-
cfg: out,
68-
close: i.Close,
65+
client: c,
66+
cfg: out,
67+
close: i.Close,
6968
}, nil
7069
}
7170

plugin/go.mod

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,12 @@ require (
1414
)
1515

1616
require (
17-
github.com/containerd/log v0.1.0 // indirect
18-
github.com/containerd/ttrpc v1.2.6-0.20240827082320-b5cd6e4b3287 // indirect
1917
github.com/davecgh/go-spew v1.1.1 // indirect
20-
github.com/golang/protobuf v1.5.3 // indirect
18+
github.com/hashicorp/yamux v0.1.2 // indirect
2119
github.com/kr/text v0.2.0 // indirect
2220
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
2321
github.com/pmezard/go-difflib v1.0.0 // indirect
2422
golang.org/x/sys v0.29.0 // indirect
25-
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
26-
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect
27-
google.golang.org/grpc v1.57.1 // indirect
2823
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
2924
gopkg.in/yaml.v3 v3.0.1 // indirect
3025
)

0 commit comments

Comments
 (0)