Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions server/controller/cloud/kubernetes_gather/kubernetes_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kubernetes_gather

import (
"errors"
"regexp"
"strings"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/db/metadb"
metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model"
"github.com/deepflowio/deepflow/server/controller/genesis"
cmodel "github.com/deepflowio/deepflow/server/controller/model"
"github.com/deepflowio/deepflow/server/controller/statsd"
"github.com/deepflowio/deepflow/server/libs/logger"
)
Expand Down Expand Up @@ -63,6 +65,7 @@ type KubernetesGather struct {
podGroupLcuuids mapset.Set
podNetworkLcuuidCIDRs networkLcuuidCIDRs
nodeNetworkLcuuidCIDRs networkLcuuidCIDRs
vinterfaceData []cmodel.GenesisVinterface
podIPToLcuuid map[string]string
nodeIPToLcuuid map[string]string
namespaceToLcuuid map[string]string
Expand Down Expand Up @@ -96,6 +99,10 @@ func NewKubernetesGather(db *metadb.DB, domain *metadbmodel.Domain, subDomain *m
var err error

domainConfigJson, err = simplejson.NewJson([]byte(domain.Config))
if err != nil {
log.Error(err, logger.NewORGPrefix(db.ORGID))
return nil
}
portNameRegex := domainConfigJson.Get("node_port_name_regex").MustString()
if portNameRegex == "" {
portNameRegex = common.DEFAULT_PORT_NAME_REGEX
Expand Down Expand Up @@ -202,6 +209,7 @@ func NewKubernetesGather(db *metadb.DB, domain *metadbmodel.Domain, subDomain *m
customTagLenMax: cfg.CustomTagLenMax,
isSubDomain: isSubDomain,
podGroupLcuuids: mapset.NewSet(),
vinterfaceData: []cmodel.GenesisVinterface{},
nodeNetworkLcuuidCIDRs: networkLcuuidCIDRs{},
podNetworkLcuuidCIDRs: networkLcuuidCIDRs{},
podIPToLcuuid: map[string]string{},
Expand Down Expand Up @@ -277,7 +285,7 @@ func (k *KubernetesGather) pgSpecGenerateConnections(nsName, pgName, pgLcuuid st
if !ok {
continue
}
cmName := ref.Get("Name").MustString()
cmName := ref.Get("name").MustString()
cmLcuuid, ok := k.configMapToLcuuid[[2]string{nsName, cmName}]
if !ok {
log.Infof("pod group (%s) imported env config map (%s) not found", pgName, cmName, logger.NewORGPrefix(k.orgID))
Expand Down Expand Up @@ -328,6 +336,7 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
// 任务循环的是同一个实例,所以这里要对关联关系进行初始化
k.azLcuuid = ""
k.k8sEntries = nil
k.vinterfaceData = []cmodel.GenesisVinterface{}
k.podNetworkLcuuidCIDRs = networkLcuuidCIDRs{}
k.nodeNetworkLcuuidCIDRs = networkLcuuidCIDRs{}
k.podGroupLcuuids = mapset.NewSet()
Expand All @@ -345,6 +354,39 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
k.nsServiceNameToService = map[string]map[string]map[string]int{}
k.cloudStatsd = statsd.NewCloudStatsd()

if genesis.GenesisService == nil {
errMsg := "genesis service is nil"
log.Warning(errMsg, logger.NewORGPrefix(k.orgID))
return model.KubernetesGatherResource{
ErrorState: common.RESOURCE_STATE_CODE_EXIT,
ErrorMessage: errMsg,
}, errors.New(errMsg)
}

k8sEntries, err := k.getKubernetesEntries()
if err != nil {
log.Warning(err.Error(), logger.NewORGPrefix(k.orgID))
return model.KubernetesGatherResource{
ErrorState: common.RESOURCE_STATE_CODE_WARNING,
ErrorMessage: err.Error(),
}, err
}
k.k8sEntries = k8sEntries

gsData, err := genesis.GenesisService.GetGenesisSyncResponse(k.orgID)
if err != nil {
return model.KubernetesGatherResource{
ErrorState: common.RESOURCE_STATE_CODE_EXIT,
ErrorMessage: err.Error(),
}, err
}
for _, v := range gsData.Vinterfaces {
if v.KubernetesClusterID != k.ClusterID {
continue
}
k.vinterfaceData = append(k.vinterfaceData, v)
}

region, err := k.getRegion()
if err != nil {
return model.KubernetesGatherResource{}, err
Expand All @@ -368,15 +410,6 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
}, err
}

k.k8sEntries, err = k.getKubernetesEntries()
if err != nil {
log.Warning(err.Error(), logger.NewORGPrefix(k.orgID))
return model.KubernetesGatherResource{
ErrorState: common.RESOURCE_STATE_CODE_WARNING,
ErrorMessage: err.Error(),
}, err
}

podCluster, err := k.getPodCluster()
if err != nil {
return model.KubernetesGatherResource{}, err
Expand Down
33 changes: 4 additions & 29 deletions server/controller/cloud/kubernetes_gather/vinterface_and_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kubernetes_gather

import (
"errors"
"regexp"
"sort"
"strings"
Expand All @@ -28,7 +27,6 @@ import (
cloudcommon "github.com/deepflowio/deepflow/server/controller/cloud/common"
"github.com/deepflowio/deepflow/server/controller/cloud/model"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/genesis"
"github.com/deepflowio/deepflow/server/libs/logger"
"github.com/mikioh/ipaddr"
)
Expand Down Expand Up @@ -58,21 +56,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod
return
}

// 获取vinterface API返回中host ip与其上所有node ip的对应关系
if genesis.GenesisService == nil {
err = errors.New("genesis service is nil")
return
}
genesisData, err := genesis.GenesisService.GetGenesisSyncResponse(k.orgID)
if err != nil {
log.Error(err.Error(), logger.NewORGPrefix(k.orgID))
return
}
vData := genesisData.Vinterfaces
for _, vItem := range vData {
if vItem.KubernetesClusterID != k.ClusterID {
continue
}
for _, vItem := range k.vinterfaceData {
deviceType := vItem.DeviceType
if deviceType == "docker-host" || deviceType == "kvm-host" {
hostIP := vItem.HostIP
Expand All @@ -96,10 +80,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod
}
}
// 生成device_uuid或uuid和pod lcuuid的对应关系
for _, vItem := range vData {
if vItem.KubernetesClusterID != k.ClusterID {
continue
}
for _, vItem := range k.vinterfaceData {
if vItem.DeviceType != "docker-container" {
continue
}
Expand Down Expand Up @@ -135,10 +116,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod
}

// 处理POD IP,生成port,ip,cidrs信息
for _, vItem := range vData {
if vItem.KubernetesClusterID != k.ClusterID {
continue
}
for _, vItem := range k.vinterfaceData {
if vItem.DeviceType != "docker-container" {
continue
}
Expand Down Expand Up @@ -402,10 +380,7 @@ func (k *KubernetesGather) getVInterfacesAndIPs() (nodeSubnets, podSubnets []mod

// 处理nodeIP,生成port,ip,cidrs信息
nodeVinterfaceLcuuids := mapset.NewSet()
for _, vItem := range vData {
if vItem.KubernetesClusterID != k.ClusterID {
continue
}
for _, vItem := range k.vinterfaceData {
deviceType := vItem.DeviceType
if deviceType != "docker-host" && deviceType != "kvm-host" {
continue
Expand Down
6 changes: 5 additions & 1 deletion server/controller/cloud/kubernetes_gather_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,14 @@ func (k *KubernetesGatherTask) run(rSignal *queue.OverwriteQueue) {
kResource, err := k.kubernetesGather.GetKubernetesGatherData()
// 这里因为任务内部没有对成功的状态赋值状态码,在这里统一处理了
if err != nil {
kResource.ErrorMessage = fmt.Sprintf("%s %s", time.Now().Format(common.GO_BIRTHDAY), err.Error())
if kResource.ErrorState == common.RESOURCE_STATE_CODE_EXIT {
log.Infof("kubernetes gather (%s) assemble failed: %s", k.kubernetesGather.Name, err.Error(), logger.NewORGPrefix(k.orgID))
return
}
if kResource.ErrorState == 0 {
kResource.ErrorState = common.RESOURCE_STATE_CODE_EXCEPTION
}
kResource.ErrorMessage = fmt.Sprintf("%s %s", time.Now().Format(common.GO_BIRTHDAY), err.Error())
} else {
kResource.ErrorState = common.RESOURCE_STATE_CODE_SUCCESS
}
Expand Down
11 changes: 6 additions & 5 deletions server/controller/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,12 @@ const (
)

const (
RESOURCE_STATE_CODE_SUCCESS = 1
RESOURCE_STATE_CODE_DELETING = 2
RESOURCE_STATE_CODE_EXCEPTION = 3
RESOURCE_STATE_CODE_WARNING = 4
RESOURCE_STATE_CODE_NO_LICENSE = 5
RESOURCE_STATE_CODE_SUCCESS = 1 + iota
RESOURCE_STATE_CODE_DELETING
RESOURCE_STATE_CODE_EXCEPTION
RESOURCE_STATE_CODE_WARNING
RESOURCE_STATE_CODE_NO_LICENSE
RESOURCE_STATE_CODE_EXIT
)

const (
Expand Down
3 changes: 3 additions & 0 deletions server/controller/genesis/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (

type GenesisSync interface {
Start()
GetVtapUpdatedVersion(key string) (uint64, bool)
GetGenesisSyncData(orgID int) GenesisSyncDataResponse
GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, error)
}

type GenesisSyncType interface {
GetInfo() string
GetLcuuid() string
GetVtapID() uint32
}
Expand Down Expand Up @@ -82,6 +84,7 @@ type VIFRPCMessage struct {
MessageType int
TeamID uint32
VtapID uint32
Version uint64
Peer string
K8SClusterID string
Key string
Expand Down
3 changes: 2 additions & 1 deletion server/controller/genesis/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package config
type GenesisConfig struct {
AgingTime float64 `default:"86400" yaml:"aging_time"`
VinterfaceAgingTime float64 `default:"300" yaml:"vinterface_aging_time"`
AgentHeartBeat float64 `default:"60" yaml:"agent_heart_beat"`
AgentHeartBeat float64 `default:"10" yaml:"agent_heart_beat"`
HostIPs []string `yaml:"host_ips"`
LocalIPRanges []string `yaml:"local_ip_ranges"`
ExcludeIPRanges []string `yaml:"exclude_ip_ranges"`
QueueLengths int `default:"60" yaml:"queue_length"`
DataPersistenceInterval int `default:"60" yaml:"data_persistence_interval"`
MultiNSMode bool `default:"false" yaml:"multi_ns_mode"`
SingleVPCMode bool `default:"false" yaml:"single_vpc_mode"`
LogDetailEnabled bool `default:"false" yaml:"log_detail_enabled"`
Database string `default:"mysql" yaml:"database"`
DefaultVPCName string `default:"default-public-vpc" yaml:"default_vpc_name"`
IgnoreNICRegex string `default:"^(kube-ipvs)" yaml:"ignore_nic_regex"`
Expand Down
26 changes: 25 additions & 1 deletion server/controller/genesis/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,15 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
_, enabled := g.workloadResourceEnabledCache.Get(fmt.Sprintf("%d-%s", orgID, groupShortLcuuid))

platformData := request.GetPlatformData()
if version == localVersion || platformData == nil {
if version == localVersion {
// It is necessary to verify whether there are available data for use,
// If there is no data, needs to be re-reported.
if v, ok := g.gsync.GetVtapUpdatedVersion(vtap); !ok || v != version {
g.vtapToVersion.Store(vtap, uint64(0))
log.Infof("genesis sync re-reporting, not data for keep alive, reset local version, from ip %s vtap_id %v", remote, vtapID, logger.NewORGPrefix(orgID))
return &agent.GenesisSyncResponse{}, nil
}

// If the worload-v is modified to be enabled during the period of continuous heartbeat,
// it will trigger the re-reporting of the full data.
if _, ok := g.workloadResourceChangeEnabledCache.Get(vtap); ok {
Expand All @@ -278,6 +286,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
VtapID: vtapID,
ORGID: orgID,
TeamID: uint32(teamID),
Version: version,
MessageType: common.TYPE_RENEW,
Message: request,
StorageRefresh: refresh,
Expand All @@ -287,6 +296,20 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
return &agent.GenesisSyncResponse{Version: &localVersion}, nil
}

if platformData == nil {
log.Debugf("genesis sync received version %v platform is nil from ip %s vtap_id %v, local version %v", version, remote, vtapID, localVersion, logger.NewORGPrefix(orgID))
return &agent.GenesisSyncResponse{}, nil
}

// 当采集器为容器类型时(cluster id 非空)
// - 采集器未注册(vtapID==0),即使没有 Interfaces 也需要处理 vinterface 来让采集器能够注册
// - 采集器已经注册(vtapID!=0),采集器重启会出现 Interfaces 为空的情况
// 为了避免 vinterface 异常增删,丢弃当前消息
if k8sClusterID != "" && len(platformData.Interfaces) == 0 && vtapID != 0 {
log.Infof("genesis sync received version %v message with empty interfaces from ip %s vtap_id %v, local version %v", version, remote, vtapID, localVersion, logger.NewORGPrefix(orgID))
return &agent.GenesisSyncResponse{Version: &version}, nil
}

log.Infof("genesis sync received version %v -> %v from ip %s vtap_id %v", localVersion, version, remote, vtapID, logger.NewORGPrefix(orgID))
g.genesisSyncQueue.Put(
common.VIFRPCMessage{
Expand All @@ -295,6 +318,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
VtapID: vtapID,
ORGID: orgID,
TeamID: uint32(teamID),
Version: version,
K8SClusterID: k8sClusterID,
MessageType: common.TYPE_UPDATE,
Message: request,
Expand Down
2 changes: 1 addition & 1 deletion server/controller/genesis/store/kubernetes/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (k *KubernetesStorage) generateCache() {
} else {
endpoint = net.JoinHostPort(domain.ControllerIP, strconv.Itoa(k.listenNodePort))
}
cacheMap[k.formatKey(db.ORGID, domain.ClusterID)] = common.ClusterDest{
cacheMap[k.formatKey(db.ORGID, subDomain.ClusterID)] = common.ClusterDest{
Endpoint: endpoint,
DomainLcuuid: domain.Lcuuid,
SubDomainLcuuid: subDomain.Lcuuid,
Expand Down
Loading
Loading