Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only add resources from cond ready and status true #410

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 77 additions & 65 deletions pkg/controller/clusterstate/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ package cache
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"sync"
"time"

"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -210,7 +211,6 @@ func (sc *ClusterStateCache) GetUnallocatedResources() *api.Resource {
return r.Add(sc.availableResources)
}


func (sc *ClusterStateCache) GetUnallocatedHistograms() map[string]*dto.Metric {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
Expand Down Expand Up @@ -238,7 +238,7 @@ func (sc *ClusterStateCache) GetResourceCapacities() *api.Resource {

// Save the cluster state.
func (sc *ClusterStateCache) saveState(available *api.Resource, capacity *api.Resource,
availableHistogram *api.ResourceHistogram) error {
availableHistogram *api.ResourceHistogram) error {
klog.V(12).Infof("Saving Cluster State")

sc.Mutex.Lock()
Expand All @@ -261,75 +261,87 @@ func (sc *ClusterStateCache) updateState() error {
idleMin := api.EmptyResource()
idleMax := api.EmptyResource()

firstNode := true
firstNode := true

var err error = nil
for _, value := range cluster.Nodes {
// Do not use any Unschedulable nodes in calculations
if value.Unschedulable == true {
klog.V(6).Infof("[updateState] %s is marked as unschedulable node Total: %v, Used: %v, and Idle: %v will not be included in cluster state calculation.",
value.Name, value.Allocatable, value.Used, value.Idle)
continue
}

total = total.Add(value.Allocatable)
used = used.Add(value.Used)
idle = idle.Add(value.Idle)

// Collect Min and Max for histogram
if firstNode {
idleMin.MilliCPU = idle.MilliCPU
idleMin.Memory = idle.Memory
idleMin.GPU = idle.GPU

idleMax.MilliCPU = idle.MilliCPU
idleMax.Memory = idle.Memory
idleMax.GPU = idle.GPU
firstNode = false
} else {
if value.Idle.MilliCPU < idleMin.MilliCPU {
idleMin.MilliCPU = value.Idle.MilliCPU
} else if value.Idle.MilliCPU > idleMax.MilliCPU {
idleMax.MilliCPU = value.Idle.MilliCPU
}
for _, cond := range value.Node.Status.Conditions {
if cond.Type == v1.NodeReady {
if cond.Status == "True" {
// Do not use any Unschedulable nodes in calculations
if value.Unschedulable == true {
klog.V(6).Infof("[updateState] %s is marked as unschedulable node Total: %v, Used: %v, and Idle: %v will not be included in cluster state calculation.",
value.Name, value.Allocatable, value.Used, value.Idle)
continue
}

total = total.Add(value.Allocatable)
used = used.Add(value.Used)
idle = idle.Add(value.Idle)

// Collect Min and Max for histogram
if firstNode {
idleMin.MilliCPU = idle.MilliCPU
idleMin.Memory = idle.Memory
idleMin.GPU = idle.GPU

idleMax.MilliCPU = idle.MilliCPU
idleMax.Memory = idle.Memory
idleMax.GPU = idle.GPU
firstNode = false
} else {
if value.Idle.MilliCPU < idleMin.MilliCPU {
idleMin.MilliCPU = value.Idle.MilliCPU
} else if value.Idle.MilliCPU > idleMax.MilliCPU {
idleMax.MilliCPU = value.Idle.MilliCPU
}

if value.Idle.Memory < idleMin.Memory {
idleMin.Memory = value.Idle.Memory
} else if value.Idle.Memory > idleMax.Memory {
idleMax.Memory = value.Idle.Memory
}

if value.Idle.GPU < idleMin.GPU {
idleMin.GPU = value.Idle.GPU
} else if value.Idle.GPU > idleMax.GPU {
idleMax.GPU = value.Idle.GPU
}
}
}

if value.Idle.Memory < idleMin.Memory {
idleMin.Memory = value.Idle.Memory
} else if value.Idle.Memory > idleMax.Memory{
idleMax.Memory = value.Idle.Memory
}
// Create available histograms
newIdleHistogram := api.NewResourceHistogram(idleMin, idleMax)
for _, value := range cluster.Nodes {
newIdleHistogram.Observer(value.Idle)
}

klog.V(8).Infof("Total capacity %+v, used %+v, free space %+v", total, used, idle)
if klog.V(12).Enabled() {
// CPU histogram
metricCPU := &dto.Metric{}
(*newIdleHistogram.MilliCPU).Write(metricCPU)
klog.V(12).Infof("[updateState] CPU histogram:\n%s", proto.MarshalTextString(metricCPU))

// Memory histogram
metricMem := &dto.Metric{}
(*newIdleHistogram.Memory).Write(metricMem)
klog.V(12).Infof("[updateState] Memory histogram:\n%s", proto.MarshalTextString(metricMem))

// GPU histogram
metricGPU := &dto.Metric{}
(*newIdleHistogram.GPU).Write(metricGPU)
klog.V(12).Infof("[updateState] GPU histogram:\n%s", proto.MarshalTextString(metricGPU))
}

err = sc.saveState(idle, total, newIdleHistogram)

if value.Idle.GPU < idleMin.GPU {
idleMin.GPU = value.Idle.GPU
} else if value.Idle.GPU > idleMax.GPU {
idleMax.GPU = value.Idle.GPU
}
}
}

// Create available histograms
newIdleHistogram := api.NewResourceHistogram(idleMin, idleMax)
for _, value := range cluster.Nodes {
newIdleHistogram.Observer(value.Idle)
}
}

klog.V(8).Infof("Total capacity %+v, used %+v, free space %+v", total, used, idle)
if klog.V(12).Enabled() {
// CPU histogram
metricCPU := &dto.Metric{}
(*newIdleHistogram.MilliCPU).Write(metricCPU)
klog.V(12).Infof("[updateState] CPU histogram:\n%s", proto.MarshalTextString(metricCPU))

// Memory histogram
metricMem := &dto.Metric{}
(*newIdleHistogram.Memory).Write(metricMem)
klog.V(12).Infof("[updateState] Memory histogram:\n%s", proto.MarshalTextString(metricMem))

// GPU histogram
metricGPU := &dto.Metric{}
(*newIdleHistogram.GPU).Write(metricGPU)
klog.V(12).Infof("[updateState] GPU histogram:\n%s", proto.MarshalTextString(metricGPU))
}

err := sc.saveState(idle, total, newIdleHistogram)
return err
}

Expand Down
37 changes: 34 additions & 3 deletions pkg/controller/clusterstate/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"reflect"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -76,7 +76,7 @@ func cacheEqual(l, r *ClusterStateCache) bool {
jobsEqual(l.Jobs, r.Jobs)
}

func buildNode(name string, alloc v1.ResourceList) *v1.Node {
func buildNode(name string, alloc v1.ResourceList, nodecondtype string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(name),
Expand All @@ -85,6 +85,9 @@ func buildNode(name string, alloc v1.ResourceList) *v1.Node {
Status: v1.NodeStatus{
Capacity: alloc,
Allocatable: alloc,
Conditions: []v1.NodeCondition{
{Type: v1.NodeConditionType(nodecondtype)},
},
},
}
}
Expand Down Expand Up @@ -155,7 +158,7 @@ func TestAddPod(t *testing.T) {
j1.AddTaskInfo(pi1)
j1.AddTaskInfo(pi2)

node1 := buildNode("n1", buildResourceList("2000m", "10G"))
node1 := buildNode("n1", buildResourceList("2000m", "10G"), string(v1.NodeReady))
ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

Expand Down Expand Up @@ -199,3 +202,31 @@ func TestAddPod(t *testing.T) {
}
}

func TestNodeStatus(t *testing.T) {

var memory float64 = 0.0
var millicpu float64 = 0.0
var gpu int64 = 0.0

node1 := buildNode("n1", buildResourceList("2000m", "10G"), string(v1.NodeDiskPressure))

cache := &ClusterStateCache{
Nodes: make(map[string]*api.NodeInfo),
}

cache.addNode(node1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you add an "unllocatable node" to this test, it will be "LGTM" :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node with status v1.NodeDiskPressure is unallocatable according to MCAD, do we need another test and if yes what do we think should be the node condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to both questions. I would try to get close to 100% coverage for this test case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v1. NodeDiskPressure is already part of this PR


cache.updateState()

if cache.availableResources != nil {
memory = cache.availableResources.Memory
millicpu = cache.availableResources.MilliCPU
gpu = cache.availableResources.GPU
}

if cache.availableResources != nil {
t.Errorf("expected all values to be 0, \n got memory: %v, millicpus: %v, gpu:%v \n",
memory, millicpu, gpu)
}

}