From 5955acbd2f2818716989e60c77f8ebb762092892 Mon Sep 17 00:00:00 2001 From: kangxiang Date: Mon, 1 Jun 2026 18:47:44 +0800 Subject: [PATCH] fix: genesis sync --- .../kubernetes_gather/kubernetes_gather.go | 53 +++++-- .../kubernetes_gather/vinterface_and_ip.go | 33 +--- .../cloud/kubernetes_gather_task.go | 6 +- server/controller/common/const.go | 11 +- server/controller/genesis/common/type.go | 3 + server/controller/genesis/config/config.go | 3 +- server/controller/genesis/grpc/server.go | 26 +++- .../genesis/store/kubernetes/store.go | 2 +- .../genesis/store/sync/mysql/datatype.go | 73 ++++++--- .../genesis/store/sync/mysql/run.go | 128 +++++++++++---- .../genesis/store/sync/mysql/store.go | 146 +++++++++++------- .../genesis/store/sync/redis/run.go | 4 + server/controller/genesis/updater/sync.go | 10 +- server/controller/model/model.go | 41 +++++ server/server.yaml | 5 +- 15 files changed, 379 insertions(+), 165 deletions(-) diff --git a/server/controller/cloud/kubernetes_gather/kubernetes_gather.go b/server/controller/cloud/kubernetes_gather/kubernetes_gather.go index 3c2269e6132..4968e92a731 100644 --- a/server/controller/cloud/kubernetes_gather/kubernetes_gather.go +++ b/server/controller/cloud/kubernetes_gather/kubernetes_gather.go @@ -17,6 +17,7 @@ package kubernetes_gather import ( + "errors" "regexp" "strings" "time" @@ -33,6 +34,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/db/metadb" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/genesis" + cmodel "github.com/deepflowio/deepflow/server/controller/model" "github.com/deepflowio/deepflow/server/controller/statsd" "github.com/deepflowio/deepflow/server/libs/logger" ) @@ -63,6 +65,7 @@ type KubernetesGather struct { podGroupLcuuids mapset.Set podNetworkLcuuidCIDRs networkLcuuidCIDRs nodeNetworkLcuuidCIDRs networkLcuuidCIDRs + vinterfaceData []cmodel.GenesisVinterface podIPToLcuuid map[string]string nodeIPToLcuuid map[string]string namespaceToLcuuid map[string]string @@ -96,6 +99,10 @@ func NewKubernetesGather(db *metadb.DB, domain *metadbmodel.Domain, subDomain *m var err error domainConfigJson, err = simplejson.NewJson([]byte(domain.Config)) + if err != nil { + log.Error(err, logger.NewORGPrefix(db.ORGID)) + return nil + } portNameRegex := domainConfigJson.Get("node_port_name_regex").MustString() if portNameRegex == "" { portNameRegex = common.DEFAULT_PORT_NAME_REGEX @@ -202,6 +209,7 @@ func NewKubernetesGather(db *metadb.DB, domain *metadbmodel.Domain, subDomain *m customTagLenMax: cfg.CustomTagLenMax, isSubDomain: isSubDomain, podGroupLcuuids: mapset.NewSet(), + vinterfaceData: []cmodel.GenesisVinterface{}, nodeNetworkLcuuidCIDRs: networkLcuuidCIDRs{}, podNetworkLcuuidCIDRs: networkLcuuidCIDRs{}, podIPToLcuuid: map[string]string{}, @@ -277,7 +285,7 @@ func (k *KubernetesGather) pgSpecGenerateConnections(nsName, pgName, pgLcuuid st if !ok { continue } - cmName := ref.Get("Name").MustString() + cmName := ref.Get("name").MustString() cmLcuuid, ok := k.configMapToLcuuid[[2]string{nsName, cmName}] if !ok { log.Infof("pod group (%s) imported env config map (%s) not found", pgName, cmName, logger.NewORGPrefix(k.orgID)) @@ -328,6 +336,7 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso // 任务循环的是同一个实例,所以这里要对关联关系进行初始化 k.azLcuuid = "" k.k8sEntries = nil + k.vinterfaceData = []cmodel.GenesisVinterface{} k.podNetworkLcuuidCIDRs = networkLcuuidCIDRs{} k.nodeNetworkLcuuidCIDRs = networkLcuuidCIDRs{} k.podGroupLcuuids = mapset.NewSet() @@ -345,6 +354,39 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso k.nsServiceNameToService = map[string]map[string]map[string]int{} k.cloudStatsd = statsd.NewCloudStatsd() + if genesis.GenesisService == nil { + errMsg := "genesis service is nil" + log.Warning(errMsg, logger.NewORGPrefix(k.orgID)) + return model.KubernetesGatherResource{ + ErrorState: common.RESOURCE_STATE_CODE_EXIT, + ErrorMessage: errMsg, + }, errors.New(errMsg) + } + + k8sEntries, err := k.getKubernetesEntries() + if err != nil { + log.Warning(err.Error(), logger.NewORGPrefix(k.orgID)) + return model.KubernetesGatherResource{ + ErrorState: common.RESOURCE_STATE_CODE_WARNING, + ErrorMessage: err.Error(), + }, err + } + k.k8sEntries = k8sEntries + + gsData, err := genesis.GenesisService.GetGenesisSyncResponse(k.orgID) + if err != nil { + return model.KubernetesGatherResource{ + ErrorState: common.RESOURCE_STATE_CODE_EXIT, + ErrorMessage: err.Error(), + }, err + } + for _, v := range gsData.Vinterfaces { + if v.KubernetesClusterID != k.ClusterID { + continue + } + k.vinterfaceData = append(k.vinterfaceData, v) + } + region, err := k.getRegion() if err != nil { return model.KubernetesGatherResource{}, err @@ -368,15 +410,6 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso }, err } - k.k8sEntries, err = k.getKubernetesEntries() - if err != nil { - log.Warning(err.Error(), logger.NewORGPrefix(k.orgID)) - return model.KubernetesGatherResource{ - ErrorState: common.RESOURCE_STATE_CODE_WARNING, - ErrorMessage: err.Error(), - }, err - } - podCluster, err := k.getPodCluster() if err != nil { return model.KubernetesGatherResource{}, err diff --git a/server/controller/cloud/kubernetes_gather/vinterface_and_ip.go b/server/controller/cloud/kubernetes_gather/vinterface_and_ip.go index 552f8a75967..375618181d7 100644 --- a/server/controller/cloud/kubernetes_gather/vinterface_and_ip.go +++ b/server/controller/cloud/kubernetes_gather/vinterface_and_ip.go @@ -17,7 +17,6 @@ package kubernetes_gather import ( - "errors" "regexp" "sort" "strings" @@ -28,7 +27,6 @@ import ( cloudcommon "github.com/deepflowio/deepflow/server/controller/cloud/common" "github.com/deepflowio/deepflow/server/controller/cloud/model" "github.com/deepflowio/deepflow/server/controller/common" - "github.com/deepflowio/deepflow/server/controller/genesis" "github.com/deepflowio/deepflow/server/libs/logger" "github.com/mikioh/ipaddr" ) @@ -58,21 +56,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod return } - // 获取vinterface API返回中host ip与其上所有node ip的对应关系 - if genesis.GenesisService == nil { - err = errors.New("genesis service is nil") - return - } - genesisData, err := genesis.GenesisService.GetGenesisSyncResponse(k.orgID) - if err != nil { - log.Error(err.Error(), logger.NewORGPrefix(k.orgID)) - return - } - vData := genesisData.Vinterfaces - for _, vItem := range vData { - if vItem.KubernetesClusterID != k.ClusterID { - continue - } + for _, vItem := range k.vinterfaceData { deviceType := vItem.DeviceType if deviceType == "docker-host" || deviceType == "kvm-host" { hostIP := vItem.HostIP @@ -96,10 +80,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod } } // 生成device_uuid或uuid和pod lcuuid的对应关系 - for _, vItem := range vData { - if vItem.KubernetesClusterID != k.ClusterID { - continue - } + for _, vItem := range k.vinterfaceData { if vItem.DeviceType != "docker-container" { continue } @@ -135,10 +116,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod } // 处理POD IP,生成port,ip,cidrs信息 - for _, vItem := range vData { - if vItem.KubernetesClusterID != k.ClusterID { - continue - } + for _, vItem := range k.vinterfaceData { if vItem.DeviceType != "docker-container" { continue } @@ -402,10 +380,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod // 处理nodeIP,生成port,ip,cidrs信息 nodeVinterfaceLcuuids := mapset.NewSet() - for _, vItem := range vData { - if vItem.KubernetesClusterID != k.ClusterID { - continue - } + for _, vItem := range k.vinterfaceData { deviceType := vItem.DeviceType if deviceType != "docker-host" && deviceType != "kvm-host" { continue diff --git a/server/controller/cloud/kubernetes_gather_task.go b/server/controller/cloud/kubernetes_gather_task.go index e2c3f8bc9b0..159a2e41193 100644 --- a/server/controller/cloud/kubernetes_gather_task.go +++ b/server/controller/cloud/kubernetes_gather_task.go @@ -122,10 +122,14 @@ func (k *KubernetesGatherTask) run(rSignal *queue.OverwriteQueue) { kResource, err := k.kubernetesGather.GetKubernetesGatherData() // 这里因为任务内部没有对成功的状态赋值状态码,在这里统一处理了 if err != nil { - kResource.ErrorMessage = fmt.Sprintf("%s %s", time.Now().Format(common.GO_BIRTHDAY), err.Error()) + if kResource.ErrorState == common.RESOURCE_STATE_CODE_EXIT { + log.Infof("kubernetes gather (%s) assemble failed: %s", k.kubernetesGather.Name, err.Error(), logger.NewORGPrefix(k.orgID)) + return + } if kResource.ErrorState == 0 { kResource.ErrorState = common.RESOURCE_STATE_CODE_EXCEPTION } + kResource.ErrorMessage = fmt.Sprintf("%s %s", time.Now().Format(common.GO_BIRTHDAY), err.Error()) } else { kResource.ErrorState = common.RESOURCE_STATE_CODE_SUCCESS } diff --git a/server/controller/common/const.go b/server/controller/common/const.go index a3d96ae2b09..2f69f02cf43 100644 --- a/server/controller/common/const.go +++ b/server/controller/common/const.go @@ -622,11 +622,12 @@ const ( ) const ( - RESOURCE_STATE_CODE_SUCCESS = 1 - RESOURCE_STATE_CODE_DELETING = 2 - RESOURCE_STATE_CODE_EXCEPTION = 3 - RESOURCE_STATE_CODE_WARNING = 4 - RESOURCE_STATE_CODE_NO_LICENSE = 5 + RESOURCE_STATE_CODE_SUCCESS = 1 + iota + RESOURCE_STATE_CODE_DELETING + RESOURCE_STATE_CODE_EXCEPTION + RESOURCE_STATE_CODE_WARNING + RESOURCE_STATE_CODE_NO_LICENSE + RESOURCE_STATE_CODE_EXIT ) const ( diff --git a/server/controller/genesis/common/type.go b/server/controller/genesis/common/type.go index 84ba86863e0..053fe742eb6 100644 --- a/server/controller/genesis/common/type.go +++ b/server/controller/genesis/common/type.go @@ -25,11 +25,13 @@ import ( type GenesisSync interface { Start() + GetVtapUpdatedVersion(key string) (uint64, bool) GetGenesisSyncData(orgID int) GenesisSyncDataResponse GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, error) } type GenesisSyncType interface { + GetInfo() string GetLcuuid() string GetVtapID() uint32 } @@ -82,6 +84,7 @@ type VIFRPCMessage struct { MessageType int TeamID uint32 VtapID uint32 + Version uint64 Peer string K8SClusterID string Key string diff --git a/server/controller/genesis/config/config.go b/server/controller/genesis/config/config.go index 654abfb1e3b..4ab38ae833c 100644 --- a/server/controller/genesis/config/config.go +++ b/server/controller/genesis/config/config.go @@ -19,7 +19,7 @@ package config type GenesisConfig struct { AgingTime float64 `default:"86400" yaml:"aging_time"` VinterfaceAgingTime float64 `default:"300" yaml:"vinterface_aging_time"` - AgentHeartBeat float64 `default:"60" yaml:"agent_heart_beat"` + AgentHeartBeat float64 `default:"10" yaml:"agent_heart_beat"` HostIPs []string `yaml:"host_ips"` LocalIPRanges []string `yaml:"local_ip_ranges"` ExcludeIPRanges []string `yaml:"exclude_ip_ranges"` @@ -27,6 +27,7 @@ type GenesisConfig struct { DataPersistenceInterval int `default:"60" yaml:"data_persistence_interval"` MultiNSMode bool `default:"false" yaml:"multi_ns_mode"` SingleVPCMode bool `default:"false" yaml:"single_vpc_mode"` + LogDetailEnabled bool `default:"false" yaml:"log_detail_enabled"` Database string `default:"mysql" yaml:"database"` DefaultVPCName string `default:"default-public-vpc" yaml:"default_vpc_name"` IgnoreNICRegex string `default:"^(kube-ipvs)" yaml:"ignore_nic_regex"` diff --git a/server/controller/genesis/grpc/server.go b/server/controller/genesis/grpc/server.go index 008ddc168cc..28ba7c04932 100644 --- a/server/controller/genesis/grpc/server.go +++ b/server/controller/genesis/grpc/server.go @@ -260,7 +260,15 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen _, enabled := g.workloadResourceEnabledCache.Get(fmt.Sprintf("%d-%s", orgID, groupShortLcuuid)) platformData := request.GetPlatformData() - if version == localVersion || platformData == nil { + if version == localVersion { + // It is necessary to verify whether there are available data for use, + // If there is no data, needs to be re-reported. + if v, ok := g.gsync.GetVtapUpdatedVersion(vtap); !ok || v != version { + g.vtapToVersion.Store(vtap, uint64(0)) + log.Infof("genesis sync re-reporting, not data for keep alive, reset local version, from ip %s vtap_id %v", remote, vtapID, logger.NewORGPrefix(orgID)) + return &agent.GenesisSyncResponse{}, nil + } + // If the worload-v is modified to be enabled during the period of continuous heartbeat, // it will trigger the re-reporting of the full data. if _, ok := g.workloadResourceChangeEnabledCache.Get(vtap); ok { @@ -278,6 +286,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen VtapID: vtapID, ORGID: orgID, TeamID: uint32(teamID), + Version: version, MessageType: common.TYPE_RENEW, Message: request, StorageRefresh: refresh, @@ -287,6 +296,20 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen return &agent.GenesisSyncResponse{Version: &localVersion}, nil } + if platformData == nil { + log.Debugf("genesis sync received version %v platform is nil from ip %s vtap_id %v, local version %v", version, remote, vtapID, localVersion, logger.NewORGPrefix(orgID)) + return &agent.GenesisSyncResponse{}, nil + } + + // 当采集器为容器类型时(cluster id 非空) + // - 采集器未注册(vtapID==0),即使没有 Interfaces 也需要处理 vinterface 来让采集器能够注册 + // - 采集器已经注册(vtapID!=0),采集器重启会出现 Interfaces 为空的情况 + // 为了避免 vinterface 异常增删,丢弃当前消息 + if k8sClusterID != "" && len(platformData.Interfaces) == 0 && vtapID != 0 { + log.Infof("genesis sync received version %v message with empty interfaces from ip %s vtap_id %v, local version %v", version, remote, vtapID, localVersion, logger.NewORGPrefix(orgID)) + return &agent.GenesisSyncResponse{Version: &version}, nil + } + log.Infof("genesis sync received version %v -> %v from ip %s vtap_id %v", localVersion, version, remote, vtapID, logger.NewORGPrefix(orgID)) g.genesisSyncQueue.Put( common.VIFRPCMessage{ @@ -295,6 +318,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen VtapID: vtapID, ORGID: orgID, TeamID: uint32(teamID), + Version: version, K8SClusterID: k8sClusterID, MessageType: common.TYPE_UPDATE, Message: request, diff --git a/server/controller/genesis/store/kubernetes/store.go b/server/controller/genesis/store/kubernetes/store.go index df0a2645e86..db088e6bd9f 100644 --- a/server/controller/genesis/store/kubernetes/store.go +++ b/server/controller/genesis/store/kubernetes/store.go @@ -244,7 +244,7 @@ func (k *KubernetesStorage) generateCache() { } else { endpoint = net.JoinHostPort(domain.ControllerIP, strconv.Itoa(k.listenNodePort)) } - cacheMap[k.formatKey(db.ORGID, domain.ClusterID)] = common.ClusterDest{ + cacheMap[k.formatKey(db.ORGID, subDomain.ClusterID)] = common.ClusterDest{ Endpoint: endpoint, DomainLcuuid: domain.Lcuuid, SubDomainLcuuid: subDomain.Lcuuid, diff --git a/server/controller/genesis/store/sync/mysql/datatype.go b/server/controller/genesis/store/sync/mysql/datatype.go index 03cfd6186bd..26436eb9492 100644 --- a/server/controller/genesis/store/sync/mysql/datatype.go +++ b/server/controller/genesis/store/sync/mysql/datatype.go @@ -25,8 +25,9 @@ import ( "github.com/patrickmn/go-cache" "github.com/deepflowio/deepflow/server/controller/db/metadb" - metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" + mmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/genesis/common" + cfg "github.com/deepflowio/deepflow/server/controller/genesis/config" "github.com/deepflowio/deepflow/server/controller/model" "github.com/deepflowio/deepflow/server/libs/logger" ) @@ -46,6 +47,7 @@ type GenesisSyncDataOperation struct { type GenesisSyncTypeOperation[T common.GenesisSyncType] struct { nodeIP string + config cfg.GenesisConfig store *cache.Cache } @@ -58,7 +60,7 @@ func (gs *GenesisSyncTypeOperation[T]) Fetch() map[int][]T { for key, item := range gs.store.Items() { orgID, err := strconv.Atoi(strings.Split(key, "-")[0]) if err != nil { - log.Error(err.Error()) + log.Errorf("parse org ID failed: %s", err.Error()) continue } result[orgID] = append(result[orgID], item.Object.([]T)...) @@ -80,6 +82,8 @@ func (gs *GenesisSyncTypeOperation[T]) Update(orgID int, vtapID uint32, vtapKey return } + log.Infof("update %T vtap (%s) entries: %d", items, vtapKey, len(items), logger.NewORGPrefix(orgID)) + db, err := metadb.GetDB(orgID) if err != nil { log.Errorf("get metadb session failed: %s", err.Error(), logger.NewORGPrefix(orgID)) @@ -87,7 +91,7 @@ func (gs *GenesisSyncTypeOperation[T]) Update(orgID int, vtapID uint32, vtapKey } key := gs.formatKey(orgID, vtapID, vtapKey) - if vtapID != 0 { + if gs.config.LogDetailEnabled && vtapID != 0 { newData := map[string]T{} for _, item := range items { newData[item.GetLcuuid()] = item @@ -107,7 +111,7 @@ func (gs *GenesisSyncTypeOperation[T]) Update(orgID int, vtapID uint32, vtapKey if ok || data.GetVtapID() == 0 { continue } - log.Infof("sync (%s) add (%#+v)", vtapKey, data, logger.NewORGPrefix(orgID)) + log.Infof("sync (%s) add (%s)", key, data.GetInfo(), logger.NewORGPrefix(orgID)) } // delete @@ -116,7 +120,7 @@ func (gs *GenesisSyncTypeOperation[T]) Update(orgID int, vtapID uint32, vtapKey if ok || data.GetVtapID() == 0 { continue } - log.Infof("sync (%s) delete (%#+v)", vtapKey, data, logger.NewORGPrefix(orgID)) + log.Infof("sync (%s) delete (%s)", key, data.GetInfo(), logger.NewORGPrefix(orgID)) } } @@ -141,7 +145,7 @@ func (gs *GenesisSyncTypeOperation[T]) Update(orgID int, vtapID uint32, vtapKey func (gs *GenesisSyncTypeOperation[T]) Load() { orgIDs, err := metadb.GetORGIDs() if err != nil { - log.Error("get org ids failed") + log.Errorf("get org ids failed: %s", err.Error()) return } for _, orgID := range orgIDs { @@ -157,6 +161,17 @@ func (gs *GenesisSyncTypeOperation[T]) Load() { continue } + var vtaps []mmodel.VTap + err = db.Find(&vtaps).Error + if err != nil { + log.Warning("get vtaps failed: %s", err.Error(), logger.NewORGPrefix(db.ORGID)) + continue + } + vtapIDs := map[int]mmodel.VTap{} + for _, vtap := range vtaps { + vtapIDs[vtap.ID] = vtap + } + activeVtapIDs := []uint32{} for _, storage := range storages { var items []T @@ -165,17 +180,23 @@ func (gs *GenesisSyncTypeOperation[T]) Load() { log.Errorf("get vtap (%d) data failed:%s", storage.VtapID, err.Error(), logger.NewORGPrefix(orgID)) continue } - var vtap metadbmodel.VTap - err = db.Where("id = ?", storage.VtapID).First(&vtap).Error - if err != nil { - log.Warningf("get vtap (%d) failed:%s", storage.VtapID, err.Error(), logger.NewORGPrefix(orgID)) + vtap, ok := vtapIDs[int(storage.VtapID)] + if !ok { + log.Debugf("vtap (%d) not found", storage.VtapID, logger.NewORGPrefix(db.ORGID)) continue } if len(items) == 0 { continue } - gs.store.SetDefault(gs.formatKey(orgID, storage.VtapID, vtap.CtrlIP+"-"+vtap.CtrlMac), items) + key := gs.formatKey(orgID, storage.VtapID, vtap.CtrlIP+"-"+vtap.CtrlMac) + if gs.config.LogDetailEnabled { + for _, item := range items { + log.Infof("genesis load %T vtap (%s) data (%s)", item, key, item.GetInfo(), logger.NewORGPrefix(db.ORGID)) + } + } + gs.store.SetDefault(key, items) activeVtapIDs = append(activeVtapIDs, storage.VtapID) + log.Infof("genesis load %T vtap (%s) entries: %d", items, key, len(items), logger.NewORGPrefix(db.ORGID)) } var inactive T err = db.Where("node_ip = ?", gs.nodeIP).Where("vtap_id NOT IN (?)", activeVtapIDs).Delete(&inactive).Error @@ -189,9 +210,10 @@ func (gs *GenesisSyncTypeOperation[T]) SetOnEvicted(f func(k string, v interface gs.store.OnEvicted(f) } -func NewHostPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisHost] { +func NewHostPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisHost] { return &GenesisSyncTypeOperation[model.GenesisHost]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -199,9 +221,10 @@ func NewHostPlatformDataOperation(nodeIP string, expired, interval int) *Genesis } } -func NewVMPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisVM] { +func NewVMPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisVM] { return &GenesisSyncTypeOperation[model.GenesisVM]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -209,9 +232,10 @@ func NewVMPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSy } } -func NewVIPPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisVIP] { +func NewVIPPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisVIP] { return &GenesisSyncTypeOperation[model.GenesisVIP]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -219,9 +243,10 @@ func NewVIPPlatformDataOperation(nodeIP string, expired, interval int) *GenesisS } } -func NewVpcPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisVPC] { +func NewVpcPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisVPC] { return &GenesisSyncTypeOperation[model.GenesisVPC]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -229,9 +254,10 @@ func NewVpcPlatformDataOperation(nodeIP string, expired, interval int) *GenesisS } } -func NewPortPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisPort] { +func NewPortPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisPort] { return &GenesisSyncTypeOperation[model.GenesisPort]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -239,9 +265,10 @@ func NewPortPlatformDataOperation(nodeIP string, expired, interval int) *Genesis } } -func NewNetworkPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisNetwork] { +func NewNetworkPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisNetwork] { return &GenesisSyncTypeOperation[model.GenesisNetwork]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -249,9 +276,10 @@ func NewNetworkPlatformDataOperation(nodeIP string, expired, interval int) *Gene } } -func NewVinterfacePlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisVinterface] { +func NewVinterfacePlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisVinterface] { return &GenesisSyncTypeOperation[model.GenesisVinterface]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -259,9 +287,10 @@ func NewVinterfacePlatformDataOperation(nodeIP string, expired, interval int) *G } } -func NewIPLastSeenPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisIP] { +func NewIPLastSeenPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisIP] { return &GenesisSyncTypeOperation[model.GenesisIP]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -269,9 +298,10 @@ func NewIPLastSeenPlatformDataOperation(nodeIP string, expired, interval int) *G } } -func NewLldpInfoPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisLldp] { +func NewLldpInfoPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisLldp] { return &GenesisSyncTypeOperation[model.GenesisLldp]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, @@ -279,9 +309,10 @@ func NewLldpInfoPlatformDataOperation(nodeIP string, expired, interval int) *Gen } } -func NewProcessPlatformDataOperation(nodeIP string, expired, interval int) *GenesisSyncTypeOperation[model.GenesisProcess] { +func NewProcessPlatformDataOperation(nodeIP string, expired, interval int, config cfg.GenesisConfig) *GenesisSyncTypeOperation[model.GenesisProcess] { return &GenesisSyncTypeOperation[model.GenesisProcess]{ nodeIP: nodeIP, + config: config, store: cache.New( time.Duration(expired)*time.Second, time.Duration(interval)*time.Second, diff --git a/server/controller/genesis/store/sync/mysql/run.go b/server/controller/genesis/store/sync/mysql/run.go index ce1c1916d9d..3c7d1792b55 100644 --- a/server/controller/genesis/store/sync/mysql/run.go +++ b/server/controller/genesis/store/sync/mysql/run.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "os" + "sync" "sync/atomic" "time" @@ -42,12 +43,13 @@ import ( var log = logger.MustGetLogger("genesis.store.sync.mysql") type GenesisSync struct { - isMaster bool - data atomic.Value - ctx context.Context - cancel context.CancelFunc - queue queue.QueueReader - config *config.ControllerConfig + isMaster bool + data atomic.Value + ctx context.Context + cancel context.CancelFunc + queue queue.QueueReader + config *config.ControllerConfig + vtapUpdatedVersion sync.Map } func NewGenesisSync(ctx context.Context, isMaster bool, queue queue.QueueReader, config *config.ControllerConfig) *GenesisSync { @@ -55,12 +57,13 @@ func NewGenesisSync(ctx context.Context, isMaster bool, queue queue.QueueReader, data.Store(common.GenesisSyncData{}) ctx, cancel := context.WithCancel(ctx) return &GenesisSync{ - isMaster: isMaster, - ctx: ctx, - cancel: cancel, - data: data, - queue: queue, - config: config, + isMaster: isMaster, + ctx: ctx, + cancel: cancel, + data: data, + queue: queue, + config: config, + vtapUpdatedVersion: sync.Map{}, } } @@ -70,11 +73,19 @@ func (g *GenesisSync) receiveGenesisSyncData(sChan chan common.GenesisSyncData) case s := <-sChan: g.data.Store(s) case <-g.ctx.Done(): - break + return } } } +func (g *GenesisSync) GetVtapUpdatedVersion(key string) (uint64, bool) { + version, ok := g.vtapUpdatedVersion.Load(key) + if !ok { + return 0, false + } + return version.(uint64), true +} + func (g *GenesisSync) GetGenesisSyncData(orgID int) common.GenesisSyncDataResponse { syncData := g.data.Load().(common.GenesisSyncData) return common.GenesisSyncDataResponse{ @@ -94,6 +105,7 @@ func (g *GenesisSync) GetGenesisSyncData(orgID int) common.GenesisSyncDataRespon func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataResponse, error) { retGenesisSyncData := common.GenesisSyncDataResponse{} + startTime := time.Now() db, err := metadb.GetDB(orgID) if err != nil { log.Errorf("get metadb session failed: %s", err.Error(), logger.NewORGPrefix(orgID)) @@ -101,12 +113,19 @@ func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataR } var controllers []mmodel.Controller + err = db.Where("state <> ?", ccommon.CONTROLLER_STATE_EXCEPTION).Find(&controllers).Error + if err != nil { + log.Error("get controllers from db failed", logger.NewORGPrefix(orgID)) + return common.GenesisSyncDataResponse{}, err + } + var azControllerConns []mmodel.AZControllerConnection + err = db.Find(&azControllerConns).Error + if err != nil { + log.Error("get az controller connections from db failed", logger.NewORGPrefix(orgID)) + return common.GenesisSyncDataResponse{}, err + } var currentRegion string - - db.Where("state <> ?", ccommon.CONTROLLER_STATE_EXCEPTION).Find(&controllers) - db.Find(&azControllerConns) - controllerIPToRegion := make(map[string]string) for _, conn := range azControllerConns { if os.Getenv(ccommon.NODE_IP_KEY) == conn.ControllerIP { @@ -115,6 +134,17 @@ func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataR controllerIPToRegion[conn.ControllerIP] = conn.Region } + var storages []model.GenesisStorage + err = db.Find(&storages).Error + if err != nil { + log.Error("get storages from db failed", logger.NewORGPrefix(orgID)) + return common.GenesisSyncDataResponse{}, err + } + nodeIPToVtapIDs := map[string][]uint32{} + for _, storage := range storages { + nodeIPToVtapIDs[storage.NodeIP] = append(nodeIPToVtapIDs[storage.NodeIP], storage.VtapID) + } + syncIPLcuuidSet := map[string]bool{} syncVIPLcuuidSet := map[string]bool{} syncHostLcuuidSet := map[string]bool{} @@ -125,18 +155,37 @@ func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataR syncVPCLcuuidSet := map[string]bool{} syncVinterfaceLcuuidSet := map[string]bool{} syncProcessLcuuidSet := map[string]bool{} + + requestControllers := []string{} for _, controller := range controllers { // skip other region controller if region, ok := controllerIPToRegion[controller.IP]; !ok || region != currentRegion { continue } + requestControllers = append(requestControllers, controller.NodeName+":"+controller.IP) + // get effective vtap ids in current controller - var storages []model.GenesisStorage - db.Where("node_ip = ?", controller.IP).Find(&storages) + vtapIDs := nodeIPToVtapIDs[controller.IP] vtapIDMap := map[uint32]int{0: 0} - for _, storage := range storages { - vtapIDMap[storage.VtapID] = 0 + for _, vtapID := range vtapIDs { + vtapIDMap[vtapID] = 0 + } + + // Optimistic Lock + var nodeStorages []model.GenesisStorage + err = db.Where("node_ip = ?", controller.IP).Find(&nodeStorages).Error + if err != nil { + log.Errorf("get node (%s) storages from db failed", controller.IP, logger.NewORGPrefix(orgID)) + return common.GenesisSyncDataResponse{}, err + } + if len(nodeStorages) != len(vtapIDs) { + return common.GenesisSyncDataResponse{}, fmt.Errorf("node (%s) vinterface storages have changed during the acquisition process, please try again.", controller.IP) + } + for _, storage := range nodeStorages { + if _, ok := vtapIDMap[storage.VtapID]; !ok { + return common.GenesisSyncDataResponse{}, fmt.Errorf("node (%s) vinterface storages have changed during the acquisition process, please try again.", controller.IP) + } } // use pod ip communication in internal region @@ -338,19 +387,36 @@ func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataR continue } sVinterfaceLcuuid := v.GetLcuuid() + sVinterfaceIP := v.GetIps() + sVinterfaceMac := v.GetMac() + sVinterfaceVtapId := v.GetVtapId() if _, ok := syncVinterfaceLcuuidSet[sVinterfaceLcuuid]; ok { + if g.config.GenesisCfg.LogDetailEnabled { + log.Infof("lcuuid (%s) duplicate, vtap (%d) vinterface (%s-%s), from node (%s)", + sVinterfaceLcuuid, sVinterfaceVtapId, sVinterfaceIP, sVinterfaceMac, controller.NodeName, + logger.NewORGPrefix(orgID)) + } continue } + + if g.config.GenesisCfg.LogDetailEnabled { + if v.GetKubernetesClusterId() != "" { + log.Infof("cluster (%s) vtap (%d) vinterface (%s-%s), device type (%s) from (%s)", + v.GetKubernetesClusterId(), sVinterfaceVtapId, sVinterfaceIP, sVinterfaceMac, v.GetDeviceType(), controller.NodeName, + logger.NewORGPrefix(orgID)) + } + } + syncVinterfaceLcuuidSet[sVinterfaceLcuuid] = false vLastSeenStr := v.GetLastSeen() vpLastSeen, _ := time.ParseInLocation(ccommon.GO_BIRTHDAY, vLastSeenStr, time.Local) retGenesisSyncData.Vinterfaces = append(retGenesisSyncData.Vinterfaces, model.GenesisVinterface{ - VtapID: v.GetVtapId(), + VtapID: sVinterfaceVtapId, Lcuuid: sVinterfaceLcuuid, NetnsID: v.GetNetnsId(), Name: v.GetName(), - IPs: v.GetIps(), - Mac: v.GetMac(), + IPs: sVinterfaceIP, + Mac: sVinterfaceMac, TapName: v.GetTapName(), TapMac: v.GetTapMac(), DeviceLcuuid: v.GetDeviceLcuuid(), @@ -393,6 +459,11 @@ func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataR }) } } + + if g.config.GenesisCfg.LogDetailEnabled { + log.Infof("sync start (%v)", startTime, logger.NewORGPrefix(orgID)) + log.Infof("request controllers (%v)", requestControllers, logger.NewORGPrefix(orgID)) + } return retGenesisSyncData, nil } @@ -405,9 +476,9 @@ func (g *GenesisSync) Start() { vStorage := NewSyncStorage(g.ctx, g.config.GenesisCfg, sDataChan) vStorage.Start() - genesisSyncDataByVtap := map[string]common.GenesisSyncDataResponse{} vUpdater := updater.NewGenesisSyncRpcUpdater(g.config.GenesisCfg) for { + genesisSyncDataByVtap := map[string]common.GenesisSyncDataResponse{} genesisSyncData := common.GenesisSyncDataResponse{} info := g.queue.Get().(common.VIFRPCMessage) if info.MessageType == common.TYPE_EXIT { @@ -417,12 +488,12 @@ func (g *GenesisSync) Start() { log.Debugf("sync received (%s) vtap_id (%v) type (%v) workload resource enabled (%t) received (%s)", info.Peer, info.VtapID, info.MessageType, info.WorkloadResourceEnabled, info.Message, logger.NewORGPrefix(info.ORGID)) - vtap := fmt.Sprintf("%d%d", info.ORGID, info.VtapID) + vtap := fmt.Sprintf("%d-%d", info.ORGID, info.VtapID) if info.MessageType == common.TYPE_RENEW { if info.VtapID != 0 { - peerInfo, ok := genesisSyncDataByVtap[vtap] + data, ok := genesisSyncDataByVtap[vtap] if ok { - vStorage.Renew(info.ORGID, info.VtapID, info.Key, info.StorageRefresh, info.WorkloadResourceEnabled, peerInfo) + vStorage.Renew(info.ORGID, info.VtapID, info.Key, info.StorageRefresh, info.WorkloadResourceEnabled, data) } } } else if info.MessageType == common.TYPE_UPDATE { @@ -444,6 +515,7 @@ func (g *GenesisSync) Start() { if info.VtapID != 0 { genesisSyncDataByVtap[vtap] = genesisSyncData + g.vtapUpdatedVersion.Store(vtap, info.Version) } vStorage.Update(info.ORGID, info.VtapID, info.Key, genesisSyncData) } diff --git a/server/controller/genesis/store/sync/mysql/store.go b/server/controller/genesis/store/sync/mysql/store.go index d3741d26a29..56c441ccc54 100644 --- a/server/controller/genesis/store/sync/mysql/store.go +++ b/server/controller/genesis/store/sync/mysql/store.go @@ -74,13 +74,15 @@ func (s *SyncStorage) Renew(orgID int, vtapID uint32, vtapKey string, refresh, w } db, err := metadb.GetDB(orgID) if err != nil { - log.Errorf("get metadb session failed", logger.NewORGPrefix(orgID)) + log.Errorf("get metadb session failed: %s", err.Error(), logger.NewORGPrefix(orgID)) return } - err = db.Model(&model.GenesisStorage{}).Where("vtap_id = ? AND node_ip <> ?", vtapID, s.nodeIP).Update("node_ip", s.nodeIP).Error - if err != nil { - log.Warningf("vtap id (%d) refresh storage to node (%s) failed: %s", vtapID, s.nodeIP, err, logger.NewORGPrefix(orgID)) + tx := db.Model(&model.GenesisStorage{}).Where("vtap_id = ?", vtapID).Update("node_ip", s.nodeIP) + if tx.Error != nil || tx.RowsAffected == 0 { + log.Warningf("update storage vtap=%d to node (%s) failed: %s", vtapID, s.nodeIP, tx.Error, logger.NewORGPrefix(orgID)) + return } + log.Infof("update storage vtap=%d, set node=%s", vtapID, s.nodeIP, logger.NewORGPrefix(orgID)) } func (s *SyncStorage) Update(orgID int, vtapID uint32, vtapKey string, items common.GenesisSyncDataResponse) { @@ -92,8 +94,8 @@ func (s *SyncStorage) Update(orgID int, vtapID uint32, vtapKey string, items com s.data.Ports.Update(orgID, vtapID, vtapKey, items.Ports) s.data.Networks.Update(orgID, vtapID, vtapKey, items.Networks) s.data.IPlastseens.Update(orgID, vtapID, vtapKey, items.IPLastSeens) - s.data.Vinterfaces.Update(orgID, vtapID, vtapKey, items.Vinterfaces) s.data.Processes.Update(orgID, vtapID, vtapKey, items.Processes) + s.data.Vinterfaces.Update(orgID, vtapID, vtapKey, items.Vinterfaces) // push immediately after update s.fetch() @@ -114,9 +116,10 @@ func (s *SyncStorage) Update(orgID int, vtapID uint32, vtapKey string, items com NodeIP: s.nodeIP, }).Error if err != nil { - log.Errorf("update storage (vtap_id:%d/node_ip:%s) failed: %s", vtapID, s.nodeIP, err.Error(), logger.NewORGPrefix(orgID)) + log.Errorf("update storage create vtap=%d and node=%s failed: %s", vtapID, s.nodeIP, err.Error(), logger.NewORGPrefix(orgID)) return } + log.Infof("update storage vtap=%d and node=%s", vtapID, s.nodeIP, logger.NewORGPrefix(orgID)) } func (s *SyncStorage) fetch() { @@ -137,36 +140,38 @@ func (s *SyncStorage) fetch() { func (s *SyncStorage) loadFromDatabase() { expired := int(s.cfg.AgingTime) interval := int(s.cfg.DataPersistenceInterval) - s.data.VIPs = NewVIPPlatformDataOperation(s.nodeIP, expired, interval) + s.data.VIPs = NewVIPPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.VIPs.Load() - s.data.VMs = NewVMPlatformDataOperation(s.nodeIP, expired, interval) + s.data.VMs = NewVMPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.VMs.Load() - s.data.VPCs = NewVpcPlatformDataOperation(s.nodeIP, expired, interval) + s.data.VPCs = NewVpcPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.VPCs.Load() - s.data.Hosts = NewHostPlatformDataOperation(s.nodeIP, expired, interval) + s.data.Hosts = NewHostPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.Hosts.Load() - s.data.Ports = NewPortPlatformDataOperation(s.nodeIP, expired, interval) + s.data.Ports = NewPortPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.Ports.Load() - s.data.Lldps = NewLldpInfoPlatformDataOperation(s.nodeIP, expired, interval) + s.data.Lldps = NewLldpInfoPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.Lldps.Load() - s.data.IPlastseens = NewIPLastSeenPlatformDataOperation(s.nodeIP, expired, interval) + s.data.IPlastseens = NewIPLastSeenPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.IPlastseens.Load() - s.data.Networks = NewNetworkPlatformDataOperation(s.nodeIP, expired, interval) + s.data.Networks = NewNetworkPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.Networks.Load() - s.data.Vinterfaces = NewVinterfacePlatformDataOperation(s.nodeIP, expired, interval) + s.data.Vinterfaces = NewVinterfacePlatformDataOperation(s.nodeIP, int(s.cfg.VinterfaceAgingTime), interval, s.cfg) s.data.Vinterfaces.Load() - s.data.Processes = NewProcessPlatformDataOperation(s.nodeIP, expired, interval) + s.data.Processes = NewProcessPlatformDataOperation(s.nodeIP, expired, interval, s.cfg) s.data.Processes.Load() + log.Info("genesis load from db complete") + s.fetch() } @@ -174,52 +179,73 @@ func (s *SyncStorage) refreshDatabase() { ticker := time.NewTicker(time.Duration(s.cfg.AgingTime) * time.Second) defer ticker.Stop() - for range ticker.C { - // clean genesis storage invalid data - orgIDs, err := metadb.GetORGIDs() - if err != nil { - log.Error("get org ids failed") + for { + select { + case <-s.sCtx.Done(): return - } - for _, orgID := range orgIDs { - db, err := metadb.GetDB(orgID) + case <-ticker.C: + // clean genesis storage invalid data + orgIDs, err := metadb.GetORGIDs() if err != nil { - log.Errorf("get metadb session failed: %s", err.Error(), logger.NewORGPrefix(orgID)) + log.Errorf("get org ids failed: %s", err.Error()) continue } - vTaps := []metadbmodel.VTap{} - vTapIDs := map[int]bool{} - storages := []model.GenesisStorage{} - invalidStorages := []model.GenesisStorage{} - db.Select("id").Find(&vTaps) - db.Where("node_ip = ?", s.nodeIP).Find(&storages) - for _, v := range vTaps { - vTapIDs[v.ID] = false - } - for _, s := range storages { - if _, ok := vTapIDs[int(s.VtapID)]; !ok { - invalidStorages = append(invalidStorages, s) - } - } - if len(invalidStorages) > 0 { - err := db.Delete(&invalidStorages).Error + for _, orgID := range orgIDs { + db, err := metadb.GetDB(orgID) if err != nil { - log.Errorf("node (%s) clean genesis storage invalid data failed: %s", s.nodeIP, err, logger.NewORGPrefix(orgID)) - } else { - log.Infof("node (%s) clean genesis storage invalid data success", s.nodeIP, logger.NewORGPrefix(orgID)) + log.Errorf("get metadb session failed: %s", err.Error(), logger.NewORGPrefix(orgID)) + continue + } + vTaps := []metadbmodel.VTap{} + vTapIDs := map[int]bool{} + storages := []model.GenesisStorage{} + invalidStorages := []model.GenesisStorage{} + db.Select("id").Find(&vTaps) + db.Where("node_ip = ?", s.nodeIP).Find(&storages) + for _, v := range vTaps { + vTapIDs[v.ID] = false + } + for _, s := range storages { + if _, ok := vTapIDs[int(s.VtapID)]; !ok { + invalidStorages = append(invalidStorages, s) + } + } + if len(invalidStorages) > 0 { + err := db.Delete(&invalidStorages).Error + if err != nil { + log.Errorf("node (%s) clean genesis storage invalid data failed: %s", s.nodeIP, err.Error(), logger.NewORGPrefix(orgID)) + } else { + log.Infof("node (%s) clean genesis storage invalid data success", s.nodeIP, logger.NewORGPrefix(orgID)) + } } } } } } -func (s *SyncStorage) onEvicted(k string, v interface{}) { +func parseEvictedOrgID(k string) (int, error) { + orgIDAndRest := strings.SplitN(k, "-", 2) + if len(orgIDAndRest) != 2 { + return 0, strconv.ErrSyntax + } + return strconv.Atoi(orgIDAndRest[0]) +} + +func handleEvicted[T common.GenesisSyncType](s *SyncStorage, k string, v interface{}) { s.fetch() - keys := strings.Split(k, "-") - orgID, err := strconv.Atoi(keys[0]) + items, ok := v.([]T) + if !ok { + log.Errorf("unexpected evicted data type for key (%s): %T", k, v) + return + } + if len(items) == 0 { + return + } + + orgID, err := parseEvictedOrgID(k) if err != nil { - log.Error(err.Error()) + log.Errorf("parse evicted org ID failed: %s", err.Error()) return } db, err := metadb.GetDB(orgID) @@ -227,25 +253,25 @@ func (s *SyncStorage) onEvicted(k string, v interface{}) { log.Errorf("get metadb session failed: %s", err.Error(), logger.NewORGPrefix(orgID)) return } - err = db.Delete(&v).Error + err = db.Delete(&items).Error if err != nil { - log.Errorf("delete vtap (%s) stale data (%#v) failed: %s", k, v, err.Error(), logger.NewORGPrefix(orgID)) + log.Errorf("delete vtap (%s) stale data (%#v) failed: %s", k, items, err.Error(), logger.NewORGPrefix(orgID)) } } func (s *SyncStorage) run() { s.loadFromDatabase() - s.data.VMs.SetOnEvicted(s.onEvicted) - s.data.VIPs.SetOnEvicted(s.onEvicted) - s.data.VPCs.SetOnEvicted(s.onEvicted) - s.data.Hosts.SetOnEvicted(s.onEvicted) - s.data.Lldps.SetOnEvicted(s.onEvicted) - s.data.Ports.SetOnEvicted(s.onEvicted) - s.data.Networks.SetOnEvicted(s.onEvicted) - s.data.Processes.SetOnEvicted(s.onEvicted) - s.data.Vinterfaces.SetOnEvicted(s.onEvicted) - s.data.IPlastseens.SetOnEvicted(s.onEvicted) + s.data.VMs.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisVM](s, k, v) }) + s.data.VIPs.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisVIP](s, k, v) }) + s.data.VPCs.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisVPC](s, k, v) }) + s.data.Hosts.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisHost](s, k, v) }) + s.data.Lldps.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisLldp](s, k, v) }) + s.data.Ports.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisPort](s, k, v) }) + s.data.Networks.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisNetwork](s, k, v) }) + s.data.Processes.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisProcess](s, k, v) }) + s.data.Vinterfaces.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisVinterface](s, k, v) }) + s.data.IPlastseens.SetOnEvicted(func(k string, v interface{}) { handleEvicted[model.GenesisIP](s, k, v) }) } func (s *SyncStorage) Start() { diff --git a/server/controller/genesis/store/sync/redis/run.go b/server/controller/genesis/store/sync/redis/run.go index 7448ed3582c..89f97dd6b29 100644 --- a/server/controller/genesis/store/sync/redis/run.go +++ b/server/controller/genesis/store/sync/redis/run.go @@ -30,6 +30,10 @@ func NewGenesisSync(ctx context.Context, isMaster bool, queue queue.QueueReader, return nil } +func (g *GenesisSync) GetVtapUpdatedVersion(key string) (uint64, bool) { + return 0, false +} + func (g *GenesisSync) GetGenesisSyncData(orgID int) common.GenesisSyncDataResponse { return common.GenesisSyncDataResponse{} } diff --git a/server/controller/genesis/updater/sync.go b/server/controller/genesis/updater/sync.go index 211d97a2cc9..c315e37f631 100644 --- a/server/controller/genesis/updater/sync.go +++ b/server/controller/genesis/updater/sync.go @@ -141,12 +141,6 @@ func NewGenesisSyncRpcUpdater(cfg config.GenesisConfig) *GenesisSyncRpcUpdater { func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(orgID int, teamID, vtapID uint32, peer, deviceType string, message *agent.GenesisSyncRequest) []model.GenesisVinterface { platformData := message.GetPlatformData() k8sClusterID := message.GetKubernetesClusterId() - // 当采集器为容器类型时(cluster id 非空) - // - 采集器未注册(vtapID==0),即使没有 Interfaces 也需要处理 vinterface 来让采集器能够注册 - // - 采集器已经注册(vtapID!=0),采集器重启会出现 Interfaces 为空的情况,为了避免 vinterface 异常增删,不解析当前消息 - if k8sClusterID != "" && len(platformData.Interfaces) == 0 && vtapID != 0 { - return []model.GenesisVinterface{} - } isContainer := deviceType == common.DEVICE_TYPE_DOCKER_HOST epoch := time.Now() @@ -243,7 +237,9 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(orgID int, teamID, vtapID ui var hasNetMask bool var validIPs []string for _, addr := range iface.Ip { - hasNetMask = strings.Contains(addr, `/`) + if strings.Contains(addr, `/`) { + hasNetMask = true + } var netIP netaddr.IP if hasNetMask { ipPrefix, err := netaddr.ParseIPPrefix(addr) diff --git a/server/controller/model/model.go b/server/controller/model/model.go index 644de0b941c..3b9d353950e 100644 --- a/server/controller/model/model.go +++ b/server/controller/model/model.go @@ -17,6 +17,7 @@ package model import ( + "fmt" "time" "github.com/deepflowio/deepflow/server/agent_config" @@ -558,6 +559,10 @@ func (g GenesisHost) GetVtapID() uint32 { return g.VtapID } +func (g GenesisHost) GetInfo() string { + return g.Hostname +} + type GenesisIP struct { Masklen uint32 `gorm:"column:masklen;type:int;default:null;default:0" json:"MASKLEN"` VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` @@ -580,6 +585,10 @@ func (g GenesisIP) GetVtapID() uint32 { return g.VtapID } +func (g GenesisIP) GetInfo() string { + return g.IP +} + type GenesisVIP struct { VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` IP string `gorm:"column:ip;type:char(64);default:null" json:"IP"` @@ -599,6 +608,10 @@ func (g GenesisVIP) GetVtapID() uint32 { return g.VtapID } +func (g GenesisVIP) GetInfo() string { + return g.IP +} + type GenesisLldp struct { VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` Lcuuid string `gorm:"primaryKey;column:lcuuid;type:char(64)" json:"LCUUID"` @@ -624,6 +637,10 @@ func (g GenesisLldp) GetVtapID() uint32 { return g.VtapID } +func (g GenesisLldp) GetInfo() string { + return fmt.Sprintf("system_name: %s, management_address: %s, vinterface_description: %s", g.SystemName, g.ManagementAddress, g.VinterfaceDescription) +} + type GenesisNetwork struct { SegmentationID uint32 `gorm:"column:segmentation_id;type:int;default:null" json:"SEGMENTATION_ID"` NetType uint32 `gorm:"column:net_type;type:int;default:null" json:"NET_TYPE"` @@ -647,6 +664,10 @@ func (g GenesisNetwork) GetVtapID() uint32 { return g.VtapID } +func (g GenesisNetwork) GetInfo() string { + return fmt.Sprintf("name: %s, segmentation_id: %d, net_type: %d", g.Name, g.SegmentationID, g.NetType) +} + type GenesisPort struct { Type uint32 `gorm:"column:type;type:int;default:null" json:"TYPE"` DeviceType uint32 `gorm:"column:device_type;type:int;default:null" json:"DEVICETYPE"` @@ -671,6 +692,10 @@ func (g GenesisPort) GetVtapID() uint32 { return g.VtapID } +func (g GenesisPort) GetInfo() string { + return fmt.Sprintf("type: %d, device_type: %d, mac: %s", g.Type, g.DeviceType, g.Mac) +} + type GenesisVinterface struct { TeamID uint32 `gorm:"column:team_id;type:int;default:1" json:"TEAM_ID"` NetnsID uint32 `gorm:"column:netns_id;type:int unsigned;default:0" json:"NETNS_ID"` @@ -703,6 +728,10 @@ func (g GenesisVinterface) GetVtapID() uint32 { return g.VtapID } +func (g GenesisVinterface) GetInfo() string { + return fmt.Sprintf("name: %s, ips: %s, mac:%s ", g.Name, g.IPs, g.Mac) +} + type GenesisVM struct { State uint32 `gorm:"column:state;type:int;default:null" json:"STATE"` VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` @@ -727,6 +756,10 @@ func (g GenesisVM) GetVtapID() uint32 { return g.VtapID } +func (g GenesisVM) GetInfo() string { + return g.Name +} + type GenesisVPC struct { VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` Lcuuid string `gorm:"primaryKey;column:lcuuid;type:char(64)" json:"LCUUID"` @@ -746,6 +779,10 @@ func (g GenesisVPC) GetVtapID() uint32 { return g.VtapID } +func (g GenesisVPC) GetInfo() string { + return g.Name +} + type GenesisProcess struct { NetnsID uint32 `gorm:"column:netns_id;type:int unsigned;default:0" json:"NETNS_ID"` VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` @@ -773,6 +810,10 @@ func (g GenesisProcess) GetVtapID() uint32 { return g.VtapID } +func (g GenesisProcess) GetInfo() string { + return fmt.Sprintf("pid:%d, name:%s", g.PID, g.Name) +} + type GenesisStorage struct { VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` NodeIP string `gorm:"column:node_ip;type:char(48)" json:"NODE_IP"` diff --git a/server/server.yaml b/server/server.yaml index cf1c8a3362b..8b222beeced 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -418,7 +418,10 @@ controller: # 数据持久化检测间隔,单位:秒 data_persistence_interval: 60 # 采集器消息心跳时长,单位:秒 - agent_heart_beat: 60 + agent_heart_beat: 10 + + # 日志详情开关 + log_detail_enabled: false # 采集器同步KVM时,配置的采集器IP所上报的内容会被解析 host_ips: