Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubewatch/pkg/informer/cluster/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ClientAdvisor interface {

func (impl *InformerImpl) GetClient(clientType bean.ClientType, clusterInfo *repository.Cluster) (ClientAdvisor, error) {
if !impl.IsMultiClusterMode(clientType) && !clusterInfo.IsDefault() {
impl.logger.Debugw("informer is not supported for cluster, skipping...", "clientType", clientType, "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
impl.logger.Warnw("informer is not supported for cluster, skipping...", "clientType", clientType, "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
return NewUnimplementedAdvisor(), nil
}
return impl.GetClientAdvisor(clientType)
Expand Down
4 changes: 2 additions & 2 deletions kubewatch/pkg/informer/cluster/argoCD/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func NewInformerImpl(logger *zap.SugaredLogger,

func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluster) error {
if !impl.appConfig.GetAcdConfig().ACDInformer || impl.appConfig.GetExternalConfig().External {
impl.logger.Debugw("argo cd informer is not enabled for cluster, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
impl.logger.Warnw("argo cd informer is not enabled for cluster, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
return nil
}
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start argo cd informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
impl.logger.Infow("time taken to start argo cd informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
}()
stopper, ok := impl.argoCdInformerStopper[clusterInfo.Id]
if ok && stopper.HasInformer() {
Expand Down
4 changes: 2 additions & 2 deletions kubewatch/pkg/informer/cluster/argoWf/cd/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func NewInformerImpl(logger *zap.SugaredLogger,

func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluster) error {
if !impl.appConfig.GetCdConfig().CdInformer {
impl.logger.Debugw("cd argo workflow informer is not enabled, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
impl.logger.Warnw("cd argo workflow informer is not enabled, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
return nil
}
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start cd argo workflow informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
impl.logger.Infow("time taken to start cd argo workflow informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
}()
restConfig := impl.k8sUtil.GetK8sConfigForCluster(clusterInfo)
cdWfInformer := impl.informerClient.GetSharedInformerClient(resourceBean.CdWorkflowResourceType)
Expand Down
4 changes: 2 additions & 2 deletions kubewatch/pkg/informer/cluster/argoWf/ci/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func NewInformerImpl(logger *zap.SugaredLogger,

func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluster) error {
if !impl.appConfig.GetCiConfig().CiInformer {
impl.logger.Debugw("ci argo workflow informer is not enabled, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
impl.logger.Warnw("ci argo workflow informer is not enabled, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
return nil
}
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start ci argo workflow informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
impl.logger.Infow("time taken to start ci argo workflow informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
}()
restConfig := impl.k8sUtil.GetK8sConfigForCluster(clusterInfo)
ciWfInformer := impl.informerClient.GetSharedInformerClient(resourceBean.CiWorkflowResourceType)
Expand Down
4 changes: 2 additions & 2 deletions kubewatch/pkg/informer/cluster/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (impl *InformerImpl) stopInformersForCluster(clusterId int) error {
func (impl *InformerImpl) startInformerForCluster(clusterInfo *repository.Cluster) error {
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start informers for cluster", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
impl.logger.Infow("time taken to start informers for cluster", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
}()
if len(clusterInfo.ErrorInConnecting) > 0 {
impl.logger.Debugw("cluster is not reachable", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (impl *InformerImpl) syncMultiClusterInformer(clusterId int) error {
func (impl *InformerImpl) reloadInformerForCluster(clusterId int) error {
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to reload informer for cluster", "clusterId", clusterId, "time", time.Since(startTime))
impl.logger.Infow("time taken to reload informer for cluster", "clusterId", clusterId, "time", time.Since(startTime))
}()

clusterInfo, err := impl.clusterRepository.FindById(clusterId)
Expand Down
2 changes: 1 addition & 1 deletion kubewatch/pkg/informer/cluster/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewInformerImpl(logger *zap.SugaredLogger,
func (impl *InformerImpl) StartDevtronClusterWatcher() error {
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start default cluster informer", "time", time.Since(startTime))
impl.logger.Infow("time taken to start default cluster informer", "time", time.Since(startTime))
}()
clusterInfo, err := impl.clusterRepository.FindByName(commonBean.DEFAULT_CLUSTER)
if err != nil && !errors.Is(err, pg.ErrNoRows) {
Expand Down
32 changes: 26 additions & 6 deletions kubewatch/pkg/informer/cluster/systemExec/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (impl *InformerImpl) inferFailedReason(eventType string, pod *coreV1.Pod) (
ctrs = append(pod.Status.InitContainerStatuses, ctrs...)

for _, ctr := range ctrs {
// In our case, we are not using init/wait containers. So ideally, there should be only main container.

// Virtual Kubelet environment will not set the terminate on waiting container
// https://github.com/argoproj/argo-workflows/issues/3879
Expand All @@ -203,10 +204,10 @@ func (impl *InformerImpl) inferFailedReason(eventType string, pod *coreV1.Pod) (
if eventType == bean.DeleteEvent {
// we should mark this case as an error
if ctr.Name == common.MainContainerName {
return v1alpha1.NodeFailed, getFailedReasonFromPodConditions(pod.Status.Conditions)
return v1alpha1.NodeFailed, impl.getFailedReasonFromPodConditions(pod.Status.Conditions)
}
}
impl.logger.Warnf("Pod %s phase was Failed but %s did not have terminated state", pod.Name, ctr.Name)
impl.logger.Warnw("Pod phase was Failed but container did not have terminated state", "podName", pod.Name, "containerName", ctr.Name, "status", pod.Status)
continue
}

Expand Down Expand Up @@ -248,15 +249,34 @@ func (impl *InformerImpl) inferFailedReason(eventType string, pod *coreV1.Pod) (
// it was marking it successful, doing this as it will be skipped at upper level, and delete event will handle it.
// ticket - you can find debug logs/details here - https://github.com/devtron-labs/sprint-tasks/issues/2092
impl.logger.Infow("Pod phase was Failed but no container had terminated state, marking it as failed now", "podName", pod.Name, "status", pod.Status)

// Here we're intentionally returning empty string as message,
// as we don't want this event to be intercepted as actual update event.
// This case will be handled by the delete event.
return v1alpha1.NodeFailed, ""
}

func getFailedReasonFromPodConditions(conditions []coreV1.PodCondition) string {
func (impl *InformerImpl) getFailedReasonFromPodConditions(conditions []coreV1.PodCondition) string {
if len(conditions) == 0 {
return "failed"
// This should never happen.
impl.logger.Warnw("Pod phase was Failed but no conditions found")
// If we get here, that means the pod is deleted after the container is started.
// The Only possible reason is NodeNoLongerExists.
return bean.NodeNoLongerExists
}

return conditions[0].Message
// Sort the conditions by last transition time, newest first.
// This is to ensure that we get the latest reason for the failure.
sort.Slice(conditions, func(i, j int) bool {
return conditions[i].LastTransitionTime.Time.After(conditions[j].LastTransitionTime.Time)
})
// If the latest condition has a message, use that.
if conditions[0].Message != "" {
return conditions[0].Message
}
// If we get here, that means the pod is deleted after the container is started.
// The Only possible reason is NodeNoLongerExists.
impl.logger.Warnw("Pod phase was Failed but the lastest condition has no message", "conditions", conditions)
return bean.NodeNoLongerExists
}

// foundAnyUpdateInPodStatus return true if any of the pod status fields have changed or if the pod is new
Expand Down
30 changes: 14 additions & 16 deletions kubewatch/pkg/informer/cluster/systemExec/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"log"
"time"
)

Expand Down Expand Up @@ -63,12 +62,12 @@ func NewInformerImpl(logger *zap.SugaredLogger,

func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluster) error {
if impl.appConfig.GetExternalConfig().External {
impl.logger.Debugw("argo workflow setup is not done for cluster, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
impl.logger.Warnw("argo workflow setup is not done for cluster, skipping...", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName, "appConfig", impl.appConfig)
return nil
}
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start system executor informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
impl.logger.Infow("time taken to start system executor informer", "clusterId", clusterInfo.Id, "time", time.Since(startTime))
}()
impl.logger.Infow("starting system executor informer for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName)
restConfig := impl.k8sUtil.GetK8sConfigForCluster(clusterInfo)
Expand All @@ -87,7 +86,7 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
if oldPodObj != nil {
logArgs = append(logArgs, "oldPodStatusObj", oldPodObj.Status)
}
impl.logger.Debugw("no significant pod updates are detected so skipping the pod update event", logArgs...)
impl.logger.Warnw("no significant pod updates are detected so skipping the pod update event", logArgs...)
return
}

Expand All @@ -101,17 +100,15 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
nodeStatus := impl.assessNodeStatus(bean.UpdateEvent, newPodObj)
workflowStatus := getWorkflowStatus(newPodObj, nodeStatus, workflowType)
if workflowStatus.Message == "" && workflowStatus.Phase == v1alpha1.WorkflowFailed {
impl.logger.Debugw("skipping the failed workflow update event as message is empty", "workflow", workflowStatus)
impl.logger.Warnw("skipping the failed workflow update event as message is empty", "workflow", workflowStatus, "podObjStatus", newPodObj.Status)
return
}
if val, ok := podLabels[informerBean.DevtronOwnerInstanceLabelKey]; ok {
workflowStatus.DevtronOwnerInstance = val
} else {
impl.logger.Warnw("devtron administrator instance label is not found in the pod. not a devtron workflow", "podLabels", podLabels)
middleware.IncNonAdministrativeEvents(clusterLabels, middleware.RESOURCE_K8S_JOB)
// return statement is skipped intentionally for backward compatibility
// TODO Asutosh: remove this return statement in future
// return
impl.logger.Warnw("devtron administrator instance label is not found in the pod. not a devtron workflow", "podLabels", podLabels)
return
}
wfJson, err := json.Marshal(workflowStatus)
if err != nil {
Expand All @@ -120,7 +117,8 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
}
impl.logger.Debugw("sending system executor workflow update event", "workflow", string(wfJson))
if impl.pubSubClient == nil {
log.Println("don't publish")
// This should never happen.
impl.logger.Errorw("pubsub client is nil, skipping the publish")
return
}
topic, err := argoWf.GetNatsTopicForWorkflow(workflowType)
Expand All @@ -133,7 +131,7 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
impl.logger.Errorw("error while publishing request", "topic", topic, "wfJson", wfJson, "err", err)
return
}
impl.logger.Debugw("system executor workflow update sent", "topic", topic, "workflowType", workflowType)
impl.logger.Infow("system executor workflow update sent", "topic", topic, "workflowType", workflowType)
}
}
// deleteFunc is called when an existing pod is deleted
Expand All @@ -148,6 +146,7 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
nodeStatus, reTriggerRequired := impl.checkIfPodDeletedAndUpdateMessage(podObj.Name, podObj.Namespace, nodeStatus, restConfig)
if !reTriggerRequired {
// not sending this deleted event if it's not a re-trigger case
impl.logger.Warnw("not sending delete event as it's not a re-trigger case", "podName", podObj.Name, "podObjStatus", podObj.Status, "deletionTimestamp", podObj.DeletionTimestamp)
return
}
workflowStatus := getWorkflowStatus(podObj, nodeStatus, workflowType)
Expand All @@ -156,9 +155,7 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
} else {
impl.logger.Warnw("devtron administrator instance label is not found in the pod. not a devtron workflow", "podLabels", podLabels)
middleware.IncNonAdministrativeEvents(clusterLabels, middleware.RESOURCE_K8S_JOB)
// return statement is skipped intentionally for backward compatibility
// TODO Asutosh: remove this return statement in future
// return
return
}
wfJson, err := json.Marshal(workflowStatus)
if err != nil {
Expand All @@ -167,7 +164,8 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
}
impl.logger.Debugw("sending system executor cd workflow delete event", "workflow", string(wfJson))
if impl.pubSubClient == nil {
log.Println("don't publish")
// This should never happen.
impl.logger.Errorw("pubsub client is nil, skipping the publish")
return
}
topic, err := argoWf.GetNatsTopicForWorkflow(workflowType)
Expand All @@ -180,7 +178,7 @@ func (impl *InformerImpl) StartInformerForCluster(clusterInfo *repository.Cluste
impl.logger.Errorw("error while publishing request", "topic", topic, "wfJson", wfJson, "err", err)
return
}
impl.logger.Debugw("workflow update sent", "topic", topic, "workflowType", workflowType)
impl.logger.Infow("workflow update sent", "topic", topic, "workflowType", workflowType)
}
podInformerFactory := impl.informerClient.GetPodInformerFactory()
eventHandler := resourceBean.NewEventHandlers[coreV1.Pod]().
Expand Down
4 changes: 2 additions & 2 deletions kubewatch/pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewRunnerImpl(logger *zap.SugaredLogger,
func (impl *RunnerImpl) Start() error {
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start informer", "time", time.Since(startTime))
impl.logger.Infow("time taken to start informer", "time", time.Since(startTime))
}()
if impl.appConfig.IsDBAvailable() {
err := impl.clusterInformer.StartDevtronClusterWatcher()
Expand All @@ -76,7 +76,7 @@ func (impl *RunnerImpl) Start() error {
func (impl *RunnerImpl) Stop() {
startTime := time.Now()
defer func() {
impl.logger.Debugw("time taken to start default cluster informer", "time", time.Since(startTime))
impl.logger.Infow("time taken to start default cluster informer", "time", time.Since(startTime))
}()
impl.clusterInformer.StopAll()
}
Loading