Merge pull request #31 from gianarb/feature/cooldown-period

Cooldown period
This commit is contained in:
Gianluca Arbezzano 2017-08-08 21:22:59 +02:00 committed by GitHub
commit 21d74312cc
8 changed files with 68 additions and 17 deletions

View File

@ -19,7 +19,7 @@ type scaleRequest struct {
Direction bool `json:"direction"` Direction bool `json:"direction"`
} }
func Handle(scalers autoscaler.Autoscalers) func(w http.ResponseWriter, r *http.Request) { func Handle(scalers *autoscaler.Autoscalers) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
var err error var err error
requestDump, err := httputil.DumpRequest(r, true) requestDump, err := httputil.DumpRequest(r, true)
@ -61,7 +61,7 @@ func Handle(scalers autoscaler.Autoscalers) func(w http.ResponseWriter, r *http.
return return
} }
s, ok := scalers[fmt.Sprintf("%s/%s", autoscalerName, serviceName)] s, ok := (*scalers)[fmt.Sprintf("%s/%s", autoscalerName, serviceName)]
if !ok { if !ok {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"path": r.URL.RawPath, "path": r.URL.RawPath,

View File

@ -6,10 +6,10 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
func GetRouter(core core.Core, eventChannel chan *logrus.Entry) *mux.Router { func GetRouter(core *core.Core, eventChannel chan *logrus.Entry) *mux.Router {
r := mux.NewRouter() r := mux.NewRouter()
r.HandleFunc("/handle/{autoscaler_name}/{service_name}", Handle(core.Autoscalers)).Methods("POST") r.HandleFunc("/handle/{autoscaler_name}/{service_name}", Handle(&core.Autoscalers)).Methods("POST")
r.HandleFunc("/handle/{autoscaler_name}/{service_name}/{direction}", Handle(core.Autoscalers)).Methods("POST") r.HandleFunc("/handle/{autoscaler_name}/{service_name}/{direction}", Handle(&core.Autoscalers)).Methods("POST")
r.HandleFunc("/autoscaler", AutoscalerList(core.Autoscalers)).Methods("GET") r.HandleFunc("/autoscaler", AutoscalerList(core.Autoscalers)).Methods("GET")
r.HandleFunc("/health", Health()).Methods("GET") r.HandleFunc("/health", Health()).Methods("GET")
r.HandleFunc("/events", Events(eventChannel)).Methods("GET") r.HandleFunc("/events", Events(eventChannel)).Methods("GET")

View File

@ -1,6 +1,15 @@
package autoscaler package autoscaler
import "github.com/Sirupsen/logrus" import (
"context"
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/pkg/errors"
)
type Provider interface { type Provider interface {
Scale(string, int, bool) error Scale(string, int, bool) error
@ -14,26 +23,64 @@ type Autoscaler struct {
serviceId string serviceId string
targetUp int targetUp int
targetDown int targetDown int
CoolDown int
} }
func NewAutoscaler(p Provider, serviceId string, targetUp int, targetDown int) Autoscaler { func NewAutoscaler(p Provider, serviceId string, targetUp int, targetDown int, coolDown int) Autoscaler {
a := Autoscaler{ a := Autoscaler{
provider: p, provider: p,
serviceId: serviceId, serviceId: serviceId,
targetUp: targetUp, targetUp: targetUp,
targetDown: targetDown, targetDown: targetDown,
CoolDown: coolDown,
} }
return a return a
} }
func canScale(serviceId string, coolDown time.Duration) (retval bool, err error) {
retval = false
err = nil
ctx := context.Background()
dockerClient, err := client.NewEnvClient()
if err != nil {
logrus.WithField("error", err).Debug("Problem communication with Docker")
return
}
services, err := dockerClient.ServiceList(ctx, types.ServiceListOptions{})
if err != nil {
logrus.WithField("error", err).Debug("Bad comunication with Docker.")
return
}
for _, service := range services {
if service.Spec.Name != serviceId {
continue
}
// now < updatedAt + coolDown ??
if time.Now().Before(service.Meta.UpdatedAt.Add(coolDown)) {
err = errors.New(fmt.Sprintf("Cooldown period for %f seconds", service.Meta.UpdatedAt.Add(coolDown).Sub(time.Now()).Seconds()))
break
} else {
retval = true
break
}
}
return
}
func (a *Autoscaler) ScaleUp() error { func (a *Autoscaler) ScaleUp() error {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"service": a.serviceId, "service": a.serviceId,
"direction": true, "direction": true,
}).Infof("Received a new request to scale up %s with %d task.", a.serviceId, a.targetUp) }).Infof("Received a new request to scale up %s with %d task.", a.serviceId, a.targetUp)
err := a.provider.Scale(a.serviceId, a.targetUp, true) if ok, err := canScale(a.serviceId, time.Duration(a.CoolDown)*time.Second); ok == false {
logrus.Warn("Cannot scale up during coolDown period!")
return err
}
err := a.provider.Scale(a.serviceId, a.targetUp, true)
if err != nil { if err != nil {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"service": a.serviceId, "service": a.serviceId,
@ -56,6 +103,10 @@ func (a *Autoscaler) ScaleDown() error {
"direction": false, "direction": false,
}).Infof("Received a new request to scale down %s with %d task.", a.serviceId, a.targetDown) }).Infof("Received a new request to scale down %s with %d task.", a.serviceId, a.targetDown)
if ok, err := canScale(a.serviceId, time.Duration(a.CoolDown)*time.Second); ok == false {
logrus.Warn("Cannot scale down during coolDown period!")
return err
}
err := a.provider.Scale(a.serviceId, a.targetDown, false) err := a.provider.Scale(a.serviceId, a.targetDown, false)
if err != nil { if err != nil {

View File

@ -6,7 +6,7 @@ import (
func TestFakeProvider(t *testing.T) { func TestFakeProvider(t *testing.T) {
var p Provider var p Provider
a := NewAutoscaler(p, "fgaerge", 3, 4) a := NewAutoscaler(p, "fgaerge", 3, 4, 1)
if a.provider != p { if a.provider != p {
t.Fail() t.Fail()
} }

View File

@ -99,7 +99,7 @@ func (c *DaemonCmd) Run(args []string) int {
}() }()
// Add routing // Add routing
router := api.GetRouter(coreEngine, c.EventChannel) router := api.GetRouter(&coreEngine, c.EventChannel)
logrus.Infof("API Server run on port %s", port) logrus.Infof("API Server run on port %s", port)
http.ListenAndServe(port, router) http.ListenAndServe(port, router)
return 0 return 0

View File

@ -64,8 +64,9 @@ func getAutoscalerByService(p autoscaler.Provider, an swarm.Annotations) (autosc
} }
up := convertStringLabelToInt("orbiter.up", an.Labels) up := convertStringLabelToInt("orbiter.up", an.Labels)
down := convertStringLabelToInt("orbiter.down", an.Labels) down := convertStringLabelToInt("orbiter.down", an.Labels)
as := autoscaler.NewAutoscaler(p, an.Name, up, down) cool := convertStringLabelToInt("orbiter.cooldown", an.Labels)
logrus.Debugf("Registering /handle/autoswarm/%s to orbiter. (UP %d, DOWN %d)", an.Name, up, down) as := autoscaler.NewAutoscaler(p, an.Name, up, down, cool)
logrus.Infof("Registering /handle/autoswarm/%s to orbiter. (UP %d, DOWN %d, COOL %d)", an.Name, up, down, cool)
return as, nil return as, nil
} }

View File

@ -5,10 +5,9 @@ import (
) )
type PolicyConf struct { type PolicyConf struct {
// Number of tasks to start during a scale up Up int `yaml:"up"` // Number of tasks to start during a scale up
Up int `yaml:"up"` Down int `yaml:"down"` // Number of tasks to start during a scale down
// Number of tasks to start during a scale down CoolDown int `yaml:"cooldown"` // Number of milliseconds to sleep avoidin too quick scale
Down int `yaml:"down"`
} }
type AutoscalerConf struct { type AutoscalerConf struct {

View File

@ -19,7 +19,7 @@ func NewCoreByConfig(c map[string]AutoscalerConf, core *Core) error {
return err return err
} }
for serviceName, policy := range scaler.Policies { for serviceName, policy := range scaler.Policies {
scalers[fmt.Sprintf("%s/%s", scalerName, serviceName)] = autoscaler.NewAutoscaler(p, serviceName, policy.Up, policy.Down) scalers[fmt.Sprintf("%s/%s", scalerName, serviceName)] = autoscaler.NewAutoscaler(p, serviceName, policy.Up, policy.Down, policy.CoolDown)
} }
} }
core.Autoscalers = scalers core.Autoscalers = scalers