Skip to content

Commit aa47f67

Browse files
authored
feat: Add node statistics (#450)
* add pending pods table * rename functions * rename remaining component functions * bump client version * add node statistics function * add set pod function * add test * add test and fix time check * add node statistics to cluster ping * update comment * add delete pod function * rename function * format * watch pods * sync pods * get node name * add node health * add pods to the cache even if the node name is empty to track pods that are not scheduled yet * revert last commit * fix time comparison * fix tests * rename tables
1 parent 0c44acc commit aa47f67

10 files changed

Lines changed: 365 additions & 118 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ require (
3636
github.com/opencost/opencost/core v0.0.0-20241216191657-30e5d9a27f41
3737
github.com/orcaman/concurrent-map/v2 v2.0.1
3838
github.com/pkg/errors v0.9.1
39-
github.com/pluralsh/console/go/client v1.44.0
39+
github.com/pluralsh/console/go/client v1.45.0
4040
github.com/pluralsh/controller-reconcile-helper v0.1.0
4141
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
4242
github.com/pluralsh/polly v0.2.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,8 +1802,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
18021802
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
18031803
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
18041804
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
1805-
github.com/pluralsh/console/go/client v1.44.0 h1:+CXCHhHIMKvzNl6do5nVwh1ffVzfEDgSPLhfXKvpOfI=
1806-
github.com/pluralsh/console/go/client v1.44.0/go.mod h1:8XlMMN3LLAN9JZo69f8X/XN7Qt1+aaKpgTvvQGfSiEU=
1805+
github.com/pluralsh/console/go/client v1.45.0 h1:eg6MlfD2NswPIM88Jhj4z//T86DoB8Ovl5QyNE8Gii0=
1806+
github.com/pluralsh/console/go/client v1.45.0/go.mod h1:8XlMMN3LLAN9JZo69f8X/XN7Qt1+aaKpgTvvQGfSiEU=
18071807
github.com/pluralsh/controller-reconcile-helper v0.1.0 h1:BV3dYZFH5rn8ZvZjtpkACSv/GmLEtRftNQj/Y4ddHEo=
18081808
github.com/pluralsh/controller-reconcile-helper v0.1.0/go.mod h1:RxAbvSB4/jkvx616krCdNQXPbpGJXW3J1L3rASxeFOA=
18091809
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=

pkg/cache/db/cache.go

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func GetComponentCache() *ComponentCache {
6262
return cache
6363
}
6464

65-
// Children retrieves all child components and their descendants (up to 4 levels deep) for a given component UID.
65+
// ComponentChildren retrieves all child components and their descendants (up to 4 levels deep) for a given component UID.
6666
// It returns a slice of ComponentChildAttributes containing information about each child component.
6767
//
6868
// Parameters:
@@ -71,7 +71,7 @@ func GetComponentCache() *ComponentCache {
7171
// Returns:
7272
// - []ComponentChildAttributes: A slice containing the child components and their attributes
7373
// - error: An error if the database operation fails or if the connection cannot be established
74-
func (in *ComponentCache) Children(uid string) (result []client.ComponentChildAttributes, err error) {
74+
func (in *ComponentCache) ComponentChildren(uid string) (result []client.ComponentChildAttributes, err error) {
7575
conn, err := in.pool.Take(context.Background())
7676
if err != nil {
7777
return result, err
@@ -98,11 +98,11 @@ func (in *ComponentCache) Children(uid string) (result []client.ComponentChildAt
9898
return result, err
9999
}
100100

101-
// Set stores or updates a component's attributes in the cache.
101+
// SetComponent stores or updates a component's attributes in the cache.
102102
// It takes a ComponentChildAttributes parameter containing the component's data.
103103
// If a component with the same UID exists, it will be updated; otherwise, a new entry is created.
104104
// Returns an error if the database operation fails or if the connection cannot be established.
105-
func (in *ComponentCache) Set(component client.ComponentChildAttributes) error {
105+
func (in *ComponentCache) SetComponent(component client.ComponentChildAttributes) error {
106106
conn, err := in.pool.Take(context.Background())
107107
if err != nil {
108108
return err
@@ -123,6 +123,22 @@ func (in *ComponentCache) Set(component client.ComponentChildAttributes) error {
123123
})
124124
}
125125

126+
// DeleteComponent removes a component from the cache by its unique identifier.
127+
// It takes a uid string parameter identifying the component to delete.
128+
// Returns an error if the operation fails or if the connection cannot be established.
129+
func (in *ComponentCache) DeleteComponent(uid string) error {
130+
conn, err := in.pool.Take(context.Background())
131+
if err != nil {
132+
return err
133+
}
134+
defer in.pool.Put(conn)
135+
136+
query := `DELETE FROM component WHERE uid = ?`
137+
return sqlitex.ExecuteTransient(conn, query, &sqlitex.ExecOptions{
138+
Args: []any{uid},
139+
})
140+
}
141+
126142
// HealthScore returns a percentage of healthy components to total components in the cluster.
127143
// The percentage is calculated as the number of healthy components divided by the total number of components.
128144
// Returns an int value between 0 and 100, where 100 indicates all components are healthy.
@@ -135,7 +151,7 @@ func (in *ComponentCache) HealthScore() (int64, error) {
135151
defer in.pool.Put(conn)
136152

137153
var ratio int64
138-
err = sqlitex.ExecuteTransient(conn, healthyComponentsRatio, &sqlitex.ExecOptions{
154+
err = sqlitex.ExecuteTransient(conn, clusterHealthScore, &sqlitex.ExecOptions{
139155
ResultFunc: func(stmt *sqlite.Stmt) error {
140156
ratio = stmt.ColumnInt64(0)
141157
return nil
@@ -144,22 +160,81 @@ func (in *ComponentCache) HealthScore() (int64, error) {
144160
return ratio, err
145161
}
146162

147-
// Delete removes a component from the cache by its unique identifier.
148-
// It takes a uid string parameter identifying the component to delete.
163+
// SetPod stores or updates a pod's attributes in the cache.
164+
// It takes pod name, namespace, uid, node name and creation timestamp as parameters.
165+
// If a pod with the same UID exists, it will be updated; otherwise, a new entry is created.
166+
// Returns an error if the database operation fails or if the connection cannot be established.
167+
func (in *ComponentCache) SetPod(name, namespace, uid, node string, createdAt int64) error {
168+
conn, err := in.pool.Take(context.Background())
169+
if err != nil {
170+
return err
171+
}
172+
defer in.pool.Put(conn)
173+
174+
return sqlitex.ExecuteTransient(conn, setPod, &sqlitex.ExecOptions{
175+
Args: []interface{}{
176+
name,
177+
namespace,
178+
uid,
179+
node,
180+
createdAt,
181+
},
182+
})
183+
}
184+
185+
// DeletePod removes a pod from the cache by its unique identifier.
186+
// It takes a uid string parameter identifying the pod to delete.
149187
// Returns an error if the operation fails or if the connection cannot be established.
150-
func (in *ComponentCache) Delete(uid string) error {
188+
func (in *ComponentCache) DeletePod(uid string) error {
151189
conn, err := in.pool.Take(context.Background())
152190
if err != nil {
153191
return err
154192
}
155193
defer in.pool.Put(conn)
156194

157-
query := `DELETE FROM Component WHERE uid = ?`
195+
query := `DELETE FROM pod WHERE uid = ?`
158196
return sqlitex.ExecuteTransient(conn, query, &sqlitex.ExecOptions{
159197
Args: []any{uid},
160198
})
161199
}
162200

201+
// NodeStatistics returns a list of node statistics including the node name and count of pending pods
202+
// that were created more than 5 minutes ago. Each NodeStatisticAttributes contains the node name and
203+
// the number of pending pods for that node. The health field is currently not implemented.
204+
// Returns an error if the database operation fails or if the connection cannot be established.
205+
func (in *ComponentCache) NodeStatistics() ([]*client.NodeStatisticAttributes, error) {
206+
conn, err := in.pool.Take(context.Background())
207+
if err != nil {
208+
return nil, err
209+
}
210+
defer in.pool.Put(conn)
211+
212+
result := make([]*client.NodeStatisticAttributes, 0)
213+
err = sqlitex.ExecuteTransient(conn, nodeStatistics, &sqlitex.ExecOptions{
214+
ResultFunc: func(stmt *sqlite.Stmt) error {
215+
pendingPods := stmt.ColumnInt64(1)
216+
result = append(result, &client.NodeStatisticAttributes{
217+
Name: lo.ToPtr(stmt.ColumnText(0)),
218+
PendingPods: &pendingPods,
219+
Health: nodeHealth(pendingPods),
220+
})
221+
return nil
222+
},
223+
})
224+
return result, err
225+
}
226+
227+
func nodeHealth(pendingPods int64) *client.NodeStatisticHealth {
228+
switch {
229+
case pendingPods == 0:
230+
return lo.ToPtr(client.NodeStatisticHealthHealthy)
231+
case pendingPods <= 3:
232+
return lo.ToPtr(client.NodeStatisticHealthWarning)
233+
default:
234+
return lo.ToPtr(client.NodeStatisticHealthFailed)
235+
}
236+
}
237+
163238
// Close closes the connection pool and cleans up temporary file if necessary
164239
func (in *ComponentCache) Close() error {
165240
mutex.Lock()
@@ -216,17 +291,17 @@ func (in *ComponentCache) init() error {
216291
}
217292

218293
in.pool = pool
219-
return in.initTable()
294+
return in.initTables()
220295
}
221296

222-
func (in *ComponentCache) initTable() error {
297+
func (in *ComponentCache) initTables() error {
223298
conn, err := in.pool.Take(context.Background())
224299
if err != nil {
225300
return err
226301
}
227302
defer in.pool.Put(conn)
228303

229-
return sqlitex.ExecuteScript(conn, createTable, nil)
304+
return sqlitex.ExecuteScript(conn, createTables, nil)
230305
}
231306

232307
// Option represents a function that configures the ComponentCache

pkg/cache/db/cache_bench_test.go

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ import (
1616
// deleting components, and a combined workload of setting and retrieving.
1717
//
1818
// Results show that the in-memory cache is generally faster than the file-based cache,
19-
// especially for write operations like Set. Here's a summary of the performance comparison:
19+
// especially for write operations like SetComponent. Here's a summary of the performance comparison:
2020
//
21-
// 1. Set: In-memory is ~60x faster than file-based
22-
// 2. Children: In-memory is ~1.1x faster than file-based
23-
// 3. Delete: In-memory is ~1.6x faster than file-based
24-
// 4. SetAndChildren: In-memory is ~3.4x faster than file-based
21+
// 1. SetComponent: In-memory is ~60x faster than file-based
22+
// 2. ComponentChildren: In-memory is ~1.1x faster than file-based
23+
// 3. DeleteComponent: In-memory is ~1.6x faster than file-based
24+
// 4. SetComponentAndComponentChildren AndChildren: In-memory is ~3.4x faster than file-based
2525
//
26-
// The performance difference is most significant for write operations (Set)
26+
// The performance difference is most significant for write operations (SetComponent)
2727
// because file-based operations involve disk I/O, which is much slower than memory access.
28-
// Read operations (Children) show a smaller performance gap because SQLite's query
28+
// Read operations (ComponentChildren) show a smaller performance gap because SQLite's query
2929
// optimization works well for both storage modes.
3030

3131
const (
@@ -50,7 +50,7 @@ func setupTestData(b *testing.B, cache *db.ComponentCache) {
5050
State: &state,
5151
}
5252

53-
err := cache.Set(rootComponent)
53+
err := cache.SetComponent(rootComponent)
5454
require.NoError(b, err)
5555

5656
// Create 10 first-level children
@@ -67,7 +67,7 @@ func setupTestData(b *testing.B, cache *db.ComponentCache) {
6767
State: &state,
6868
}
6969

70-
err := cache.Set(component)
70+
err := cache.SetComponent(component)
7171
require.NoError(b, err)
7272

7373
// Create 5 second-level children for each first-level child
@@ -84,7 +84,7 @@ func setupTestData(b *testing.B, cache *db.ComponentCache) {
8484
State: &state,
8585
}
8686

87-
err := cache.Set(childComponent)
87+
err := cache.SetComponent(childComponent)
8888
require.NoError(b, err)
8989

9090
// Create 3 third-level children for each second-level child
@@ -101,7 +101,7 @@ func setupTestData(b *testing.B, cache *db.ComponentCache) {
101101
State: &state,
102102
}
103103

104-
err := cache.Set(grandchildComponent)
104+
err := cache.SetComponent(grandchildComponent)
105105
require.NoError(b, err)
106106
}
107107
}
@@ -122,8 +122,8 @@ func BenchmarkMemoryCache(b *testing.B) {
122122
cache := db.GetComponentCache()
123123
defer cache.Close()
124124

125-
// Run the Set benchmark
126-
b.Run("Set", func(b *testing.B) {
125+
// Run the SetComponent benchmark
126+
b.Run("SetComponent", func(b *testing.B) {
127127
state := client.ComponentState("Healthy")
128128
group := testGroup
129129
namespace := testNamespace
@@ -143,39 +143,39 @@ func BenchmarkMemoryCache(b *testing.B) {
143143
}
144144
i++
145145

146-
err := cache.Set(component)
146+
err := cache.SetComponent(component)
147147
require.NoError(b, err)
148148
}
149149
})
150150

151151
// Setup test data for the remaining benchmarks
152152
setupTestData(b, cache)
153153

154-
// Run the Children benchmark
155-
b.Run("Children", func(b *testing.B) {
154+
// Run the ComponentChildren benchmark
155+
b.Run("ComponentChildren", func(b *testing.B) {
156156
b.ResetTimer()
157157
for b.Loop() {
158-
children, err := cache.Children("root-uid")
158+
children, err := cache.ComponentChildren("root-uid")
159159
require.NoError(b, err)
160160
require.NotEmpty(b, children)
161161
}
162162
})
163163

164-
// Run the Delete benchmark
165-
b.Run("Delete", func(b *testing.B) {
164+
// Run the DeleteComponent benchmark
165+
b.Run("DeleteComponent", func(b *testing.B) {
166166
b.ResetTimer()
167167
for i := 0; i < b.N; i++ {
168-
// Delete a level 1 component (which should cascade to its children)
169-
err := cache.Delete("level1-uid-" + string(rune('a'+i%10)))
168+
// DeleteComponent a level 1 component (which should cascade to its children)
169+
err := cache.DeleteComponent("level1-uid-" + string(rune('a'+i%10)))
170170
require.NoError(b, err)
171171
}
172172
})
173173

174174
// Setup test data again for the combined benchmark
175175
setupTestData(b, cache)
176176

177-
// Run the SetAndChildren benchmark
178-
b.Run("SetAndChildren", func(b *testing.B) {
177+
// Run the SetComponentAndComponentChildren benchmark
178+
b.Run("SetComponentAndComponentChildren", func(b *testing.B) {
179179
state := client.ComponentState("Healthy")
180180
group := testGroup
181181
namespace := testNamespace
@@ -196,11 +196,11 @@ func BenchmarkMemoryCache(b *testing.B) {
196196
State: &state,
197197
}
198198

199-
err := cache.Set(component)
199+
err := cache.SetComponent(component)
200200
require.NoError(b, err)
201201

202202
// Retrieve children
203-
children, err := cache.Children("root-uid")
203+
children, err := cache.ComponentChildren("root-uid")
204204
require.NoError(b, err)
205205
require.NotEmpty(b, children)
206206
}
@@ -227,8 +227,8 @@ func BenchmarkFileCache(b *testing.B) {
227227
_ = os.Remove(benchDBFile)
228228
}()
229229

230-
// Run the Set benchmark
231-
b.Run("Set", func(b *testing.B) {
230+
// Run the SetComponent benchmark
231+
b.Run("SetComponent", func(b *testing.B) {
232232
state := client.ComponentState("Healthy")
233233
group := testGroup
234234
namespace := testNamespace
@@ -246,30 +246,30 @@ func BenchmarkFileCache(b *testing.B) {
246246
State: &state,
247247
}
248248

249-
err := cache.Set(component)
249+
err := cache.SetComponent(component)
250250
require.NoError(b, err)
251251
}
252252
})
253253

254254
// Setup test data for the remaining benchmarks
255255
setupTestData(b, cache)
256256

257-
// Run the Children benchmark
258-
b.Run("Children", func(b *testing.B) {
257+
// Run the ComponentChildren benchmark
258+
b.Run("ComponentChildren", func(b *testing.B) {
259259
b.ResetTimer()
260260
for i := 0; i < b.N; i++ {
261-
children, err := cache.Children("root-uid")
261+
children, err := cache.ComponentChildren("root-uid")
262262
require.NoError(b, err)
263263
require.NotEmpty(b, children)
264264
}
265265
})
266266

267-
// Run the Delete benchmark
268-
b.Run("Delete", func(b *testing.B) {
267+
// Run the DeleteComponent benchmark
268+
b.Run("DeleteComponent", func(b *testing.B) {
269269
b.ResetTimer()
270270
for i := 0; i < b.N; i++ {
271-
// Delete a level 1 component (which should cascade to its children)
272-
err := cache.Delete("level1-uid-" + string(rune('a'+i%10)))
271+
// DeleteComponent a level 1 component (which should cascade to its children)
272+
err := cache.DeleteComponent("level1-uid-" + string(rune('a'+i%10)))
273273
require.NoError(b, err)
274274
}
275275
})
@@ -299,11 +299,11 @@ func BenchmarkFileCache(b *testing.B) {
299299
State: &state,
300300
}
301301

302-
err := cache.Set(component)
302+
err := cache.SetComponent(component)
303303
require.NoError(b, err)
304304

305305
// Retrieve children
306-
children, err := cache.Children("root-uid")
306+
children, err := cache.ComponentChildren("root-uid")
307307
require.NoError(b, err)
308308
require.NotEmpty(b, children)
309309
}

0 commit comments

Comments
 (0)