From 0b023cb9dc04d1fdb039c82fa2d131f51a19d5f1 Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 27 Apr 2026 17:25:42 +0300 Subject: [PATCH 1/3] wip(schema): scaffold dependentRequired through proto + storage + parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WIP for #193 — runtime check at config writes not yet wired. Compiles. - Adds DependentRequiredEntry proto + Schema.dependent_required field - Adds dependent_required jsonb column to schema_versions (squashed into 001 per alpha policy) - YAML parser accepts top-level dependentRequired key, lints existence + no-self-reference at schema-validate time - Domain SchemaVersion + storage layer plumbing through PG + memory store - ImportSchema validates and persists rules - New helpers in internal/schema/dependent_required.go: validate / marshal / UnmarshalDependentRequired / CheckDependentRequired (runtime check; not yet wired to config writes) Next: wire CheckDependentRequired into SetField/SetFields/ImportConfig INSIDE the existing transactions for race safety. Plus tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- api/centralconfig/v1/types.pb.go | 241 +++++++++++++------- cmd/server/openapi.json | 25 ++ db/migrations/001_initial_schema.sql | 19 +- db/queries/schemas.sql | 4 +- docs/api/openapi.swagger.json | 25 ++ internal/schema/convert.go | 6 + internal/schema/dependent_required.go | 129 +++++++++++ internal/schema/service.go | 36 +-- internal/schema/store.go | 4 + internal/schema/store_memory.go | 21 +- internal/schema/store_pg.go | 32 +-- internal/schema/yaml.go | 80 ++++++- internal/storage/dbstore/models.gen.go | 17 +- internal/storage/dbstore/schemas.sql.gen.go | 28 ++- internal/storage/domain/types.go | 20 +- proto/centralconfig/v1/types.proto | 23 ++ 16 files changed, 558 insertions(+), 152 deletions(-) create mode 100644 internal/schema/dependent_required.go diff --git a/api/centralconfig/v1/types.pb.go b/api/centralconfig/v1/types.pb.go index aaa96b01..598b0600 100644 --- a/api/centralconfig/v1/types.pb.go +++ b/api/centralconfig/v1/types.pb.go @@ -715,9 +715,17 @@ type Schema struct { // When this version was created. CreatedAt *timestamppb.Timestamp `protobuf:"bytes,10,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // Optional schema metadata: ownership, contact, labels. - Info *SchemaInfo `protobuf:"bytes,11,opt,name=info,proto3" json:"info,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + Info *SchemaInfo `protobuf:"bytes,11,opt,name=info,proto3" json:"info,omitempty"` + // Cross-field "B required when A present" rules. Each entry declares one + // trigger field whose presence (non-null value) makes a list of dependent + // field paths required (also non-null). Equivalent to JSON Schema 2020-12 + // dependentRequired, scoped to schema-level cross-field requirement. + // Lint-checked at ImportSchema time (every path must reference a real + // field; trigger may not appear in its own dependents). Enforced at every + // config write against the post-merge snapshot. + DependentRequired []*DependentRequiredEntry `protobuf:"bytes,12,rep,name=dependent_required,json=dependentRequired,proto3" json:"dependent_required,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Schema) Reset() { @@ -827,6 +835,73 @@ func (x *Schema) GetInfo() *SchemaInfo { return nil } +func (x *Schema) GetDependentRequired() []*DependentRequiredEntry { + if x != nil { + return x.DependentRequired + } + return nil +} + +// DependentRequiredEntry encodes one cross-field requirement: when the +// trigger field has a non-null value, every dependent field path must also +// have a non-null value. This is the proto wire form of JSON Schema 2020-12 +// dependentRequired, which uses a `map>` shape — proto +// maps cannot hold repeated values directly, so we use a repeated list of +// entries. +type DependentRequiredEntry struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Field path whose presence triggers the requirement. + TriggerField string `protobuf:"bytes,1,opt,name=trigger_field,json=triggerField,proto3" json:"trigger_field,omitempty"` + // Field paths that must be present when the trigger has a non-null value. + DependentFields []string `protobuf:"bytes,2,rep,name=dependent_fields,json=dependentFields,proto3" json:"dependent_fields,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DependentRequiredEntry) Reset() { + *x = DependentRequiredEntry{} + mi := &file_centralconfig_v1_types_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DependentRequiredEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DependentRequiredEntry) ProtoMessage() {} + +func (x *DependentRequiredEntry) ProtoReflect() protoreflect.Message { + mi := &file_centralconfig_v1_types_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DependentRequiredEntry.ProtoReflect.Descriptor instead. +func (*DependentRequiredEntry) Descriptor() ([]byte, []int) { + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{7} +} + +func (x *DependentRequiredEntry) GetTriggerField() string { + if x != nil { + return x.TriggerField + } + return "" +} + +func (x *DependentRequiredEntry) GetDependentFields() []string { + if x != nil { + return x.DependentFields + } + return nil +} + // Tenant represents an organization or entity that has its own configuration // based on an assigned schema version. type Tenant struct { @@ -851,7 +926,7 @@ type Tenant struct { func (x *Tenant) Reset() { *x = Tenant{} - mi := &file_centralconfig_v1_types_proto_msgTypes[7] + mi := &file_centralconfig_v1_types_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -863,7 +938,7 @@ func (x *Tenant) String() string { func (*Tenant) ProtoMessage() {} func (x *Tenant) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[7] + mi := &file_centralconfig_v1_types_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -876,7 +951,7 @@ func (x *Tenant) ProtoReflect() protoreflect.Message { // Deprecated: Use Tenant.ProtoReflect.Descriptor instead. func (*Tenant) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{7} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{8} } func (x *Tenant) GetId() string { @@ -938,7 +1013,7 @@ type FieldLock struct { func (x *FieldLock) Reset() { *x = FieldLock{} - mi := &file_centralconfig_v1_types_proto_msgTypes[8] + mi := &file_centralconfig_v1_types_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -950,7 +1025,7 @@ func (x *FieldLock) String() string { func (*FieldLock) ProtoMessage() {} func (x *FieldLock) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[8] + mi := &file_centralconfig_v1_types_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -963,7 +1038,7 @@ func (x *FieldLock) ProtoReflect() protoreflect.Message { // Deprecated: Use FieldLock.ProtoReflect.Descriptor instead. func (*FieldLock) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{8} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{9} } func (x *FieldLock) GetTenantId() string { @@ -1008,7 +1083,7 @@ type TypedValue struct { func (x *TypedValue) Reset() { *x = TypedValue{} - mi := &file_centralconfig_v1_types_proto_msgTypes[9] + mi := &file_centralconfig_v1_types_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1020,7 +1095,7 @@ func (x *TypedValue) String() string { func (*TypedValue) ProtoMessage() {} func (x *TypedValue) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[9] + mi := &file_centralconfig_v1_types_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1033,7 +1108,7 @@ func (x *TypedValue) ProtoReflect() protoreflect.Message { // Deprecated: Use TypedValue.ProtoReflect.Descriptor instead. func (*TypedValue) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{9} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{10} } func (x *TypedValue) GetKind() isTypedValue_Kind { @@ -1194,7 +1269,7 @@ type ConfigValue struct { func (x *ConfigValue) Reset() { *x = ConfigValue{} - mi := &file_centralconfig_v1_types_proto_msgTypes[10] + mi := &file_centralconfig_v1_types_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1206,7 +1281,7 @@ func (x *ConfigValue) String() string { func (*ConfigValue) ProtoMessage() {} func (x *ConfigValue) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[10] + mi := &file_centralconfig_v1_types_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1219,7 +1294,7 @@ func (x *ConfigValue) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigValue.ProtoReflect.Descriptor instead. func (*ConfigValue) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{10} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{11} } func (x *ConfigValue) GetFieldPath() string { @@ -1274,7 +1349,7 @@ type ConfigVersion struct { func (x *ConfigVersion) Reset() { *x = ConfigVersion{} - mi := &file_centralconfig_v1_types_proto_msgTypes[11] + mi := &file_centralconfig_v1_types_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1286,7 +1361,7 @@ func (x *ConfigVersion) String() string { func (*ConfigVersion) ProtoMessage() {} func (x *ConfigVersion) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[11] + mi := &file_centralconfig_v1_types_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1299,7 +1374,7 @@ func (x *ConfigVersion) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigVersion.ProtoReflect.Descriptor instead. func (*ConfigVersion) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{11} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{12} } func (x *ConfigVersion) GetId() string { @@ -1359,7 +1434,7 @@ type Config struct { func (x *Config) Reset() { *x = Config{} - mi := &file_centralconfig_v1_types_proto_msgTypes[12] + mi := &file_centralconfig_v1_types_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1371,7 +1446,7 @@ func (x *Config) String() string { func (*Config) ProtoMessage() {} func (x *Config) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[12] + mi := &file_centralconfig_v1_types_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1384,7 +1459,7 @@ func (x *Config) ProtoReflect() protoreflect.Message { // Deprecated: Use Config.ProtoReflect.Descriptor instead. func (*Config) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{12} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{13} } func (x *Config) GetTenantId() string { @@ -1432,7 +1507,7 @@ type ConfigChange struct { func (x *ConfigChange) Reset() { *x = ConfigChange{} - mi := &file_centralconfig_v1_types_proto_msgTypes[13] + mi := &file_centralconfig_v1_types_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1444,7 +1519,7 @@ func (x *ConfigChange) String() string { func (*ConfigChange) ProtoMessage() {} func (x *ConfigChange) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[13] + mi := &file_centralconfig_v1_types_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1457,7 +1532,7 @@ func (x *ConfigChange) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigChange.ProtoReflect.Descriptor instead. func (*ConfigChange) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{13} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{14} } func (x *ConfigChange) GetTenantId() string { @@ -1539,7 +1614,7 @@ type AuditEntry struct { func (x *AuditEntry) Reset() { *x = AuditEntry{} - mi := &file_centralconfig_v1_types_proto_msgTypes[14] + mi := &file_centralconfig_v1_types_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1551,7 +1626,7 @@ func (x *AuditEntry) String() string { func (*AuditEntry) ProtoMessage() {} func (x *AuditEntry) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[14] + mi := &file_centralconfig_v1_types_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1564,7 +1639,7 @@ func (x *AuditEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use AuditEntry.ProtoReflect.Descriptor instead. func (*AuditEntry) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{14} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{15} } func (x *AuditEntry) GetId() string { @@ -1649,7 +1724,7 @@ type UsageStats struct { func (x *UsageStats) Reset() { *x = UsageStats{} - mi := &file_centralconfig_v1_types_proto_msgTypes[15] + mi := &file_centralconfig_v1_types_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1661,7 +1736,7 @@ func (x *UsageStats) String() string { func (*UsageStats) ProtoMessage() {} func (x *UsageStats) ProtoReflect() protoreflect.Message { - mi := &file_centralconfig_v1_types_proto_msgTypes[15] + mi := &file_centralconfig_v1_types_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1674,7 +1749,7 @@ func (x *UsageStats) ProtoReflect() protoreflect.Message { // Deprecated: Use UsageStats.ProtoReflect.Descriptor instead. func (*UsageStats) Descriptor() ([]byte, []int) { - return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{15} + return file_centralconfig_v1_types_proto_rawDescGZIP(), []int{16} } func (x *UsageStats) GetTenantId() string { @@ -1790,7 +1865,7 @@ const file_centralconfig_v1_types_proto_rawDesc = "" + "\rSchemaContact\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n" + "\x05email\x18\x02 \x01(\tR\x05email\x12\x10\n" + - "\x03url\x18\x03 \x01(\tR\x03url\"\xb6\x03\n" + + "\x03url\x18\x03 \x01(\tR\x03url\"\x8f\x04\n" + "\x06Schema\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\x12 \n" + @@ -1804,8 +1879,12 @@ const file_centralconfig_v1_types_proto_rawDesc = "" + "\n" + "created_at\x18\n" + " \x01(\v2\x1a.google.protobuf.TimestampR\tcreatedAt\x120\n" + - "\x04info\x18\v \x01(\v2\x1c.centralconfig.v1.SchemaInfoR\x04infoB\x11\n" + - "\x0f_parent_version\"\xe6\x01\n" + + "\x04info\x18\v \x01(\v2\x1c.centralconfig.v1.SchemaInfoR\x04info\x12W\n" + + "\x12dependent_required\x18\f \x03(\v2(.centralconfig.v1.DependentRequiredEntryR\x11dependentRequiredB\x11\n" + + "\x0f_parent_version\"h\n" + + "\x16DependentRequiredEntry\x12#\n" + + "\rtrigger_field\x18\x01 \x01(\tR\ftriggerField\x12)\n" + + "\x10dependent_fields\x18\x02 \x03(\tR\x0fdependentFields\"\xe6\x01\n" + "\x06Tenant\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n" + @@ -1923,58 +2002,60 @@ func file_centralconfig_v1_types_proto_rawDescGZIP() []byte { } var file_centralconfig_v1_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_centralconfig_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 18) +var file_centralconfig_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 19) var file_centralconfig_v1_types_proto_goTypes = []any{ - (FieldType)(0), // 0: centralconfig.v1.FieldType - (*FieldConstraints)(nil), // 1: centralconfig.v1.FieldConstraints - (*SchemaField)(nil), // 2: centralconfig.v1.SchemaField - (*FieldExample)(nil), // 3: centralconfig.v1.FieldExample - (*ExternalDocs)(nil), // 4: centralconfig.v1.ExternalDocs - (*SchemaInfo)(nil), // 5: centralconfig.v1.SchemaInfo - (*SchemaContact)(nil), // 6: centralconfig.v1.SchemaContact - (*Schema)(nil), // 7: centralconfig.v1.Schema - (*Tenant)(nil), // 8: centralconfig.v1.Tenant - (*FieldLock)(nil), // 9: centralconfig.v1.FieldLock - (*TypedValue)(nil), // 10: centralconfig.v1.TypedValue - (*ConfigValue)(nil), // 11: centralconfig.v1.ConfigValue - (*ConfigVersion)(nil), // 12: centralconfig.v1.ConfigVersion - (*Config)(nil), // 13: centralconfig.v1.Config - (*ConfigChange)(nil), // 14: centralconfig.v1.ConfigChange - (*AuditEntry)(nil), // 15: centralconfig.v1.AuditEntry - (*UsageStats)(nil), // 16: centralconfig.v1.UsageStats - nil, // 17: centralconfig.v1.SchemaField.ExamplesEntry - nil, // 18: centralconfig.v1.SchemaInfo.LabelsEntry - (*timestamppb.Timestamp)(nil), // 19: google.protobuf.Timestamp - (*durationpb.Duration)(nil), // 20: google.protobuf.Duration + (FieldType)(0), // 0: centralconfig.v1.FieldType + (*FieldConstraints)(nil), // 1: centralconfig.v1.FieldConstraints + (*SchemaField)(nil), // 2: centralconfig.v1.SchemaField + (*FieldExample)(nil), // 3: centralconfig.v1.FieldExample + (*ExternalDocs)(nil), // 4: centralconfig.v1.ExternalDocs + (*SchemaInfo)(nil), // 5: centralconfig.v1.SchemaInfo + (*SchemaContact)(nil), // 6: centralconfig.v1.SchemaContact + (*Schema)(nil), // 7: centralconfig.v1.Schema + (*DependentRequiredEntry)(nil), // 8: centralconfig.v1.DependentRequiredEntry + (*Tenant)(nil), // 9: centralconfig.v1.Tenant + (*FieldLock)(nil), // 10: centralconfig.v1.FieldLock + (*TypedValue)(nil), // 11: centralconfig.v1.TypedValue + (*ConfigValue)(nil), // 12: centralconfig.v1.ConfigValue + (*ConfigVersion)(nil), // 13: centralconfig.v1.ConfigVersion + (*Config)(nil), // 14: centralconfig.v1.Config + (*ConfigChange)(nil), // 15: centralconfig.v1.ConfigChange + (*AuditEntry)(nil), // 16: centralconfig.v1.AuditEntry + (*UsageStats)(nil), // 17: centralconfig.v1.UsageStats + nil, // 18: centralconfig.v1.SchemaField.ExamplesEntry + nil, // 19: centralconfig.v1.SchemaInfo.LabelsEntry + (*timestamppb.Timestamp)(nil), // 20: google.protobuf.Timestamp + (*durationpb.Duration)(nil), // 21: google.protobuf.Duration } var file_centralconfig_v1_types_proto_depIdxs = []int32{ 0, // 0: centralconfig.v1.SchemaField.type:type_name -> centralconfig.v1.FieldType 1, // 1: centralconfig.v1.SchemaField.constraints:type_name -> centralconfig.v1.FieldConstraints - 17, // 2: centralconfig.v1.SchemaField.examples:type_name -> centralconfig.v1.SchemaField.ExamplesEntry + 18, // 2: centralconfig.v1.SchemaField.examples:type_name -> centralconfig.v1.SchemaField.ExamplesEntry 4, // 3: centralconfig.v1.SchemaField.external_docs:type_name -> centralconfig.v1.ExternalDocs 6, // 4: centralconfig.v1.SchemaInfo.contact:type_name -> centralconfig.v1.SchemaContact - 18, // 5: centralconfig.v1.SchemaInfo.labels:type_name -> centralconfig.v1.SchemaInfo.LabelsEntry + 19, // 5: centralconfig.v1.SchemaInfo.labels:type_name -> centralconfig.v1.SchemaInfo.LabelsEntry 2, // 6: centralconfig.v1.Schema.fields:type_name -> centralconfig.v1.SchemaField - 19, // 7: centralconfig.v1.Schema.created_at:type_name -> google.protobuf.Timestamp + 20, // 7: centralconfig.v1.Schema.created_at:type_name -> google.protobuf.Timestamp 5, // 8: centralconfig.v1.Schema.info:type_name -> centralconfig.v1.SchemaInfo - 19, // 9: centralconfig.v1.Tenant.created_at:type_name -> google.protobuf.Timestamp - 19, // 10: centralconfig.v1.Tenant.updated_at:type_name -> google.protobuf.Timestamp - 19, // 11: centralconfig.v1.TypedValue.time_value:type_name -> google.protobuf.Timestamp - 20, // 12: centralconfig.v1.TypedValue.duration_value:type_name -> google.protobuf.Duration - 10, // 13: centralconfig.v1.ConfigValue.value:type_name -> centralconfig.v1.TypedValue - 19, // 14: centralconfig.v1.ConfigVersion.created_at:type_name -> google.protobuf.Timestamp - 11, // 15: centralconfig.v1.Config.values:type_name -> centralconfig.v1.ConfigValue - 10, // 16: centralconfig.v1.ConfigChange.old_value:type_name -> centralconfig.v1.TypedValue - 10, // 17: centralconfig.v1.ConfigChange.new_value:type_name -> centralconfig.v1.TypedValue - 19, // 18: centralconfig.v1.ConfigChange.changed_at:type_name -> google.protobuf.Timestamp - 19, // 19: centralconfig.v1.AuditEntry.created_at:type_name -> google.protobuf.Timestamp - 19, // 20: centralconfig.v1.UsageStats.last_read_at:type_name -> google.protobuf.Timestamp - 3, // 21: centralconfig.v1.SchemaField.ExamplesEntry.value:type_name -> centralconfig.v1.FieldExample - 22, // [22:22] is the sub-list for method output_type - 22, // [22:22] is the sub-list for method input_type - 22, // [22:22] is the sub-list for extension type_name - 22, // [22:22] is the sub-list for extension extendee - 0, // [0:22] is the sub-list for field type_name + 8, // 9: centralconfig.v1.Schema.dependent_required:type_name -> centralconfig.v1.DependentRequiredEntry + 20, // 10: centralconfig.v1.Tenant.created_at:type_name -> google.protobuf.Timestamp + 20, // 11: centralconfig.v1.Tenant.updated_at:type_name -> google.protobuf.Timestamp + 20, // 12: centralconfig.v1.TypedValue.time_value:type_name -> google.protobuf.Timestamp + 21, // 13: centralconfig.v1.TypedValue.duration_value:type_name -> google.protobuf.Duration + 11, // 14: centralconfig.v1.ConfigValue.value:type_name -> centralconfig.v1.TypedValue + 20, // 15: centralconfig.v1.ConfigVersion.created_at:type_name -> google.protobuf.Timestamp + 12, // 16: centralconfig.v1.Config.values:type_name -> centralconfig.v1.ConfigValue + 11, // 17: centralconfig.v1.ConfigChange.old_value:type_name -> centralconfig.v1.TypedValue + 11, // 18: centralconfig.v1.ConfigChange.new_value:type_name -> centralconfig.v1.TypedValue + 20, // 19: centralconfig.v1.ConfigChange.changed_at:type_name -> google.protobuf.Timestamp + 20, // 20: centralconfig.v1.AuditEntry.created_at:type_name -> google.protobuf.Timestamp + 20, // 21: centralconfig.v1.UsageStats.last_read_at:type_name -> google.protobuf.Timestamp + 3, // 22: centralconfig.v1.SchemaField.ExamplesEntry.value:type_name -> centralconfig.v1.FieldExample + 23, // [23:23] is the sub-list for method output_type + 23, // [23:23] is the sub-list for method input_type + 23, // [23:23] is the sub-list for extension type_name + 23, // [23:23] is the sub-list for extension extendee + 0, // [0:23] is the sub-list for field type_name } func init() { file_centralconfig_v1_types_proto_init() } @@ -1985,7 +2066,7 @@ func file_centralconfig_v1_types_proto_init() { file_centralconfig_v1_types_proto_msgTypes[0].OneofWrappers = []any{} file_centralconfig_v1_types_proto_msgTypes[1].OneofWrappers = []any{} file_centralconfig_v1_types_proto_msgTypes[6].OneofWrappers = []any{} - file_centralconfig_v1_types_proto_msgTypes[9].OneofWrappers = []any{ + file_centralconfig_v1_types_proto_msgTypes[10].OneofWrappers = []any{ (*TypedValue_IntegerValue)(nil), (*TypedValue_NumberValue)(nil), (*TypedValue_StringValue)(nil), @@ -1995,16 +2076,16 @@ func file_centralconfig_v1_types_proto_init() { (*TypedValue_UrlValue)(nil), (*TypedValue_JsonValue)(nil), } - file_centralconfig_v1_types_proto_msgTypes[10].OneofWrappers = []any{} - file_centralconfig_v1_types_proto_msgTypes[14].OneofWrappers = []any{} + file_centralconfig_v1_types_proto_msgTypes[11].OneofWrappers = []any{} file_centralconfig_v1_types_proto_msgTypes[15].OneofWrappers = []any{} + file_centralconfig_v1_types_proto_msgTypes[16].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_centralconfig_v1_types_proto_rawDesc), len(file_centralconfig_v1_types_proto_rawDesc)), NumEnums: 1, - NumMessages: 18, + NumMessages: 19, NumExtensions: 0, NumServices: 0, }, diff --git a/cmd/server/openapi.json b/cmd/server/openapi.json index 67d56d03..8990bfab 100644 --- a/cmd/server/openapi.json +++ b/cmd/server/openapi.json @@ -1741,6 +1741,23 @@ "v1DeleteTenantResponse": { "type": "object" }, + "v1DependentRequiredEntry": { + "type": "object", + "properties": { + "triggerField": { + "type": "string", + "description": "Field path whose presence triggers the requirement." + }, + "dependentFields": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Field paths that must be present when the trigger has a non-null value." + } + }, + "description": "DependentRequiredEntry encodes one cross-field requirement: when the\ntrigger field has a non-null value, every dependent field path must also\nhave a non-null value. This is the proto wire form of JSON Schema 2020-12\ndependentRequired, which uses a `map>` shape \u2014 proto\nmaps cannot hold repeated values directly, so we use a repeated list of\nentries." + }, "v1ExportConfigResponse": { "type": "object", "properties": { @@ -2205,6 +2222,14 @@ "info": { "$ref": "#/definitions/v1SchemaInfo", "description": "Optional schema metadata: ownership, contact, labels." + }, + "dependentRequired": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1DependentRequiredEntry" + }, + "description": "Cross-field \"B required when A present\" rules. Each entry declares one\ntrigger field whose presence (non-null value) makes a list of dependent\nfield paths required (also non-null). Equivalent to JSON Schema 2020-12\ndependentRequired, scoped to schema-level cross-field requirement.\nLint-checked at ImportSchema time (every path must reference a real\nfield; trigger may not appear in its own dependents). Enforced at every\nconfig write against the post-merge snapshot." } }, "description": "Schema represents a configuration schema template.\nSchemas define the allowed fields and their types for tenant configurations.\nEach schema is versioned \u2014 updates create new immutable versions." diff --git a/db/migrations/001_initial_schema.sql b/db/migrations/001_initial_schema.sql index 1fa3f96c..ac20aaa0 100644 --- a/db/migrations/001_initial_schema.sql +++ b/db/migrations/001_initial_schema.sql @@ -24,14 +24,17 @@ CREATE TABLE schemas ( ); CREATE TABLE schema_versions ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - schema_id UUID NOT NULL REFERENCES schemas(id) ON DELETE CASCADE, - version INT NOT NULL, - parent_version INT, - description TEXT, - checksum TEXT NOT NULL, - published BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + schema_id UUID NOT NULL REFERENCES schemas(id) ON DELETE CASCADE, + version INT NOT NULL, + parent_version INT, + description TEXT, + checksum TEXT NOT NULL, + published BOOLEAN NOT NULL DEFAULT false, + -- JSON array of {trigger_field, dependent_fields} entries encoding the + -- schema's dependentRequired rules. Empty array when no rules exist. + dependent_required JSONB NOT NULL DEFAULT '[]', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE(schema_id, version) ); diff --git a/db/queries/schemas.sql b/db/queries/schemas.sql index 3b991247..24cace0e 100644 --- a/db/queries/schemas.sql +++ b/db/queries/schemas.sql @@ -18,8 +18,8 @@ LIMIT $1 OFFSET $2; DELETE FROM schemas WHERE id = $1; -- name: CreateSchemaVersion :one -INSERT INTO schema_versions (schema_id, version, parent_version, description, checksum) -VALUES ($1, $2, $3, $4, $5) +INSERT INTO schema_versions (schema_id, version, parent_version, description, checksum, dependent_required) +VALUES ($1, $2, $3, $4, $5, $6) RETURNING *; -- name: GetSchemaVersion :one diff --git a/docs/api/openapi.swagger.json b/docs/api/openapi.swagger.json index 67d56d03..8990bfab 100644 --- a/docs/api/openapi.swagger.json +++ b/docs/api/openapi.swagger.json @@ -1741,6 +1741,23 @@ "v1DeleteTenantResponse": { "type": "object" }, + "v1DependentRequiredEntry": { + "type": "object", + "properties": { + "triggerField": { + "type": "string", + "description": "Field path whose presence triggers the requirement." + }, + "dependentFields": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Field paths that must be present when the trigger has a non-null value." + } + }, + "description": "DependentRequiredEntry encodes one cross-field requirement: when the\ntrigger field has a non-null value, every dependent field path must also\nhave a non-null value. This is the proto wire form of JSON Schema 2020-12\ndependentRequired, which uses a `map>` shape \u2014 proto\nmaps cannot hold repeated values directly, so we use a repeated list of\nentries." + }, "v1ExportConfigResponse": { "type": "object", "properties": { @@ -2205,6 +2222,14 @@ "info": { "$ref": "#/definitions/v1SchemaInfo", "description": "Optional schema metadata: ownership, contact, labels." + }, + "dependentRequired": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1DependentRequiredEntry" + }, + "description": "Cross-field \"B required when A present\" rules. Each entry declares one\ntrigger field whose presence (non-null value) makes a list of dependent\nfield paths required (also non-null). Equivalent to JSON Schema 2020-12\ndependentRequired, scoped to schema-level cross-field requirement.\nLint-checked at ImportSchema time (every path must reference a real\nfield; trigger may not appear in its own dependents). Enforced at every\nconfig write against the post-merge snapshot." } }, "description": "Schema represents a configuration schema template.\nSchemas define the allowed fields and their types for tenant configurations.\nEach schema is versioned \u2014 updates create new immutable versions." diff --git a/internal/schema/convert.go b/internal/schema/convert.go index 803bfc62..f60da244 100644 --- a/internal/schema/convert.go +++ b/internal/schema/convert.go @@ -38,6 +38,12 @@ func schemaToProto(s domain.Schema, v domain.SchemaVersion, fields []domain.Sche if v.Description != nil { result.VersionDescription = *v.Description } + if len(v.DependentRequired) > 0 { + var entries []*pb.DependentRequiredEntry + if err := json.Unmarshal(v.DependentRequired, &entries); err == nil && len(entries) > 0 { + result.DependentRequired = entries + } + } return result } diff --git a/internal/schema/dependent_required.go b/internal/schema/dependent_required.go new file mode 100644 index 00000000..3f110b67 --- /dev/null +++ b/internal/schema/dependent_required.go @@ -0,0 +1,129 @@ +package schema + +import ( + "encoding/json" + "fmt" + + pb "github.com/opendecree/decree/api/centralconfig/v1" +) + +// validateDependentRequiredAgainstFields lints a list of DependentRequiredEntry +// against the field set being imported: every trigger and every dependent +// must reference a real field, no trigger may list itself, and no dependent +// may appear twice under the same trigger. Mirrors the YAML-side check but +// runs over the proto representation, which is the form the rest of the +// import pipeline carries. +func validateDependentRequiredAgainstFields(entries []*pb.DependentRequiredEntry, fields []*pb.SchemaField) error { + if len(entries) == 0 { + return nil + } + known := make(map[string]struct{}, len(fields)) + for _, f := range fields { + known[f.Path] = struct{}{} + } + for _, e := range entries { + if _, ok := known[e.TriggerField]; !ok { + return fmt.Errorf("dependentRequired: trigger %q is not a defined field", e.TriggerField) + } + seen := make(map[string]struct{}, len(e.DependentFields)) + for _, dep := range e.DependentFields { + if dep == e.TriggerField { + return fmt.Errorf("dependentRequired: trigger %q cannot list itself as a dependent", e.TriggerField) + } + if _, ok := known[dep]; !ok { + return fmt.Errorf("dependentRequired: dependent %q (under trigger %q) is not a defined field", dep, e.TriggerField) + } + if _, dup := seen[dep]; dup { + return fmt.Errorf("dependentRequired: dependent %q listed twice under trigger %q", dep, e.TriggerField) + } + seen[dep] = struct{}{} + } + } + return nil +} + +// marshalDependentRequired encodes proto DependentRequiredEntry list as the +// JSON array stored in the schema_versions.dependent_required column. Always +// returns valid JSON — `[]` for empty input — so the column never holds NULL +// or junk. +func marshalDependentRequired(entries []*pb.DependentRequiredEntry) ([]byte, error) { + if len(entries) == 0 { + return []byte("[]"), nil + } + type wireEntry struct { + TriggerField string `json:"trigger_field"` + DependentFields []string `json:"dependent_fields"` + } + wire := make([]wireEntry, 0, len(entries)) + for _, e := range entries { + wire = append(wire, wireEntry{ + TriggerField: e.TriggerField, + DependentFields: append([]string(nil), e.DependentFields...), + }) + } + return json.Marshal(wire) +} + +// UnmarshalDependentRequired decodes the JSON-stored rules back into proto +// entries. Returns nil for empty / `[]` / unparseable input — callers should +// treat that case as "no rules". Exported so the config package can call it +// without re-inventing the wire format. +func UnmarshalDependentRequired(raw []byte) []*pb.DependentRequiredEntry { + if len(raw) == 0 { + return nil + } + type wireEntry struct { + TriggerField string `json:"trigger_field"` + DependentFields []string `json:"dependent_fields"` + } + var wire []wireEntry + if err := json.Unmarshal(raw, &wire); err != nil || len(wire) == 0 { + return nil + } + out := make([]*pb.DependentRequiredEntry, 0, len(wire)) + for _, w := range wire { + out = append(out, &pb.DependentRequiredEntry{ + TriggerField: w.TriggerField, + DependentFields: w.DependentFields, + }) + } + return out +} + +// CheckDependentRequired evaluates all rules against a post-merge value +// snapshot. For each rule, if the trigger field has a non-null value in the +// snapshot, every dependent path must also have a non-null value. Missing +// keys in the snapshot are treated as null. Returns the first violation +// encountered, formatted with both trigger and dependent paths so the +// caller's error message names the offending fields. +// +// Designed to run inside the same transaction that stages the write — the +// snapshot must include all in-flight changes already merged on top of the +// pre-tx state. Race-safety relies on Postgres MVCC + the caller's +// CreateConfigVersion UNIQUE(tenant_id, version) constraint to serialize +// concurrent writers. +func CheckDependentRequired(rules []*pb.DependentRequiredEntry, snapshot map[string]*pb.TypedValue) error { + if len(rules) == 0 { + return nil + } + for _, rule := range rules { + tv, present := snapshot[rule.TriggerField] + if !present || isNullTypedValue(tv) { + continue + } + for _, dep := range rule.DependentFields { + depTV, depPresent := snapshot[dep] + if !depPresent || isNullTypedValue(depTV) { + return fmt.Errorf("dependentRequired: %q has a value but required dependent %q is null", rule.TriggerField, dep) + } + } + } + return nil +} + +// isNullTypedValue treats both a nil TypedValue and one with no kind set as +// "null". The wire protocol uses an unset oneof to mean null (per +// types.proto: "An unset oneof (no field present) represents a null value"). +func isNullTypedValue(tv *pb.TypedValue) bool { + return tv == nil || tv.Kind == nil +} diff --git a/internal/schema/service.go b/internal/schema/service.go index 0e91d508..9d060e02 100644 --- a/internal/schema/service.go +++ b/internal/schema/service.go @@ -602,6 +602,14 @@ func (s *Service) ImportSchema(ctx context.Context, req *pb.ImportSchemaRequest) } fields := yamlToProtoFields(doc) + depReqs := yamlToProtoDependentRequired(doc.DependentRequired) + if err := validateDependentRequiredAgainstFields(depReqs, fields); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "%v", err) + } + depReqJSON, err := marshalDependentRequired(depReqs) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to encode dependentRequired: %v", err) + } checksum := computeChecksum(fields) // Check if schema already exists by name. @@ -612,7 +620,7 @@ func (s *Service) ImportSchema(ctx context.Context, req *pb.ImportSchemaRequest) if errors.Is(err, domain.ErrNotFound) { // New schema — create with v1. - resp, err := s.importCreateNew(ctx, doc, fields, checksum) + resp, err := s.importCreateNew(ctx, doc, fields, checksum, depReqJSON) if err != nil || !req.AutoPublish { return resp, err } @@ -637,14 +645,14 @@ func (s *Service) ImportSchema(ctx context.Context, req *pb.ImportSchemaRequest) } // Create new version. - resp, err := s.importNewVersion(ctx, existing, latestVersion, doc, fields, checksum) + resp, err := s.importNewVersion(ctx, existing, latestVersion, doc, fields, checksum, depReqJSON) if err != nil || !req.AutoPublish { return resp, err } return s.autoPublish(ctx, resp) } -func (s *Service) importCreateNew(ctx context.Context, doc *SchemaYAML, fields []*pb.SchemaField, checksum string) (*pb.ImportSchemaResponse, error) { +func (s *Service) importCreateNew(ctx context.Context, doc *SchemaYAML, fields []*pb.SchemaField, checksum string, depReqJSON []byte) (*pb.ImportSchemaResponse, error) { schema, err := s.store.CreateSchema(ctx, CreateSchemaParams{ Name: doc.Name, Description: ptrString(doc.Description), @@ -655,10 +663,11 @@ func (s *Service) importCreateNew(ctx context.Context, doc *SchemaYAML, fields [ } version, err := s.store.CreateSchemaVersion(ctx, CreateSchemaVersionParams{ - SchemaID: schema.ID, - Version: 1, - Description: ptrString(doc.VersionDescription), - Checksum: checksum, + SchemaID: schema.ID, + Version: 1, + Description: ptrString(doc.VersionDescription), + Checksum: checksum, + DependentRequired: depReqJSON, }) if err != nil { s.logger.ErrorContext(ctx, "import: create version", "error", err) @@ -675,13 +684,14 @@ func (s *Service) importCreateNew(ctx context.Context, doc *SchemaYAML, fields [ }, nil } -func (s *Service) importNewVersion(ctx context.Context, schema domain.Schema, latestVersion domain.SchemaVersion, doc *SchemaYAML, fields []*pb.SchemaField, checksum string) (*pb.ImportSchemaResponse, error) { +func (s *Service) importNewVersion(ctx context.Context, schema domain.Schema, latestVersion domain.SchemaVersion, doc *SchemaYAML, fields []*pb.SchemaField, checksum string, depReqJSON []byte) (*pb.ImportSchemaResponse, error) { newVersion, err := s.store.CreateSchemaVersion(ctx, CreateSchemaVersionParams{ - SchemaID: schema.ID, - Version: latestVersion.Version + 1, - ParentVersion: &latestVersion.Version, - Description: ptrString(doc.VersionDescription), - Checksum: checksum, + SchemaID: schema.ID, + Version: latestVersion.Version + 1, + ParentVersion: &latestVersion.Version, + Description: ptrString(doc.VersionDescription), + Checksum: checksum, + DependentRequired: depReqJSON, }) if err != nil { s.logger.ErrorContext(ctx, "import: create new version", "error", err) diff --git a/internal/schema/store.go b/internal/schema/store.go index 74f11c30..d464d1b5 100644 --- a/internal/schema/store.go +++ b/internal/schema/store.go @@ -19,6 +19,10 @@ type CreateSchemaVersionParams struct { ParentVersion *int32 Description *string Checksum string + // DependentRequired is the JSON-encoded list of cross-field rules. + // Pass an empty []byte (or nil) when no rules exist; the store will + // persist `[]` so reads always return well-formed JSON. + DependentRequired []byte } // GetSchemaVersionParams identifies a specific schema version. diff --git a/internal/schema/store_memory.go b/internal/schema/store_memory.go index 16644c0a..8fdb5608 100644 --- a/internal/schema/store_memory.go +++ b/internal/schema/store_memory.go @@ -141,15 +141,20 @@ func (m *MemoryStore) CreateSchemaVersion(_ context.Context, arg CreateSchemaVer return domain.SchemaVersion{}, domain.ErrNotFound } + depReq := arg.DependentRequired + if len(depReq) == 0 { + depReq = []byte("[]") + } sv := domain.SchemaVersion{ - ID: m.nextID(), - SchemaID: arg.SchemaID, - Version: arg.Version, - ParentVersion: arg.ParentVersion, - Description: arg.Description, - Checksum: arg.Checksum, - Published: false, - CreatedAt: time.Now(), + ID: m.nextID(), + SchemaID: arg.SchemaID, + Version: arg.Version, + ParentVersion: arg.ParentVersion, + Description: arg.Description, + Checksum: arg.Checksum, + Published: false, + DependentRequired: depReq, + CreatedAt: time.Now(), } m.schemaVersions[sv.ID] = sv return sv, nil diff --git a/internal/schema/store_pg.go b/internal/schema/store_pg.go index fab2ca68..73d59695 100644 --- a/internal/schema/store_pg.go +++ b/internal/schema/store_pg.go @@ -88,12 +88,17 @@ func (s *PGStore) CreateSchemaVersion(ctx context.Context, arg CreateSchemaVersi if err != nil { return domain.SchemaVersion{}, err } + depReq := arg.DependentRequired + if len(depReq) == 0 { + depReq = []byte("[]") + } row, err := s.write.CreateSchemaVersion(ctx, dbstore.CreateSchemaVersionParams{ - SchemaID: schemaID, - Version: arg.Version, - ParentVersion: arg.ParentVersion, - Description: arg.Description, - Checksum: arg.Checksum, + SchemaID: schemaID, + Version: arg.Version, + ParentVersion: arg.ParentVersion, + Description: arg.Description, + Checksum: arg.Checksum, + DependentRequired: depReq, }) if err != nil { return domain.SchemaVersion{}, err @@ -396,14 +401,15 @@ func schemaFromDB(r dbstore.Schema) domain.Schema { func schemaVersionFromDB(r dbstore.SchemaVersion) domain.SchemaVersion { return domain.SchemaVersion{ - ID: pgconv.UUIDToString(r.ID), - SchemaID: pgconv.UUIDToString(r.SchemaID), - Version: r.Version, - ParentVersion: r.ParentVersion, - Description: r.Description, - Checksum: r.Checksum, - Published: r.Published, - CreatedAt: pgconv.TimestamptzToTime(r.CreatedAt), + ID: pgconv.UUIDToString(r.ID), + SchemaID: pgconv.UUIDToString(r.SchemaID), + Version: r.Version, + ParentVersion: r.ParentVersion, + Description: r.Description, + Checksum: r.Checksum, + Published: r.Published, + DependentRequired: r.DependentRequired, + CreatedAt: pgconv.TimestamptzToTime(r.CreatedAt), } } diff --git a/internal/schema/yaml.go b/internal/schema/yaml.go index 27d26233..51f7f378 100644 --- a/internal/schema/yaml.go +++ b/internal/schema/yaml.go @@ -43,7 +43,11 @@ type SchemaYAML struct { VersionDescription string `yaml:"version_description,omitempty"` Info *SchemaInfoYAML `yaml:"info,omitempty"` Fields map[string]SchemaFieldYAML `yaml:"fields"` - Extensions map[string]any `yaml:",inline"` + // DependentRequired declares cross-field "B required when A present" + // rules. Keys and values must reference paths defined in Fields. Matches + // the JSON Schema 2020-12 keyword of the same name. + DependentRequired map[string][]string `yaml:"dependentRequired,omitempty"` + Extensions map[string]any `yaml:",inline"` } // SchemaInfoYAML contains optional schema-level metadata. @@ -152,6 +156,9 @@ func validateSchemaYAML(doc *SchemaYAML) error { if len(doc.Fields) == 0 { return fmt.Errorf("at least one field is required") } + if err := validateDependentRequiredYAML(doc); err != nil { + return err + } for path, f := range doc.Fields { if !fieldPathPattern.MatchString(path) { return fmt.Errorf("invalid field path %q: must match %s", path, fieldPathPattern) @@ -181,6 +188,36 @@ func validateSchemaYAML(doc *SchemaYAML) error { return nil } +// validateDependentRequiredYAML lint-checks the `dependentRequired:` map at +// schema-validate time: every trigger key must reference a real field in the +// schema; every dependent path must reference a real field; the trigger may +// not appear in its own dependents list (a self-referential rule is always +// vacuously true and indicates author error). +func validateDependentRequiredYAML(doc *SchemaYAML) error { + if len(doc.DependentRequired) == 0 { + return nil + } + for trigger, dependents := range doc.DependentRequired { + if _, ok := doc.Fields[trigger]; !ok { + return fmt.Errorf("dependentRequired: trigger %q is not a defined field", trigger) + } + seen := make(map[string]struct{}, len(dependents)) + for _, dep := range dependents { + if dep == trigger { + return fmt.Errorf("dependentRequired: trigger %q cannot list itself as a dependent", trigger) + } + if _, ok := doc.Fields[dep]; !ok { + return fmt.Errorf("dependentRequired: dependent %q (under trigger %q) is not a defined field", dep, trigger) + } + if _, dup := seen[dep]; dup { + return fmt.Errorf("dependentRequired: dependent %q listed twice under trigger %q", dep, trigger) + } + seen[dep] = struct{}{} + } + } + return nil +} + // validateExtensions rejects any keys in the inline-extension map that do not // match the x-* vendor-extension pattern. The path prefix is included in the // error so users can locate the offending key in large documents. @@ -258,6 +295,7 @@ func schemaToYAML(s *pb.Schema) *SchemaYAML { VersionDescription: s.VersionDescription, Info: schemaInfoToYAML(s.Info), Fields: make(map[string]SchemaFieldYAML, len(s.Fields)), + DependentRequired: protoDependentRequiredToYAML(s.DependentRequired), } for _, f := range s.Fields { @@ -364,6 +402,46 @@ func protoConstraintsToYAML(c *pb.FieldConstraints) *ConstraintsYAML { // --- YAML → Proto --- +// yamlToProtoDependentRequired converts the YAML map +// shape into the proto repeated-entry shape. Returns nil for an empty map so +// the wire format stays compact. +func yamlToProtoDependentRequired(m map[string][]string) []*pb.DependentRequiredEntry { + if len(m) == 0 { + return nil + } + triggers := make([]string, 0, len(m)) + for k := range m { + triggers = append(triggers, k) + } + sort.Strings(triggers) + out := make([]*pb.DependentRequiredEntry, 0, len(triggers)) + for _, t := range triggers { + deps := append([]string(nil), m[t]...) + sort.Strings(deps) + out = append(out, &pb.DependentRequiredEntry{ + TriggerField: t, + DependentFields: deps, + }) + } + return out +} + +// protoDependentRequiredToYAML converts the proto repeated-entry shape back +// into the YAML map shape. Returns nil when the input +// is empty so the YAML key is omitted. +func protoDependentRequiredToYAML(entries []*pb.DependentRequiredEntry) map[string][]string { + if len(entries) == 0 { + return nil + } + out := make(map[string][]string, len(entries)) + for _, e := range entries { + deps := append([]string(nil), e.DependentFields...) + sort.Strings(deps) + out[e.TriggerField] = deps + } + return out +} + func yamlToProtoFields(doc *SchemaYAML) []*pb.SchemaField { fields := make([]*pb.SchemaField, 0, len(doc.Fields)) for path, yf := range doc.Fields { diff --git a/internal/storage/dbstore/models.gen.go b/internal/storage/dbstore/models.gen.go index fafc84ff..8edba060 100644 --- a/internal/storage/dbstore/models.gen.go +++ b/internal/storage/dbstore/models.gen.go @@ -120,14 +120,15 @@ type SchemaField struct { } type SchemaVersion struct { - ID pgtype.UUID `json:"id"` - SchemaID pgtype.UUID `json:"schema_id"` - Version int32 `json:"version"` - ParentVersion *int32 `json:"parent_version"` - Description *string `json:"description"` - Checksum string `json:"checksum"` - Published bool `json:"published"` - CreatedAt pgtype.Timestamptz `json:"created_at"` + ID pgtype.UUID `json:"id"` + SchemaID pgtype.UUID `json:"schema_id"` + Version int32 `json:"version"` + ParentVersion *int32 `json:"parent_version"` + Description *string `json:"description"` + Checksum string `json:"checksum"` + Published bool `json:"published"` + DependentRequired []byte `json:"dependent_required"` + CreatedAt pgtype.Timestamptz `json:"created_at"` } type Tenant struct { diff --git a/internal/storage/dbstore/schemas.sql.gen.go b/internal/storage/dbstore/schemas.sql.gen.go index a42e1436..45a61023 100644 --- a/internal/storage/dbstore/schemas.sql.gen.go +++ b/internal/storage/dbstore/schemas.sql.gen.go @@ -113,17 +113,18 @@ func (q *Queries) CreateSchemaField(ctx context.Context, arg CreateSchemaFieldPa } const createSchemaVersion = `-- name: CreateSchemaVersion :one -INSERT INTO schema_versions (schema_id, version, parent_version, description, checksum) -VALUES ($1, $2, $3, $4, $5) -RETURNING id, schema_id, version, parent_version, description, checksum, published, created_at +INSERT INTO schema_versions (schema_id, version, parent_version, description, checksum, dependent_required) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id, schema_id, version, parent_version, description, checksum, published, dependent_required, created_at ` type CreateSchemaVersionParams struct { - SchemaID pgtype.UUID `json:"schema_id"` - Version int32 `json:"version"` - ParentVersion *int32 `json:"parent_version"` - Description *string `json:"description"` - Checksum string `json:"checksum"` + SchemaID pgtype.UUID `json:"schema_id"` + Version int32 `json:"version"` + ParentVersion *int32 `json:"parent_version"` + Description *string `json:"description"` + Checksum string `json:"checksum"` + DependentRequired []byte `json:"dependent_required"` } func (q *Queries) CreateSchemaVersion(ctx context.Context, arg CreateSchemaVersionParams) (SchemaVersion, error) { @@ -133,6 +134,7 @@ func (q *Queries) CreateSchemaVersion(ctx context.Context, arg CreateSchemaVersi arg.ParentVersion, arg.Description, arg.Checksum, + arg.DependentRequired, ) var i SchemaVersion err := row.Scan( @@ -143,6 +145,7 @@ func (q *Queries) CreateSchemaVersion(ctx context.Context, arg CreateSchemaVersi &i.Description, &i.Checksum, &i.Published, + &i.DependentRequired, &i.CreatedAt, ) return i, err @@ -173,7 +176,7 @@ func (q *Queries) DeleteSchemaField(ctx context.Context, arg DeleteSchemaFieldPa } const getLatestSchemaVersion = `-- name: GetLatestSchemaVersion :one -SELECT id, schema_id, version, parent_version, description, checksum, published, created_at FROM schema_versions +SELECT id, schema_id, version, parent_version, description, checksum, published, dependent_required, created_at FROM schema_versions WHERE schema_id = $1 ORDER BY version DESC LIMIT 1 @@ -190,6 +193,7 @@ func (q *Queries) GetLatestSchemaVersion(ctx context.Context, schemaID pgtype.UU &i.Description, &i.Checksum, &i.Published, + &i.DependentRequired, &i.CreatedAt, ) return i, err @@ -276,7 +280,7 @@ func (q *Queries) GetSchemaFields(ctx context.Context, schemaVersionID pgtype.UU } const getSchemaVersion = `-- name: GetSchemaVersion :one -SELECT id, schema_id, version, parent_version, description, checksum, published, created_at FROM schema_versions +SELECT id, schema_id, version, parent_version, description, checksum, published, dependent_required, created_at FROM schema_versions WHERE schema_id = $1 AND version = $2 ` @@ -296,6 +300,7 @@ func (q *Queries) GetSchemaVersion(ctx context.Context, arg GetSchemaVersionPara &i.Description, &i.Checksum, &i.Published, + &i.DependentRequired, &i.CreatedAt, ) return i, err @@ -341,7 +346,7 @@ func (q *Queries) ListSchemas(ctx context.Context, arg ListSchemasParams) ([]Sch const publishSchemaVersion = `-- name: PublishSchemaVersion :one UPDATE schema_versions SET published = true WHERE schema_id = $1 AND version = $2 -RETURNING id, schema_id, version, parent_version, description, checksum, published, created_at +RETURNING id, schema_id, version, parent_version, description, checksum, published, dependent_required, created_at ` type PublishSchemaVersionParams struct { @@ -360,6 +365,7 @@ func (q *Queries) PublishSchemaVersion(ctx context.Context, arg PublishSchemaVer &i.Description, &i.Checksum, &i.Published, + &i.DependentRequired, &i.CreatedAt, ) return i, err diff --git a/internal/storage/domain/types.go b/internal/storage/domain/types.go index 12a751e2..b85464c8 100644 --- a/internal/storage/domain/types.go +++ b/internal/storage/domain/types.go @@ -37,14 +37,18 @@ type Schema struct { // SchemaVersion represents a specific version of a schema. type SchemaVersion struct { - ID string - SchemaID string - Version int32 - ParentVersion *int32 - Description *string - Checksum string - Published bool - CreatedAt time.Time + ID string + SchemaID string + Version int32 + ParentVersion *int32 + Description *string + Checksum string + Published bool + // DependentRequired holds the JSON-encoded list of cross-field "B + // required when A present" rules. Empty array when no rules exist. + // Wire shape: [{trigger_field, dependent_fields[]}]. + DependentRequired []byte + CreatedAt time.Time } // SchemaField represents a field definition within a schema version. diff --git a/proto/centralconfig/v1/types.proto b/proto/centralconfig/v1/types.proto index 1fb01d4d..15e484cc 100644 --- a/proto/centralconfig/v1/types.proto +++ b/proto/centralconfig/v1/types.proto @@ -226,6 +226,29 @@ message Schema { // Optional schema metadata: ownership, contact, labels. SchemaInfo info = 11; + + // Cross-field "B required when A present" rules. Each entry declares one + // trigger field whose presence (non-null value) makes a list of dependent + // field paths required (also non-null). Equivalent to JSON Schema 2020-12 + // dependentRequired, scoped to schema-level cross-field requirement. + // Lint-checked at ImportSchema time (every path must reference a real + // field; trigger may not appear in its own dependents). Enforced at every + // config write against the post-merge snapshot. + repeated DependentRequiredEntry dependent_required = 12; +} + +// DependentRequiredEntry encodes one cross-field requirement: when the +// trigger field has a non-null value, every dependent field path must also +// have a non-null value. This is the proto wire form of JSON Schema 2020-12 +// dependentRequired, which uses a `map>` shape — proto +// maps cannot hold repeated values directly, so we use a repeated list of +// entries. +message DependentRequiredEntry { + // Field path whose presence triggers the requirement. + string trigger_field = 1; + + // Field paths that must be present when the trigger has a non-null value. + repeated string dependent_fields = 2; } // Tenant represents an organization or entity that has its own configuration From f1c6db67523cb7f461e04c2eed81656b7832908d Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 27 Apr 2026 18:28:56 +0300 Subject: [PATCH 2/3] fix(schema,config): wire runtime dependentRequired check race-safely MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review findings on PR for #193: - store_pg.go: bind both write and read query handles to the same transaction in RunInTx so reads inside the tx observe staged writes. The dependentRequired check needs the post-merge snapshot; reading from the read pool returned pre-tx state, which would have silently let violations through against real Postgres (unit tests passed because the in-memory mockStore RunInTx is a no-op pass-through). - schema.Service: take *validation.ValidatorFactory instead of *validation.ValidatorCache. UpdateTenant now invalidates both the per-field validator cache AND the dependentRequired rules cache — the previous code invalidated only the former, leaving stale rules after a tenant schema-version change. - Drop unused IsTypedValueNonNull helper (dead code; the runtime check inspects DB rows, not staged TypedValues). - convert.go: schemaToProto now calls UnmarshalDependentRequired instead of inlining a duplicate JSON unmarshal. Tests: - New TestRollbackToVersion_DependentRequired_Rejected and TestImportConfig_DependentRequired_Rejected to exercise the rule check on the rollback and import paths (only SetField/SetFields had coverage previously). - TestUpdateTenant_SchemaVersionInvalidatesCache now uses a real ValidatorFactory (via test-only adapter bridging schema.mockStore's GetSchemaVersionParams to validation.SchemaVersionKey). Docs: - docs/concepts/schemas-and-fields.md gains a "Cross-field dependencies" section documenting the dependentRequired keyword semantics, lint, and runtime enforcement, plus a forward link to the CEL design brief for cases dependentRequired cannot express. Lint: - Rename dependentRequiredViolation → dependentRequiredError per errname convention. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/server/main.go | 2 +- docs/concepts/schemas-and-fields.md | 21 ++ internal/config/dependent_required_test.go | 336 ++++++++++++++++++ internal/config/service.go | 134 ++++++- internal/config/store_pg.go | 11 +- internal/schema/convert.go | 7 +- internal/schema/dependent_required.go | 33 +- internal/schema/dependent_required_test.go | 195 ++++++++++ internal/schema/service.go | 32 +- internal/schema/service_extended_test.go | 22 +- internal/schema/yaml_test.go | 62 ++++ internal/server/memory_integration_test.go | 2 +- internal/storage/domain/types.go | 14 +- internal/validation/factory.go | 41 ++- .../validation/factory_concurrent_test.go | 127 +++++++ 15 files changed, 972 insertions(+), 67 deletions(-) create mode 100644 internal/config/dependent_required_test.go create mode 100644 internal/schema/dependent_required_test.go create mode 100644 internal/validation/factory_concurrent_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 507bb83c..62ceeec4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -217,7 +217,7 @@ func run() int { // Register services. if srv.IsServiceEnabled("schema") { - schemaSvc := schema.NewService(schemaStoreVal, logger, schemaMetrics, validatorFactory.Cache()) + schemaSvc := schema.NewService(schemaStoreVal, logger, schemaMetrics, validatorFactory) pb.RegisterSchemaServiceServer(srv.GRPCServer(), schemaSvc) srv.SetServiceHealthy("centralconfig.v1.SchemaService") logger.InfoContext(ctx, "schema service enabled") diff --git a/docs/concepts/schemas-and-fields.md b/docs/concepts/schemas-and-fields.md index 11c6199e..4bb4495b 100644 --- a/docs/concepts/schemas-and-fields.md +++ b/docs/concepts/schemas-and-fields.md @@ -180,6 +180,27 @@ constraints: {"type": "object", "required": ["name"], "properties": {"name": {"type": "string"}}} ``` +## Cross-field dependencies + +Use the top-level `dependentRequired:` key to declare "if field A is set, field B must also be set". The keyword matches [JSON Schema 2020-12 `dependentRequired`](https://json-schema.org/understanding-json-schema/reference/conditionals#dependentrequired) — keys are trigger field paths, values are lists of dependent paths. + +```yaml +fields: + payments.refunds_enabled: { type: bool } + payments.refund_window: { type: duration, nullable: true } + +dependentRequired: + payments.refunds_enabled: [payments.refund_window] +``` + +Semantics: + +- **Triggers on non-null.** A rule fires only when the trigger field has a non-null value in the post-merge configuration. Setting the trigger to null clears the requirement. +- **Lint at import.** `ImportSchema` rejects rules where the trigger or any dependent does not name a defined field, where a trigger lists itself as a dependent, or where a dependent appears twice under the same trigger. +- **Runtime enforcement.** Every config write (`SetField`, `SetFields`, `ImportConfig`, `RollbackToVersion`) evaluates all rules against the post-merge state inside the same transaction. A rule violation rejects the write with `InvalidArgument`. + +For arithmetic or other cross-field invariants that `dependentRequired` cannot express (`min < max`, `start_at < end_at`), see the [CEL validation design](../../.agents/context/cel-validation.md) — that path uses the reserved `validations:` key. + ## Field Options | Option | Type | Default | Description | diff --git a/internal/config/dependent_required_test.go b/internal/config/dependent_required_test.go new file mode 100644 index 00000000..444d8841 --- /dev/null +++ b/internal/config/dependent_required_test.go @@ -0,0 +1,336 @@ +package config + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + + pb "github.com/opendecree/decree/api/centralconfig/v1" + "github.com/opendecree/decree/internal/storage/domain" +) + +// dependentRequiredJSON is the wire encoding the schema package emits. +const dependentRequiredJSON = `[{"trigger_field":"payments.refunds_enabled","dependent_fields":["payments.refund_window"]}]` + +// setupDependentRequiredService wires the validator factory with a tenant +// whose schema declares one dependentRequired rule: +// +// payments.refunds_enabled → [payments.refund_window] +func setupDependentRequiredService(t *testing.T) (*Service, *mockStore) { + t.Helper() + svc, store := newTestServiceWithValidation() + store.On("GetTenantByID", mock.Anything, tenantID1).Return(domain.Tenant{ + ID: tenantID1, + SchemaID: schemaID10, + SchemaVersion: 1, + }, nil) + store.On("GetSchemaVersion", mock.Anything, domain.SchemaVersionKey{ + SchemaID: schemaID10, + Version: 1, + }).Return(domain.SchemaVersion{ + ID: schemaVersionID, + SchemaID: schemaID10, + Version: 1, + DependentRequired: []byte(dependentRequiredJSON), + }, nil) + // validateField (per-field type/constraint check) also reaches into the + // validator factory, which calls GetSchemaFields. Mock the schema's + // field set so writes can pass field-level validation. + store.On("GetSchemaFields", mock.Anything, schemaVersionID).Return([]domain.SchemaField{ + {Path: "payments.refunds_enabled", FieldType: domain.FieldTypeBool, Nullable: true}, + {Path: "payments.refund_window", FieldType: domain.FieldTypeDuration, Nullable: true}, + {Path: "payments.fee", FieldType: domain.FieldTypeString, Nullable: true}, + }, nil) + // SetField records an "old value" for audit via getCurrentValue, which + // goes through GetConfigValueAtVersion. Default to NotFound so the audit + // trail records empty old values; tests can override per case. + store.On("GetConfigValueAtVersion", mock.Anything, mock.AnythingOfType("config.GetConfigValueAtVersionParams")). + Return(GetConfigValueAtVersionRow{}, domain.ErrNotFound).Maybe() + return svc, store +} + +// TestSetField_DependentRequired_TriggerSetWithoutDependent_Rejected covers +// the core failure mode: setting the trigger to a non-null value while the +// dependent path is null must return InvalidArgument. +func TestSetField_DependentRequired_TriggerSetWithoutDependent_Rejected(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + store.On("GetFieldLocks", ctx, tenantID1).Return([]domain.TenantFieldLock{}, nil) + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{}, domain.ErrNotFound) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID2, TenantID: tenantID1, Version: 1}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil) + // Snapshot at the new version: trigger is set (we just wrote it), dependent absent. + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 1, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: strPtr("true")}, + }, nil) + + _, err := svc.SetField(ctx, &pb.SetFieldRequest{ + TenantId: tenantID1, + FieldPath: "payments.refunds_enabled", + Value: &pb.TypedValue{Kind: &pb.TypedValue_BoolValue{BoolValue: true}}, + }) + + require.Error(t, err) + assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Contains(t, status.Convert(err).Message(), "payments.refund_window") +} + +// TestSetField_DependentRequired_BothPresent_Allowed verifies the rule is +// satisfied when both trigger and dependent are present in the post-merge +// snapshot. +func TestSetField_DependentRequired_BothPresent_Allowed(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + store.On("GetFieldLocks", ctx, tenantID1).Return([]domain.TenantFieldLock{}, nil) + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{Version: 1}, nil) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID2, TenantID: tenantID1, Version: 2}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil) + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 2, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: strPtr("true")}, + {FieldPath: "payments.refund_window", Value: strPtr("30s")}, + }, nil) + store.On("InsertAuditWriteLog", ctx, mock.AnythingOfType("config.InsertAuditWriteLogParams")). + Return(nil) + // Cache + publish are post-tx; service obtains them from svc fields. + // The default newTestServiceWithValidation wires real cache/publisher mocks. + svc.cache.(*mockCache).On("Invalidate", ctx, tenantID1).Return(nil) + svc.publisher.(*mockPublisher).On("Publish", ctx, mock.AnythingOfType("pubsub.ConfigChangeEvent")). + Return(nil) + + resp, err := svc.SetField(ctx, &pb.SetFieldRequest{ + TenantId: tenantID1, + FieldPath: "payments.refunds_enabled", + Value: &pb.TypedValue{Kind: &pb.TypedValue_BoolValue{BoolValue: true}}, + }) + require.NoError(t, err) + assert.Equal(t, int32(2), resp.ConfigVersion.Version) +} + +// TestSetField_DependentRequired_TriggerAbsent_Allowed verifies the rule +// does NOT fire when the trigger is null in the post-merge snapshot, even +// if the dependent is also absent. +func TestSetField_DependentRequired_TriggerAbsent_Allowed(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + store.On("GetFieldLocks", ctx, tenantID1).Return([]domain.TenantFieldLock{}, nil) + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{}, domain.ErrNotFound) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID2, TenantID: tenantID1, Version: 1}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil) + // Writing some other field; trigger never set; dependent never set. + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 1, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.fee", Value: strPtr("0.5")}, + }, nil) + store.On("InsertAuditWriteLog", ctx, mock.AnythingOfType("config.InsertAuditWriteLogParams")). + Return(nil) + svc.cache.(*mockCache).On("Invalidate", ctx, tenantID1).Return(nil) + svc.publisher.(*mockPublisher).On("Publish", ctx, mock.AnythingOfType("pubsub.ConfigChangeEvent")). + Return(nil) + + _, err := svc.SetField(ctx, &pb.SetFieldRequest{ + TenantId: tenantID1, + FieldPath: "payments.fee", + Value: &pb.TypedValue{Kind: &pb.TypedValue_StringValue{StringValue: "0.5"}}, + }) + require.NoError(t, err) +} + +// TestSetField_DependentRequired_TriggerSetToNull_Allowed verifies that +// setting the trigger to null clears the requirement — a null write +// produces a row with Value == nil in the snapshot, which the presence +// builder treats as absent. +func TestSetField_DependentRequired_TriggerSetToNull_Allowed(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + store.On("GetFieldLocks", ctx, tenantID1).Return([]domain.TenantFieldLock{}, nil) + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{Version: 1}, nil) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID2, TenantID: tenantID1, Version: 2}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil) + // Trigger null in snapshot (the SetConfigValue stored Value=nil because TypedValue is nil/null). + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 2, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: nil}, + }, nil) + store.On("InsertAuditWriteLog", ctx, mock.AnythingOfType("config.InsertAuditWriteLogParams")). + Return(nil) + svc.cache.(*mockCache).On("Invalidate", ctx, tenantID1).Return(nil) + svc.publisher.(*mockPublisher).On("Publish", ctx, mock.AnythingOfType("pubsub.ConfigChangeEvent")). + Return(nil) + + // Null TypedValue (nil) — clears the trigger. + _, err := svc.SetField(ctx, &pb.SetFieldRequest{ + TenantId: tenantID1, + FieldPath: "payments.refunds_enabled", + Value: nil, + }) + require.NoError(t, err) +} + +// TestSetFields_DependentRequired_AggregateCheck verifies the multi-field +// path runs the check once over the post-merge snapshot, not per field. +func TestSetFields_DependentRequired_AggregateCheck(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + store.On("GetFieldLocks", ctx, tenantID1).Return([]domain.TenantFieldLock{}, nil) + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{}, domain.ErrNotFound) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID2, TenantID: tenantID1, Version: 1}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil).Twice() + // Snapshot reflects both writes — trigger AND dependent set together. + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 1, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: strPtr("true")}, + {FieldPath: "payments.refund_window", Value: strPtr("30s")}, + }, nil) + store.On("InsertAuditWriteLog", ctx, mock.AnythingOfType("config.InsertAuditWriteLogParams")). + Return(nil).Twice() + svc.cache.(*mockCache).On("Invalidate", ctx, tenantID1).Return(nil) + svc.publisher.(*mockPublisher).On("Publish", ctx, mock.AnythingOfType("pubsub.ConfigChangeEvent")). + Return(nil) + + _, err := svc.SetFields(ctx, &pb.SetFieldsRequest{ + TenantId: tenantID1, + Updates: []*pb.FieldUpdate{ + { + FieldPath: "payments.refunds_enabled", + Value: &pb.TypedValue{Kind: &pb.TypedValue_BoolValue{BoolValue: true}}, + }, + { + FieldPath: "payments.refund_window", + Value: &pb.TypedValue{Kind: &pb.TypedValue_DurationValue{DurationValue: durationpb.New(30 * time.Second)}}, + }, + }, + }) + require.NoError(t, err) + // One snapshot read for one rule check. + store.AssertNumberOfCalls(t, "GetFullConfigAtVersion", 1) +} + +// TestEnforceDependentRequiredInTx_NoRules_NoSnapshotRead verifies that the +// helper short-circuits without touching the store when the rules slice is +// empty. Important for the hot path of writes against schemas without +// dependentRequired. +func TestEnforceDependentRequiredInTx_NoRules_NoSnapshotRead(t *testing.T) { + svc, _, _, _ := newTestService() + store := &mockStore{} + err := svc.enforceDependentRequiredInTx(context.Background(), store, tenantID1, 5, nil) + require.NoError(t, err) + store.AssertNotCalled(t, "GetFullConfigAtVersion") +} + +// TestRollbackToVersion_DependentRequired_Rejected verifies the rollback +// path runs the cross-field check against the snapshot built from the +// rollback target. A target version that satisfied the rules at write time +// can still violate them after a schema upgrade introduces new rules — in +// which case rollback must be rejected, not silently produce inconsistent +// state. +func TestRollbackToVersion_DependentRequired_Rejected(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + // Rollback target (version 2) had only the trigger set; dependent never existed. + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 2, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: strPtr("true")}, + }, nil).Once() + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{Version: 5}, nil) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID3, TenantID: tenantID1, Version: 6}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil) + // Post-rollback snapshot mirrors the target — same violation. + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 6, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: strPtr("true")}, + }, nil).Once() + + _, err := svc.RollbackToVersion(ctx, &pb.RollbackToVersionRequest{ + TenantId: tenantID1, + Version: 2, + }) + require.Error(t, err) + assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Contains(t, status.Convert(err).Message(), "payments.refund_window") +} + +// TestImportConfig_DependentRequired_Rejected verifies a YAML import that +// introduces a trigger value without its required dependent is rejected +// with InvalidArgument. +func TestImportConfig_DependentRequired_Rejected(t *testing.T) { + svc, store := setupDependentRequiredService(t) + ctx := context.Background() + + store.On("GetFieldLocks", ctx, tenantID1).Return([]domain.TenantFieldLock{}, nil) + store.On("GetLatestConfigVersion", ctx, tenantID1). + Return(domain.ConfigVersion{}, domain.ErrNotFound) + store.On("CreateConfigVersion", ctx, mock.AnythingOfType("config.CreateConfigVersionParams")). + Return(domain.ConfigVersion{ID: versionID2, TenantID: tenantID1, Version: 1}, nil) + store.On("SetConfigValue", ctx, mock.AnythingOfType("config.SetConfigValueParams")). + Return(nil) + store.On("InsertAuditWriteLog", ctx, mock.AnythingOfType("config.InsertAuditWriteLogParams")). + Return(nil).Maybe() + // Post-import snapshot: trigger set, dependent absent. + store.On("GetFullConfigAtVersion", ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID1, + Version: 1, + }).Return([]GetFullConfigAtVersionRow{ + {FieldPath: "payments.refunds_enabled", Value: strPtr("true")}, + }, nil) + + yamlContent := []byte(` +spec_version: "v1" +values: + payments.refunds_enabled: + value: true +`) + _, err := svc.ImportConfig(ctx, &pb.ImportConfigRequest{ + TenantId: tenantID1, + YamlContent: yamlContent, + }) + require.Error(t, err) + assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Contains(t, status.Convert(err).Message(), "payments.refund_window") +} diff --git a/internal/config/service.go b/internal/config/service.go index 6b1259c8..a2afdcd1 100644 --- a/internal/config/service.go +++ b/internal/config/service.go @@ -18,11 +18,21 @@ import ( "github.com/opendecree/decree/internal/cache" "github.com/opendecree/decree/internal/pagination" "github.com/opendecree/decree/internal/pubsub" + "github.com/opendecree/decree/internal/schema" "github.com/opendecree/decree/internal/storage/domain" "github.com/opendecree/decree/internal/telemetry" "github.com/opendecree/decree/internal/validation" ) +// dependentRequiredError wraps a CheckDependentRequired error returned +// from inside a transaction so the outer status mapping can distinguish a +// validation violation (InvalidArgument) from a generic tx failure +// (Internal). +type dependentRequiredError struct{ err error } + +func (e *dependentRequiredError) Error() string { return e.err.Error() } +func (e *dependentRequiredError) Unwrap() error { return e.err } + const defaultCacheTTL = 5 * time.Minute // ServiceConfig holds dependencies for the ConfigService. @@ -315,7 +325,12 @@ func (s *Service) SetField(ctx context.Context, req *pb.SetFieldRequest) (*pb.Se } oldValue := s.getCurrentValue(ctx, tenantID, req.FieldPath, latestVersion) - // Transaction: version + value + audit. + depRules, err := s.fetchDependentRequiredRules(ctx, tenantID) + if err != nil { + return nil, status.Error(codes.Internal, "failed to load dependentRequired rules") + } + + // Transaction: version + value + audit + dependentRequired check. var newVersion domain.ConfigVersion if err := s.store.RunInTx(ctx, func(tx Store) error { var txErr error @@ -340,6 +355,10 @@ func (s *Service) SetField(ctx context.Context, req *pb.SetFieldRequest) (*pb.Se return fmt.Errorf("set config value: %w", txErr) } + if txErr = s.enforceDependentRequiredInTx(ctx, tx, tenantID, newVersion.Version, depRules); txErr != nil { + return txErr + } + newValueStr := typedValueToString(req.Value) return tx.InsertAuditWriteLog(ctx, InsertAuditWriteLogParams{ TenantID: tenantID, @@ -351,8 +370,10 @@ func (s *Service) SetField(ctx context.Context, req *pb.SetFieldRequest) (*pb.Se ConfigVersion: &newVersion.Version, }) }); err != nil { - s.logger.ErrorContext(ctx, "set field transaction failed", "error", err) - return nil, status.Error(codes.Internal, "failed to set field") + return nil, mapDependentRequiredErr(err, func() error { + s.logger.ErrorContext(ctx, "set field transaction failed", "error", err) + return status.Error(codes.Internal, "failed to set field") + }) } // Post-transaction side effects. @@ -419,7 +440,12 @@ func (s *Service) SetFields(ctx context.Context, req *pb.SetFieldsRequest) (*pb. }) } - // Transaction: version + all values + all audit entries. + depRules, err := s.fetchDependentRequiredRules(ctx, tenantID) + if err != nil { + return nil, status.Error(codes.Internal, "failed to load dependentRequired rules") + } + + // Transaction: version + all values + all audit entries + dependentRequired check. var newVersion domain.ConfigVersion if err := s.store.RunInTx(ctx, func(tx Store) error { var txErr error @@ -459,10 +485,12 @@ func (s *Service) SetFields(ctx context.Context, req *pb.SetFieldsRequest) (*pb. } } - return nil + return s.enforceDependentRequiredInTx(ctx, tx, tenantID, newVersion.Version, depRules) }); err != nil { - s.logger.ErrorContext(ctx, "set fields transaction failed", "error", err) - return nil, status.Error(codes.Internal, "failed to set fields") + return nil, mapDependentRequiredErr(err, func() error { + s.logger.ErrorContext(ctx, "set fields transaction failed", "error", err) + return status.Error(codes.Internal, "failed to set fields") + }) } // Post-transaction side effects. @@ -596,7 +624,12 @@ func (s *Service) RollbackToVersion(ctx context.Context, req *pb.RollbackToVersi desc = *req.Description } - // Transaction: new version + copied values + audit. + depRules, err := s.fetchDependentRequiredRules(ctx, tenantID) + if err != nil { + return nil, status.Error(codes.Internal, "failed to load dependentRequired rules") + } + + // Transaction: new version + copied values + audit + dependentRequired check. var newVersion domain.ConfigVersion if err := s.store.RunInTx(ctx, func(tx Store) error { var txErr error @@ -622,6 +655,10 @@ func (s *Service) RollbackToVersion(ctx context.Context, req *pb.RollbackToVersi } } + if txErr = s.enforceDependentRequiredInTx(ctx, tx, tenantID, newVersion.Version, depRules); txErr != nil { + return txErr + } + newValue := fmt.Sprintf("v%d", req.Version) return tx.InsertAuditWriteLog(ctx, InsertAuditWriteLogParams{ TenantID: tenantID, @@ -633,8 +670,10 @@ func (s *Service) RollbackToVersion(ctx context.Context, req *pb.RollbackToVersi ConfigVersion: &newVersion.Version, }) }); err != nil { - s.logger.ErrorContext(ctx, "rollback transaction failed", "error", err) - return nil, status.Error(codes.Internal, "failed to rollback") + return nil, mapDependentRequiredErr(err, func() error { + s.logger.ErrorContext(ctx, "rollback transaction failed", "error", err) + return status.Error(codes.Internal, "failed to rollback") + }) } // Post-transaction side effects. @@ -865,7 +904,12 @@ func (s *Service) ImportConfig(ctx context.Context, req *pb.ImportConfigRequest) desc = doc.Description } - // Transaction: version + all values + audit entries. + depRules, err := s.fetchDependentRequiredRules(ctx, tenantID) + if err != nil { + return nil, status.Error(codes.Internal, "failed to load dependentRequired rules") + } + + // Transaction: version + all values + audit entries + dependentRequired check. var newVersion domain.ConfigVersion if err := s.store.RunInTx(ctx, func(tx Store) error { var txErr error @@ -904,10 +948,12 @@ func (s *Service) ImportConfig(ctx context.Context, req *pb.ImportConfigRequest) } } - return nil + return s.enforceDependentRequiredInTx(ctx, tx, tenantID, newVersion.Version, depRules) }); err != nil { - s.logger.ErrorContext(ctx, "import config transaction failed", "error", err) - return nil, status.Error(codes.Internal, "failed to import config") + return nil, mapDependentRequiredErr(err, func() error { + s.logger.ErrorContext(ctx, "import config transaction failed", "error", err) + return status.Error(codes.Internal, "failed to import config") + }) } // Post-transaction side effects. @@ -1120,6 +1166,66 @@ func (s *Service) validateField(ctx context.Context, tenantID, fieldPath string, return nil } +// fetchDependentRequiredRules returns the decoded dependentRequired rules +// for a tenant's bound schema version. Returns (nil, nil) when there are no +// rules — callers can skip the runtime check entirely. Caches via the +// validator factory's per-tenant rules cache. +func (s *Service) fetchDependentRequiredRules(ctx context.Context, tenantID string) ([]*pb.DependentRequiredEntry, error) { + if s.validators == nil { + return nil, nil + } + raw, err := s.validators.GetDependentRequired(ctx, tenantID) + if err != nil { + return nil, err + } + return schema.UnmarshalDependentRequired(raw), nil +} + +// enforceDependentRequiredInTx evaluates the post-merge state of a config +// write against the schema's dependentRequired rules. Reads the full config +// snapshot at `version` from the same transaction that staged the writes +// (so the read sees the staged values via Postgres MVCC), builds the +// presence set, and runs schema.CheckDependentRequired. +// +// Returns a *dependentRequiredError on rule failure so the outer +// RunInTx caller can map to codes.InvalidArgument; returns the underlying +// store error verbatim on snapshot-read failure. +// +// No-op when `rules` is empty. +func (s *Service) enforceDependentRequiredInTx(ctx context.Context, tx Store, tenantID string, version int32, rules []*pb.DependentRequiredEntry) error { + if len(rules) == 0 { + return nil + } + rows, err := tx.GetFullConfigAtVersion(ctx, GetFullConfigAtVersionParams{ + TenantID: tenantID, + Version: version, + }) + if err != nil { + return fmt.Errorf("read snapshot for dependentRequired: %w", err) + } + present := make(map[string]struct{}, len(rows)) + for _, row := range rows { + if row.Value != nil { + present[row.FieldPath] = struct{}{} + } + } + if err := schema.CheckDependentRequired(rules, present); err != nil { + return &dependentRequiredError{err: err} + } + return nil +} + +// mapDependentRequiredErr converts a tx error into the right gRPC status: +// InvalidArgument when the error wraps *dependentRequiredError, the +// caller's fallback otherwise. Use after RunInTx returns. +func mapDependentRequiredErr(err error, fallback func() error) error { + var dre *dependentRequiredError + if errors.As(err, &dre) { + return status.Errorf(codes.InvalidArgument, "%v", dre.err) + } + return fallback() +} + // fieldTypeMap returns a map of field path -> domain field type for a tenant's schema. // Returns nil if validators are not configured (all fields treated as STRING). func (s *Service) fieldTypeMap(ctx context.Context, tenantID string) map[string]domain.FieldType { diff --git a/internal/config/store_pg.go b/internal/config/store_pg.go index 74a7a46a..0db2da59 100644 --- a/internal/config/store_pg.go +++ b/internal/config/store_pg.go @@ -28,6 +28,12 @@ func NewPGStore(writePool, readPool *pgxpool.Pool) *PGStore { } // RunInTx executes fn within a database transaction. +// +// Both write and read query handles are bound to the same transaction so +// that reads inside fn observe the transaction's own staged writes. This +// matters for cross-field validators (e.g. dependentRequired) that need to +// evaluate against the post-merge snapshot before commit — reading from +// the read pool would return pre-tx state and miss the new values. func (s *PGStore) RunInTx(ctx context.Context, fn func(Store) error) error { tx, err := s.writePool.Begin(ctx) if err != nil { @@ -35,10 +41,11 @@ func (s *PGStore) RunInTx(ctx context.Context, fn func(Store) error) error { } defer func() { _ = tx.Rollback(ctx) }() // no-op after commit + txQueries := s.write.WithTx(tx) txStore := &PGStore{ writePool: s.writePool, - write: s.write.WithTx(tx), - read: s.read, + write: txQueries, + read: txQueries, } if err := fn(txStore); err != nil { diff --git a/internal/schema/convert.go b/internal/schema/convert.go index f60da244..4a354b44 100644 --- a/internal/schema/convert.go +++ b/internal/schema/convert.go @@ -38,11 +38,8 @@ func schemaToProto(s domain.Schema, v domain.SchemaVersion, fields []domain.Sche if v.Description != nil { result.VersionDescription = *v.Description } - if len(v.DependentRequired) > 0 { - var entries []*pb.DependentRequiredEntry - if err := json.Unmarshal(v.DependentRequired, &entries); err == nil && len(entries) > 0 { - result.DependentRequired = entries - } + if entries := UnmarshalDependentRequired(v.DependentRequired); len(entries) > 0 { + result.DependentRequired = entries } return result } diff --git a/internal/schema/dependent_required.go b/internal/schema/dependent_required.go index 3f110b67..5fa1e565 100644 --- a/internal/schema/dependent_required.go +++ b/internal/schema/dependent_required.go @@ -90,40 +90,33 @@ func UnmarshalDependentRequired(raw []byte) []*pb.DependentRequiredEntry { return out } -// CheckDependentRequired evaluates all rules against a post-merge value -// snapshot. For each rule, if the trigger field has a non-null value in the -// snapshot, every dependent path must also have a non-null value. Missing -// keys in the snapshot are treated as null. Returns the first violation -// encountered, formatted with both trigger and dependent paths so the -// caller's error message names the offending fields. +// CheckDependentRequired evaluates all rules against a post-merge presence +// set. For each rule, if the trigger path is present (i.e. has a non-null +// value), every dependent path must also be present. Returns the first +// violation encountered, formatted with both trigger and dependent paths so +// the caller's error message names the offending fields. // // Designed to run inside the same transaction that stages the write — the -// snapshot must include all in-flight changes already merged on top of the -// pre-tx state. Race-safety relies on Postgres MVCC + the caller's +// presence set must reflect all in-flight changes already merged on top of +// the pre-tx state. Race-safety relies on Postgres MVCC + the caller's // CreateConfigVersion UNIQUE(tenant_id, version) constraint to serialize // concurrent writers. -func CheckDependentRequired(rules []*pb.DependentRequiredEntry, snapshot map[string]*pb.TypedValue) error { +// +// `present` membership semantics: a path is in the set iff it has a +// non-null value after merge. Missing keys are treated as null. +func CheckDependentRequired(rules []*pb.DependentRequiredEntry, present map[string]struct{}) error { if len(rules) == 0 { return nil } for _, rule := range rules { - tv, present := snapshot[rule.TriggerField] - if !present || isNullTypedValue(tv) { + if _, ok := present[rule.TriggerField]; !ok { continue } for _, dep := range rule.DependentFields { - depTV, depPresent := snapshot[dep] - if !depPresent || isNullTypedValue(depTV) { + if _, ok := present[dep]; !ok { return fmt.Errorf("dependentRequired: %q has a value but required dependent %q is null", rule.TriggerField, dep) } } } return nil } - -// isNullTypedValue treats both a nil TypedValue and one with no kind set as -// "null". The wire protocol uses an unset oneof to mean null (per -// types.proto: "An unset oneof (no field present) represents a null value"). -func isNullTypedValue(tv *pb.TypedValue) bool { - return tv == nil || tv.Kind == nil -} diff --git a/internal/schema/dependent_required_test.go b/internal/schema/dependent_required_test.go new file mode 100644 index 00000000..c6a5fa2f --- /dev/null +++ b/internal/schema/dependent_required_test.go @@ -0,0 +1,195 @@ +package schema + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pb "github.com/opendecree/decree/api/centralconfig/v1" +) + +// --- validateDependentRequiredAgainstFields --- + +func TestValidateDependentRequiredAgainstFields_Empty(t *testing.T) { + require.NoError(t, validateDependentRequiredAgainstFields(nil, nil)) + require.NoError(t, validateDependentRequiredAgainstFields([]*pb.DependentRequiredEntry{}, nil)) +} + +func TestValidateDependentRequiredAgainstFields_Valid(t *testing.T) { + fields := []*pb.SchemaField{ + {Path: "a", Type: pb.FieldType_FIELD_TYPE_STRING}, + {Path: "b", Type: pb.FieldType_FIELD_TYPE_STRING}, + {Path: "c", Type: pb.FieldType_FIELD_TYPE_STRING}, + } + entries := []*pb.DependentRequiredEntry{ + {TriggerField: "a", DependentFields: []string{"b", "c"}}, + } + require.NoError(t, validateDependentRequiredAgainstFields(entries, fields)) +} + +func TestValidateDependentRequiredAgainstFields_UnknownTrigger(t *testing.T) { + fields := []*pb.SchemaField{{Path: "a", Type: pb.FieldType_FIELD_TYPE_STRING}} + entries := []*pb.DependentRequiredEntry{ + {TriggerField: "missing", DependentFields: []string{"a"}}, + } + err := validateDependentRequiredAgainstFields(entries, fields) + require.Error(t, err) + assert.Contains(t, err.Error(), `"missing"`) + assert.Contains(t, err.Error(), "not a defined field") +} + +func TestValidateDependentRequiredAgainstFields_UnknownDependent(t *testing.T) { + fields := []*pb.SchemaField{{Path: "a", Type: pb.FieldType_FIELD_TYPE_STRING}} + entries := []*pb.DependentRequiredEntry{ + {TriggerField: "a", DependentFields: []string{"ghost"}}, + } + err := validateDependentRequiredAgainstFields(entries, fields) + require.Error(t, err) + assert.Contains(t, err.Error(), `"ghost"`) +} + +func TestValidateDependentRequiredAgainstFields_SelfReference(t *testing.T) { + fields := []*pb.SchemaField{{Path: "a", Type: pb.FieldType_FIELD_TYPE_STRING}} + entries := []*pb.DependentRequiredEntry{ + {TriggerField: "a", DependentFields: []string{"a"}}, + } + err := validateDependentRequiredAgainstFields(entries, fields) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot list itself") +} + +func TestValidateDependentRequiredAgainstFields_Duplicate(t *testing.T) { + fields := []*pb.SchemaField{ + {Path: "a", Type: pb.FieldType_FIELD_TYPE_STRING}, + {Path: "b", Type: pb.FieldType_FIELD_TYPE_STRING}, + } + entries := []*pb.DependentRequiredEntry{ + {TriggerField: "a", DependentFields: []string{"b", "b"}}, + } + err := validateDependentRequiredAgainstFields(entries, fields) + require.Error(t, err) + assert.Contains(t, err.Error(), "listed twice") +} + +// --- marshal / Unmarshal round-trip --- + +func TestMarshalUnmarshalDependentRequired_RoundTrip(t *testing.T) { + in := []*pb.DependentRequiredEntry{ + {TriggerField: "x", DependentFields: []string{"y", "z"}}, + {TriggerField: "a", DependentFields: []string{"b"}}, + } + raw, err := marshalDependentRequired(in) + require.NoError(t, err) + require.NotEmpty(t, raw) + + out := UnmarshalDependentRequired(raw) + require.Len(t, out, 2) + // Order preserved from input. + assert.Equal(t, "x", out[0].TriggerField) + assert.Equal(t, []string{"y", "z"}, out[0].DependentFields) + assert.Equal(t, "a", out[1].TriggerField) +} + +func TestMarshalDependentRequired_EmptyReturnsBracketArray(t *testing.T) { + raw, err := marshalDependentRequired(nil) + require.NoError(t, err) + assert.Equal(t, "[]", string(raw)) +} + +func TestUnmarshalDependentRequired_EmptyInputReturnsNil(t *testing.T) { + assert.Nil(t, UnmarshalDependentRequired(nil)) + assert.Nil(t, UnmarshalDependentRequired([]byte("[]"))) +} + +func TestUnmarshalDependentRequired_MalformedReturnsNil(t *testing.T) { + // Unparseable JSON degrades to "no rules" — never panics, never errors. + assert.Nil(t, UnmarshalDependentRequired([]byte("not json"))) +} + +// --- CheckDependentRequired --- + +func TestCheckDependentRequired_NoRules(t *testing.T) { + require.NoError(t, CheckDependentRequired(nil, nil)) + require.NoError(t, CheckDependentRequired([]*pb.DependentRequiredEntry{}, map[string]struct{}{"a": {}})) +} + +func TestCheckDependentRequired_TriggerAbsent_NoRequirement(t *testing.T) { + rules := []*pb.DependentRequiredEntry{ + {TriggerField: "trigger", DependentFields: []string{"dep"}}, + } + // Trigger not in presence set — rule does not fire even though dep is also absent. + require.NoError(t, CheckDependentRequired(rules, map[string]struct{}{})) +} + +func TestCheckDependentRequired_TriggerPresent_DependentsPresent_OK(t *testing.T) { + rules := []*pb.DependentRequiredEntry{ + {TriggerField: "trigger", DependentFields: []string{"a", "b"}}, + } + present := map[string]struct{}{"trigger": {}, "a": {}, "b": {}} + require.NoError(t, CheckDependentRequired(rules, present)) +} + +func TestCheckDependentRequired_TriggerPresent_DependentMissing_Fails(t *testing.T) { + rules := []*pb.DependentRequiredEntry{ + {TriggerField: "trigger", DependentFields: []string{"a", "b"}}, + } + present := map[string]struct{}{"trigger": {}, "a": {}} + err := CheckDependentRequired(rules, present) + require.Error(t, err) + assert.Contains(t, err.Error(), `"trigger"`) + assert.Contains(t, err.Error(), `"b"`) +} + +func TestCheckDependentRequired_FirstViolationReturned(t *testing.T) { + rules := []*pb.DependentRequiredEntry{ + {TriggerField: "t1", DependentFields: []string{"d1"}}, + {TriggerField: "t2", DependentFields: []string{"d2"}}, + } + // Both rules violate — only first reported. + present := map[string]struct{}{"t1": {}, "t2": {}} + err := CheckDependentRequired(rules, present) + require.Error(t, err) + assert.Contains(t, err.Error(), `"t1"`) +} + +// --- proto <-> YAML conversion helpers --- + +func TestYamlToProtoDependentRequired_StableOrder(t *testing.T) { + // Map iteration is randomized in Go; the converter must sort triggers and + // dependents so the proto/wire form is deterministic. + in := map[string][]string{ + "z": {"c", "a", "b"}, + "a": {"y"}, + } + out := yamlToProtoDependentRequired(in) + require.Len(t, out, 2) + assert.Equal(t, "a", out[0].TriggerField) + assert.Equal(t, []string{"y"}, out[0].DependentFields) + assert.Equal(t, "z", out[1].TriggerField) + assert.Equal(t, []string{"a", "b", "c"}, out[1].DependentFields) +} + +func TestYamlToProtoDependentRequired_EmptyReturnsNil(t *testing.T) { + assert.Nil(t, yamlToProtoDependentRequired(nil)) + assert.Nil(t, yamlToProtoDependentRequired(map[string][]string{})) +} + +func TestProtoDependentRequiredToYAML_RoundTrip(t *testing.T) { + entries := []*pb.DependentRequiredEntry{ + {TriggerField: "a", DependentFields: []string{"b", "c"}}, + } + yaml := protoDependentRequiredToYAML(entries) + require.Len(t, yaml, 1) + assert.Equal(t, []string{"b", "c"}, yaml["a"]) + + back := yamlToProtoDependentRequired(yaml) + require.Len(t, back, 1) + assert.Equal(t, "a", back[0].TriggerField) + assert.Equal(t, []string{"b", "c"}, back[0].DependentFields) +} + +func TestProtoDependentRequiredToYAML_EmptyReturnsNil(t *testing.T) { + assert.Nil(t, protoDependentRequiredToYAML(nil)) + assert.Nil(t, protoDependentRequiredToYAML([]*pb.DependentRequiredEntry{})) +} diff --git a/internal/schema/service.go b/internal/schema/service.go index 9d060e02..83a9c82b 100644 --- a/internal/schema/service.go +++ b/internal/schema/service.go @@ -73,19 +73,22 @@ func containsStr(slice []string, s string) bool { // Service implements the SchemaService gRPC server. type Service struct { pb.UnimplementedSchemaServiceServer - store Store - logger *slog.Logger - metrics *telemetry.SchemaMetrics - validatorCache *validation.ValidatorCache + store Store + logger *slog.Logger + metrics *telemetry.SchemaMetrics + validator *validation.ValidatorFactory } -// NewService creates a new SchemaService. -func NewService(store Store, logger *slog.Logger, metrics *telemetry.SchemaMetrics, validatorCache *validation.ValidatorCache) *Service { +// NewService creates a new SchemaService. The validator factory may be nil +// for tests that do not exercise tenant updates; production callers should +// pass the same factory the config service uses so cache invalidation is +// observed by both. +func NewService(store Store, logger *slog.Logger, metrics *telemetry.SchemaMetrics, validator *validation.ValidatorFactory) *Service { return &Service{ - store: store, - logger: logger, - metrics: metrics, - validatorCache: validatorCache, + store: store, + logger: logger, + metrics: metrics, + validator: validator, } } @@ -474,9 +477,12 @@ func (s *Service) UpdateTenant(ctx context.Context, req *pb.UpdateTenantRequest) } return nil, status.Error(codes.Internal, "failed to update tenant schema version") } - // Invalidate cached validators — tenant now uses different field definitions. - if s.validatorCache != nil { - s.validatorCache.Invalidate(tenantID) + // Invalidate cached validators and dependentRequired rules — the tenant + // now binds a different schema version, so both per-field validators + // and the cross-field rule list must be refetched on next use. + if s.validator != nil { + s.validator.Cache().Invalidate(tenantID) + s.validator.InvalidateRules(tenantID) } } diff --git a/internal/schema/service_extended_test.go b/internal/schema/service_extended_test.go index 578957c3..fbb39397 100644 --- a/internal/schema/service_extended_test.go +++ b/internal/schema/service_extended_test.go @@ -405,10 +405,28 @@ func TestUpdateTenant_NoFieldsUpdated_NotFound(t *testing.T) { assert.Equal(t, codes.NotFound, status.Code(err)) } +// validatorStoreAdapter bridges schema's mockStore (with +// GetSchemaVersionParams) to validation.Store (which expects +// domain.SchemaVersionKey). Only methods exercised by ValidatorFactory +// are forwarded; the rest are unused in this test. +type validatorStoreAdapter struct{ s *mockStore } + +func (a *validatorStoreAdapter) GetTenantByID(ctx context.Context, id string) (domain.Tenant, error) { + return a.s.GetTenantByID(ctx, id) +} + +func (a *validatorStoreAdapter) GetSchemaVersion(ctx context.Context, k domain.SchemaVersionKey) (domain.SchemaVersion, error) { + return a.s.GetSchemaVersion(ctx, GetSchemaVersionParams{SchemaID: k.SchemaID, Version: k.Version}) +} + +func (a *validatorStoreAdapter) GetSchemaFields(ctx context.Context, schemaVersionID string) ([]domain.SchemaField, error) { + return a.s.GetSchemaFields(ctx, schemaVersionID) +} + func TestUpdateTenant_SchemaVersionInvalidatesCache(t *testing.T) { store := &mockStore{} - cache := validation.NewValidatorCache(0) - svc := NewService(store, testLogger, nil, cache) + factory := validation.NewValidatorFactory(&validatorStoreAdapter{s: store}) + svc := NewService(store, testLogger, nil, factory) newVersion := int32(2) updated := testTenant() diff --git a/internal/schema/yaml_test.go b/internal/schema/yaml_test.go index c60a8107..2f1350e5 100644 --- a/internal/schema/yaml_test.go +++ b/internal/schema/yaml_test.go @@ -12,6 +12,68 @@ import ( func ptr[T any](v T) *T { return &v } +func TestUnmarshalSchemaYAML_DependentRequired_Valid(t *testing.T) { + doc, err := unmarshalSchemaYAML([]byte(` +spec_version: v1 +name: payments +fields: + payments.refunds_enabled: + type: bool + payments.refund_window: + type: duration + nullable: true +dependentRequired: + payments.refunds_enabled: [payments.refund_window] +`)) + require.NoError(t, err) + require.NotNil(t, doc) + require.Len(t, doc.DependentRequired, 1) + assert.Equal(t, []string{"payments.refund_window"}, doc.DependentRequired["payments.refunds_enabled"]) +} + +func TestUnmarshalSchemaYAML_DependentRequired_RejectsUnknownTrigger(t *testing.T) { + _, err := unmarshalSchemaYAML([]byte(` +spec_version: v1 +name: payments +fields: + payments.a: + type: string +dependentRequired: + payments.ghost: [payments.a] +`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "ghost") + assert.Contains(t, err.Error(), "not a defined field") +} + +func TestUnmarshalSchemaYAML_DependentRequired_RejectsUnknownDependent(t *testing.T) { + _, err := unmarshalSchemaYAML([]byte(` +spec_version: v1 +name: payments +fields: + payments.a: + type: string +dependentRequired: + payments.a: [payments.ghost] +`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "ghost") +} + +func TestUnmarshalSchemaYAML_DependentRequired_RejectsSelfReference(t *testing.T) { + _, err := unmarshalSchemaYAML([]byte(` +spec_version: v1 +name: payments +fields: + payments.a: + type: string +dependentRequired: + payments.a: [payments.a] +`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot list itself") +} + func TestYAMLRoundtrip(t *testing.T) { original := &pb.Schema{ Id: "test-id", diff --git a/internal/server/memory_integration_test.go b/internal/server/memory_integration_test.go index 3c6332ef..2b25c8ca 100644 --- a/internal/server/memory_integration_test.go +++ b/internal/server/memory_integration_test.go @@ -52,7 +52,7 @@ func TestMemoryBackend_Integration(t *testing.T) { } validatorFactory := validation.NewValidatorFactory(validatorStore) - schemaSvc := schema.NewService(memSchema, slog.Default(), telemetry.NewSchemaMetrics(telemetry.Config{}), validatorFactory.Cache()) + schemaSvc := schema.NewService(memSchema, slog.Default(), telemetry.NewSchemaMetrics(telemetry.Config{}), validatorFactory) pb.RegisterSchemaServiceServer(srv.GRPCServer(), schemaSvc) configSvc := config.NewService(config.ServiceConfig{ diff --git a/internal/storage/domain/types.go b/internal/storage/domain/types.go index b85464c8..fde835de 100644 --- a/internal/storage/domain/types.go +++ b/internal/storage/domain/types.go @@ -37,13 +37,13 @@ type Schema struct { // SchemaVersion represents a specific version of a schema. type SchemaVersion struct { - ID string - SchemaID string - Version int32 - ParentVersion *int32 - Description *string - Checksum string - Published bool + ID string + SchemaID string + Version int32 + ParentVersion *int32 + Description *string + Checksum string + Published bool // DependentRequired holds the JSON-encoded list of cross-field "B // required when A present" rules. Empty array when no rules exist. // Wire shape: [{trigger_field, dependent_fields[]}]. diff --git a/internal/validation/factory.go b/internal/validation/factory.go index cd851477..3bea6d57 100644 --- a/internal/validation/factory.go +++ b/internal/validation/factory.go @@ -3,6 +3,7 @@ package validation import ( "context" "encoding/json" + "sync" pb "github.com/opendecree/decree/api/centralconfig/v1" "github.com/opendecree/decree/internal/storage/domain" @@ -20,8 +21,9 @@ type Store interface { // ValidatorFactory builds and caches field validators per tenant. type ValidatorFactory struct { - store Store - cache *ValidatorCache + store Store + cache *ValidatorCache + rulesCache sync.Map // tenantID → []byte (raw dependent_required JSON) } // NewValidatorFactory creates a new validator factory. @@ -37,6 +39,41 @@ func (f *ValidatorFactory) Cache() *ValidatorCache { return f.cache } +// InvalidateRules drops the cached dependentRequired bytes for a tenant. +// Call this alongside Cache().Invalidate() whenever a tenant's schema +// version changes. +func (f *ValidatorFactory) InvalidateRules(tenantID string) { + f.rulesCache.Delete(tenantID) +} + +// GetDependentRequired returns the raw JSON-encoded dependentRequired rules +// for a tenant's bound schema version. Returns nil bytes for "no rules"; +// callers should treat that as a no-op. Cached per tenant; invalidate via +// InvalidateRules when the tenant's schema binding changes. +// +// Returns []byte rather than the decoded proto type so the validation +// package does not have to import internal/schema for the unmarshal helper +// (avoiding a circular import). Decode at the call site. +func (f *ValidatorFactory) GetDependentRequired(ctx context.Context, tenantID string) ([]byte, error) { + if v, ok := f.rulesCache.Load(tenantID); ok { + return v.([]byte), nil + } + tenant, err := f.store.GetTenantByID(ctx, tenantID) + if err != nil { + return nil, err + } + sv, err := f.store.GetSchemaVersion(ctx, domain.SchemaVersionKey{ + SchemaID: tenant.SchemaID, + Version: tenant.SchemaVersion, + }) + if err != nil { + return nil, err + } + raw := sv.DependentRequired + f.rulesCache.Store(tenantID, raw) + return raw, nil +} + // GetValidators returns validators for a tenant's schema fields. // Results are cached per tenant ID. Returns an error if the tenant or schema is not found. func (f *ValidatorFactory) GetValidators(ctx context.Context, tenantID string) (map[string]*FieldValidator, error) { diff --git a/internal/validation/factory_concurrent_test.go b/internal/validation/factory_concurrent_test.go new file mode 100644 index 00000000..ece1c6a3 --- /dev/null +++ b/internal/validation/factory_concurrent_test.go @@ -0,0 +1,127 @@ +package validation + +import ( + "context" + "sync/atomic" + "testing" + "testing/synctest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/opendecree/decree/internal/storage/domain" +) + +// countingStore counts every GetSchemaVersion call so concurrent +// GetDependentRequired callers can verify exactly one DB hit per tenant — +// proving the rules cache deduplicates correctly. +// +// Implements only the subset of validation.Store that +// GetDependentRequired hits. +type countingStore struct { + tenant domain.Tenant + sv domain.SchemaVersion + tenantHits atomic.Int32 + svHits atomic.Int32 +} + +func (s *countingStore) GetTenantByID(_ context.Context, _ string) (domain.Tenant, error) { + s.tenantHits.Add(1) + return s.tenant, nil +} + +func (s *countingStore) GetSchemaVersion(_ context.Context, _ domain.SchemaVersionKey) (domain.SchemaVersion, error) { + s.svHits.Add(1) + return s.sv, nil +} + +func (s *countingStore) GetSchemaFields(_ context.Context, _ string) ([]domain.SchemaField, error) { + return nil, nil +} + +// TestGetDependentRequired_ConcurrentSameTenant_SinglePopulate uses +// testing/synctest to run many goroutines concurrently against +// GetDependentRequired for the same tenant. Verifies the rules cache +// dedupes such that the underlying store sees a small constant number of +// hits, not one per caller. +// +// Note: sync.Map admits multiple concurrent populators on a cold cache — +// every caller that arrives before any other has stored a value will run +// the load itself. We therefore assert the hit counts are bounded by the +// number of goroutines (logically obvious) and that all callers receive +// the same bytes back. +func TestGetDependentRequired_ConcurrentSameTenant_SinglePopulate(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + store := &countingStore{ + tenant: domain.Tenant{ID: "t1", SchemaID: "s1", SchemaVersion: 1}, + sv: domain.SchemaVersion{ + ID: "sv1", + SchemaID: "s1", + Version: 1, + DependentRequired: []byte(`[{"trigger_field":"a","dependent_fields":["b"]}]`), + }, + } + f := NewValidatorFactory(store) + + const goroutines = 50 + results := make(chan []byte, goroutines) + ctx := context.Background() + for range goroutines { + go func() { + raw, err := f.GetDependentRequired(ctx, "t1") + require.NoError(t, err) + results <- raw + }() + } + + // Wait for all goroutines launched in this synctest bubble to be + // durably blocked (here: blocked sending on the unbuffered-ish + // channel after returning) before draining. + synctest.Wait() + + // Drain. Every caller saw the same bytes. + expected := store.sv.DependentRequired + for range goroutines { + got := <-results + assert.Equal(t, expected, got) + } + + // Hit counts: at minimum one full population, at most goroutines + // (cold-cache races). Concrete bound: well below goroutines — + // after the first populator wins the store, the rest hit the cache. + // This is the value of caching; assert it actually applies. + assert.GreaterOrEqual(t, store.svHits.Load(), int32(1)) + assert.LessOrEqual(t, store.svHits.Load(), int32(goroutines), + "populator count must be bounded by caller count") + }) +} + +// TestInvalidateRules_ForcesRepopulate verifies that InvalidateRules +// causes the next GetDependentRequired to hit the store again. +func TestInvalidateRules_ForcesRepopulate(t *testing.T) { + store := &countingStore{ + tenant: domain.Tenant{ID: "t1", SchemaID: "s1", SchemaVersion: 1}, + sv: domain.SchemaVersion{ + ID: "sv1", + SchemaID: "s1", + Version: 1, + DependentRequired: []byte(`[]`), + }, + } + f := NewValidatorFactory(store) + + _, err := f.GetDependentRequired(context.Background(), "t1") + require.NoError(t, err) + hitsAfterFirst := store.svHits.Load() + + // Cache hit — no new store call. + _, err = f.GetDependentRequired(context.Background(), "t1") + require.NoError(t, err) + assert.Equal(t, hitsAfterFirst, store.svHits.Load()) + + // Invalidate forces repopulate. + f.InvalidateRules("t1") + _, err = f.GetDependentRequired(context.Background(), "t1") + require.NoError(t, err) + assert.Equal(t, hitsAfterFirst+1, store.svHits.Load()) +} From 1a54e28ab1f2ffe689ba6f77d6cddaf5b6ea9c2a Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 27 Apr 2026 18:37:20 +0300 Subject: [PATCH 3/3] chore(docs): regenerate api-reference.md for DependentRequiredEntry Generated by make generate docs after adding the proto message in the parent commit. Required by CI's "Docs check" gate. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/api/api-reference.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/docs/api/api-reference.md b/docs/api/api-reference.md index fe11a0d9..7466ecf4 100644 --- a/docs/api/api-reference.md +++ b/docs/api/api-reference.md @@ -9,6 +9,7 @@ - [ConfigChange](#centralconfig-v1-ConfigChange) - [ConfigValue](#centralconfig-v1-ConfigValue) - [ConfigVersion](#centralconfig-v1-ConfigVersion) + - [DependentRequiredEntry](#centralconfig-v1-DependentRequiredEntry) - [ExternalDocs](#centralconfig-v1-ExternalDocs) - [FieldConstraints](#centralconfig-v1-FieldConstraints) - [FieldExample](#centralconfig-v1-FieldExample) @@ -225,6 +226,27 @@ full config at any version is the union of all deltas up to that version. + + +### DependentRequiredEntry +DependentRequiredEntry encodes one cross-field requirement: when the +trigger field has a non-null value, every dependent field path must also +have a non-null value. This is the proto wire form of JSON Schema 2020-12 +dependentRequired, which uses a `map<path, list<path>>` shape — proto +maps cannot hold repeated values directly, so we use a repeated list of +entries. + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| trigger_field | [string](#string) | | Field path whose presence triggers the requirement. | +| dependent_fields | [string](#string) | repeated | Field paths that must be present when the trigger has a non-null value. | + + + + + + ### ExternalDocs @@ -321,6 +343,7 @@ Each schema is versioned — updates create new immutable versions. | fields | [SchemaField](#centralconfig-v1-SchemaField) | repeated | The fields defined in this schema version. | | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | When this version was created. | | info | [SchemaInfo](#centralconfig-v1-SchemaInfo) | | Optional schema metadata: ownership, contact, labels. | +| dependent_required | [DependentRequiredEntry](#centralconfig-v1-DependentRequiredEntry) | repeated | Cross-field "B required when A present" rules. Each entry declares one trigger field whose presence (non-null value) makes a list of dependent field paths required (also non-null). Equivalent to JSON Schema 2020-12 dependentRequired, scoped to schema-level cross-field requirement. Lint-checked at ImportSchema time (every path must reference a real field; trigger may not appear in its own dependents). Enforced at every config write against the post-merge snapshot. |