@@ -26,10 +26,12 @@ import (
2626 "time"
2727
2828 mysqlDriver "github.com/go-sql-driver/mysql"
29- "github.com/ory/dockertest/v3"
30- "github.com/ory/dockertest/v3/docker"
29+ "github.com/moby/moby/api/types/container"
3130 "github.com/stretchr/testify/assert"
3231 "github.com/stretchr/testify/require"
32+ "github.com/testcontainers/testcontainers-go"
33+ "github.com/testcontainers/testcontainers-go/network"
34+ "github.com/testcontainers/testcontainers-go/wait"
3335
3436 _ "github.com/redpanda-data/benthos/v4/public/components/pure"
3537 "github.com/redpanda-data/benthos/v4/public/service"
@@ -45,66 +47,68 @@ func TestIntegrationDorisStreamLoadOutput(t *testing.T) {
4547 }
4648
4749 ctx := t .Context ()
48- pool , err := dockertest .NewPool ("" )
49- require .NoError (t , err )
50- if err := pool .Client .Ping (); err != nil {
51- t .Skipf ("Skipping Doris Docker integration test because Docker is unavailable: %v" , err )
52- }
53- pool .MaxWait = 5 * time .Minute
5450
55- network , feIP , beIP := createDorisIntegrationNetwork (t , pool )
56- t .Cleanup (func () {
57- assert .NoError (t , pool .RemoveNetwork (network ))
58- })
51+ dockerNet , err := network .New (ctx )
52+ require .NoError (t , err )
53+ t .Cleanup (func () { _ = dockerNet .Remove (context .Background ()) })
54+
55+ // Set a stable hostname so the FE can reference itself in FE_SERVERS using
56+ // Docker's internal DNS rather than a pre-allocated IP.
57+ fe , err := testcontainers .Run (ctx , "apache/doris:fe-" + dorisIntegrationVersion ,
58+ testcontainers .WithConfigModifier (func (c * container.Config ) {
59+ c .Hostname = "doris-fe"
60+ }),
61+ network .WithNetwork ([]string {"doris-fe" }, dockerNet ),
62+ testcontainers .WithExposedPorts ("8030/tcp" , "9010/tcp" , "9030/tcp" ),
63+ testcontainers .WithEnv (map [string ]string {
64+ "FE_SERVERS" : "fe1:doris-fe:9010" ,
65+ "FE_ID" : "1" ,
66+ }),
67+ testcontainers .WithWaitStrategy (
68+ wait .ForListeningPort ("9030/tcp" ).WithStartupTimeout (5 * time .Minute ),
69+ ),
70+ )
71+ testcontainers .CleanupContainer (t , fe )
72+ require .NoError (t , err )
5973
60- feName := fmt .Sprintf ("doris-it-fe-%d" , time .Now ().UnixNano ())
61- fe , err := pool .RunWithOptions (& dockertest.RunOptions {
62- Name : feName ,
63- Hostname : feName ,
64- Repository : "apache/doris" ,
65- Tag : "fe-" + dorisIntegrationVersion ,
66- Networks : []* dockertest.Network {network },
67- ExposedPorts : []string {"8030/tcp" , "9010/tcp" , "9030/tcp" },
68- Env : []string {
69- "FE_SERVERS=fe1:" + feIP + ":9010" ,
70- "FE_ID=1" ,
71- },
72- }, noRestartAutoRemove )
74+ // Get the FE's internal Docker network IP so the BE can connect to it.
75+ feIP := dorisContainerNetworkIP (t , ctx , fe , dockerNet .Name )
76+
77+ // BE_ADDR is intentionally omitted: the BE image auto-detects its own
78+ // Docker network IP, which is exactly what stream-load redirects will use.
79+ be , err := testcontainers .Run (ctx , "apache/doris:be-" + dorisIntegrationVersion ,
80+ network .WithNetwork ([]string {"doris-be" }, dockerNet ),
81+ testcontainers .WithExposedPorts ("8040/tcp" , "9050/tcp" ),
82+ testcontainers .WithEnv (map [string ]string {
83+ "FE_SERVERS" : "fe1:" + feIP + ":9010" ,
84+ }),
85+ testcontainers .WithWaitStrategy (
86+ wait .ForListeningPort ("9050/tcp" ).WithStartupTimeout (5 * time .Minute ),
87+ ),
88+ )
89+ testcontainers .CleanupContainer (t , be )
7390 require .NoError (t , err )
74- t .Cleanup (func () {
75- assert .NoError (t , pool .Purge (fe ))
76- })
77- require .Equal (t , feIP , fe .GetIPInNetwork (network ))
78-
79- beName := fmt .Sprintf ("doris-it-be-%d" , time .Now ().UnixNano ())
80- be , err := pool .RunWithOptions (& dockertest.RunOptions {
81- Name : beName ,
82- Hostname : beName ,
83- Repository : "apache/doris" ,
84- Tag : "be-" + dorisIntegrationVersion ,
85- Networks : []* dockertest.Network {network },
86- ExposedPorts : []string {"8040/tcp" , "9050/tcp" },
87- Env : []string {
88- "FE_SERVERS=fe1:" + feIP + ":9010" ,
89- "BE_ADDR=" + beIP + ":9050" ,
90- },
91- }, noRestartAutoRemove )
91+
92+ // Inspect the BE's Docker-internal IP: used for ALTER SYSTEM ADD BACKEND
93+ // and as the redirect target for stream-load requests (routable from the
94+ // host on Linux via Docker bridge networking).
95+ beIP := dorisContainerNetworkIP (t , ctx , be , dockerNet .Name )
96+
97+ queryPort , err := fe .MappedPort (ctx , "9030/tcp" )
9298 require .NoError (t , err )
93- t .Cleanup (func () {
94- assert .NoError (t , pool .Purge (be ))
95- })
96- require .Equal (t , beIP , be .GetIPInNetwork (network ))
9799
98- queryPort := fe .GetPort ("9030/tcp" )
99- db := openDorisIntegrationDB (t , pool , queryPort )
100+ db := openDorisIntegrationDB (t , queryPort .Port ())
100101 t .Cleanup (func () {
101102 assert .NoError (t , db .Close ())
102103 })
103104
104105 t .Log ("Given a Doris FE and BE started from the official Docker images" )
105- waitForDorisBackend (t , pool , db , beIP )
106+ waitForDorisBackend (t , db , beIP )
106107 createDorisStreamLoadTable (t , db )
107108
109+ feHTTPPort , err := fe .MappedPort (ctx , "8030/tcp" )
110+ require .NoError (t , err )
111+
108112 streamBuilder := service .NewStreamBuilder ()
109113 require .NoError (t , streamBuilder .AddOutputYAML (fmt .Sprintf (`
110114doris_stream_load:
@@ -119,7 +123,7 @@ doris_stream_load:
119123 columns: [id, name, created_at]
120124 batching:
121125 count: 2
122- ` , fe . GetPort ( "8030/tcp" ), queryPort )))
126+ ` , feHTTPPort . Port ( ), queryPort . Port () )))
123127
124128 sendBatch , err := streamBuilder .AddBatchProducerFunc ()
125129 require .NoError (t , err )
@@ -155,9 +159,18 @@ doris_stream_load:
155159 }, time .Minute , time .Second )
156160}
157161
158- func noRestartAutoRemove (config * docker.HostConfig ) {
159- config .AutoRemove = true
160- config .RestartPolicy = docker.RestartPolicy {Name : "no" }
162+ // dorisContainerNetworkIP returns the container's IP address on the named
163+ // Docker network by inspecting Docker's container state directly. ContainerIP
164+ // only covers the default bridge; user-defined network IPs are in Networks map.
165+ func dorisContainerNetworkIP (t * testing.T , ctx context.Context , ctr * testcontainers.DockerContainer , networkName string ) string {
166+ t .Helper ()
167+ inspect , err := ctr .Inspect (ctx )
168+ require .NoError (t , err )
169+ netInfo , ok := inspect .NetworkSettings .Networks [networkName ]
170+ require .True (t , ok , "container not found on network %q" , networkName )
171+ ip := netInfo .IPAddress .String ()
172+ require .NotEmpty (t , ip , "container has no IP on network %q" , networkName )
173+ return ip
161174}
162175
163176func ignoreContextCanceled (err error ) error {
@@ -167,30 +180,7 @@ func ignoreContextCanceled(err error) error {
167180 return err
168181}
169182
170- func createDorisIntegrationNetwork (t * testing.T , pool * dockertest.Pool ) (* dockertest.Network , string , string ) {
171- t .Helper ()
172-
173- var lastErr error
174- for i := range 128 {
175- thirdOctet := 50 + int ((time .Now ().UnixNano ()+ int64 (i ))% 150 )
176- subnet := fmt .Sprintf ("168.%d.0.0/24" , thirdOctet )
177- network , err := pool .CreateNetwork (fmt .Sprintf ("doris-it-%d-%d" , time .Now ().UnixNano (), i ), func (config * docker.CreateNetworkOptions ) {
178- config .Driver = "bridge"
179- config .IPAM = & docker.IPAMOptions {
180- Driver : "default" ,
181- Config : []docker.IPAMConfig {{Subnet : subnet }},
182- }
183- })
184- if err == nil {
185- return network , fmt .Sprintf ("168.%d.0.2" , thirdOctet ), fmt .Sprintf ("168.%d.0.3" , thirdOctet )
186- }
187- lastErr = err
188- }
189- require .NoError (t , lastErr )
190- return nil , "" , ""
191- }
192-
193- func openDorisIntegrationDB (t * testing.T , pool * dockertest.Pool , queryPort string ) * sql.DB {
183+ func openDorisIntegrationDB (t * testing.T , queryPort string ) * sql.DB {
194184 t .Helper ()
195185
196186 cfg := mysqlDriver .NewConfig ()
@@ -204,19 +194,23 @@ func openDorisIntegrationDB(t *testing.T, pool *dockertest.Pool, queryPort strin
204194
205195 db , err := sql .Open ("mysql" , cfg .FormatDSN ())
206196 require .NoError (t , err )
207- require .NoError (t , pool .Retry (db .Ping ))
197+ require .Eventually (t , func () bool {
198+ return db .Ping () == nil
199+ }, 5 * time .Minute , time .Second , "Doris FE MySQL port never became ready" )
208200 return db
209201}
210202
211- func waitForDorisBackend (t * testing.T , pool * dockertest. Pool , db * sql.DB , beIP string ) {
203+ func waitForDorisBackend (t * testing.T , db * sql.DB , beIP string ) {
212204 t .Helper ()
213205
214- require .NoError (t , pool .Retry (func () error {
215- if _ , err := db .Exec (fmt .Sprintf (`ALTER SYSTEM ADD BACKEND "%s:9050"` , beIP )); err != nil && ! strings .Contains (strings .ToLower (err .Error ()), "already" ) {
216- return fmt .Errorf ("adding doris backend: %w" , err )
206+ require .Eventually (t , func () bool {
207+ if _ , err := db .Exec (fmt .Sprintf (`ALTER SYSTEM ADD BACKEND "%s:9050"` , beIP )); err != nil {
208+ if ! strings .Contains (strings .ToLower (err .Error ()), "already" ) {
209+ return false
210+ }
217211 }
218- return waitForAliveDorisBackend (db )
219- }) )
212+ return waitForAliveDorisBackend (db ) == nil
213+ }, 5 * time . Minute , time . Second , "Doris BE never became alive" )
220214}
221215
222216func waitForAliveDorisBackend (db * sql.DB ) error {
0 commit comments