diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index 6e432996a3..6f6a8b705b 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -106,6 +106,10 @@ func (c *client) StreamEvents(_ context.Context, _ ...grpc.CallOption) (grpc.Bid return nil, fmt.Errorf("not implemented: StreamEvents is experimental and not supported yet") } +func (c *client) RegisterSchema(ctx context.Context, in *pb.RegisterSchemaRequest, opts ...grpc.CallOption) (*pb.RegisterSchemaResponse, error) { + return c.client.RegisterSchema(ctx, in, opts...) +} + func (c *client) Close() error { return c.conn.Close() } diff --git a/pkg/chipingress/mocks/client.go b/pkg/chipingress/mocks/client.go index a3829e4177..f8542e3882 100644 --- a/pkg/chipingress/mocks/client.go +++ b/pkg/chipingress/mocks/client.go @@ -294,6 +294,80 @@ func (_c *Client_PublishBatch_Call) RunAndReturn(run func(context.Context, *pb.C return _c } +// RegisterSchema provides a mock function with given fields: ctx, in, opts +func (_m *Client) RegisterSchema(ctx context.Context, in *pb.RegisterSchemaRequest, opts ...grpc.CallOption) (*pb.RegisterSchemaResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for RegisterSchema") + } + + var r0 *pb.RegisterSchemaResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *pb.RegisterSchemaRequest, ...grpc.CallOption) (*pb.RegisterSchemaResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *pb.RegisterSchemaRequest, ...grpc.CallOption) *pb.RegisterSchemaResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pb.RegisterSchemaResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *pb.RegisterSchemaRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_RegisterSchema_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterSchema' +type Client_RegisterSchema_Call struct { + *mock.Call +} + +// RegisterSchema is a helper method to define mock.On call +// - ctx context.Context +// - in *pb.RegisterSchemaRequest +// - opts ...grpc.CallOption +func (_e *Client_Expecter) RegisterSchema(ctx interface{}, in interface{}, opts ...interface{}) *Client_RegisterSchema_Call { + return &Client_RegisterSchema_Call{Call: _e.mock.On("RegisterSchema", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *Client_RegisterSchema_Call) Run(run func(ctx context.Context, in *pb.RegisterSchemaRequest, opts ...grpc.CallOption)) *Client_RegisterSchema_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*pb.RegisterSchemaRequest), variadicArgs...) + }) + return _c +} + +func (_c *Client_RegisterSchema_Call) Return(_a0 *pb.RegisterSchemaResponse, _a1 error) *Client_RegisterSchema_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_RegisterSchema_Call) RunAndReturn(run func(context.Context, *pb.RegisterSchemaRequest, ...grpc.CallOption) (*pb.RegisterSchemaResponse, error)) *Client_RegisterSchema_Call { + _c.Call.Return(run) + return _c +} + // StreamEvents provides a mock function with given fields: ctx, opts func (_m *Client) StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[pb.StreamEventsRequest, pb.StreamEventsResponse], error) { _va := make([]interface{}, len(opts)) diff --git a/pkg/chipingress/pb/chip_common.pb.go b/pkg/chipingress/pb/chip_common.pb.go new file mode 100644 index 0000000000..33be5ae2cb --- /dev/null +++ b/pkg/chipingress/pb/chip_common.pb.go @@ -0,0 +1,584 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v5.29.3 +// source: pb/chip_common.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Enum for different schema types +type SchemaType int32 + +const ( + SchemaType_PROTOBUF SchemaType = 0 + SchemaType_AVRO SchemaType = 1 + SchemaType_JSON SchemaType = 2 +) + +// Enum value maps for SchemaType. +var ( + SchemaType_name = map[int32]string{ + 0: "PROTOBUF", + 1: "AVRO", + 2: "JSON", + } + SchemaType_value = map[string]int32{ + "PROTOBUF": 0, + "AVRO": 1, + "JSON": 2, + } +) + +func (x SchemaType) Enum() *SchemaType { + p := new(SchemaType) + *p = x + return p +} + +func (x SchemaType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (SchemaType) Descriptor() protoreflect.EnumDescriptor { + return file_pb_chip_common_proto_enumTypes[0].Descriptor() +} + +func (SchemaType) Type() protoreflect.EnumType { + return &file_pb_chip_common_proto_enumTypes[0] +} + +func (x SchemaType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use SchemaType.Descriptor instead. +func (SchemaType) EnumDescriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{0} +} + +// Enum for different path types +type PathType int32 + +const ( + PathType_S3 PathType = 0 // S3 storage + PathType_LOCAL PathType = 1 // Local file system + PathType_GITHUB PathType = 3 // GitHub storage + PathType_OTHER PathType = 4 // Other storage types +) + +// Enum value maps for PathType. +var ( + PathType_name = map[int32]string{ + 0: "S3", + 1: "LOCAL", + 3: "GITHUB", + 4: "OTHER", + } + PathType_value = map[string]int32{ + "S3": 0, + "LOCAL": 1, + "GITHUB": 3, + "OTHER": 4, + } +) + +func (x PathType) Enum() *PathType { + p := new(PathType) + *p = x + return p +} + +func (x PathType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (PathType) Descriptor() protoreflect.EnumDescriptor { + return file_pb_chip_common_proto_enumTypes[1].Descriptor() +} + +func (PathType) Type() protoreflect.EnumType { + return &file_pb_chip_common_proto_enumTypes[1] +} + +func (x PathType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use PathType.Descriptor instead. +func (PathType) EnumDescriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{1} +} + +// Schema definition +type Schema struct { + state protoimpl.MessageState `protogen:"open.v1"` + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + Path *Path `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` // S3 location or other storage path + Format SchemaType `protobuf:"varint,4,opt,name=format,proto3,enum=chip_common.SchemaType" json:"format,omitempty"` // Format of the schema, e.g. PROTOBUF, AVRO, JSON + Schema string `protobuf:"bytes,5,opt,name=schema,proto3" json:"schema,omitempty"` // Raw proto/avro schema with annotations + References []*SchemaReference `protobuf:"bytes,6,rep,name=references,proto3" json:"references,omitempty"` // References to other schemas + Metadata *MetaData `protobuf:"bytes,7,opt,name=metadata,proto3" json:"metadata,omitempty"` // optional metadata information + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Schema) Reset() { + *x = Schema{} + mi := &file_pb_chip_common_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Schema) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Schema) ProtoMessage() {} + +func (x *Schema) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_common_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Schema.ProtoReflect.Descriptor instead. +func (*Schema) Descriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{0} +} + +func (x *Schema) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *Schema) GetPath() *Path { + if x != nil { + return x.Path + } + return nil +} + +func (x *Schema) GetFormat() SchemaType { + if x != nil { + return x.Format + } + return SchemaType_PROTOBUF +} + +func (x *Schema) GetSchema() string { + if x != nil { + return x.Schema + } + return "" +} + +func (x *Schema) GetReferences() []*SchemaReference { + if x != nil { + return x.References + } + return nil +} + +func (x *Schema) GetMetadata() *MetaData { + if x != nil { + return x.Metadata + } + return nil +} + +// Message to reference other schemas +type SchemaReference struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Subject string `protobuf:"bytes,2,opt,name=subject,proto3" json:"subject,omitempty"` + Version int32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SchemaReference) Reset() { + *x = SchemaReference{} + mi := &file_pb_chip_common_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SchemaReference) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SchemaReference) ProtoMessage() {} + +func (x *SchemaReference) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_common_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SchemaReference.ProtoReflect.Descriptor instead. +func (*SchemaReference) Descriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{1} +} + +func (x *SchemaReference) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *SchemaReference) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *SchemaReference) GetVersion() int32 { + if x != nil { + return x.Version + } + return 0 +} + +// Metadata for the schema, currently includes storage information +type MetaData struct { + state protoimpl.MessageState `protogen:"open.v1"` + Stores map[string]*Store `protobuf:"bytes,1,rep,name=stores,proto3" json:"stores,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // List of datastores the schema should be stored in. Map of store (hot, cold or warm) to Store objects. + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MetaData) Reset() { + *x = MetaData{} + mi := &file_pb_chip_common_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MetaData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetaData) ProtoMessage() {} + +func (x *MetaData) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_common_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetaData.ProtoReflect.Descriptor instead. +func (*MetaData) Descriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{2} +} + +func (x *MetaData) GetStores() map[string]*Store { + if x != nil { + return x.Stores + } + return nil +} + +// Each store contains index and partition information +type Store struct { + state protoimpl.MessageState `protogen:"open.v1"` + Index []string `protobuf:"bytes,1,rep,name=index,proto3" json:"index,omitempty"` + Partition []string `protobuf:"bytes,2,rep,name=partition,proto3" json:"partition,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Store) Reset() { + *x = Store{} + mi := &file_pb_chip_common_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Store) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Store) ProtoMessage() {} + +func (x *Store) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_common_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Store.ProtoReflect.Descriptor instead. +func (*Store) Descriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{3} +} + +func (x *Store) GetIndex() []string { + if x != nil { + return x.Index + } + return nil +} + +func (x *Store) GetPartition() []string { + if x != nil { + return x.Partition + } + return nil +} + +// Path definition +type Path struct { + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` // S3 location or other storage path + Type PathType `protobuf:"varint,2,opt,name=type,proto3,enum=chip_common.PathType" json:"type,omitempty"` // Type of PATH storage (S3, LOCAL, OTHER) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Path) Reset() { + *x = Path{} + mi := &file_pb_chip_common_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Path) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Path) ProtoMessage() {} + +func (x *Path) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_common_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Path.ProtoReflect.Descriptor instead. +func (*Path) Descriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{4} +} + +func (x *Path) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *Path) GetType() PathType { + if x != nil { + return x.Type + } + return PathType_S3 +} + +// Response message for schema registration +type RegisteredSchema struct { + state protoimpl.MessageState `protogen:"open.v1"` + Subject string `protobuf:"bytes,1,opt,name=subject,proto3" json:"subject,omitempty"` + Version int32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisteredSchema) Reset() { + *x = RegisteredSchema{} + mi := &file_pb_chip_common_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisteredSchema) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisteredSchema) ProtoMessage() {} + +func (x *RegisteredSchema) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_common_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisteredSchema.ProtoReflect.Descriptor instead. +func (*RegisteredSchema) Descriptor() ([]byte, []int) { + return file_pb_chip_common_proto_rawDescGZIP(), []int{5} +} + +func (x *RegisteredSchema) GetSubject() string { + if x != nil { + return x.Subject + } + return "" +} + +func (x *RegisteredSchema) GetVersion() int32 { + if x != nil { + return x.Version + } + return 0 +} + +var File_pb_chip_common_proto protoreflect.FileDescriptor + +const file_pb_chip_common_proto_rawDesc = "" + + "\n" + + "\x14pb/chip_common.proto\x12\vchip_common\"\x83\x02\n" + + "\x06Schema\x12\x18\n" + + "\asubject\x18\x01 \x01(\tR\asubject\x12%\n" + + "\x04path\x18\x02 \x01(\v2\x11.chip_common.PathR\x04path\x12/\n" + + "\x06format\x18\x04 \x01(\x0e2\x17.chip_common.SchemaTypeR\x06format\x12\x16\n" + + "\x06schema\x18\x05 \x01(\tR\x06schema\x12<\n" + + "\n" + + "references\x18\x06 \x03(\v2\x1c.chip_common.SchemaReferenceR\n" + + "references\x121\n" + + "\bmetadata\x18\a \x01(\v2\x15.chip_common.MetaDataR\bmetadata\"Y\n" + + "\x0fSchemaReference\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x18\n" + + "\asubject\x18\x02 \x01(\tR\asubject\x12\x18\n" + + "\aversion\x18\x03 \x01(\x05R\aversion\"\x94\x01\n" + + "\bMetaData\x129\n" + + "\x06stores\x18\x01 \x03(\v2!.chip_common.MetaData.StoresEntryR\x06stores\x1aM\n" + + "\vStoresEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12(\n" + + "\x05value\x18\x02 \x01(\v2\x12.chip_common.StoreR\x05value:\x028\x01\";\n" + + "\x05Store\x12\x14\n" + + "\x05index\x18\x01 \x03(\tR\x05index\x12\x1c\n" + + "\tpartition\x18\x02 \x03(\tR\tpartition\"E\n" + + "\x04Path\x12\x12\n" + + "\x04path\x18\x01 \x01(\tR\x04path\x12)\n" + + "\x04type\x18\x02 \x01(\x0e2\x15.chip_common.PathTypeR\x04type\"F\n" + + "\x10RegisteredSchema\x12\x18\n" + + "\asubject\x18\x01 \x01(\tR\asubject\x12\x18\n" + + "\aversion\x18\x02 \x01(\x05R\aversion*.\n" + + "\n" + + "SchemaType\x12\f\n" + + "\bPROTOBUF\x10\x00\x12\b\n" + + "\x04AVRO\x10\x01\x12\b\n" + + "\x04JSON\x10\x02*4\n" + + "\bPathType\x12\x06\n" + + "\x02S3\x10\x00\x12\t\n" + + "\x05LOCAL\x10\x01\x12\n" + + "\n" + + "\x06GITHUB\x10\x03\x12\t\n" + + "\x05OTHER\x10\x04B\x06Z\x04./pbb\x06proto3" + +var ( + file_pb_chip_common_proto_rawDescOnce sync.Once + file_pb_chip_common_proto_rawDescData []byte +) + +func file_pb_chip_common_proto_rawDescGZIP() []byte { + file_pb_chip_common_proto_rawDescOnce.Do(func() { + file_pb_chip_common_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pb_chip_common_proto_rawDesc), len(file_pb_chip_common_proto_rawDesc))) + }) + return file_pb_chip_common_proto_rawDescData +} + +var file_pb_chip_common_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_pb_chip_common_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_pb_chip_common_proto_goTypes = []any{ + (SchemaType)(0), // 0: chip_common.SchemaType + (PathType)(0), // 1: chip_common.PathType + (*Schema)(nil), // 2: chip_common.Schema + (*SchemaReference)(nil), // 3: chip_common.SchemaReference + (*MetaData)(nil), // 4: chip_common.MetaData + (*Store)(nil), // 5: chip_common.Store + (*Path)(nil), // 6: chip_common.Path + (*RegisteredSchema)(nil), // 7: chip_common.RegisteredSchema + nil, // 8: chip_common.MetaData.StoresEntry +} +var file_pb_chip_common_proto_depIdxs = []int32{ + 6, // 0: chip_common.Schema.path:type_name -> chip_common.Path + 0, // 1: chip_common.Schema.format:type_name -> chip_common.SchemaType + 3, // 2: chip_common.Schema.references:type_name -> chip_common.SchemaReference + 4, // 3: chip_common.Schema.metadata:type_name -> chip_common.MetaData + 8, // 4: chip_common.MetaData.stores:type_name -> chip_common.MetaData.StoresEntry + 1, // 5: chip_common.Path.type:type_name -> chip_common.PathType + 5, // 6: chip_common.MetaData.StoresEntry.value:type_name -> chip_common.Store + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_pb_chip_common_proto_init() } +func file_pb_chip_common_proto_init() { + if File_pb_chip_common_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_chip_common_proto_rawDesc), len(file_pb_chip_common_proto_rawDesc)), + NumEnums: 2, + NumMessages: 7, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_pb_chip_common_proto_goTypes, + DependencyIndexes: file_pb_chip_common_proto_depIdxs, + EnumInfos: file_pb_chip_common_proto_enumTypes, + MessageInfos: file_pb_chip_common_proto_msgTypes, + }.Build() + File_pb_chip_common_proto = out.File + file_pb_chip_common_proto_goTypes = nil + file_pb_chip_common_proto_depIdxs = nil +} diff --git a/pkg/chipingress/pb/chip_common.proto b/pkg/chipingress/pb/chip_common.proto new file mode 100644 index 0000000000..7a79f9933a --- /dev/null +++ b/pkg/chipingress/pb/chip_common.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package chip_common; + +option go_package = "./pb"; + +// Schema definition +message Schema { + string subject = 1; + Path path = 2; // S3 location or other storage path + SchemaType format = 4; // Format of the schema, e.g. PROTOBUF, AVRO, JSON + string schema = 5; // Raw proto/avro schema with annotations + repeated SchemaReference references = 6; // References to other schemas + MetaData metadata = 7; // optional metadata information +} + +// Enum for different schema types +enum SchemaType { + PROTOBUF = 0; + AVRO = 1; + JSON = 2; +} + +// Message to reference other schemas +message SchemaReference { + string name = 1; + string subject = 2; + int32 version = 3; +} + +// Metadata for the schema, currently includes storage information +message MetaData { + map stores = 1; // List of datastores the schema should be stored in. Map of store (hot, cold or warm) to Store objects. +} + +// Each store contains index and partition information +message Store { + repeated string index = 1; + repeated string partition = 2; +} + +// Path definition +message Path { + string path = 1; // S3 location or other storage path + PathType type = 2; // Type of PATH storage (S3, LOCAL, OTHER) +} + +// Enum for different path types +enum PathType { + S3 = 0; // S3 storage + LOCAL = 1; // Local file system + GITHUB = 2; // GitHub storage + OTHER = 3; // Other storage types +} + +// Response message for schema registration +message RegisteredSchema { + string subject = 1; + int32 version = 2; +} diff --git a/pkg/chipingress/pb/chip_ingress.pb.go b/pkg/chipingress/pb/chip_ingress.pb.go index 596483ce45..7117f287c6 100644 --- a/pkg/chipingress/pb/chip_ingress.pb.go +++ b/pkg/chipingress/pb/chip_ingress.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.36.6 // protoc v5.29.3 -// source: chip_ingress.proto +// source: pb/chip_ingress.proto package pb @@ -32,7 +32,7 @@ type CloudEventBatch struct { func (x *CloudEventBatch) Reset() { *x = CloudEventBatch{} - mi := &file_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -44,7 +44,7 @@ func (x *CloudEventBatch) String() string { func (*CloudEventBatch) ProtoMessage() {} func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -57,7 +57,7 @@ func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use CloudEventBatch.ProtoReflect.Descriptor instead. func (*CloudEventBatch) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{0} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} } func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { @@ -76,7 +76,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} - mi := &file_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -88,7 +88,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -101,7 +101,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{1} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} } func (x *PublishResponse) GetResults() []*PublishResult { @@ -120,7 +120,7 @@ type PublishResult struct { func (x *PublishResult) Reset() { *x = PublishResult{} - mi := &file_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -132,7 +132,7 @@ func (x *PublishResult) String() string { func (*PublishResult) ProtoMessage() {} func (x *PublishResult) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -145,7 +145,7 @@ func (x *PublishResult) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResult.ProtoReflect.Descriptor instead. func (*PublishResult) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{2} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} } func (x *PublishResult) GetEventId() string { @@ -164,7 +164,7 @@ type EmptyRequest struct { func (x *EmptyRequest) Reset() { *x = EmptyRequest{} - mi := &file_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -176,7 +176,7 @@ func (x *EmptyRequest) String() string { func (*EmptyRequest) ProtoMessage() {} func (x *EmptyRequest) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -189,7 +189,7 @@ func (x *EmptyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use EmptyRequest.ProtoReflect.Descriptor instead. func (*EmptyRequest) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{3} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} } // PingResponse responds to pings @@ -202,7 +202,7 @@ type PingResponse struct { func (x *PingResponse) Reset() { *x = PingResponse{} - mi := &file_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -214,7 +214,7 @@ func (x *PingResponse) String() string { func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -227,7 +227,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{4} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} } func (x *PingResponse) GetMessage() string { @@ -247,7 +247,7 @@ type StreamEventsRequest struct { func (x *StreamEventsRequest) Reset() { *x = StreamEventsRequest{} - mi := &file_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -259,7 +259,7 @@ func (x *StreamEventsRequest) String() string { func (*StreamEventsRequest) ProtoMessage() {} func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -272,7 +272,7 @@ func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsRequest.ProtoReflect.Descriptor instead. func (*StreamEventsRequest) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{5} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} } func (x *StreamEventsRequest) GetEvent() *pb.CloudEvent { @@ -292,7 +292,7 @@ type StreamEventsResponse struct { func (x *StreamEventsResponse) Reset() { *x = StreamEventsResponse{} - mi := &file_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -304,7 +304,7 @@ func (x *StreamEventsResponse) String() string { func (*StreamEventsResponse) ProtoMessage() {} func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { - mi := &file_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -317,7 +317,7 @@ func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsResponse.ProtoReflect.Descriptor instead. func (*StreamEventsResponse) Descriptor() ([]byte, []int) { - return file_chip_ingress_proto_rawDescGZIP(), []int{6} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} } func (x *StreamEventsResponse) GetEventId() string { @@ -334,11 +334,101 @@ func (x *StreamEventsResponse) GetStatus() string { return "" } -var File_chip_ingress_proto protoreflect.FileDescriptor +// RegisterSchema request message +type RegisterSchemaRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Schemas []*Schema `protobuf:"bytes,1,rep,name=schemas,proto3" json:"schemas,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterSchemaRequest) Reset() { + *x = RegisterSchemaRequest{} + mi := &file_pb_chip_ingress_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} -const file_chip_ingress_proto_rawDesc = "" + +func (x *RegisterSchemaRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterSchemaRequest) ProtoMessage() {} + +func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterSchemaRequest.ProtoReflect.Descriptor instead. +func (*RegisterSchemaRequest) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} +} + +func (x *RegisterSchemaRequest) GetSchemas() []*Schema { + if x != nil { + return x.Schemas + } + return nil +} + +// RegisterSchema response message +type RegisterSchemaResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Registered []*RegisteredSchema `protobuf:"bytes,1,rep,name=registered,proto3" json:"registered,omitempty"` // List of registered schema subjects + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterSchemaResponse) Reset() { + *x = RegisterSchemaResponse{} + mi := &file_pb_chip_ingress_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterSchemaResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterSchemaResponse) ProtoMessage() {} + +func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterSchemaResponse.ProtoReflect.Descriptor instead. +func (*RegisterSchemaResponse) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} +} + +func (x *RegisterSchemaResponse) GetRegistered() []*RegisteredSchema { + if x != nil { + return x.Registered + } + return nil +} + +var File_pb_chip_ingress_proto protoreflect.FileDescriptor + +const file_pb_chip_ingress_proto_rawDesc = "" + "\n" + - "\x12chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\"H\n" + + "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\"H\n" + "\x0fCloudEventBatch\x125\n" + "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\"J\n" + "\x0fPublishResponse\x127\n" + @@ -352,75 +442,91 @@ const file_chip_ingress_proto_rawDesc = "" + "\x05event\x18\x01 \x01(\v2\x1d.io.cloudevents.v1.CloudEventR\x05event\"H\n" + "\x14StreamEventsResponse\x12\x18\n" + "\aeventId\x18\x01 \x01(\tR\aeventId\x12\x16\n" + - "\x06status\x18\x02 \x01(\tR\x06status2\xd3\x02\n" + + "\x06status\x18\x02 \x01(\tR\x06status\"F\n" + + "\x15RegisterSchemaRequest\x12-\n" + + "\aschemas\x18\x01 \x03(\v2\x13.chip_common.SchemaR\aschemas\"W\n" + + "\x16RegisterSchemaResponse\x12=\n" + + "\n" + + "registered\x18\x01 \x03(\v2\x1d.chip_common.RegisteredSchemaR\n" + + "registered2\xb6\x03\n" + "\vChipIngress\x12K\n" + "\aPublish\x12\x1d.io.cloudevents.v1.CloudEvent\x1a\x1f.chipingress.pb.PublishResponse\"\x00\x12R\n" + "\fPublishBatch\x12\x1f.chipingress.pb.CloudEventBatch\x1a\x1f.chipingress.pb.PublishResponse\"\x00\x12B\n" + "\x04Ping\x12\x1c.chipingress.pb.EmptyRequest\x1a\x1c.chipingress.pb.PingResponse\x12_\n" + - "\fStreamEvents\x12#.chipingress.pb.StreamEventsRequest\x1a$.chipingress.pb.StreamEventsResponse\"\x00(\x010\x01B\x06Z\x04./pbb\x06proto3" + "\fStreamEvents\x12#.chipingress.pb.StreamEventsRequest\x1a$.chipingress.pb.StreamEventsResponse\"\x00(\x010\x01\x12a\n" + + "\x0eRegisterSchema\x12%.chipingress.pb.RegisterSchemaRequest\x1a&.chipingress.pb.RegisterSchemaResponse\"\x00B\x06Z\x04./pbb\x06proto3" var ( - file_chip_ingress_proto_rawDescOnce sync.Once - file_chip_ingress_proto_rawDescData []byte + file_pb_chip_ingress_proto_rawDescOnce sync.Once + file_pb_chip_ingress_proto_rawDescData []byte ) -func file_chip_ingress_proto_rawDescGZIP() []byte { - file_chip_ingress_proto_rawDescOnce.Do(func() { - file_chip_ingress_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_chip_ingress_proto_rawDesc), len(file_chip_ingress_proto_rawDesc))) +func file_pb_chip_ingress_proto_rawDescGZIP() []byte { + file_pb_chip_ingress_proto_rawDescOnce.Do(func() { + file_pb_chip_ingress_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pb_chip_ingress_proto_rawDesc), len(file_pb_chip_ingress_proto_rawDesc))) }) - return file_chip_ingress_proto_rawDescData -} - -var file_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 7) -var file_chip_ingress_proto_goTypes = []any{ - (*CloudEventBatch)(nil), // 0: chipingress.pb.CloudEventBatch - (*PublishResponse)(nil), // 1: chipingress.pb.PublishResponse - (*PublishResult)(nil), // 2: chipingress.pb.PublishResult - (*EmptyRequest)(nil), // 3: chipingress.pb.EmptyRequest - (*PingResponse)(nil), // 4: chipingress.pb.PingResponse - (*StreamEventsRequest)(nil), // 5: chipingress.pb.StreamEventsRequest - (*StreamEventsResponse)(nil), // 6: chipingress.pb.StreamEventsResponse - (*pb.CloudEvent)(nil), // 7: io.cloudevents.v1.CloudEvent -} -var file_chip_ingress_proto_depIdxs = []int32{ - 7, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent - 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult - 7, // 2: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent - 7, // 3: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent - 0, // 4: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch - 3, // 5: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest - 5, // 6: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest - 1, // 7: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse - 1, // 8: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse - 4, // 9: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse - 6, // 10: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse - 7, // [7:11] is the sub-list for method output_type - 3, // [3:7] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name -} - -func init() { file_chip_ingress_proto_init() } -func file_chip_ingress_proto_init() { - if File_chip_ingress_proto != nil { + return file_pb_chip_ingress_proto_rawDescData +} + +var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pb_chip_ingress_proto_goTypes = []any{ + (*CloudEventBatch)(nil), // 0: chipingress.pb.CloudEventBatch + (*PublishResponse)(nil), // 1: chipingress.pb.PublishResponse + (*PublishResult)(nil), // 2: chipingress.pb.PublishResult + (*EmptyRequest)(nil), // 3: chipingress.pb.EmptyRequest + (*PingResponse)(nil), // 4: chipingress.pb.PingResponse + (*StreamEventsRequest)(nil), // 5: chipingress.pb.StreamEventsRequest + (*StreamEventsResponse)(nil), // 6: chipingress.pb.StreamEventsResponse + (*RegisterSchemaRequest)(nil), // 7: chipingress.pb.RegisterSchemaRequest + (*RegisterSchemaResponse)(nil), // 8: chipingress.pb.RegisterSchemaResponse + (*pb.CloudEvent)(nil), // 9: io.cloudevents.v1.CloudEvent + (*Schema)(nil), // 10: chip_common.Schema + (*RegisteredSchema)(nil), // 11: chip_common.RegisteredSchema +} +var file_pb_chip_ingress_proto_depIdxs = []int32{ + 9, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent + 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult + 9, // 2: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 10, // 3: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema + 11, // 4: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema + 9, // 5: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent + 0, // 6: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch + 3, // 7: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest + 5, // 8: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest + 7, // 9: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest + 1, // 10: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse + 1, // 11: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse + 4, // 12: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse + 6, // 13: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse + 8, // 14: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse + 10, // [10:15] is the sub-list for method output_type + 5, // [5:10] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_pb_chip_ingress_proto_init() } +func file_pb_chip_ingress_proto_init() { + if File_pb_chip_ingress_proto != nil { return } + file_pb_chip_common_proto_init() type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_chip_ingress_proto_rawDesc), len(file_chip_ingress_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_chip_ingress_proto_rawDesc), len(file_pb_chip_ingress_proto_rawDesc)), NumEnums: 0, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_chip_ingress_proto_goTypes, - DependencyIndexes: file_chip_ingress_proto_depIdxs, - MessageInfos: file_chip_ingress_proto_msgTypes, + GoTypes: file_pb_chip_ingress_proto_goTypes, + DependencyIndexes: file_pb_chip_ingress_proto_depIdxs, + MessageInfos: file_pb_chip_ingress_proto_msgTypes, }.Build() - File_chip_ingress_proto = out.File - file_chip_ingress_proto_goTypes = nil - file_chip_ingress_proto_depIdxs = nil + File_pb_chip_ingress_proto = out.File + file_pb_chip_ingress_proto_goTypes = nil + file_pb_chip_ingress_proto_depIdxs = nil } diff --git a/pkg/chipingress/pb/chip_ingress.proto b/pkg/chipingress/pb/chip_ingress.proto index 09c36b268e..1675f5d9c6 100644 --- a/pkg/chipingress/pb/chip_ingress.proto +++ b/pkg/chipingress/pb/chip_ingress.proto @@ -1,6 +1,7 @@ syntax = "proto3"; import "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto"; +import "pb/chip_common.proto"; package chipingress.pb; @@ -22,6 +23,9 @@ service ChipIngress { // StreamEvents; EXPERIMENTAL ~ allows clients to stream CloudEvents to the server. // This API is experimental and may change in the future. rpc StreamEvents (stream StreamEventsRequest) returns (stream StreamEventsResponse) {} // New streaming endpoint + +// RegisterSchema allows registering one or more schemas that define the structure of CloudEvent data. + rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse) {} } // CloudEventBatch is used to send many ChipIngress @@ -54,3 +58,13 @@ message StreamEventsResponse { string eventId = 1; string status = 2; // e.g., "success", "error" } + +// RegisterSchema request message +message RegisterSchemaRequest { + repeated chip_common.Schema schemas = 1; +} + +// RegisterSchema response message +message RegisterSchemaResponse { + repeated chip_common.RegisteredSchema registered = 1; // List of registered schema subjects +} \ No newline at end of file diff --git a/pkg/chipingress/pb/chip_ingress_grpc.pb.go b/pkg/chipingress/pb/chip_ingress_grpc.pb.go index 1121da5eca..7b68869f16 100644 --- a/pkg/chipingress/pb/chip_ingress_grpc.pb.go +++ b/pkg/chipingress/pb/chip_ingress_grpc.pb.go @@ -2,7 +2,7 @@ // versions: // - protoc-gen-go-grpc v1.5.1 // - protoc v5.29.3 -// source: chip_ingress.proto +// source: pb/chip_ingress.proto package pb @@ -20,10 +20,11 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - ChipIngress_Publish_FullMethodName = "/chipingress.pb.ChipIngress/Publish" - ChipIngress_PublishBatch_FullMethodName = "/chipingress.pb.ChipIngress/PublishBatch" - ChipIngress_Ping_FullMethodName = "/chipingress.pb.ChipIngress/Ping" - ChipIngress_StreamEvents_FullMethodName = "/chipingress.pb.ChipIngress/StreamEvents" + ChipIngress_Publish_FullMethodName = "/chipingress.pb.ChipIngress/Publish" + ChipIngress_PublishBatch_FullMethodName = "/chipingress.pb.ChipIngress/PublishBatch" + ChipIngress_Ping_FullMethodName = "/chipingress.pb.ChipIngress/Ping" + ChipIngress_StreamEvents_FullMethodName = "/chipingress.pb.ChipIngress/StreamEvents" + ChipIngress_RegisterSchema_FullMethodName = "/chipingress.pb.ChipIngress/RegisterSchema" ) // ChipIngressClient is the client API for ChipIngress service. @@ -46,6 +47,8 @@ type ChipIngressClient interface { // StreamEvents; EXPERIMENTAL ~ allows clients to stream CloudEvents to the server. // This API is experimental and may change in the future. StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[StreamEventsRequest, StreamEventsResponse], error) + // RegisterSchema allows registering one or more schemas that define the structure of CloudEvent data. + RegisterSchema(ctx context.Context, in *RegisterSchemaRequest, opts ...grpc.CallOption) (*RegisterSchemaResponse, error) } type chipIngressClient struct { @@ -99,6 +102,16 @@ func (c *chipIngressClient) StreamEvents(ctx context.Context, opts ...grpc.CallO // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type ChipIngress_StreamEventsClient = grpc.BidiStreamingClient[StreamEventsRequest, StreamEventsResponse] +func (c *chipIngressClient) RegisterSchema(ctx context.Context, in *RegisterSchemaRequest, opts ...grpc.CallOption) (*RegisterSchemaResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RegisterSchemaResponse) + err := c.cc.Invoke(ctx, ChipIngress_RegisterSchema_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // ChipIngressServer is the server API for ChipIngress service. // All implementations must embed UnimplementedChipIngressServer // for forward compatibility. @@ -119,6 +132,8 @@ type ChipIngressServer interface { // StreamEvents; EXPERIMENTAL ~ allows clients to stream CloudEvents to the server. // This API is experimental and may change in the future. StreamEvents(grpc.BidiStreamingServer[StreamEventsRequest, StreamEventsResponse]) error + // RegisterSchema allows registering one or more schemas that define the structure of CloudEvent data. + RegisterSchema(context.Context, *RegisterSchemaRequest) (*RegisterSchemaResponse, error) mustEmbedUnimplementedChipIngressServer() } @@ -141,6 +156,9 @@ func (UnimplementedChipIngressServer) Ping(context.Context, *EmptyRequest) (*Pin func (UnimplementedChipIngressServer) StreamEvents(grpc.BidiStreamingServer[StreamEventsRequest, StreamEventsResponse]) error { return status.Errorf(codes.Unimplemented, "method StreamEvents not implemented") } +func (UnimplementedChipIngressServer) RegisterSchema(context.Context, *RegisterSchemaRequest) (*RegisterSchemaResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RegisterSchema not implemented") +} func (UnimplementedChipIngressServer) mustEmbedUnimplementedChipIngressServer() {} func (UnimplementedChipIngressServer) testEmbeddedByValue() {} @@ -223,6 +241,24 @@ func _ChipIngress_StreamEvents_Handler(srv interface{}, stream grpc.ServerStream // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type ChipIngress_StreamEventsServer = grpc.BidiStreamingServer[StreamEventsRequest, StreamEventsResponse] +func _ChipIngress_RegisterSchema_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RegisterSchemaRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChipIngressServer).RegisterSchema(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ChipIngress_RegisterSchema_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChipIngressServer).RegisterSchema(ctx, req.(*RegisterSchemaRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ChipIngress_ServiceDesc is the grpc.ServiceDesc for ChipIngress service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -242,6 +278,10 @@ var ChipIngress_ServiceDesc = grpc.ServiceDesc{ MethodName: "Ping", Handler: _ChipIngress_Ping_Handler, }, + { + MethodName: "RegisterSchema", + Handler: _ChipIngress_RegisterSchema_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -251,5 +291,5 @@ var ChipIngress_ServiceDesc = grpc.ServiceDesc{ ClientStreams: true, }, }, - Metadata: "chip_ingress.proto", + Metadata: "pb/chip_ingress.proto", }