Skip to content

Commit 8300a57

Browse files
authored
Merge pull request #59 from docker/feat/adaptation-external-plugins
feat(sdk): plugin runtime to handle externally launched plugins
2 parents bdecbee + f6bd88b commit 8300a57

13 files changed

Lines changed: 392 additions & 28 deletions

File tree

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ FROM base AS lint
1818
COPY --from=lint-base /usr/bin/golangci-lint /usr/bin/golangci-lint
1919
ARG TARGETOS
2020
ARG TARGETARCH
21-
RUN --mount=target=. \
21+
RUN --mount=type=bind,target=. \
2222
--mount=type=cache,target=/go/pkg/mod \
2323
--mount=type=cache,target=/root/.cache/go-build \
2424
--mount=type=cache,target=/root/.cache/golangci-lint <<EOD

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ module github.com/docker/secrets-engine
22

33
go 1.24.3
44

5+
replace github.com/docker/secrets-engine/plugin => ./plugin/
6+
57
require (
68
connectrpc.com/connect v1.18.1
79
github.com/alecthomas/kong v1.11.0
810
github.com/containerd/nri v0.9.0
11+
github.com/docker/secrets-engine/plugin v0.0.0-00010101000000-000000000000
912
github.com/hashicorp/yamux v0.1.2
1013
github.com/keybase/go-keychain v0.0.1
1114
github.com/sirupsen/logrus v1.9.3

internal/ipc/design.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,32 @@ Yamux is a full-featured, multiplexing protocol that allows multiple streams to
5151
Using Yamux we get Go's `net/http` out-of-the-box.
5252

5353

54+
## Decisions
5455

56+
---
57+
58+
2025-07-02 IPC stack
59+
60+
The IPC stack consists of multiple parts that need to play well together:
61+
- socket multiplexing
62+
- API format (includes networking protocol + serialization format)
63+
64+
At this point in time we have decided to go with yamux + connect rpc.
65+
Connect rpc in itself uses protobuf for data serialization combined with gRPC over http for networking.
66+
A main advantage is that we can keep using Go's standard library's `net/http` stack for server and client.
67+
See [Connect: A better gRPC](https://buf.build/blog/connect-a-better-grpc) for a detailed comparison against e.g.`grpc-go`.
68+
Also connect rpc is part of CNCF ([source](https://www.cncf.io/projects/connect-rpc/)).
69+
70+
Potential drawbacks: Performance
71+
72+
Using [nri/net/multiplex](https://github.com/containerd/nri/tree/main/pkg/net/multiplex) with [ttrpc](https://github.com/containerd/ttrpc) probably would be the most performant solution.
73+
It re-uses one stream over the multiplexed socket per direction and does not have the overhead of the HTTP protocol as Protobuf gets streamed directly over the multiplexer.
74+
Although lightweight, it has stopped evolving and has not caught up to the latest improvements on Protobuf.
75+
Another major downside is that it's mainly Go only.
76+
Plugins written in a different language would come at a high cost.
77+
78+
We argue that in our use case since the networking only happens locally the overhead of GRPC over HTTP and the cost of opening a new yamux stream per API request are negligible.
79+
In addition, the main performance bottleneck will be within the actual plugins due to IO operations, additional upstream network requests and potentially authentication.
80+
81+
---
5582

internal/ipc/ipc.go

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88
"net"
99
"net/http"
1010
"sync"
11+
"syscall"
1112
"time"
1213

1314
"github.com/hashicorp/yamux"
15+
"github.com/sirupsen/logrus"
1416
)
1517

1618
const (
@@ -56,7 +58,7 @@ type ipcServer struct {
5658
err error
5759
}
5860

59-
func newIpcServer(l net.Listener, handler http.Handler, onClose func(error) error) *ipcServer {
61+
func newIpcServer(l net.Listener, handler http.Handler, afterClose func(error) error) *ipcServer {
6062
result := &ipcServer{
6163
done: make(chan struct{}),
6264
server: &http.Server{
@@ -68,7 +70,7 @@ func newIpcServer(l net.Listener, handler http.Handler, onClose func(error) erro
6870
if errors.Is(err, http.ErrServerClosed) { // not an error, client closed the connection
6971
err = nil
7072
}
71-
result.err = errors.Join(filterEOF(err), onClose(err)) // EOF: only forward to the onClose handler, but filter out internal forwarding
73+
result.err = errors.Join(filterBrokenPipe(filterEOF(err)), afterClose(err)) // EOF: only forward to the afterClose handler, but filter out internal forwarding
7274
close(result.done)
7375
}()
7476
return result
@@ -81,6 +83,13 @@ func filterEOF(err error) error {
8183
return err
8284
}
8385

86+
func filterBrokenPipe(err error) error {
87+
if errors.Is(err, syscall.EPIPE) {
88+
return nil
89+
}
90+
return err
91+
}
92+
8493
type ipcImpl struct {
8594
server *ipcServer
8695
teardown func() error
@@ -99,16 +108,34 @@ func newMuxedIPC(session *yamux.Session, handler http.Handler, onClose func(erro
99108
}
100109
return session.Close()
101110
})
111+
c := createYamuxedClient(session)
102112
return &ipcImpl{
103113
server: server,
104114
teardown: sync.OnceValue(func() error {
105-
ctx, cancel := context.WithTimeout(context.Background(), cfg.shutdownTimeout)
106-
defer cancel()
107-
err := server.server.Shutdown(ctx)
115+
_ = session.GoAway()
116+
c.CloseIdleConnections()
117+
waitForClientToDisconnect(session, cfg.shutdownTimeout)
118+
err := server.server.Close()
108119
<-server.done
109-
return errors.Join(err, session.Close(), server.err)
120+
return errors.Join(err, server.err)
110121
}),
111-
}, createYamuxedClient(session)
122+
}, c
123+
}
124+
125+
func waitForClientToDisconnect(s *yamux.Session, t time.Duration) {
126+
timeout := time.After(t)
127+
for {
128+
select {
129+
case <-time.After(50 * time.Millisecond):
130+
case <-timeout:
131+
logrus.Debugf("Timeout expired but %d streams still open, shutting down server...", s.NumStreams())
132+
return
133+
}
134+
streams := s.NumStreams()
135+
if streams <= 0 {
136+
return
137+
}
138+
}
112139
}
113140

114141
func (i *ipcImpl) Close() error {
@@ -120,6 +147,11 @@ func createYamuxedClient(session *yamux.Session) *http.Client {
120147
DialContext: func(context.Context, string, string) (net.Conn, error) {
121148
return session.Open()
122149
},
150+
// We don't want to use http keepalive because
151+
// - we keep re-using the underlying socket anyway
152+
// - it allows clean bookkeeping of yamux session / any yamux session means IPC happening and not stale keepalive connections
153+
// -> we can check for session.NumStreams() on shutdown
154+
DisableKeepAlives: true,
123155
}
124156
return &http.Client{Transport: transport}
125157
}

pkg/adaptation/plugin.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package adaptation
22

33
import (
4+
"context"
5+
"net"
46
"sync"
57
"time"
68

9+
"connectrpc.com/connect"
10+
"google.golang.org/protobuf/proto"
11+
712
"github.com/docker/secrets-engine/pkg/api"
13+
v1 "github.com/docker/secrets-engine/pkg/api/resolver/v1"
14+
"github.com/docker/secrets-engine/pkg/api/resolver/v1/resolverv1connect"
15+
"github.com/docker/secrets-engine/pkg/secrets"
816
)
917

1018
var (
@@ -24,3 +32,59 @@ func getPluginRegistrationTimeout() time.Duration {
2432
defer timeoutCfgLock.RUnlock()
2533
return pluginRegistrationTimeout
2634
}
35+
36+
var (
37+
_ secrets.Resolver = &plugin{}
38+
)
39+
40+
type plugin struct {
41+
sync.Mutex
42+
base string
43+
pattern secrets.Pattern
44+
version string
45+
pluginClient resolverv1connect.PluginServiceClient
46+
resolverClient resolverv1connect.ResolverServiceClient
47+
close func() error
48+
}
49+
50+
// newExternalPlugin creates a plugin (stub) for an accepted external plugin connection.
51+
func newExternalPlugin(conn net.Conn, v setupValidator) (*plugin, error) {
52+
r, err := setup(conn, v)
53+
if err != nil {
54+
return nil, err
55+
}
56+
return &plugin{
57+
base: r.cfg.name,
58+
pattern: r.cfg.pattern,
59+
version: r.cfg.version,
60+
pluginClient: resolverv1connect.NewPluginServiceClient(r.client, "http://unix"),
61+
resolverClient: resolverv1connect.NewResolverServiceClient(r.client, "http://unix"),
62+
close: r.close,
63+
}, nil
64+
}
65+
66+
func (p *plugin) GetSecret(ctx context.Context, request secrets.Request) (secrets.Envelope, error) {
67+
req := connect.NewRequest(v1.GetSecretRequest_builder{
68+
SecretId: proto.String(request.ID.String()),
69+
}.Build())
70+
resp, err := p.resolverClient.GetSecret(ctx, req)
71+
if err != nil {
72+
return envelopeErr(request, err), err
73+
}
74+
id, err := secrets.ParseID(resp.Msg.GetSecretId())
75+
if err != nil {
76+
return envelopeErr(request, err), err
77+
}
78+
if id != request.ID {
79+
return envelopeErr(request, secrets.ErrIDMismatch), secrets.ErrIDMismatch
80+
}
81+
return secrets.Envelope{
82+
ID: id,
83+
Value: []byte(resp.Msg.GetSecretValue()),
84+
Provider: p.base,
85+
}, nil
86+
}
87+
88+
func envelopeErr(req secrets.Request, err error) secrets.Envelope {
89+
return secrets.Envelope{ID: req.ID, ResolvedAt: time.Now(), Error: err.Error()}
90+
}

0 commit comments

Comments
 (0)