Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 78 additions & 27 deletions ci-runner/helper/DockerHelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
Expand All @@ -38,18 +51,6 @@ import (
"github.com/devtron-labs/common-lib/utils/dockerOperations"
"github.com/devtron-labs/common-lib/utils/retryFunc"
"golang.org/x/sync/errgroup"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
)

const (
Expand All @@ -74,15 +75,23 @@ type DockerHelper interface {
GetDockerAuthConfigForPrivateRegistries(workflowRequest *CommonWorkflowRequest) *bean.DockerAuthConfig
}

// BuildxK8sClientFactory creates a BuildxK8sInterface from deployment names.
// Abstracted for testability — production code uses newBuildxK8sClient as the default.
type BuildxK8sClientFactory func(deploymentNames []string) (BuildxK8sInterface, error)

type DockerHelperImpl struct {
DockerCommandEnv []string
cmdExecutor CommandExecutor
k8sClientFactory BuildxK8sClientFactory
}

func NewDockerHelperImpl(cmdExecutor CommandExecutor) *DockerHelperImpl {
return &DockerHelperImpl{
DockerCommandEnv: os.Environ(),
cmdExecutor: cmdExecutor,
k8sClientFactory: func(names []string) (BuildxK8sInterface, error) {
return newBuildxK8sClient(names)
},
}
}

Expand Down Expand Up @@ -300,9 +309,27 @@ func (impl *DockerHelperImpl) DockerLogin(ciContext cicxt.CiContext, dockerCrede
return performDockerLogin()
}

// waitForBuilderPods waits until all buildx k8s driver pods reach Running state,
// or returns an error if the deadline is exceeded. Extracted for testability.
func waitForBuilderPods(ctx context.Context, k8sClient BuildxK8sInterface, duration time.Duration) error {
log.Println(util.DEVTRON, fmt.Sprintf("waiting for builder pods to be ready (timeout: %v)", duration))
initDone := make(chan bool, 1)
initCtx, initCancel := context.WithTimeout(ctx, duration)
defer initCancel()
go k8sClient.WaitUntilBuilderPodLive(initCtx, initDone)
select {
case <-initDone:
log.Println(util.DEVTRON, "builder pods are ready for build")
return nil
case <-initCtx.Done():
return fmt.Errorf("builder pods did not reach Running state within %v", duration)
}
}

func (impl *DockerHelperImpl) executeDockerReBuild(ciContext cicxt.CiContext, k8sClient BuildxK8sInterface,
useBuildxK8sDriver bool, dockerBuild string, deploymentNames []string,
dockerBuildStageMetadata bean2.DockerBuildStageMetadata, reBuildLogs []any) error {
dockerBuildStageMetadata bean2.DockerBuildStageMetadata, reBuildLogs []any,
builderPodWaitDuration time.Duration) error {
if !useBuildxK8sDriver {
return nil
}
Expand All @@ -311,7 +338,7 @@ func (impl *DockerHelperImpl) executeDockerReBuild(ciContext cicxt.CiContext, k8
log.Println(util.DEVTRON, fmt.Sprintf(" error in RestartBuilders : %s", k8sErr.Error()))
return k8sErr
}
k8sClient, err := newBuildxK8sClient(deploymentNames)
k8sClient, err := impl.k8sClientFactory(deploymentNames)
if err != nil {
log.Println(util.DEVTRON, " error in creating buildxK8sClient , err : ", err.Error())
return err
Expand All @@ -324,18 +351,15 @@ func (impl *DockerHelperImpl) executeDockerReBuild(ciContext cicxt.CiContext, k8
rebuildImageStage := func() error {
// wait for the builder pod to be up again
startTime := time.Now()
util.LogInfo("Waiting for builder pod to be ready,", "timeout: 2 minutes")
done := make(chan bool)
ctx, cancel := context.WithCancel(ciContext)
util.LogInfo("Waiting for builder pod to be ready,", fmt.Sprintf("timeout: %v", builderPodWaitDuration))
done := make(chan bool, 1) // buffered to prevent goroutine leak on timeout
ctx, cancel := context.WithTimeout(ciContext, builderPodWaitDuration)
defer cancel()
go k8sClient.WaitUntilBuilderPodLive(ctx, done)
select {
case <-done:
// builder pod is up again, continue with the build
cancel()
case <-time.After(2 * time.Minute):
// timeout after 2 minutes
cancel()
case <-ctx.Done():
return BuilderPodDeletedError
}
util.LogInfo("DONE -->", time.Since(startTime).Seconds())
Expand Down Expand Up @@ -403,6 +427,10 @@ func (impl *DockerHelperImpl) BuildArtifact(ciRequest *CommonWorkflowRequest) (s
if err != nil {
log.Println("Error while parsing environment variables", err)
}
builderPodWaitDuration := 2 * time.Minute // backward-compat default
if ciRequest.BuildxBuilderPodWaitDurationSecs > 0 {
builderPodWaitDuration = time.Duration(ciRequest.BuildxBuilderPodWaitDurationSecs) * time.Second
}
if ciRequest.DockerImageTag == "" {
ciRequest.DockerImageTag = "latest"
}
Expand Down Expand Up @@ -453,12 +481,12 @@ func (impl *DockerHelperImpl) BuildArtifact(ciRequest *CommonWorkflowRequest) (s
}
useBuildxK8sDriver, eligibleK8sDriverNodes = dockerBuildConfig.CheckForBuildXK8sDriver()
if useBuildxK8sDriver {
deploymentNames, err = impl.createBuildxBuilderWithK8sDriver(ciContext, ciRequest.PropagateLabelsInBuildxPod, ciRequest.DockerConnection, dockerBuildConfig.BuildxDriverImage, eligibleK8sDriverNodes, ciRequest.PipelineId, ciRequest.WorkflowId)
deploymentNames, err = impl.createBuildxBuilderWithK8sDriver(ciContext, ciRequest.PropagateLabelsInBuildxPod, ciRequest.DockerConnection, dockerBuildConfig.BuildxDriverImage, eligibleK8sDriverNodes, ciRequest.PipelineId, ciRequest.WorkflowId, builderPodWaitDuration)
if err != nil {
log.Println(util.DEVTRON, " error in creating buildxDriver , err : ", err.Error())
return err
}
k8sClient, err = newBuildxK8sClient(deploymentNames)
k8sClient, err = impl.k8sClientFactory(deploymentNames)
if err != nil {
log.Println(util.DEVTRON, " error in creating buildxK8sClient , err : ", err.Error())
return err
Expand All @@ -469,6 +497,11 @@ func (impl *DockerHelperImpl) BuildArtifact(ciRequest *CommonWorkflowRequest) (s
log.Println(util.DEVTRON, " error in registering builder pods ", " err: ", err)
return err
}
// Wait for builder pods to reach Running state before starting the build.
// Prevents false-positive BuilderPodDeletedError from pod startup latency.
if err = waitForBuilderPods(ciContext, k8sClient, builderPodWaitDuration); err != nil {
return err
}
} else {
err = impl.createBuildxBuilderForMultiArchBuild(ciContext, ciRequest.DockerConnection, dockerBuildConfig.BuildxDriverImage)
if err != nil {
Expand Down Expand Up @@ -534,7 +567,7 @@ func (impl *DockerHelperImpl) BuildArtifact(ciRequest *CommonWorkflowRequest) (s
reBuildLogs = []any{fmt.Sprintf("Starting re docker build (Attempt %d) : ", attempt), dockerBuild}
}
return impl.executeDockerReBuild(ciContext, k8sClient, useBuildxK8sDriver, dockerBuild,
deploymentNames, dockerBuildStageMetadata, reBuildLogs)
deploymentNames, dockerBuildStageMetadata, reBuildLogs, builderPodWaitDuration)
}
err = retryFunc.RetryWithOutLogging(callback, retryFunc.IsRetryableError, maxRetry, 1*time.Second)
if err != nil {
Expand Down Expand Up @@ -1097,14 +1130,14 @@ func (impl *DockerHelperImpl) createBuildxBuilderForMultiArchBuild(ciContext cic
return nil
}

func (impl *DockerHelperImpl) createBuildxBuilderWithK8sDriver(ciContext cicxt.CiContext, propagateLabelsInBuildxPod bool, dockerConnection, buildxDriverImage string, builderNodes []map[string]string, ciPipelineId, ciWorkflowId int) ([]string, error) {
func (impl *DockerHelperImpl) createBuildxBuilderWithK8sDriver(ciContext cicxt.CiContext, propagateLabelsInBuildxPod bool, dockerConnection, buildxDriverImage string, builderNodes []map[string]string, ciPipelineId, ciWorkflowId int, timeout time.Duration) ([]string, error) {
deploymentNames := make([]string, 0)
if len(builderNodes) == 0 {
return deploymentNames, errors.New("atleast one node is expected for builder with kubernetes driver")
}
for i := 0; i < len(builderNodes); i++ {
nodeOpts := builderNodes[i]
builderCmd, deploymentName, err := getBuildxK8sDriverCmd(propagateLabelsInBuildxPod, dockerConnection, buildxDriverImage, nodeOpts, ciPipelineId, ciWorkflowId)
builderCmd, deploymentName, err := getBuildxK8sDriverCmd(propagateLabelsInBuildxPod, dockerConnection, buildxDriverImage, nodeOpts, ciPipelineId, ciWorkflowId, timeout)
if err != nil {
return deploymentNames, err
}
Expand Down Expand Up @@ -1181,7 +1214,7 @@ func (impl *DockerHelperImpl) runCmd(cmd string) (error, *bytes.Buffer) {
return err, errBuf
}

func getBuildxK8sDriverCmd(propagateLabelsInBuildxPod bool, dockerConnection, buildxDriverImage string, driverOpts map[string]string, ciPipelineId, ciWorkflowId int) (string, string, error) {
func getBuildxK8sDriverCmd(propagateLabelsInBuildxPod bool, dockerConnection, buildxDriverImage string, driverOpts map[string]string, ciPipelineId, ciWorkflowId int, timeout time.Duration) (string, string, error) {
buildxCreate := "docker buildx create --buildkitd-flags '--allow-insecure-entitlement network.host --allow-insecure-entitlement security.insecure' --name=%s --driver=kubernetes --node=%s --bootstrap "
nodeName := driverOpts["node"]
if nodeName == "" {
Expand All @@ -1203,6 +1236,9 @@ func getBuildxK8sDriverCmd(propagateLabelsInBuildxPod bool, dockerConnection, bu
}

driverOpts["driverOptions"] = getBuildXDriverOptionsWithImage(buildxDriverImage, driverOpts["driverOptions"])
if timeout > 0 {
driverOpts["driverOptions"] = getBuildXDriverOptionsWithTimeout(timeout, driverOpts["driverOptions"])
}
if len(driverOpts["driverOptions"]) > 0 {
buildxCreate += " '--driver-opt=%s' "
buildxCreate = fmt.Sprintf(buildxCreate, driverOpts["driverOptions"])
Expand All @@ -1227,6 +1263,21 @@ func getBuildXDriverOptionsWithImage(buildxDriverImage, driverOptions string) st
return driverOptions
}

func getBuildXDriverOptionsWithTimeout(timeout time.Duration, driverOptions string) string {
if strings.HasPrefix(driverOptions, "timeout=") ||
strings.Contains(driverOptions, ",timeout=") {
// if timeout is already present in driver options then do not override it, just return the existing options
return driverOptions
}
timeoutOption := fmt.Sprintf("\"timeout=%s\"", timeout.String())
if len(driverOptions) > 0 {
driverOptions += fmt.Sprintf(",%s", timeoutOption)
} else {
driverOptions = timeoutOption
}
return driverOptions
}

func getBuildXDriverOptionsWithLabelsAndAnnotations(driverOptions string) (string, error) {
// not passing annotation as of now because --driver-opt=annotations is not supported by buildx if contains quotes
labels := make(map[string]string)
Expand Down
4 changes: 2 additions & 2 deletions ci-runner/helper/DockerHelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCreateBuildXK8sDriver(t *testing.T) {
eligibleK8sNodes := dockerBuildConfig.GetEligibleK8sDriverNodes()
impl := getDockerHelperImpl()
ciContext := cicxt.BuildCiContext(context.Background(), true)
_, err := impl.createBuildxBuilderWithK8sDriver(ciContext, false, "", "", eligibleK8sNodes, 1, 1)
_, err := impl.createBuildxBuilderWithK8sDriver(ciContext, false, "", "", eligibleK8sNodes, 1, 1, 0)
t.Cleanup(func() {
buildxDelete := fmt.Sprintf("docker buildx rm %s", BUILDX_K8S_DRIVER_NAME)
builderRemoveCmd := exec.Command("/bin/sh", "-c", buildxDelete)
Expand All @@ -64,7 +64,7 @@ func TestCleanBuildxK8sDriver(t *testing.T) {
eligibleK8sNodes := dockerBuildConfig.GetEligibleK8sDriverNodes()
impl := getDockerHelperImpl()
ciContext := cicxt.BuildCiContext(context.Background(), true)
_, err := impl.createBuildxBuilderWithK8sDriver(ciContext, false, "", "", eligibleK8sNodes, 1, 1)
_, err := impl.createBuildxBuilderWithK8sDriver(ciContext, false, "", "", eligibleK8sNodes, 1, 1, 0)
if err != nil {
fmt.Println(err.Error())
t.Fail()
Expand Down
71 changes: 36 additions & 35 deletions ci-runner/helper/EventHelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
bean2 "github.com/devtron-labs/common-lib/imageScan/bean"
"github.com/devtron-labs/common-lib/utils/remoteConnection/bean"
"log"
"net/http"
"strings"
Expand All @@ -31,7 +29,9 @@ import (
"github.com/devtron-labs/ci-runner/pubsub"
"github.com/devtron-labs/ci-runner/util"
blobStorage "github.com/devtron-labs/common-lib/blob-storage"
bean2 "github.com/devtron-labs/common-lib/imageScan/bean"
pubSub "github.com/devtron-labs/common-lib/pubsub-lib"
"github.com/devtron-labs/common-lib/utils/remoteConnection/bean"
"github.com/go-resty/resty/v2"
)

Expand Down Expand Up @@ -170,39 +170,40 @@ type CommonWorkflowRequest struct {
EnableSecretMasking bool `json:"enableSecretMasking"`
PropagateLabelsInBuildxPod bool `json:"propagateLabelsInBuildxPod"`
// Data from CD Workflow service
WorkflowRunnerId int `json:"workflowRunnerId"`
CdPipelineId int `json:"cdPipelineId"`
StageYaml string `json:"stageYaml"`
ArtifactLocation string `json:"artifactLocation"`
CiArtifactDTO CiArtifactDTO `json:"ciArtifactDTO"`
CdImage string `json:"cdImage"`
StageType string `json:"stageType"`
CdCacheLocation string `json:"cdCacheLocation"`
CdCacheRegion string `json:"cdCacheRegion"`
WorkflowPrefixForLog string `json:"workflowPrefixForLog"`
DeploymentTriggeredBy string `json:"deploymentTriggeredBy,omitempty"`
DeploymentTriggerTime time.Time `json:"deploymentTriggerTime,omitempty"`
DeploymentReleaseCounter int `json:"deploymentReleaseCounter,omitempty"`
PrePostDeploySteps []*StepObject `json:"prePostDeploySteps"`
TaskYaml *TaskYaml `json:"-"`
IsVirtualExecution bool `json:"isVirtualExecution"`
CiArtifactLastFetch time.Time `json:"ciArtifactLastFetch"`
CiPipelineType string `json:"CiPipelineType"`
RegistryDestinationImageMap map[string][]string `json:"registryDestinationImageMap"`
RegistryCredentialMap map[string]RegistryCredentials `json:"registryCredentialMap"`
PluginArtifactStage string `json:"pluginArtifactStage"`
PushImageBeforePostCI bool `json:"pushImageBeforePostCI"`
IntermediateDockerRegistryUrl string `json:"-"` // this URL will be used for all operations and can be mutated
BuildxCacheModeMin bool `json:"buildxCacheModeMin"`
AsyncBuildxCacheExport bool `json:"asyncBuildxCacheExport"`
BuildxInterruptionMaxRetry int `json:"buildxInterruptionMaxRetry"`
UseDockerApiToGetDigest bool `json:"useDockerApiToGetDigest"`
HostUrl string `json:"hostUrl"`
ImageScanningSteps []*ImageScanningSteps `json:"imageScanningSteps,omitempty"`
ExecuteImageScanningVia bean2.ScanExecutionMedium `json:"executeImageScanningVia,omitempty"`
AwsInspectorConfig string `json:"awsInspectorConfig,omitempty"`
PartSize int64 `json:"partSize,omitempty"`
ConcurrencyMultiplier int `json:"concurrencyMultiplier,omitempty"`
WorkflowRunnerId int `json:"workflowRunnerId"`
CdPipelineId int `json:"cdPipelineId"`
StageYaml string `json:"stageYaml"`
ArtifactLocation string `json:"artifactLocation"`
CiArtifactDTO CiArtifactDTO `json:"ciArtifactDTO"`
CdImage string `json:"cdImage"`
StageType string `json:"stageType"`
CdCacheLocation string `json:"cdCacheLocation"`
CdCacheRegion string `json:"cdCacheRegion"`
WorkflowPrefixForLog string `json:"workflowPrefixForLog"`
DeploymentTriggeredBy string `json:"deploymentTriggeredBy,omitempty"`
DeploymentTriggerTime time.Time `json:"deploymentTriggerTime,omitempty"`
DeploymentReleaseCounter int `json:"deploymentReleaseCounter,omitempty"`
PrePostDeploySteps []*StepObject `json:"prePostDeploySteps"`
TaskYaml *TaskYaml `json:"-"`
IsVirtualExecution bool `json:"isVirtualExecution"`
CiArtifactLastFetch time.Time `json:"ciArtifactLastFetch"`
CiPipelineType string `json:"CiPipelineType"`
RegistryDestinationImageMap map[string][]string `json:"registryDestinationImageMap"`
RegistryCredentialMap map[string]RegistryCredentials `json:"registryCredentialMap"`
PluginArtifactStage string `json:"pluginArtifactStage"`
PushImageBeforePostCI bool `json:"pushImageBeforePostCI"`
IntermediateDockerRegistryUrl string `json:"-"` // this URL will be used for all operations and can be mutated
BuildxCacheModeMin bool `json:"buildxCacheModeMin"`
AsyncBuildxCacheExport bool `json:"asyncBuildxCacheExport"`
BuildxInterruptionMaxRetry int `json:"buildxInterruptionMaxRetry"`
BuildxBuilderPodWaitDurationSecs int `json:"buildxBuilderPodWaitDurationSecs"`
UseDockerApiToGetDigest bool `json:"useDockerApiToGetDigest"`
HostUrl string `json:"hostUrl"`
ImageScanningSteps []*ImageScanningSteps `json:"imageScanningSteps,omitempty"`
ExecuteImageScanningVia bean2.ScanExecutionMedium `json:"executeImageScanningVia,omitempty"`
AwsInspectorConfig string `json:"awsInspectorConfig,omitempty"`
PartSize int64 `json:"partSize,omitempty"`
ConcurrencyMultiplier int `json:"concurrencyMultiplier,omitempty"`
}

func (c *CommonWorkflowRequest) IsPreCdStage() bool {
Expand Down
Loading
Loading