diff --git a/internal/generator/generator_test.go b/internal/generator/generator_test.go index ba2c068e..7c4ca74d 100644 --- a/internal/generator/generator_test.go +++ b/internal/generator/generator_test.go @@ -1,221 +1,107 @@ package generator import ( - "bufio" - "encoding/json" - "fmt" "io" "log" - "net/http" - "os" - "strings" - "sync/atomic" "testing" + "testing/synctest" "time" docker "github.com/fsouza/go-dockerclient" - dockertest "github.com/fsouza/go-dockerclient/testing" "github.com/nginx-proxy/docker-gen/internal/config" "github.com/nginx-proxy/docker-gen/internal/context" - "github.com/nginx-proxy/docker-gen/internal/dockerclient" "github.com/stretchr/testify/assert" ) -func TestGenerateFromEvents(t *testing.T) { +func newStartEvent() *docker.APIEvents { + return &docker.APIEvents{Type: "container", Action: "start"} +} + +// TestNewDebounceChannel deterministically verifies debounce timing via testing/synctest's fake clock (replaces the flaky TestGenerateFromEvents, #238). +func TestNewDebounceChannel(t *testing.T) { + orig := log.Writer() log.SetOutput(io.Discard) - containerID := "8dfafdbc3a40" - var counter atomic.Int32 - - eventsResponse := ` -{"Type":"container","Action":"start","Actor": {"ID":"8dfafdbc3a40"},"Time":1374067924} -{"Type":"container","Action":"stop","Actor": {"ID":"8dfafdbc3a40"},"Time":1374067966} -{"Type":"container","Action":"start","Actor": {"ID":"8dfafdbc3a40"},"Time":1374067970}` - infoResponse := `{"Containers":1,"Images":1,"Debug":false,"NFd":11,"NGoroutines":21,"MemoryLimit":true,"SwapLimit":false}` - versionResponse := `{"Version":"19.03.12","Os":"Linux","KernelVersion":"4.19.76-linuxkit","GoVersion":"go1.13.14","GitCommit":"48a66213fe","Arch":"amd64","ApiVersion":"1.40"}` - - server, _ := dockertest.NewServer("127.0.0.1:0", nil, nil) - server.CustomHandler("/events", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - rsc := bufio.NewScanner(strings.NewReader(eventsResponse)) - for rsc.Scan() { - w.Write([]byte(rsc.Text())) - w.(http.Flusher).Flush() + t.Cleanup(func() { log.SetOutput(orig) }) + + t.Run("passes events through when Min is zero", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + input := make(chan *docker.APIEvents, 1) + out := newDebounceChannel(input, &config.Wait{Min: 0, Max: 0}) + + ev := newStartEvent() + input <- ev + synctest.Wait() + + select { + case got := <-out: + assert.Same(t, ev, got) + default: + t.Fatal("expected the event to pass straight through") + } + }) + }) + + t.Run("coalesces a burst and fires Min after the last event", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + input := make(chan *docker.APIEvents) + out := newDebounceChannel(input, &config.Wait{Min: 200 * time.Millisecond, Max: time.Second}) + + start := time.Now() + var fires []time.Duration + done := make(chan struct{}) + go func() { + for range out { + fires = append(fires, time.Since(start)) + } + close(done) + }() + + input <- newStartEvent() // t=0 time.Sleep(150 * time.Millisecond) - } - time.Sleep(500 * time.Millisecond) - })) - server.CustomHandler("/info", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(infoResponse)) - w.(http.Flusher).Flush() - })) - server.CustomHandler("/version", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(versionResponse)) - w.(http.Flusher).Flush() - })) - server.CustomHandler("/containers/json", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - result := []docker.APIContainers{ - { - ID: containerID, - Image: "base:latest", - Command: "/bin/sh", - Created: time.Now().Unix(), - Status: "running", - Ports: []docker.APIPort{}, - Names: []string{"/docker-gen-test"}, - }, - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(result) - })) - server.CustomHandler(fmt.Sprintf("/containers/%s/json", containerID), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - counter := counter.Add(1) - container := docker.Container{ - Name: "docker-gen-test", - ID: containerID, - Created: time.Now(), - Path: "/bin/sh", - Args: []string{}, - Config: &docker.Config{ - Hostname: "docker-gen", - AttachStdout: true, - AttachStderr: true, - Env: []string{fmt.Sprintf("COUNTER=%d", counter)}, - Cmd: []string{"/bin/sh"}, - Image: "base:latest", - }, - HostConfig: &docker.HostConfig{ - NetworkMode: "container:d246e2c9e3d465d96359c942e91de493f6d51a01ba33900d865180d64c34ee91", - }, - State: docker.State{ - Running: true, - Pid: 400, - ExitCode: 0, - StartedAt: time.Now(), - Health: docker.Health{ - Status: "healthy", - FailingStreak: 5, - Log: []docker.HealthCheck{}, - }, - }, - Image: "0ff407d5a7d9ed36acdf3e75de8cc127afecc9af234d05486be2981cdc01a38d", - NetworkSettings: &docker.NetworkSettings{ - IPAddress: "10.0.0.10", - IPPrefixLen: 24, - Gateway: "10.0.0.1", - Bridge: "docker0", - PortMapping: map[string]docker.PortMapping{}, - Ports: map[docker.Port][]docker.PortBinding{}, - }, - ResolvConfPath: "/etc/resolv.conf", - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(container) - })) - - serverURL := fmt.Sprintf("tcp://%s", strings.TrimRight(strings.TrimPrefix(server.URL(), "http://"), "/")) - client, err := dockerclient.NewDockerClient(serverURL, false, "", "", "") - if err != nil { - t.Errorf("Failed to create client: %s", err) - } - client.SkipServerVersionCheck = true + input <- newStartEvent() // t=150ms (gap 150ms < Min) + time.Sleep(150 * time.Millisecond) + input <- newStartEvent() // t=300ms (gap 150ms < Min) + time.Sleep(time.Second) // advance the fake clock so the pending timer fires + synctest.Wait() - tmplFile, err := os.CreateTemp(os.TempDir(), "docker-gen-tmpl") - if err != nil { - t.Errorf("Failed to create temp file: %v\n", err) - } - defer func() { - tmplFile.Close() - os.Remove(tmplFile.Name()) - }() - err = os.WriteFile(tmplFile.Name(), []byte("{{range $key, $value := .}}{{$value.ID}}.{{$value.Env.COUNTER}}{{end}}"), 0644) - if err != nil { - t.Errorf("Failed to write to temp file: %v\n", err) - } + close(input) + <-done - var destFiles []*os.File - for i := 0; i < 4; i++ { - destFile, err := os.CreateTemp(os.TempDir(), "docker-gen-out") - if err != nil { - t.Errorf("Failed to create temp file: %v\n", err) - } - destFiles = append(destFiles, destFile) - } - defer func() { - for _, destFile := range destFiles { - destFile.Close() - os.Remove(destFile.Name()) - } - }() - - apiVersion, err := client.Version() - if err != nil { - t.Errorf("Failed to retrieve docker server version info: %v\n", err) - } - context.SetDockerEnv(apiVersion) // prevents a panic - - generator := &generator{ - Client: client, - Endpoint: serverURL, - Configs: config.ConfigFile{ - Config: []config.Config{ - { - Template: tmplFile.Name(), - Dest: destFiles[0].Name(), - Watch: false, - }, - { - Template: tmplFile.Name(), - Dest: destFiles[1].Name(), - Watch: true, - Wait: &config.Wait{Min: 0, Max: 0}, - }, - { - Template: tmplFile.Name(), - Dest: destFiles[2].Name(), - Watch: true, - Wait: &config.Wait{Min: 200 * time.Millisecond, Max: 250 * time.Millisecond}, - }, - { - Template: tmplFile.Name(), - Dest: destFiles[3].Name(), - Watch: true, - Wait: &config.Wait{Min: 250 * time.Millisecond, Max: 1 * time.Second}, - }, - }, - }, - retry: false, - } + // One coalesced event, fired Min (200ms) after the last event (t=300ms). + assert.Equal(t, []time.Duration{500 * time.Millisecond}, fires) + }) + }) + + t.Run("Max caps the wait when events keep arriving", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + input := make(chan *docker.APIEvents) + out := newDebounceChannel(input, &config.Wait{Min: 200 * time.Millisecond, Max: 250 * time.Millisecond}) + + start := time.Now() + var fires []time.Duration + done := make(chan struct{}) + go func() { + for range out { + fires = append(fires, time.Since(start)) + } + close(done) + }() + + input <- newStartEvent() // t=0: minTimer->200ms, maxTimer->250ms + time.Sleep(150 * time.Millisecond) + input <- newStartEvent() // t=150ms: minTimer reset->350ms, maxTimer still 250ms + time.Sleep(150 * time.Millisecond) // maxTimer fires at 250ms -> first output + input <- newStartEvent() // t=300ms: new burst, minTimer->500ms + time.Sleep(time.Second) // advance the fake clock so the pending timer fires (500ms) + synctest.Wait() - generator.generateFromEvents() - generator.wg.Wait() - - var ( - value []byte - expected string - ) - - // The counter is incremented in each output file in the following sequence: - // - // init 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms 650ms 700ms - // ├──────╫──────┼──────┼──────╫──────┼──────┼──────╫──────┼──────┼──────┼──────┼──────┤ - // File0 ├─ 1 ║ ║ ║ - // File1 ├─ 2 ╟─ 5 ╟─ 6 ╟─ 8 - // File2 ├─ 3 ╟───── max (250ms) ──║───────────> 7 ╟─────── min (200ms) ─────> 9 - // File3 └─ 4 ╟──────────────────> ╟──────────────────> ╟─────────── min (250ms) ────────> 10 - // ┌───╨───┐ ┌───╨──┐ ┌───╨───┐ - // │ start │ │ stop │ │ start │ - // └───────┘ └──────┘ └───────┘ - - expectedCounters := []int{1, 8, 9, 10} - - for i, counter := range expectedCounters { - value, _ = os.ReadFile(destFiles[i].Name()) - expected = fmt.Sprintf("%s.%d", containerID, counter) - if string(value) != expected { - t.Errorf("expected: %s. got: %s", expected, value) - } - } + close(input) + <-done + + // First output capped by Max at 250ms; second is Min after the t=300ms event. + assert.Equal(t, []time.Duration{250 * time.Millisecond, 500 * time.Millisecond}, fires) + }) + }) } func TestSortNetworks(t *testing.T) {