Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit f719df9

Browse files
author
alishakawaguchi
authored
NEOS-1798: Adds passthrough as a new column addition strategy (#3454)
1 parent ce3bd6e commit f719df9

24 files changed

Lines changed: 4106 additions & 2818 deletions

File tree

backend/gen/go/protos/mgmt/v1alpha1/job.pb.go

Lines changed: 2607 additions & 2271 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/gen/go/protos/mgmt/v1alpha1/job.pb.json.go

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/protos/mgmt/v1alpha1/job.proto

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,15 @@ message PostgresSourceConnectionOptions {
168168
// If this doesn't exist, will fall back to configuring generators for supported datatypes.
169169
// If none of the criteria above can be met, the job run will fail to prevent leaking of PII.
170170
AutoMap auto_map = 2;
171+
// pass the new column through as is.
172+
Passthrough passthrough = 3;
171173
}
172174
// Configuration for the HaltJob strategy
173175
message HaltJob {}
174176
// Configuration for the AutoMap strategy
175177
message AutoMap {}
178+
// Configuration for the Passthrough strategy
179+
message Passthrough {}
176180
}
177181

178182
message ColumnRemovalStrategy {
@@ -245,11 +249,15 @@ message MysqlSourceConnectionOptions {
245249
// If this doesn't exist, will fall back to configuring generators for supported datatypes.
246250
// If none of the criteria above can be met, the job run will fail to prevent leaking of PII.
247251
AutoMap auto_map = 2;
252+
// pass the new column through as is.
253+
Passthrough passthrough = 3;
248254
}
249255
// Configuration for the HaltJob strategy
250256
message HaltJob {}
251257
// Configuration for the AutoMap strategy
252258
message AutoMap {}
259+
// Configuration for the Passthrough strategy
260+
message Passthrough {}
253261
}
254262
}
255263

@@ -269,7 +277,8 @@ message MysqlSourceTableOption {
269277

270278
message MssqlSourceConnectionOptions {
271279
// Whether to halt the job if a new column is added
272-
bool halt_on_new_column_addition = 1;
280+
// Deprecated: Use new_column_addition_strategy instead
281+
bool halt_on_new_column_addition = 1 [deprecated = true];
273282
// The list of schemas (and their tables) along with any configuration options that will be used.
274283
repeated MssqlSourceSchemaOption schemas = 2;
275284
// The unique connection id to a mssql connection configuration
@@ -278,6 +287,8 @@ message MssqlSourceConnectionOptions {
278287
bool subset_by_foreign_key_constraints = 4;
279288
// Provide a strategy of what to do in the event Neosync encounters a column that is removed from the source table.
280289
ColumnRemovalStrategy column_removal_strategy = 5;
290+
// Provide a strategy of what to do in the event Neosync encounters an unmapped column for the job's mapped tables.
291+
NewColumnAdditionStrategy new_column_addition_strategy = 6;
281292

282293
message ColumnRemovalStrategy {
283294
oneof strategy {
@@ -293,6 +304,19 @@ message MssqlSourceConnectionOptions {
293304
// Configuration for the ContinueJob strategy
294305
message ContinueJob {}
295306
}
307+
308+
message NewColumnAdditionStrategy {
309+
oneof strategy {
310+
// halt job if a new column is detected.
311+
HaltJob halt_job = 1;
312+
// pass the new column through as is.
313+
Passthrough passthrough = 2;
314+
}
315+
// Configuration for the HaltJob strategy
316+
message HaltJob {}
317+
// Configuration for the Passthrough strategy
318+
message Passthrough {}
319+
}
296320
}
297321

298322
message MssqlSourceSchemaOption {

backend/sql/postgresql/models/models.go

Lines changed: 92 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -945,13 +945,23 @@ type JobSourceOptions struct {
945945
}
946946

947947
type MssqlSourceOptions struct {
948-
HaltOnNewColumnAddition bool `json:"haltOnNewColumnAddition"`
949-
SubsetByForeignKeyConstraints bool `json:"subsetByForeignKeyConstraints"`
950-
Schemas []*MssqlSourceSchemaOption `json:"schemas"`
951-
ConnectionId string `json:"connectionId"`
952-
ColumnRemovalStrategy *MssqlColumnRemovalStrategy `json:"columnRemovalStrategy,omitempty"`
948+
// @deprecated
949+
HaltOnNewColumnAddition bool `json:"haltOnNewColumnAddition"`
950+
SubsetByForeignKeyConstraints bool `json:"subsetByForeignKeyConstraints"`
951+
Schemas []*MssqlSourceSchemaOption `json:"schemas"`
952+
ConnectionId string `json:"connectionId"`
953+
ColumnRemovalStrategy *MssqlColumnRemovalStrategy `json:"columnRemovalStrategy,omitempty"`
954+
NewColumnAdditionStrategy *MssqlNewColumnAdditionStrategy `json:"newColumnAdditionStrategy,omitempty"`
955+
}
956+
957+
type MssqlNewColumnAdditionStrategy struct {
958+
HaltJob *MssqlHaltJobNewColumnAdditionStrategy `json:"haltJob,omitempty"`
959+
Passthrough *MssqlPassthroughNewColumnAdditionStrategy `json:"passthrough,omitempty"`
953960
}
954961

962+
type MssqlHaltJobNewColumnAdditionStrategy struct{}
963+
type MssqlPassthroughNewColumnAdditionStrategy struct{}
964+
955965
type MssqlColumnRemovalStrategy struct {
956966
HaltJob *MssqlHaltJobColumnRemovalStrategy `json:"haltJob,omitempty"`
957967
ContinueJob *MssqlContinueJobColumnRemovalStrategy `json:"continueJob,omitempty"`
@@ -993,8 +1003,7 @@ type MssqlContinueJobColumnRemovalStrategy struct{}
9931003

9941004
func (m *MssqlSourceOptions) ToDto() *mgmtv1alpha1.MssqlSourceConnectionOptions {
9951005
dto := &mgmtv1alpha1.MssqlSourceConnectionOptions{
996-
HaltOnNewColumnAddition: m.HaltOnNewColumnAddition,
997-
ConnectionId: m.ConnectionId,
1006+
ConnectionId: m.ConnectionId,
9981007
Schemas: make(
9991008
[]*mgmtv1alpha1.MssqlSourceSchemaOption,
10001009
len(m.Schemas),
@@ -1009,13 +1018,23 @@ func (m *MssqlSourceOptions) ToDto() *mgmtv1alpha1.MssqlSourceConnectionOptions
10091018
dto.ColumnRemovalStrategy = m.ColumnRemovalStrategy.ToDto()
10101019
}
10111020

1021+
if m.NewColumnAdditionStrategy != nil {
1022+
dto.NewColumnAdditionStrategy = m.NewColumnAdditionStrategy.ToDto()
1023+
} else if m.HaltOnNewColumnAddition {
1024+
dto.NewColumnAdditionStrategy = &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy{
1025+
Strategy: &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob_{
1026+
HaltJob: &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob{},
1027+
},
1028+
}
1029+
}
1030+
10121031
return dto
10131032
}
1033+
10141034
func (m *MssqlSourceOptions) FromDto(dto *mgmtv1alpha1.MssqlSourceConnectionOptions) {
10151035
if dto == nil {
10161036
dto = &mgmtv1alpha1.MssqlSourceConnectionOptions{}
10171037
}
1018-
m.HaltOnNewColumnAddition = dto.GetHaltOnNewColumnAddition()
10191038
m.ConnectionId = dto.GetConnectionId()
10201039
m.SubsetByForeignKeyConstraints = dto.GetSubsetByForeignKeyConstraints()
10211040
m.Schemas = FromDtoMssqlSourceSchemaOptions(dto.GetSchemas())
@@ -1024,6 +1043,46 @@ func (m *MssqlSourceOptions) FromDto(dto *mgmtv1alpha1.MssqlSourceConnectionOpti
10241043
m.ColumnRemovalStrategy = &MssqlColumnRemovalStrategy{}
10251044
m.ColumnRemovalStrategy.FromDto(dto.GetColumnRemovalStrategy())
10261045
}
1046+
1047+
if dto.GetNewColumnAdditionStrategy().GetStrategy() != nil {
1048+
m.NewColumnAdditionStrategy = &MssqlNewColumnAdditionStrategy{}
1049+
m.NewColumnAdditionStrategy.FromDto(dto.GetNewColumnAdditionStrategy())
1050+
} else if m.HaltOnNewColumnAddition {
1051+
m.NewColumnAdditionStrategy = &MssqlNewColumnAdditionStrategy{
1052+
HaltJob: &MssqlHaltJobNewColumnAdditionStrategy{},
1053+
}
1054+
}
1055+
}
1056+
1057+
func (s *MssqlNewColumnAdditionStrategy) ToDto() *mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy {
1058+
if s.HaltJob != nil {
1059+
return &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy{
1060+
Strategy: &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob_{
1061+
HaltJob: &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob{},
1062+
},
1063+
}
1064+
}
1065+
if s.Passthrough != nil {
1066+
return &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy{
1067+
Strategy: &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough_{
1068+
Passthrough: &mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough{},
1069+
},
1070+
}
1071+
}
1072+
return nil
1073+
}
1074+
1075+
func (s *MssqlNewColumnAdditionStrategy) FromDto(
1076+
dto *mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy,
1077+
) {
1078+
if dto.GetStrategy() != nil {
1079+
switch dto.GetStrategy().(type) {
1080+
case *mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob_:
1081+
s.HaltJob = &MssqlHaltJobNewColumnAdditionStrategy{}
1082+
case *mgmtv1alpha1.MssqlSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough_:
1083+
s.Passthrough = &MssqlPassthroughNewColumnAdditionStrategy{}
1084+
}
1085+
}
10271086
}
10281087

10291088
type MssqlSourceSchemaOption struct {
@@ -1260,13 +1319,14 @@ type MysqlSourceOptions struct {
12601319
}
12611320

12621321
type MysqlNewColumnAdditionStrategy struct {
1263-
HaltJob *MysqlHaltJobNewColumnAdditionStrategy `json:"haltJob,omitempty"`
1264-
AutoMap *MysqlAutoMapNewColumnAdditionStrategy `json:"autoMap,omitempty"`
1322+
HaltJob *MysqlHaltJobNewColumnAdditionStrategy `json:"haltJob,omitempty"`
1323+
AutoMap *MysqlAutoMapNewColumnAdditionStrategy `json:"autoMap,omitempty"`
1324+
Passthrough *MysqlPassthroughNewColumnAdditionStrategy `json:"passthrough,omitempty"`
12651325
}
12661326

12671327
type MysqlHaltJobNewColumnAdditionStrategy struct{}
12681328
type MysqlAutoMapNewColumnAdditionStrategy struct{}
1269-
1329+
type MysqlPassthroughNewColumnAdditionStrategy struct{}
12701330
type MysqlColumnRemovalStrategy struct {
12711331
HaltJob *MysqlHaltJobColumnRemovalStrategy `json:"haltJob,omitempty"`
12721332
ContinueJob *MysqlContinueJobColumnRemovalStrategy `json:"continueJob,omitempty"`
@@ -1317,8 +1377,9 @@ type PostgresSourceOptions struct {
13171377
}
13181378

13191379
type PostgresNewColumnAdditionStrategy struct {
1320-
HaltJob *PostgresHaltJobStrategy `json:"haltJob,omitempty"`
1321-
AutoMap *PostgresAutoMapStrategy `json:"autoMap,omitempty"`
1380+
HaltJob *PostgresHaltJobStrategy `json:"haltJob,omitempty"`
1381+
AutoMap *PostgresAutoMapStrategy `json:"autoMap,omitempty"`
1382+
Passthrough *PostgresPassthroughStrategy `json:"passthrough,omitempty"`
13221383
}
13231384

13241385
func (p *PostgresNewColumnAdditionStrategy) ToDto() *mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy {
@@ -1334,6 +1395,12 @@ func (p *PostgresNewColumnAdditionStrategy) ToDto() *mgmtv1alpha1.PostgresSource
13341395
AutoMap: &mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_AutoMap{},
13351396
},
13361397
}
1398+
} else if p.Passthrough != nil {
1399+
return &mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy{
1400+
Strategy: &mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough_{
1401+
Passthrough: &mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough{},
1402+
},
1403+
}
13371404
}
13381405
return nil
13391406
}
@@ -1349,11 +1416,14 @@ func (p *PostgresNewColumnAdditionStrategy) FromDto(
13491416
p.AutoMap = &PostgresAutoMapStrategy{}
13501417
case *mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob_:
13511418
p.HaltJob = &PostgresHaltJobStrategy{}
1419+
case *mgmtv1alpha1.PostgresSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough_:
1420+
p.Passthrough = &PostgresPassthroughStrategy{}
13521421
}
13531422
}
13541423

13551424
type PostgresHaltJobStrategy struct{}
13561425
type PostgresAutoMapStrategy struct{}
1426+
type PostgresPassthroughStrategy struct{}
13571427

13581428
type PostgresColumnRemovalStrategy struct {
13591429
HaltJob *PostgresHaltJobColumnRemovalStrategy `json:"haltJob,omitempty"`
@@ -1504,7 +1574,6 @@ func FromDtoPostgresSourceSchemaOptions(
15041574

15051575
func (s *MysqlSourceOptions) ToDto() *mgmtv1alpha1.MysqlSourceConnectionOptions {
15061576
dto := &mgmtv1alpha1.MysqlSourceConnectionOptions{
1507-
HaltOnNewColumnAddition: s.HaltOnNewColumnAddition,
15081577
SubsetByForeignKeyConstraints: s.SubsetByForeignKeyConstraints,
15091578
ConnectionId: s.ConnectionId,
15101579
}
@@ -1536,7 +1605,6 @@ func (s *MysqlSourceOptions) ToDto() *mgmtv1alpha1.MysqlSourceConnectionOptions
15361605
HaltJob: &mgmtv1alpha1.MysqlSourceConnectionOptions_NewColumnAdditionStrategy_HaltJob{},
15371606
},
15381607
}
1539-
dto.HaltOnNewColumnAddition = true //nolint:staticcheck
15401608
}
15411609

15421610
return dto
@@ -1557,6 +1625,13 @@ func (s *MysqlNewColumnAdditionStrategy) ToDto() *mgmtv1alpha1.MysqlSourceConnec
15571625
},
15581626
}
15591627
}
1628+
if s.Passthrough != nil {
1629+
return &mgmtv1alpha1.MysqlSourceConnectionOptions_NewColumnAdditionStrategy{
1630+
Strategy: &mgmtv1alpha1.MysqlSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough_{
1631+
Passthrough: &mgmtv1alpha1.MysqlSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough{},
1632+
},
1633+
}
1634+
}
15601635
return nil
15611636
}
15621637

@@ -1569,20 +1644,20 @@ func (s *MysqlNewColumnAdditionStrategy) FromDto(
15691644
s.HaltJob = &MysqlHaltJobNewColumnAdditionStrategy{}
15701645
case *mgmtv1alpha1.MysqlSourceConnectionOptions_NewColumnAdditionStrategy_AutoMap_:
15711646
s.AutoMap = &MysqlAutoMapNewColumnAdditionStrategy{}
1647+
case *mgmtv1alpha1.MysqlSourceConnectionOptions_NewColumnAdditionStrategy_Passthrough_:
1648+
s.Passthrough = &MysqlPassthroughNewColumnAdditionStrategy{}
15721649
}
15731650
}
15741651
}
15751652

15761653
func (s *MysqlSourceOptions) FromDto(dto *mgmtv1alpha1.MysqlSourceConnectionOptions) {
1577-
s.HaltOnNewColumnAddition = dto.HaltOnNewColumnAddition //nolint:staticcheck
15781654
if dto.GetNewColumnAdditionStrategy().GetStrategy() != nil {
15791655
s.NewColumnAdditionStrategy = &MysqlNewColumnAdditionStrategy{}
15801656
s.NewColumnAdditionStrategy.FromDto(dto.GetNewColumnAdditionStrategy())
15811657
} else if dto.HaltOnNewColumnAddition { //nolint:staticcheck
15821658
s.NewColumnAdditionStrategy = &MysqlNewColumnAdditionStrategy{
15831659
HaltJob: &MysqlHaltJobNewColumnAdditionStrategy{},
15841660
}
1585-
s.HaltOnNewColumnAddition = true
15861661
}
15871662
s.SubsetByForeignKeyConstraints = dto.SubsetByForeignKeyConstraints
15881663
s.Schemas = FromDtoMysqlSourceSchemaOptions(dto.Schemas)

0 commit comments

Comments
 (0)