@@ -52,28 +52,30 @@ var (
5252)
5353
5454type PluginServer struct {
55- nodeName string
56- registerAnno string
57- handshakeAnno string
58- allocAnno string
59- grpcServer * grpc.Server
60- mgr * manager.AscendManager
61- socket string
62- stopCh chan interface {}
63- healthCh chan int32
55+ nodeName string
56+ registerAnno string
57+ handshakeAnno string
58+ allocAnno string
59+ grpcServer * grpc.Server
60+ mgr * manager.AscendManager
61+ socket string
62+ stopCh chan interface {}
63+ healthCh chan int32
64+ checkIdleVNPUInterval int
6465}
6566
66- func NewPluginServer (mgr * manager.AscendManager , nodeName string ) (* PluginServer , error ) {
67+ func NewPluginServer (mgr * manager.AscendManager , nodeName string , checkIdleVNPUInterval int ) (* PluginServer , error ) {
6768 return & PluginServer {
68- nodeName : nodeName ,
69- registerAnno : fmt .Sprintf ("hami.io/node-register-%s" , mgr .CommonWord ()),
70- handshakeAnno : fmt .Sprintf ("hami.io/node-handshake-%s" , mgr .CommonWord ()),
71- allocAnno : fmt .Sprintf ("huawei.com/%s" , mgr .CommonWord ()),
72- grpcServer : grpc .NewServer (),
73- mgr : mgr ,
74- socket : path .Join (v1beta1 .DevicePluginPath , fmt .Sprintf ("%s.sock" , mgr .CommonWord ())),
75- stopCh : make (chan interface {}),
76- healthCh : make (chan int32 ),
69+ nodeName : nodeName ,
70+ registerAnno : fmt .Sprintf ("hami.io/node-register-%s" , mgr .CommonWord ()),
71+ handshakeAnno : fmt .Sprintf ("hami.io/node-handshake-%s" , mgr .CommonWord ()),
72+ allocAnno : fmt .Sprintf ("huawei.com/%s" , mgr .CommonWord ()),
73+ grpcServer : grpc .NewServer (),
74+ mgr : mgr ,
75+ socket : path .Join (v1beta1 .DevicePluginPath , fmt .Sprintf ("%s.sock" , mgr .CommonWord ())),
76+ stopCh : make (chan interface {}),
77+ healthCh : make (chan int32 ),
78+ checkIdleVNPUInterval : checkIdleVNPUInterval ,
7779 }, nil
7880}
7981
@@ -91,16 +93,42 @@ func (ps *PluginServer) Start() error {
9193 if err != nil {
9294 return err
9395 }
96+ go ps .startPeriodicCheckIdleVNPUs ()
9497 go ps .watchAndRegister ()
9598 return nil
9699}
97100
101+ func (ps * PluginServer ) startPeriodicCheckIdleVNPUs () {
102+ ticker := time .NewTicker (time .Duration (ps .checkIdleVNPUInterval ) * time .Second )
103+ defer ticker .Stop ()
104+ for {
105+ select {
106+ case <- ticker .C :
107+ klog .Info ("Running scheduled idle vNPU cleanup" )
108+ if err := ps .CleanupIdleVNPUs (); err != nil {
109+ klog .Errorf ("Failed to cleanup idle vNPUs: %v" , err )
110+ }
111+ case <- ps .stopCh :
112+ klog .Info ("Stopping cleanup goroutine" )
113+ return
114+ }
115+ }
116+ }
117+
98118func (ps * PluginServer ) Stop () error {
99119 close (ps .stopCh )
100120 ps .grpcServer .Stop ()
101121 return nil
102122}
103123
124+ func (ps * PluginServer ) StopCh () <- chan interface {} {
125+ return ps .stopCh
126+ }
127+
128+ func (ps * PluginServer ) CleanupIdleVNPUs () error {
129+ return ps .mgr .CleanupIdleVNPUs ()
130+ }
131+
104132func (ps * PluginServer ) dial (unixSocketPath string , timeout time.Duration ) (* grpc.ClientConn , error ) {
105133 ctx , cancel := context .WithTimeout (context .Background (), timeout )
106134 defer cancel ()
0 commit comments