Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,12 @@ func main() {

// Initialize commitments API for LIQUID interface (Postgres-backed usage reporting).
commitmentsConfig := conf.GetConfigOrDie[commitments.Config]()
var commitmentsUsageDB commitments.UsageDBClient
var commitmentsVMSource reservations.VMSource
if commitmentsConfig.DatasourceName != "" {
commitmentsUsageDB = commitments.NewDBUsageClient(multiclusterClient, commitmentsConfig.DatasourceName)
commitmentsVMSource = reservations.NewPostgresVMSource(multiclusterClient, commitmentsConfig.DatasourceName)
}
if slices.Contains(mainConfig.EnabledControllers, "committed-resource-reservations-controller") {
commitmentsAPI := commitmentsapi.NewAPIWithConfig(multiclusterClient, commitmentsConfig.API, commitmentsUsageDB)
commitmentsAPI := commitmentsapi.NewAPIWithConfig(multiclusterClient, commitmentsConfig.API, commitmentsVMSource)
commitmentsAPI.Init(mux, metrics.Registry, ctrl.Log.WithName("commitments-api"))
}

Expand Down Expand Up @@ -597,16 +597,16 @@ func main() {

usageReconcilerMonitor := commitments.NewUsageReconcilerMonitor()
metrics.Registry.MustRegister(&usageReconcilerMonitor)
if commitmentsUsageDB == nil {
if commitmentsVMSource == nil {
setupLog.Error(nil, "UsageReconciler requires a datasource but commitments.datasourceName is not configured — skipping")
} else {
usageReconcilerConf := commitmentsConfig.UsageReconciler
usageReconcilerConf.ApplyDefaults()
if err := (&commitments.UsageReconciler{
Client: multiclusterClient,
Conf: usageReconcilerConf,
UsageDB: commitmentsUsageDB,
Monitor: usageReconcilerMonitor,
Client: multiclusterClient,
Conf: usageReconcilerConf,
VMSource: commitmentsVMSource,
Monitor: usageReconcilerMonitor,
}).SetupWithManager(mgr, multiclusterClient); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CommittedResourceUsage")
os.Exit(1)
Expand Down Expand Up @@ -701,7 +701,7 @@ func main() {

// Create NovaReader and DBVMSource
novaReader := external.NewNovaReader(postgresReader)
vmSource := failover.NewDBVMSource(novaReader)
vmSource := reservations.NewDBVMSource(novaReader)

// Create the unified failover controller
// It handles both:
Expand Down Expand Up @@ -794,7 +794,7 @@ func main() {

// Create NovaReader and DBVMSource
novaReader := external.NewNovaReader(postgresReader)
vmSource := failover.NewDBVMSource(novaReader)
vmSource := reservations.NewDBVMSource(novaReader)

// Create the quota controller
quotaController := quota.NewQuotaController(
Expand Down
12 changes: 12 additions & 0 deletions internal/scheduling/external/nova.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type NovaReaderInterface interface {
GetAllFlavors(ctx context.Context) ([]nova.Flavor, error)
GetServerByID(ctx context.Context, serverID string) (*nova.Server, error)
GetFlavorByName(ctx context.Context, flavorName string) (*nova.Flavor, error)
// GetServersByProject returns all servers belonging to a specific project.
GetServersByProject(ctx context.Context, projectID string) ([]nova.Server, error)
// GetDeletedServerByID returns a deleted server by its ID from the deleted_servers table.
// Returns nil, nil if the server is not found in the deleted_servers table.
GetDeletedServerByID(ctx context.Context, serverID string) (*nova.DeletedServer, error)
Expand Down Expand Up @@ -83,6 +85,16 @@ func (r *NovaReader) GetAllAggregates(ctx context.Context) ([]nova.Aggregate, er
return aggregates, nil
}

// GetServersByProject returns all Nova servers belonging to a specific project.
func (r *NovaReader) GetServersByProject(ctx context.Context, projectID string) ([]nova.Server, error) {
var servers []nova.Server
query := "SELECT * FROM " + nova.Server{}.TableName() + " WHERE tenant_id = $1"
if err := r.Select(ctx, &servers, query, projectID); err != nil {
return nil, fmt.Errorf("failed to query servers by project: %w", err)
}
return servers, nil
}

// GetServerByID returns a Nova server by its ID.
// Returns nil, nil if the server is not found.
func (r *NovaReader) GetServerByID(ctx context.Context, serverID string) (*nova.Server, error) {
Expand Down
7 changes: 4 additions & 3 deletions internal/scheduling/reservations/commitments/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"

"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
commitments "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/commitments"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -21,7 +22,7 @@ var apiLog = ctrl.Log.WithName("committed-resource-api").WithValues("module", "c
type HTTPAPI struct {
client client.Client
config commitments.APIConfig
usageDB commitments.UsageDBClient
usageDB reservations.VMSource
monitor ChangeCommitmentsAPIMonitor
usageMonitor ReportUsageAPIMonitor
capacityMonitor ReportCapacityAPIMonitor
Expand All @@ -36,11 +37,11 @@ func NewAPI(client client.Client) *HTTPAPI {
}

// NewAPIWithConfig creates an HTTPAPI with the given config and optional usageDB client.
func NewAPIWithConfig(k8sClient client.Client, config commitments.APIConfig, usageDB commitments.UsageDBClient) *HTTPAPI {
func NewAPIWithConfig(k8sClient client.Client, config commitments.APIConfig, vmSource reservations.VMSource) *HTTPAPI {
return &HTTPAPI{
client: k8sClient,
config: config,
usageDB: usageDB,
usageDB: vmSource,
monitor: NewChangeCommitmentsAPIMonitor(),
usageMonitor: NewReportUsageAPIMonitor(),
capacityMonitor: NewReportCapacityAPIMonitor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
commitments "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/commitments"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -572,54 +573,64 @@ type ExpectedVMUsage struct {
}

// ============================================================================
// Mock UsageDBClient
// Mock VMSource
// ============================================================================

type mockUsageDBClient struct {
rows map[string][]commitments.VMRow // projectID -> rows
err error
type mockVMSource struct {
vms map[string][]reservations.VM // projectID -> VMs
err error
}

func newMockUsageDBClient() *mockUsageDBClient {
return &mockUsageDBClient{
rows: make(map[string][]commitments.VMRow),
}
func newMockVMSource() *mockVMSource {
return &mockVMSource{vms: make(map[string][]reservations.VM)}
}

func (m *mockUsageDBClient) ListProjectVMs(_ context.Context, projectID string) ([]commitments.VMRow, error) {
func (m *mockVMSource) ListVMsByProject(_ context.Context, projectID string) ([]reservations.VM, error) {
if m.err != nil {
return nil, m.err
}
return m.rows[projectID], nil
return m.vms[projectID], nil
}

func (m *mockUsageDBClient) addVM(vm *TestVMUsage) {
func (m *mockVMSource) ListVMs(_ context.Context) ([]reservations.VM, error) { return nil, nil }
func (m *mockVMSource) ListVMsOnHypervisors(_ context.Context, _ *hv1.HypervisorList, _ bool) ([]reservations.VM, error) {
return nil, nil
}
func (m *mockVMSource) GetVM(_ context.Context, _ string) (*reservations.VM, error) { return nil, nil }
func (m *mockVMSource) IsServerActive(_ context.Context, _ string) (bool, error) { return false, nil }
func (m *mockVMSource) GetDeletedVMInfo(_ context.Context, _ string) (*reservations.DeletedVMInfo, error) {
return nil, nil
}

func (m *mockVMSource) addVM(vm *TestVMUsage) {
extraSpecs := map[string]string{
"quota:hw_version": vm.Flavor.Group,
}
if vm.Flavor.VideoRAMMiB != nil {
extraSpecs["hw_video:ram_max_mb"] = strconv.FormatUint(*vm.Flavor.VideoRAMMiB, 10)
}
extrasJSON, _ := json.Marshal(extraSpecs) //nolint:errcheck // test helper, always valid
osType := vm.OSType
if osType == "" {
osType = "unknown"
}
row := commitments.VMRow{
ID: vm.UUID,
Name: vm.UUID,
Status: "ACTIVE",
Created: vm.CreatedAt.Format(time.RFC3339),
AZ: vm.AZ,
Hypervisor: vm.Host,
FlavorName: vm.Flavor.Name,
FlavorRAM: uint64(vm.Flavor.MemoryMB), //nolint:gosec
FlavorVCPUs: uint64(vm.Flavor.VCPUs), //nolint:gosec
FlavorDisk: vm.Flavor.DiskGB,
FlavorExtras: string(extrasJSON),
OSType: osType,
v := reservations.VM{
UUID: vm.UUID,
Name: vm.UUID,
Status: "ACTIVE",
CreatedAt: vm.CreatedAt.Format(time.RFC3339),
AvailabilityZone: vm.AZ,
CurrentHypervisor: vm.Host,
FlavorName: vm.Flavor.Name,
FlavorExtraSpecs: extraSpecs,
DiskGB: vm.Flavor.DiskGB,
OSType: osType,
ProjectID: vm.ProjectID,
Resources: map[string]resource.Quantity{
"memory": *resource.NewQuantity(vm.Flavor.MemoryMB*1024*1024, resource.BinarySI),
"vcpus": *resource.NewQuantity(vm.Flavor.VCPUs, resource.DecimalSI),
},
}
m.rows[vm.ProjectID] = append(m.rows[vm.ProjectID], row)
m.vms[vm.ProjectID] = append(m.vms[vm.ProjectID], v)
}

// ============================================================================
Expand All @@ -630,7 +641,7 @@ type UsageTestEnv struct {
T *testing.T
Scheme *runtime.Scheme
K8sClient client.Client
DBClient *mockUsageDBClient
DBClient *mockVMSource
FlavorGroups FlavorGroupsKnowledge
HTTPServer *httptest.Server
API *HTTPAPI
Expand Down Expand Up @@ -725,7 +736,7 @@ func newUsageTestEnv(
Build()

// Create mock DB client with VMs
dbClient := newMockUsageDBClient()
dbClient := newMockVMSource()
for _, vm := range vms {
dbClient.addVM(vm)
}
Expand All @@ -734,10 +745,10 @@ func newUsageTestEnv(
// CalculateUsage reads from this status, so the API returns the correct commitment assignments.
if len(crObjects) > 0 {
rec := &commitments.UsageReconciler{
Client: k8sClient,
Conf: commitments.UsageReconcilerConfig{CooldownInterval: metav1.Duration{Duration: 0}},
UsageDB: dbClient,
Monitor: commitments.NewUsageReconcilerMonitor(),
Client: k8sClient,
Conf: commitments.UsageReconcilerConfig{CooldownInterval: metav1.Duration{Duration: 0}},
VMSource: dbClient,
Monitor: commitments.NewUsageReconcilerMonitor(),
}
ctx := context.Background()
for _, obj := range crObjects {
Expand Down
Loading