Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bcs-services/bcs-bkcmdb-synchronizer/cmd/synchronizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func init() {
blog.Warnf("parse custom resource types from env failed: %s, using default", err)
}

// Parse and set sql log level from environment variable
if err := common.SetSqlLogLevelFromEnv(BkcmdbSynchronizerOption); err != nil {
blog.Warnf("parse sql log level from env failed: %s, using default", err)
}

// Parse and set clean local cache from environment variable
if err := common.SetCleanLocalCacheFromEnv(BkcmdbSynchronizerOption); err != nil {
blog.Warnf("parse clean local cache from env failed: %s, using default", err)
}

if err := common.DecryptCMOption(BkcmdbSynchronizerOption); err != nil {
blog.Fatalf("load config failed, err: %s", err.Error())
}
Expand Down
244 changes: 122 additions & 122 deletions bcs-services/bcs-bkcmdb-synchronizer/internal/pkg/client/cmdb/cmdb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,7 @@ func Test_cmdbClient_GetHostsByBiz(t *testing.T) {
}

func Test_gorm_container(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down Expand Up @@ -2499,7 +2499,7 @@ func Test_gorm_container(t *testing.T) {
}

func Test_gorm_pod(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down Expand Up @@ -2564,7 +2564,7 @@ func Test_gorm_pod(t *testing.T) {
}

func Test_gorm_node(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down Expand Up @@ -2642,7 +2642,7 @@ func Test_gorm_node(t *testing.T) {
}

func Test_gorm_deployment(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down Expand Up @@ -2711,7 +2711,7 @@ func Test_gorm_deployment(t *testing.T) {
}

func Test_gorm_cluster(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down Expand Up @@ -2778,7 +2778,7 @@ func Test_gorm_cluster(t *testing.T) {

// nolint:golint
func Test_cmdbClient_GetBcsCluster_withDB(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down Expand Up @@ -2817,7 +2817,7 @@ func Test_cmdbClient_GetBcsCluster_withDB(t *testing.T) {
}

func Test_gorm_namespace(t *testing.T) {
sq := mySqlite.New("./test1.db")
sq := mySqlite.New("./test1.db", 2)
if sq == nil {
t.Fatal("failed to open database")
}
Expand Down
46 changes: 46 additions & 0 deletions bcs-services/bcs-bkcmdb-synchronizer/internal/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"hash/fnv"
"os"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -135,6 +136,36 @@ func SetCustomResourceTypesFromEnv(opt *option.BkcmdbSynchronizerOption) error {
return nil
}

// SetSqlLogLevelFromEnv parses synchronizer_sqlLogLevel environment variable
// and sets it to the option.
func SetSqlLogLevelFromEnv(opt *option.BkcmdbSynchronizerOption) error {
envValue := os.Getenv("synchronizer_sqlLogLevel")
if envValue == "" {
return nil
}
logLevel, err := strconv.Atoi(envValue)
if err != nil {
return fmt.Errorf("parse sql log level failed: %w", err)
}
opt.Synchronizer.SqlLogLevel = logLevel
return nil
}

// SetCleanLocalCacheFromEnv parses synchronizer_cleanLocalCache environment variable
// and sets it to the option.
func SetCleanLocalCacheFromEnv(opt *option.BkcmdbSynchronizerOption) error {
envValue := os.Getenv("synchronizer_cleanLocalCache")
if envValue == "" {
return nil
}
cleanLocalCache, err := strconv.ParseBool(envValue)
if err != nil {
return fmt.Errorf("parse clean local cache failed: %w", err)
}
opt.Synchronizer.CleanLocalCache = cleanLocalCache
return nil
}

// IsKindInSlice checks if a kind exists in the whitelist
func IsKindInSlice(kind string, whitelist []string) bool {
for _, s := range whitelist {
Expand All @@ -144,3 +175,18 @@ func IsKindInSlice(kind string, whitelist []string) bool {
}
return false
}

// Deref dereferences a pointer and returns its value. If the pointer is nil, it returns the zero value of the type.
// This is a generic tool to prevent panics when dereferencing nil pointers.
func Deref[T any](p *T) T {
if p == nil {
var v T
return v
}
return *p
}

// Ptr returns a pointer to the given value.
func Ptr[T any](v T) *T {
return &v
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *BcsBkcmdbSynchronizerHandler) HandleMsg(

path := "/data/bcs/bcs-bkcmdb-synchronizer/db/" + clusterId + ".db"

db := sqlite.Open(path)
db := sqlite.Open(path, b.Syncer.BkcmdbSynchronizerOption.Synchronizer.SqlLogLevel)
if db == nil {
blog.Errorf("open db failed, path: %s", path)
return
Expand Down Expand Up @@ -197,7 +197,7 @@ func (b *BcsBkcmdbSynchronizerHandler) HandleMsg(

if v, ok := header["resourceType"]; ok {
var errH error
blog.Infof("resourceType: %v", v)
blog.Infof("resourceType: %v, clusterUid: %s, bkBizID: %d", v, bkCluster.Uid, bkCluster.BizID)
switch v.(string) {
case "Pod":
m := podMsg.M
Expand Down Expand Up @@ -292,8 +292,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleCluster(
}

// 打印白名单和黑名单信息
blog.Infof("whiteList: %v, len: %d", whiteList, len(whiteList))
blog.Infof("blackList: %v, len: %d", blackList, len(blackList))
blog.Infof("whiteList: %v, len: %d; blackList: %v, len: %d", whiteList, len(whiteList), blackList, len(blackList))

// 遍历所有集群
for _, cluster := range clusters {
Expand Down Expand Up @@ -415,7 +414,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handlePods(podMsg *msgBuffer, bkCluster *
// blog.Errorf("handlePod: Unable to unmarshal")
// return fmt.Errorf("handlePod: Unable to unmarshal")
// }
blog.Infof("podMsg: %d", len(podMsg.M))
blog.Infof("podMsg: %d, clusterUid: %s, bkBizID: %d", len(podMsg.M), bkCluster.Uid, bkCluster.BizID)
if time.Since(podMsg.T) < 10*time.Second {
// blog.Infof("podMsg.T: %s, %s", podMsg.T, time.Now().Sub(podMsg.T))
if len(podMsg.M) < 100 {
Expand Down Expand Up @@ -817,7 +816,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handlePodsDelete(
}

// 打印日志,显示将要处理的Pod名称
blog.Infof("handlePodsDelete podNames: %v", nsPod)
blog.Infof("handlePodsDelete podNames: %v, clusterUid: %s, bkBizID: %d", nsPod, bkCluster.Uid, bkCluster.BizID)

// 创建一个切片,用于存储要删除的BkPod的ID
bkPodIDs := make([]int64, 0)
Expand Down Expand Up @@ -1403,7 +1402,8 @@ func (b *BcsBkcmdbSynchronizerHandler) handlePodCreate(pod *corev1.Pod, bkCluste
},
}, nil)

blog.Infof("podToAdd: %s+%s+%s", bkCluster.Uid, &pod.Namespace, &pod.Name)
blog.Infof("podToAdd, clusterUid: %s, bizID: %d, clusterID: %d, namespaceID: %d, namespace: %s, workloadKind: %s, workloadName: %s, workloadID: %d, podName: %s, podIP: %s",
bkCluster.Uid, bkNamespace.BizID, bkCluster.ID, bkNamespace.ID, pod.Namespace, workloadKind, workloadName, workloadID, pod.Name, pod.Status.PodIP)

return nil
}
Expand All @@ -1418,7 +1418,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handlePodsCreate(podsCreate map[string]*c
for _, v := range podsCreate {
podNames = append(podNames, v.Name)
}
blog.Infof("handlePodsCreate podNames: %v", podNames)
blog.Infof("handlePodsCreate podNames: %v, clusterUid: %s, bkBizID: %d", podNames, bkCluster.Uid, bkCluster.BizID)

lcReq := cmp.ListClusterReq{
ClusterID: bkCluster.Uid,
Expand Down Expand Up @@ -1823,7 +1823,8 @@ func (b *BcsBkcmdbSynchronizerHandler) handlePodsCreate(podsCreate map[string]*c
},
},
}, db)
blog.Infof("podToAdd: %s+%s+%s", bkCluster.Uid, pod.Namespace, pod.Name)
blog.Infof("podToAdd, clusterUid: %s, bizID: %d, clusterID: %d, namespaceID: %d, namespace: %s, workloadKind: %s, workloadName: %s, workloadID: %d, podName: %s, podIP: %s",
bkCluster.Uid, bkNamespace.BizID, bkCluster.ID, bkNamespace.ID, pod.Namespace, workloadKind, workloadName, workloadID, pod.Name, pod.Status.PodIP)
}

return nil
Expand All @@ -1833,7 +1834,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handlePodsCreate(podsCreate map[string]*c
func (b *BcsBkcmdbSynchronizerHandler) handleDeployment(
msg amqp.Delivery, bkCluster *bkcmdbkube.Cluster, db *gorm.DB) error {
// 记录接收到的消息头信息
blog.Infof("handleDeployment Message: %v", msg.Headers)
blog.Infof("handleDeployment Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)

// 尝试获取消息头信息
msgHeader, err := getMsgHeader(&msg.Headers)
Expand Down Expand Up @@ -2102,7 +2103,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleDeploymentCreate(

func (b *BcsBkcmdbSynchronizerHandler) handleStatefulSet(
msg amqp.Delivery, bkCluster *bkcmdbkube.Cluster, db *gorm.DB) error {
blog.Infof("handleStatefulSet Message: %v", msg.Headers)
blog.Infof("handleStatefulSet Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)
msgHeader, err := getMsgHeader(&msg.Headers)
if err != nil {
blog.Errorf("handleStatefulSet unable to get headers, err: %s", err.Error())
Expand Down Expand Up @@ -2330,7 +2331,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleStatefulSetCreate(

func (b *BcsBkcmdbSynchronizerHandler) handleDaemonSet(
msg amqp.Delivery, bkCluster *bkcmdbkube.Cluster, db *gorm.DB) error {
blog.Infof("handleDaemonSet Message: %v", msg.Headers)
blog.Infof("handleDaemonSet Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)
msgHeader, err := getMsgHeader(&msg.Headers)
if err != nil {
blog.Errorf("handleDaemonSet unable to get headers, err: %s", err.Error())
Expand Down Expand Up @@ -2521,7 +2522,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleDaemonSetCreate(

func (b *BcsBkcmdbSynchronizerHandler) handleGameDeployment(msg amqp.Delivery,
bkCluster *bkcmdbkube.Cluster, db *gorm.DB) error {
blog.Infof("handleGameDeployment Message: %v", msg.Headers)
blog.Infof("handleGameDeployment Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)
msgHeader, err := getMsgHeader(&msg.Headers)
if err != nil {
blog.Errorf("handleGameDeployment unable to get headers, err: %s", err.Error())
Expand Down Expand Up @@ -2719,7 +2720,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleGameDeploymentCreate(
// handle GameStateful Set
func (b *BcsBkcmdbSynchronizerHandler) handleGameStatefulSet(
msg amqp.Delivery, bkCluster *bkcmdbkube.Cluster, db *gorm.DB) error {
blog.Infof("handleGameStatefulSet Message: %v", msg.Headers)
blog.Infof("handleGameStatefulSet Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)
msgHeader, err := getMsgHeader(&msg.Headers)
if err != nil {
blog.Errorf("handleGameStatefulSet unable to get headers, err: %s", err.Error())
Expand Down Expand Up @@ -2916,7 +2917,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleGameStatefulSetCreate(

func (b *BcsBkcmdbSynchronizerHandler) handleNamespace(
msg amqp.Delivery, bkCluster *bkcmdbkube.Cluster, db *gorm.DB) error {
blog.Infof("handleNamespace Message: %v", msg.Headers)
blog.Infof("handleNamespace Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)
msgHeader, err := getMsgHeader(&msg.Headers)
if err != nil {
blog.Errorf("handleNamespace unable to get headers, err: %s", err.Error())
Expand Down Expand Up @@ -3151,7 +3152,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleEvent(msg amqp.Delivery, bkCluster

// Node handle
func (b *BcsBkcmdbSynchronizerHandler) handleNode(msg amqp.Delivery, bkCluster *bkcmdbkube.Cluster) error { // nolint
blog.Infof("handleNode Message: %v", msg.Headers)
blog.Infof("handleNode Message: %v, clusterUid: %s, bkBizID: %d", msg.Headers, bkCluster.Uid, bkCluster.BizID)
msgHeader, err := getMsgHeader(&msg.Headers)
if err != nil {
blog.Errorf("handleNode unable to get headers, err: %s", err.Error())
Expand Down Expand Up @@ -3202,7 +3203,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleNodes(
// return fmt.Errorf("handleNode: Unable to unmarshal")
// }

blog.Infof("nodeMsg: %d", len(nodeMsg.M))
blog.Infof("nodeMsg: %d, clusterUid: %s, bkBizID: %d", len(nodeMsg.M), bkCluster.Uid, bkCluster.BizID)
if time.Since(nodeMsg.T) < 10*time.Second {
// blog.Infof("podMsg.T: %s, %s", podMsg.T, time.Now().Sub(podMsg.T))
if len(nodeMsg.M) < 100 {
Expand Down Expand Up @@ -3583,7 +3584,7 @@ func (b *BcsBkcmdbSynchronizerHandler) handleCustomResource(
clusterID := msgHeader.ClusterId
crKinds, ok := b.Syncer.BkcmdbSynchronizerOption.Synchronizer.CustomResourceTypes[clusterID]
if !ok || len(crKinds) == 0 {
blog.Infof("cluster %s not configured for custom resource sync, skip", clusterID)
blog.Infof("cluster %s not configured for custom resource sync, bkBizID: %d, skip", clusterID, bkCluster.BizID)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type SynchronizerConfig struct {
// Only clusters configured in this map will have custom resources synced
// Example: {"cluster-id-1": ["BkApp", "CronJob"], "cluster-id-2": ["BkApp"]}
CustomResourceTypes map[string][]string `json:"customResourceTypes"`
// SqlLogLevel define sql log level
SqlLogLevel int `json:"sqlLogLevel"`
// CleanLocalCache define whether to clean local cache
CleanLocalCache bool `json:"cleanLocalCache"`
}

// CustomResourceType defines parsed custom resource type configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)

// SQLite 结构体包含一个 GORM 数据库实例和数据库文件路径
Expand All @@ -27,9 +28,9 @@ type SQLite struct {

// New 是 SQLite 的构造函数,它接受一个数据库文件路径作为参数
// 并尝试打开数据库连接,如果成功则返回一个初始化的 SQLite 实例
func New(path string) *SQLite {
db := Open(path) // 尝试打开数据库连接
if db == nil { // 如果打开失败,返回 nil
func New(path string, logLevel int) *SQLite {
db := Open(path, logLevel) // 尝试打开数据库连接
if db == nil { // 如果打开失败,返回 nil
return nil
}
return &SQLite{ // 返回初始化的 SQLite 实例
Expand All @@ -40,9 +41,14 @@ func New(path string) *SQLite {

// Open 函数尝试使用给定的路径打开一个 SQLite 数据库连接
// 如果成功,它返回一个 GORM 数据库实例;如果失败,它会记录错误并返回 nil
func Open(path string) *gorm.DB {
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{}) // 尝试打开数据库连接
if err != nil { // 如果发生错误
func Open(path string, logLevel int) *gorm.DB {
blog.Infof("sqlite.Open called with path: %s, logLevel: %d", path, logLevel)
config := &gorm.Config{
Logger: logger.Default.LogMode(logger.LogLevel(logLevel)),
}

db, err := gorm.Open(sqlite.Open(path), config) // 尝试打开数据库连接
if err != nil { // 如果发生错误
blog.Errorf("Failed to open the SQLite database: %v", err) // 记录错误
return nil // 返回 nil
}
Expand Down
Loading
Loading