Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
Refactoring (#19)
Browse files Browse the repository at this point in the history
* One more iteration of refactoring

* One more try to update ggr
  • Loading branch information
kirillbilchenko authored May 26, 2020
1 parent 61ae32c commit 4de405d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 85 deletions.
102 changes: 37 additions & 65 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package main

import (
"encoding/json"
"fmt"
. "github.com/aerokube/ggr/config"
"io/ioutil"
"math/rand"
"net/http"
"sort"
"strings"
"sync"
"time"

. "github.com/aerokube/ggr/config"
)

type set interface {
Expand All @@ -28,11 +26,10 @@ func newSet(data ...string) *setImpl {
}

type capacity struct {
Key *Host
queued int
pending int
used int
total int
Queued *int `json:"queued,omitempty" binding:"required"`
Pending *int `json:"pending,omitempty" binding:"required"`
Used *int `json:"used,omitempty" binding:"required"`
Total *int `json:"total,omitempty" binding:"required"`
}

type setImpl struct {
Expand Down Expand Up @@ -113,16 +110,12 @@ func choose(hosts Hosts) (*Host, int) {
return nil, -1
}

func findFirstNodeByQueue(currentHost *Host, hosts *Hosts, mutex *sync.RWMutex) (host *Host) {
func findFirstNodeByQueue(h *Host, hosts *Hosts, mutex *sync.RWMutex) (host *Host) {
if len(*hosts) < 1 {
return currentHost
return h
}
hostMap := map[string]*Host{}
for v := range *hosts {
hostMap[fmt.Sprintf("%s%d%s", (*hosts)[v].Name, (*hosts)[v].Port, (*hosts)[v].Region)] = &(*hosts)[v]
}
var capacities []capacity
mutex.Lock()
resultMap := make(map[*Host]*capacity)
defer mutex.Unlock()
var netClient = &http.Client{
Timeout: time.Second * 10,
Expand All @@ -132,62 +125,41 @@ func findFirstNodeByQueue(currentHost *Host, hosts *Hosts, mutex *sync.RWMutex)
if err != nil {
continue
}

responseMap := make(map[string]interface{})
body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return currentHost
}

err = json.Unmarshal(body, &responseMap)
if err != nil {
return currentHost
}
var cap capacity
if queued, ok := responseMap["queued"]; ok {
cap.queued = int(queued.(float64))
} else {
var capa capacity
err = json.NewDecoder(rsp.Body).Decode(&capa)
if err != nil && i < len(*hosts) {
continue
}

if pending, ok := responseMap["pending"]; ok {
cap.pending = int(pending.(float64))
} else {
if capa.Pending == nil || capa.Queued == nil || capa.Total == nil || capa.Used == nil {
continue
}

if used, ok := responseMap["used"]; ok {
cap.used = int(used.(float64))
} else {
continue
}

if total, ok := responseMap["total"]; ok {
cap.total = int(total.(float64))
} else {
continue
}

cap.Key = &(*hosts)[i]
capacities = append(capacities, cap)
resultMap[&(*hosts)[i]] = &capa
}
if len(capacities) < 1 {
return currentHost
netClient.CloseIdleConnections()
if len(resultMap) < 1 {
return h
}
var target = mostFreeHost(capacities)
if v, ok := hostMap[fmt.Sprintf("%s%d%s", target.Name, target.Port, target.Region)]; ok {
if &v == &currentHost {
return currentHost
}
return v
targetHost := mostFreeHost(resultMap)
if targetHost == h {
return h
}

return currentHost
return targetHost
}

func mostFreeHost(values []capacity) *Host {
sort.Slice(values, func(i, j int) bool {
return values[i].queued < values[j].queued
})
return values[0].Key
func mostFreeHost(target map[*Host]*capacity) *Host {
var queued int
var targetHost *Host
i := len(target)
for k, v := range target {
if i == len(target) {
queued = *v.Queued
targetHost = k
}
if *v.Queued < queued {
queued = *v.Queued
targetHost = k
}
i--
}
return targetHost
}
43 changes: 26 additions & 17 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func init() {
verbose = true
}

func newIntPointer(i int) *int {
return &i
}

func TestEmptyListOfHosts(t *testing.T) {
host, index := choose(Hosts{})
AssertThat(t, host, Is{(*Host)(nil)})
Expand Down Expand Up @@ -264,17 +268,21 @@ func TestChoosingAllHosts(t *testing.T) {
}

func TestFindMostFreeHostCapacity(t *testing.T) {
cap := []capacity{{Key: &Host{Name: "MaxLoad", Count: 1}, queued: 10, pending: 0, used: 0, total: 1}, {Key: &Host{Name: "MidLoad", Count: 1}, queued: 5, pending: 0, used: 0, total: 1}, {Key: &Host{Name: "Free", Count: 1}, queued: 0, pending: 0, used: 0, total: 1}}
targetHost := mostFreeHost(cap)
AssertThat(t, targetHost.Name, EqualTo{"Free"})
var capacity = map[*Host]*capacity{
&Host{Name: "MaxLoad", Count: 1}: {Queued: newIntPointer(10), Pending: newIntPointer(0), Used: newIntPointer(0), Total: newIntPointer(1)},
&Host{Name: "MidLoad", Count: 1}: {Queued: newIntPointer(5), Pending: newIntPointer(0), Used: newIntPointer(0), Total: newIntPointer(1)},
&Host{Name: "Free", Count: 1}: {Queued: newIntPointer(0), Pending: newIntPointer(0), Used: newIntPointer(0), Total: newIntPointer(1)},
}
targetHost := mostFreeHost(capacity)
AssertThat(t, targetHost, EqualTo{V: &Host{Name: "Free", Count: 1}})
}

func TestHostCapacity(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
switch req.RequestURI {
case "/status":
res.WriteHeader(200)
res.Write([]byte(fmt.Sprintf("{\"queued\":%s, \"pending\":%d, \"used\":%d, \"total\":%d }", strconv.Itoa(rand.Intn(20)), 0, 0, 0)))
res.Write([]byte(fmt.Sprintf("{\"Queued\":%s, \"Pending\":%d, \"Used\":%d, \"Total\":%d }", strconv.Itoa(rand.Intn(20)), 0, 0, 0)))
}
}))

Expand All @@ -284,7 +292,7 @@ func TestHostCapacity(t *testing.T) {
hosts := Hosts{Host{Name: ip.Hostname(), Port: port, Count: 1}, Host{Name: "mid", Count: 1}, Host{Name: "last", Count: 1}}
defer testServer.Close()
target := findFirstNodeByQueue(&hosts[0], &hosts, &sync.RWMutex{})
AssertThat(t, target, EqualTo{&hosts[0]})
AssertThat(t, target, EqualTo{V: &hosts[0]})
}

func TestErrorResponseHostCapacity(t *testing.T) {
Expand All @@ -300,63 +308,64 @@ func TestErrorResponseHostCapacity(t *testing.T) {

hosts := Hosts{Host{Name: ip.Hostname(), Port: port, Count: 1}, Host{Name: "mid", Count: 1}, Host{Name: "last", Count: 1}}
defer testServer.Close()
AssertThat(t, findFirstNodeByQueue(&hosts[0], &hosts, &sync.RWMutex{}), EqualTo{&hosts[0]})
target := findFirstNodeByQueue(&hosts[0], &hosts, &sync.RWMutex{})
AssertThat(t, target, EqualTo{V: &hosts[0]})
}

func TestEmptyHostListCapacity(t *testing.T) {
currentHost := Host{Name: "", Port: 0, Count: 1}
hosts := Hosts{}
AssertThat(t, findFirstNodeByQueue(&currentHost, &hosts, &sync.RWMutex{}), EqualTo{&currentHost})
hosts := Hosts{currentHost}
target := findFirstNodeByQueue(&currentHost, &hosts, &sync.RWMutex{})
AssertThat(t, target, EqualTo{V: &currentHost})
}

func TestWrongHostResponse(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
switch req.RequestURI {
case "/status":
res.WriteHeader(200)
res.Write([]byte(fmt.Sprintf("{\"queued\":%s, \"pending\":%d, \"used\":%d}", strconv.Itoa(rand.Intn(20)), 0, 0)))
res.Write([]byte(fmt.Sprintf("{\"Queued\":%s, \"Pending\":%d, \"Used\":%d}", strconv.Itoa(rand.Intn(20)), 0, 0)))
}
}))

ip, _ := url.Parse(testServer.URL)
port, _ := strconv.Atoi(ip.Port())
currentHost := Host{Name: "", Port: 0, Count: 1}
hosts := Hosts{Host{Name: ip.Hostname(), Port: port, Count: 1}, Host{Name: "mid", Count: 1}, Host{Name: "last", Count: 1}}
defer testServer.Close()
target := findFirstNodeByQueue(&currentHost, &hosts, &sync.RWMutex{})
AssertThat(t, target, EqualTo{&currentHost})
target := findFirstNodeByQueue(&hosts[0], &hosts, &sync.RWMutex{})
AssertThat(t, target, EqualTo{V: &hosts[0]})
}

func TestPartialHostResponse(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
switch req.RequestURI {
case "/status":
res.WriteHeader(200)
res.Write([]byte(fmt.Sprintf("{\"queued\":%s, \"pending\":%d, \"used\":%d}", strconv.Itoa(rand.Intn(20)), 0, 0)))
res.Write([]byte(fmt.Sprintf("{\"Queued\":%d, \"Pending\":%d, \"Used\":%d}", 2, 0, 0)))
}
}))

testServer2 := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
switch req.RequestURI {
case "/status":
res.WriteHeader(200)
res.Write([]byte(fmt.Sprintf("{\"queued\":%s, \"pending\":%d, \"used\":%d, \"total\":%d}", strconv.Itoa(rand.Intn(20)), 0, 0, 0)))
res.Write([]byte(fmt.Sprintf("{\"Queued\":%d, \"Pending\":%d, \"Used\":%d, \"Total\":%d}", 3, 0, 0, 0)))
}
}))

testServer3 := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
switch req.RequestURI {
case "/status":
res.WriteHeader(200)
res.Write([]byte(fmt.Sprintf("{\"queued\":%s, \"used\":%d, \"total\":%d}", strconv.Itoa(rand.Intn(20)), 0, 0)))
res.Write([]byte(fmt.Sprintf("{\"Queued\":%s, \"Used\":%d, \"Total\":%d}", strconv.Itoa(rand.Intn(20)), 0, 0)))
}
}))

testServer4 := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
switch req.RequestURI {
case "/status":
res.WriteHeader(200)
res.Write([]byte(fmt.Sprintf("{\"pending\":%d, \"used\":%d, \"total\":%d}", 0, 0, 0)))
res.Write([]byte(fmt.Sprintf("{\"Pending\":%d, \"Used\":%d, \"Total\":%d}", 0, 0, 0)))
}
}))

Expand All @@ -383,5 +392,5 @@ func TestPartialHostResponse(t *testing.T) {
defer testServer3.Close()
defer testServer4.Close()
target := findFirstNodeByQueue(&currentHost, &hosts, &sync.RWMutex{})
AssertThat(t, target, EqualTo{&hosts[1]})
AssertThat(t, *target, EqualTo{V: hosts[1]})
}
4 changes: 2 additions & 2 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,9 @@ loop:
}
if r.Header["X-Selenoid-No-Wait"] == nil && uniformDistribution {
confLock.RLock()
hosts, _, _ := browsers.find(browser, version, platform, newSet(), newSet())
hostsList, _, _ := browsers.find(browser, version, platform, newSet(), newSet())
confLock.RUnlock()
h = findFirstNodeByQueue(h, &hosts, &confLock)
h = findFirstNodeByQueue(h, &hostsList, &confLock)
}
if h == nil {
break loop
Expand Down
2 changes: 1 addition & 1 deletion proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ func TestStartSessionUniformDistribution(t *testing.T) {
testStartSession(t, mux, browsersProvider, "browser", "1.0")
testStartSessionCustomCaps(t, mux, browsersProvider, `{"desiredCapabilities":{"deviceName":"someDevice", "version":"2.0"}}`)
testStartSessionCustomCaps(t, mux, browsersProvider, `{"desiredCapabilities":{"deviceName":"someDevice", "version":"2.0"}}`)
uniformDistribution = false
}

func TestStartSessionWithLocationHeader(t *testing.T) {
Expand Down Expand Up @@ -1737,7 +1738,6 @@ func TestPanicGuestQuotaMissingUsersFileAuthPresent(t *testing.T) {
}

func TestCreateSessionChangeRegionOnFailure(t *testing.T) {
uniformDistribution = false
var selectedRegions []string

srv1 := httptest.NewServer(recordingMux("a", &selectedRegions))
Expand Down

0 comments on commit 4de405d

Please sign in to comment.