Skip to content

Commit 22bff86

Browse files
committed
license: implement vCPU audit background writer for
self-hosted clusters Add background ticker infrastructure for tracking vCPU consumption in self-hosted clusters. A background goroutine fires hourly to collect vCPU data from status.GetVCPUs() and prepare it for writing to system.vcpu_hours_audit (table creation in follow-up). The writer detects license rotation and triggers an immediate write to prevent gaps in consumption tracking. It handles graceful shutdown with a 5-second timeout for a final audit record write. The actual SQL INSERT logic is stubbed with log.Dev.Infof() until the system table is created in a follow-up commit. Epic: CC-35515 Release note: None
1 parent 8cf7d10 commit 22bff86

7 files changed

Lines changed: 577 additions & 5 deletions

File tree

pkg/server/license/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ go_library(
88
"enforcer.go",
99
"license.go",
1010
"opts.go",
11+
"vcpu_audit.go",
1112
":gen-lictype-stringer", # keep
1213
],
1314
importpath = "github.com/cockroachdb/cockroach/pkg/server/license",
1415
visibility = ["//visibility:public"],
1516
deps = [
1617
"//pkg/keys",
18+
"//pkg/roachpb",
1719
"//pkg/server/license/licensepb",
20+
"//pkg/server/status",
1821
"//pkg/settings",
1922
"//pkg/settings/cluster",
2023
"//pkg/sql/isql",
@@ -25,6 +28,7 @@ go_library(
2528
"//pkg/util/envutil",
2629
"//pkg/util/log",
2730
"//pkg/util/metric",
31+
"//pkg/util/stop",
2832
"//pkg/util/syncutil",
2933
"//pkg/util/timeutil",
3034
"@com_github_cockroachdb_errors//:errors",
@@ -40,6 +44,7 @@ go_test(
4044
"license_decode_test.go",
4145
"license_enforcer_integration_test.go",
4246
"main_test.go",
47+
"vcpu_audit_test.go",
4348
],
4449
embed = [":license"],
4550
deps = [

pkg/server/license/enforcer.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,20 @@ type Enforcer struct {
9696
// in the metadata accessor. This is set to true during initialization if the
9797
// latest timestamp was not received.
9898
continueToPollMetadataAccessor atomic.Bool
99+
100+
// currentLicenseID is the license ID for the currently active license.
101+
// Used to write vCPU audit records and detect license rotation.
102+
// Stores sentinel value if no license is installed.
103+
currentLicenseID atomic.Value
104+
105+
// auditMu serializes license ID updates and rotation detection in
106+
// updateLicenseIDAndMaybeWrite.
107+
auditMu syncutil.Mutex
108+
109+
// nodeID is the node ID used in vCPU audit records. It is set once when
110+
// the audit writer starts via StartVCPUAuditWriter. A non-zero value
111+
// indicates the writer is active and audit records should be emitted.
112+
nodeID atomic.Int32
99113
}
100114

101115
type TestingKnobs struct {
@@ -124,6 +138,20 @@ type TestingKnobs struct {
124138
// OverrideTelemetryStatusReporter, if set, will set the telemetry status
125139
// reporter in Start().
126140
OverrideTelemetryStatusReporter TelemetryStatusReporter
141+
142+
// OverrideVCPUAuditInterval if set, overrides the 1-hour vCPU audit interval.
143+
// Used for testing to accelerate ticker firing.
144+
OverrideVCPUAuditInterval *time.Duration
145+
146+
// OverrideVCPUCount if set, overrides the value returned by status.GetVCPUs().
147+
// Used for testing to provide deterministic vCPU counts.
148+
OverrideVCPUCount *float64
149+
150+
// OnAuditRecordWritten if set, is called each time writeVCPUAuditRecord
151+
// fires, both on normal ticker intervals and on immediate writes triggered
152+
// by license rotation. Intended for use with an atomic counter or channel
153+
// to verify write behavior in tests.
154+
OnAuditRecordWritten func()
127155
}
128156

129157
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
@@ -197,7 +225,7 @@ func (e *Enforcer) Start(ctx context.Context, st *cluster.Settings, opts ...Opti
197225
// must be done after setting the cluster init grace period timestamp. And it
198226
// is needed for testing that may be running this in isolation to the license
199227
// ccl package.
200-
e.RefreshForLicenseChange(ctx, LicTypeNone, time.Time{})
228+
e.RefreshForLicenseChange(ctx, LicTypeNone, time.Time{}, nil /* licenseID */)
201229

202230
// Register a callback so that we refresh our state whenever the license
203231
// changes. This will also update the state for the current license if not
@@ -455,8 +483,11 @@ func (e *Enforcer) MaybeFailIfThrottled(
455483
// information to optimize enforcement. Instead of reading the license from the
456484
// settings, unmarshaling it, and checking its type and expiry each time,
457485
// caching the information improves efficiency since licenses change infrequently.
486+
//
487+
// The licenseID parameter is used to detect license rotation for vCPU audit purposes.
488+
// Pass nil if no license is installed.
458489
func (e *Enforcer) RefreshForLicenseChange(
459-
ctx context.Context, licType LicType, licenseExpiry time.Time,
490+
ctx context.Context, licType LicType, licenseExpiry time.Time, licenseID []byte,
460491
) {
461492
e.hasLicense.Store(licType != LicTypeNone)
462493
e.licenseExpiryTS.Store(licenseExpiry.Unix())
@@ -491,6 +522,9 @@ func (e *Enforcer) RefreshForLicenseChange(
491522
}
492523
sb.Printf("telemetry required: %t", e.licenseRequiresTelemetry.Load())
493524
log.Dev.Infof(ctx, "%s", sb.RedactableString())
525+
526+
// Detect license rotation and trigger immediate vCPU audit write if needed.
527+
e.updateLicenseIDAndMaybeWrite(ctx, licenseID)
494528
}
495529

496530
// UpdateTrialLicenseExpiry updates the expiration timestamp of trial license

pkg/server/license/enforcer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ func TestThrottle(t *testing.T) {
288288
license.WithTelemetryStatusReporter(&mockTelemetryStatusReporter{lastPingTime: &tc.lastTelemetryPingTime}),
289289
)
290290
require.NoError(t, err)
291-
e.RefreshForLicenseChange(ctx, tc.licType, tc.licExpiry)
291+
e.RefreshForLicenseChange(ctx, tc.licType, tc.licExpiry, nil /* licenseID */)
292292
notice, err := e.TestingMaybeFailIfThrottled(ctx, tc.openTxnsCount)
293293
if tc.expectedErrRegex == "" {
294294
require.NoError(t, err)
@@ -371,7 +371,7 @@ func TestThrottleErrorMsg(t *testing.T) {
371371

372372
// Set up a free license that will expire in 30 days
373373
licenseEnforcer := srv.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig).LicenseEnforcer
374-
licenseEnforcer.RefreshForLicenseChange(ctx, license.LicTypeFree, t30d)
374+
licenseEnforcer.RefreshForLicenseChange(ctx, license.LicTypeFree, t30d, nil /* licenseID */)
375375

376376
for _, tc := range []struct {
377377
desc string

pkg/server/license/license.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,12 @@ func registerCallbackOnLicenseChange(
177177
}
178178
var licenseType LicType
179179
var licenseExpiry time.Time
180+
var licenseID []byte
180181
if lic == nil {
181182
licenseType = LicTypeNone
182183
} else {
183184
licenseExpiry = timeutil.Unix(lic.ValidUntilUnixSec, 0)
185+
licenseID = lic.LicenseId
184186
switch lic.Type {
185187
case licensepb.License_Free:
186188
licenseType = LicTypeFree
@@ -192,7 +194,7 @@ func registerCallbackOnLicenseChange(
192194
licenseType = LicTypeEnterprise
193195
}
194196
}
195-
licenseEnforcer.RefreshForLicenseChange(ctx, licenseType, licenseExpiry)
197+
licenseEnforcer.RefreshForLicenseChange(ctx, licenseType, licenseExpiry, licenseID)
196198

197199
err = licenseEnforcer.UpdateTrialLicenseExpiry(
198200
ctx, licenseType, isChange, licenseExpiry.Unix())

pkg/server/license/vcpu_audit.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// This file implements vCPU consumption tracking for self-hosted clusters.
7+
//
8+
// Background: CockroachDB's usage-based licensing model requires tracking
9+
// vCPU-hours consumed by each node. This package writes hourly audit records
10+
// to system.vcpu_hours_audit for later export via the `cockroach license audit` CLI.
11+
//
12+
// The audit system operates independently of license enforcement - it records
13+
// consumption data even when no license is installed or when licenses expire.
14+
15+
package license
16+
17+
import (
18+
"bytes"
19+
"context"
20+
"time"
21+
22+
"github.com/cockroachdb/cockroach/pkg/roachpb"
23+
"github.com/cockroachdb/cockroach/pkg/server/status"
24+
"github.com/cockroachdb/cockroach/pkg/util/log"
25+
"github.com/cockroachdb/cockroach/pkg/util/stop"
26+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
27+
"github.com/cockroachdb/errors"
28+
)
29+
30+
const (
31+
// defaultVCPUAuditInterval is how often we write vCPU consumption records.
32+
defaultVCPUAuditInterval = 1 * time.Hour
33+
34+
// vcpuAuditShutdownTimeout is the maximum time we wait to write the final
35+
// audit record during shutdown.
36+
vcpuAuditShutdownTimeout = 5 * time.Second
37+
38+
// noLicenseID is the sentinel value used when no license is installed.
39+
// This matches the CLI behavior matrix in the design doc.
40+
noLicenseID = "no-license"
41+
)
42+
43+
// vcpuAuditData contains the data needed to write a vCPU audit record.
44+
type vcpuAuditData struct {
45+
// licenseID is the unique identifier for the license.
46+
licenseID []byte
47+
48+
// vcpuCount is the number of vCPUs allocated to this node.
49+
vcpuCount float64
50+
}
51+
52+
// getVCPUAuditData collects the current vCPU count and license ID.
53+
// This is the data needed to write one audit record.
54+
func (e *Enforcer) getVCPUAuditData(ctx context.Context) vcpuAuditData {
55+
data := vcpuAuditData{}
56+
57+
// Get vCPU count from status package (cgroup-aware).
58+
data.vcpuCount = status.GetVCPUs(ctx)
59+
// Allow test override for deterministic vCPU counts.
60+
if tk := e.GetTestingKnobs(); tk != nil && tk.OverrideVCPUCount != nil {
61+
data.vcpuCount = *tk.OverrideVCPUCount
62+
}
63+
64+
licIDRaw := e.currentLicenseID.Load()
65+
if licIDRaw != nil {
66+
data.licenseID = licIDRaw.([]byte)
67+
} else {
68+
data.licenseID = []byte(noLicenseID)
69+
}
70+
71+
return data
72+
}
73+
74+
// getVCPUAuditInterval returns the interval for writing vCPU audit records.
75+
// Can be overridden for testing.
76+
func (e *Enforcer) getVCPUAuditInterval() time.Duration {
77+
if tk := e.GetTestingKnobs(); tk != nil && tk.OverrideVCPUAuditInterval != nil {
78+
return *tk.OverrideVCPUAuditInterval
79+
}
80+
return defaultVCPUAuditInterval
81+
}
82+
83+
// TestingGetCurrentLicenseID returns the currently cached license ID.
84+
// Used for testing license rotation detection.
85+
func (e *Enforcer) TestingGetCurrentLicenseID() []byte {
86+
val := e.currentLicenseID.Load()
87+
if val == nil {
88+
return nil
89+
}
90+
return val.([]byte)
91+
}
92+
93+
// VCPUAuditDataForTest exposes vcpuAuditData for testing.
94+
type VCPUAuditDataForTest struct {
95+
LicenseID []byte
96+
VCPUCount float64
97+
}
98+
99+
// GetVCPUAuditDataForTest returns the current vCPU audit data for testing.
100+
func (e *Enforcer) GetVCPUAuditDataForTest(ctx context.Context) VCPUAuditDataForTest {
101+
data := e.getVCPUAuditData(ctx)
102+
return VCPUAuditDataForTest{
103+
LicenseID: data.licenseID,
104+
VCPUCount: data.vcpuCount,
105+
}
106+
}
107+
108+
// writeVCPUAuditRecord writes a single vCPU consumption record for the given hour.
109+
// TODO(sadaf-crl): Replace log.Infof with actual SQL INSERT when
110+
// system.vcpu_hours_audit table is available.
111+
func (e *Enforcer) writeVCPUAuditRecord(ctx context.Context, hourTimestamp time.Time) {
112+
nodeID := roachpb.NodeID(e.nodeID.Load())
113+
data := e.getVCPUAuditData(ctx)
114+
115+
// Round timestamp to hour boundary.
116+
hourTimestamp = hourTimestamp.Truncate(time.Hour)
117+
118+
// TODO(sadaf-crl): Replace this log statement with SQL INSERT:
119+
// INSERT INTO system.vcpu_hours_audit (node_id, license_id, hour_timestamp, num_vcpu)
120+
// VALUES ($1, $2, $3, $4)
121+
log.Dev.Infof(ctx,
122+
"TODO: write vcpu audit record: node_id=%d license_id=%x hour=%s vcpu_count=%.2f",
123+
nodeID, data.licenseID, hourTimestamp.UTC().Format(time.RFC3339), data.vcpuCount)
124+
125+
if tk := e.GetTestingKnobs(); tk != nil && tk.OnAuditRecordWritten != nil {
126+
tk.OnAuditRecordWritten()
127+
}
128+
}
129+
130+
// updateLicenseIDAndMaybeWrite checks if the license ID changed and triggers
131+
// an immediate vCPU audit write to prevent gaps in consumption tracking.
132+
// This is called from RefreshForLicenseChange when the license is updated.
133+
func (e *Enforcer) updateLicenseIDAndMaybeWrite(ctx context.Context, newLicenseID []byte) {
134+
var licenseIDCopy []byte
135+
if newLicenseID != nil {
136+
licenseIDCopy = append([]byte(nil), newLicenseID...)
137+
} else {
138+
licenseIDCopy = []byte(noLicenseID)
139+
}
140+
141+
// Serialize the read-update-compare cycle so that each license change
142+
// triggers at most one immediate write.
143+
var shouldWrite bool
144+
func() {
145+
e.auditMu.Lock()
146+
defer e.auditMu.Unlock()
147+
148+
prevLicenseIDRaw := e.currentLicenseID.Load()
149+
var prevLicenseID []byte
150+
if prevLicenseIDRaw != nil {
151+
prevLicenseID = prevLicenseIDRaw.([]byte)
152+
}
153+
154+
e.currentLicenseID.Store(licenseIDCopy)
155+
156+
shouldWrite = !bytes.Equal(prevLicenseID, licenseIDCopy) && e.nodeID.Load() != 0
157+
}()
158+
159+
if shouldWrite {
160+
log.Dev.Infof(ctx, "license rotation detected, writing immediate vCPU audit record")
161+
e.writeVCPUAuditRecord(ctx, timeutil.Now())
162+
}
163+
}
164+
165+
// StartVCPUAuditWriter starts a background goroutine that writes vCPU audit
166+
// records at regular intervals (default: hourly). The writer will:
167+
// - Write one record per hour to system.vcpu_hours_audit
168+
// - Attempt a final write on graceful shutdown (with timeout)
169+
//
170+
// This should be called once during enforcer initialization, after Start().
171+
// Returns an error if called multiple times or with invalid nodeID.
172+
func (e *Enforcer) StartVCPUAuditWriter(
173+
ctx context.Context, stopper *stop.Stopper, nodeID roachpb.NodeID,
174+
) error {
175+
if nodeID == 0 {
176+
return errors.AssertionFailedf("invalid nodeID: %d", nodeID)
177+
}
178+
179+
// CAS 0 → nodeID atomically marks the writer as started and publishes the
180+
// node ID for concurrent license rotation handlers.
181+
if !e.nodeID.CompareAndSwap(0, int32(nodeID)) {
182+
return errors.New("vCPU audit writer already started")
183+
}
184+
185+
return stopper.RunAsyncTask(ctx, "vcpu-audit-writer", func(ctx context.Context) {
186+
// Write an initial record immediately so the first partial hour
187+
// is not lost (e.g. on node restart during a rolling upgrade).
188+
e.writeVCPUAuditRecord(ctx, timeutil.Now())
189+
ticker := time.NewTicker(e.getVCPUAuditInterval())
190+
defer ticker.Stop()
191+
192+
for {
193+
select {
194+
case <-ticker.C:
195+
e.writeVCPUAuditRecord(ctx, timeutil.Now())
196+
197+
case <-stopper.ShouldQuiesce():
198+
// Attempt final write with timeout on shutdown.
199+
_ = timeutil.RunWithTimeout(
200+
context.Background(), "vcpu-audit-shutdown", vcpuAuditShutdownTimeout,
201+
func(ctx context.Context) error {
202+
e.writeVCPUAuditRecord(ctx, timeutil.Now())
203+
return nil
204+
})
205+
log.Dev.Infof(ctx, "vcpu audit writer stopped")
206+
return
207+
}
208+
}
209+
})
210+
}

0 commit comments

Comments
 (0)