|
6 | 6 | "strings" |
7 | 7 | "time" |
8 | 8 |
|
| 9 | + "github.com/golang/protobuf/protoc-gen-go/descriptor" |
9 | 10 | "github.com/jhump/protoreflect/desc" |
10 | 11 | "github.com/jhump/protoreflect/dynamic" |
11 | 12 | sink "github.com/streamingfast/substreams-sink" |
@@ -107,8 +108,13 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes |
107 | 108 | d.logger.Debug("Walking message descriptor", zap.String("message_descriptor_name", md.GetName()), zap.Any("table_info", tableInfo)) |
108 | 109 | // Keep the actual PK value handy so we don't rely on slice indexes later |
109 | 110 | var primaryKeyValue any |
| 111 | + var columnDescriptors map[*desc.FieldDescriptor]struct{} |
110 | 112 | if tableInfo != nil { |
111 | 113 | if table := dialect.GetTable(tableInfo.Name); table != nil { |
| 114 | + columnDescriptors = make(map[*desc.FieldDescriptor]struct{}, len(table.Columns)) |
| 115 | + for _, col := range table.Columns { |
| 116 | + columnDescriptors[col.FieldDescriptor] = struct{}{} |
| 117 | + } |
112 | 118 | if table.PrimaryKey != nil { |
113 | 119 | // Robust PK retrieval: always use the original protobuf field name |
114 | 120 | // from the descriptor (not the possibly renamed SQL column name). |
@@ -143,6 +149,14 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes |
143 | 149 | } |
144 | 150 | } |
145 | 151 | } |
| 152 | + |
| 153 | + if fd.GetType() != descriptor.FieldDescriptorProto_TYPE_MESSAGE { |
| 154 | + if columnDescriptors != nil { |
| 155 | + if _, ok := columnDescriptors[fd]; !ok { |
| 156 | + continue |
| 157 | + } |
| 158 | + } |
| 159 | + } |
146 | 160 | fv := dm.GetField(fd) |
147 | 161 | if v, ok := fv.([]interface{}); ok { |
148 | 162 | // Check if this is an array of messages or native values |
|
0 commit comments