Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions internal/provider/adc/adc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type adcClient struct {

updater status.Updater
statusUpdateMap map[types.NamespacedNameKind][]string

syncCh chan struct{}
}

type Task struct {
Expand All @@ -116,6 +118,7 @@ func New(updater status.Updater, opts ...Option) (provider.Provider, error) {
store: NewStore(),
executor: &DefaultADCExecutor{},
updater: updater,
syncCh: make(chan struct{}, 1),
}, nil
}

Expand Down Expand Up @@ -220,6 +223,7 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext,
// which only needs to be saved in cache
// and triggered by a timer for synchronization
if d.BackendMode == BackendModeAPISIXStandalone || d.BackendMode == BackendModeAPISIX {
d.syncNotify()
return nil
}

Expand Down Expand Up @@ -289,6 +293,8 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error {
Name: obj.GetName(),
configs: configs,
})
} else {
d.syncNotify()
}
return nil
case BackendModeAPI7EE:
Expand All @@ -306,26 +312,34 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error {

func (d *adcClient) Start(ctx context.Context) error {
initalSyncDelay := d.InitSyncDelay
time.AfterFunc(initalSyncDelay, func() {
if err := d.Sync(ctx); err != nil {
log.Error(err)
return
}
})
if initalSyncDelay > 0 {
time.AfterFunc(initalSyncDelay, func() {
if err := d.Sync(ctx); err != nil {
log.Error(err)
return
}
})
}

if d.SyncPeriod < 1 {
return nil
}
ticker := time.NewTicker(d.SyncPeriod)
defer ticker.Stop()
for {
synced := false
select {
case <-d.syncCh:
synced = true
case <-ticker.C:
synced = true
case <-ctx.Done():
return nil
}
if synced {
if err := d.Sync(ctx); err != nil {
log.Error(err)
}
case <-ctx.Done():
return nil
}
}
}
Expand Down Expand Up @@ -506,6 +520,13 @@ func (d *adcClient) sync(ctx context.Context, task Task) error {
return nil
}

func (d *adcClient) syncNotify() {
select {
case d.syncCh <- struct{}{}:
default:
}
}

func prepareSyncFile(resources any) (string, func(), error) {
data, err := json.Marshal(resources)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/provider/adc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (d *adcClient) updateConfigForGatewayProxy(tctx *provider.TranslateContext,
for _, ref := range referrers {
d.configs[ref] = *config
}

d.syncNotify()
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions test/conformance/apisix/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func TestMain(m *testing.M) {
ControllerName: s.GetControllerName(),
Namespace: namespace,
StatusAddress: address,
InitSyncDelay: 1 * time.Minute,
InitSyncDelay: 20 * time.Minute,
ProviderType: framework.ProviderType,
ProviderSyncPeriod: 10 * time.Millisecond,
ProviderSyncPeriod: 1 * time.Hour,
})

adminEndpoint := fmt.Sprintf("http://%s.%s:9180", svc.Name, namespace)
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/scaffold/apisix_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *APISIXDeployer) DeployIngress() {
s.Framework.DeployIngress(framework.IngressDeployOpts{
ControllerName: s.opts.ControllerName,
ProviderType: framework.ProviderType,
ProviderSyncPeriod: 200 * time.Millisecond,
ProviderSyncPeriod: 1 * time.Hour,
Namespace: s.namespace,
Replicas: 1,
})
Expand All @@ -270,7 +270,7 @@ func (s *APISIXDeployer) ScaleIngress(replicas int) {
s.Framework.DeployIngress(framework.IngressDeployOpts{
ControllerName: s.opts.ControllerName,
ProviderType: framework.ProviderType,
ProviderSyncPeriod: 200 * time.Millisecond,
ProviderSyncPeriod: 1 * time.Hour,
Namespace: s.namespace,
Replicas: replicas,
})
Expand Down
Loading