Skip to content

Commit

Permalink
Add better config, implement logging
Browse files Browse the repository at this point in the history
  • Loading branch information
pawalt committed Jul 6, 2019
1 parent 1962821 commit f323c22
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 75 deletions.
15 changes: 10 additions & 5 deletions cmd/caplance/cmd/client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cmd

import (
"log"
"net"

"github.com/pwpon500/caplance/internal/client"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -17,26 +17,31 @@ var clientCmd = &cobra.Command{
Short: "Start caplance in client mode",
Long: `Mark this host as a backend, allowing packets to be forwarded to it from the load balancer.`,
Run: func(cmd *cobra.Command, args []string) {
log.Infoln("Reading in config file")
readConfig()
vip := net.ParseIP(conf.VIP)
if vip == nil {
log.Fatal("Could not parse vip: " + conf.VIP)
log.Fatalln("Could not parse vip: " + conf.VIP)
}
dataIP := net.ParseIP(conf.Client.DataIP)
if dataIP == nil {
log.Fatal("Could not parse data ip: " + conf.Client.DataIP)
log.Fatalln("Could not parse data ip: " + conf.Client.DataIP)
}
c := client.NewClient(vip, dataIP)
if conf.Client.Name == "" {
log.Fatalln("Please provide a client name")
}
c := client.NewClient(conf.Client.Name, vip, dataIP, conf.ReadTimeout, conf.WriteTimeout, conf.HealthRate, conf.Sockaddr)
connectIP := net.ParseIP(conf.Client.ConnectIP)
if connectIP == nil {
connectIP = net.ParseIP(conf.Server.MngIP)
if connectIP == nil {
log.Fatalf("Could not parse Client.ConnectIP (%v) or Server.MngIP (%v)\n", conf.Client.ConnectIP, conf.Server.MngIP)
}
}
log.Infoln("Starting client")
err := c.Start(connectIP)
if err != nil {
log.Fatal("Failed to start with error: " + err.Error())
log.Fatalln("Failed to start with error: " + err.Error())
}
},
}
8 changes: 4 additions & 4 deletions cmd/caplance/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type config struct {
Client struct {
ConnectIP string
DataIP string
Name string
}
Server struct {
MngIP string
Expand All @@ -21,10 +22,9 @@ type config struct {
VIP string
Test bool

HealthRate int
RegisterTimeout int
ReadTimeout int
WriteTimeout int
HealthRate int
ReadTimeout int
WriteTimeout int

Sockaddr string
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/caplance/cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package cmd

import (
"log"
"net"
"strconv"

"github.com/pwpon500/caplance/internal/balancer"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -18,6 +18,7 @@ var serverCmd = &cobra.Command{
Short: "Start caplance in server mode",
Long: `Mark this host as the load balancer, forwarding packets to a set of backends.`,
Run: func(cmd *cobra.Command, args []string) {
log.Infoln("Reading in config file")
readConfig()
vip := net.ParseIP(conf.VIP)
if vip == nil {
Expand All @@ -30,10 +31,11 @@ var serverCmd = &cobra.Command{
if conf.Server.BackendCapacity <= 0 {
log.Fatal("Backend capacity " + strconv.Itoa(conf.Server.BackendCapacity) + " must be postive.")
}
b, err := balancer.New(vip, mngIP, conf.Server.BackendCapacity)
b, err := balancer.New(vip, mngIP, conf.Server.BackendCapacity, conf.ReadTimeout, conf.WriteTimeout)
if err != nil {
log.Fatal("Error when creating balancer: " + err.Error())
}
log.Infoln("Starting load balancer")
b.Start()
},
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/kkdai/maglev v0.0.0-20190512112251-2d79cc08016e
github.com/kr/pty v1.1.4 // indirect
github.com/mdlayher/raw v0.0.0-20190419142535-64193704e472 // indirect
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.3.2
github.com/stamblerre/gocode v0.0.0-20190327203809-810592086997 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ github.com/kkdai/maglev v0.0.0-20170625132216-9074cd53582b h1:e8NwpJMRdbVdN+w4/D
github.com/kkdai/maglev v0.0.0-20170625132216-9074cd53582b/go.mod h1:+19zs6rDZlKbJTe83xieLtHpvaLS7jtFQPH3JK0KoKs=
github.com/kkdai/maglev v0.0.0-20190512112251-2d79cc08016e h1:qU5KjWxBHf+AXk8lAnWbzl+9TWg+o2KZYuvwOYWsPpU=
github.com/kkdai/maglev v0.0.0-20190512112251-2d79cc08016e/go.mod h1:+19zs6rDZlKbJTe83xieLtHpvaLS7jtFQPH3JK0KoKs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.4/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -55,6 +56,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
Expand All @@ -69,6 +72,7 @@ github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stamblerre/gocode v0.0.0-20190327203809-810592086997 h1:LF81AGV63kJoxjmSgQPT8FARAMHeY46CYQ4TNoVDWHM=
github.com/stamblerre/gocode v0.0.0-20190327203809-810592086997/go.mod h1:EM2T8YDoTCvGXbEpFHxarbpv7VE26QD1++Cb1Pbh7Gs=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
Expand Down Expand Up @@ -107,6 +111,7 @@ golang.org/x/sys v0.0.0-20190309122539-980fc434d28e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 h1:rM0ROo5vb9AdYJi1110yjWGMej9ITfKddS89P3Fkhug=
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190516014833-cab07311ab81 h1:5Q88vZAfC0WB8T1GHRLttQaZdCNeQHM40n41gMUeFlI=
Expand Down
38 changes: 19 additions & 19 deletions internal/balancer/backends/manager.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package backends

import (
"log"
"math/rand"
"net"
"regexp"
"strconv"
"strings"

"github.com/pwpon500/caplance/pkg/util"
)
log "github.com/sirupsen/logrus"

const (
REGISTER_TIMEOUT = 10
READ_TIMEOUT = 30
WRITE_TIMEOUT = 5
"github.com/pwpon500/caplance/pkg/util"
)

type managedBackend struct {
Expand All @@ -30,10 +25,12 @@ type Manager struct {
listenPort int // port to listen on
handler *Handler // handler for backends
managedBackends map[string]*managedBackend // map of backend name to its communicator
readTimeout int
writeTimeout int
}

// NewManager instantiates a new instance of the Manager object
func NewManager(ip net.IP, port, capacity int) (*Manager, error) {
func NewManager(ip net.IP, port, capacity, readTimeout, writeTimeout int) (*Manager, error) {
handler, err := NewHandler(capacity)

if err != nil {
Expand All @@ -44,7 +41,9 @@ func NewManager(ip net.IP, port, capacity int) (*Manager, error) {
listenIP: ip,
listenPort: port,
handler: handler,
managedBackends: make(map[string]*managedBackend)}, nil
managedBackends: make(map[string]*managedBackend),
readTimeout: readTimeout,
writeTimeout: writeTimeout}, nil
}

// Listen listens for new connections, registering them if needed
Expand All @@ -53,12 +52,12 @@ func (m *Manager) Listen() {
var err error
m.listener, err = net.Listen("tcp", m.listenIP.String()+":"+strconv.Itoa(m.listenPort))
if err != nil {
log.Panic(err)
log.Panicln(err)
}
for {
conn, err := m.listener.Accept()
if err != nil {
log.Println(err)
log.Debugln(err)
}
go m.attemptRegister(conn)
}
Expand All @@ -77,11 +76,11 @@ func (m *Manager) GetBackends() []*Backend {
// registration message should be in the following format:
// REGISTER <desired_name> <ip>
func (m *Manager) attemptRegister(conn net.Conn) {
comm := util.NewTCPCommunicator(conn, READ_TIMEOUT, WRITE_TIMEOUT)
comm := util.NewTCPCommunicator(conn, m.readTimeout, m.writeTimeout)

response, err := comm.ReadLine()
if err != nil {
log.Println(err)
log.Debugln(err)
conn.Close()
return
}
Expand All @@ -97,7 +96,7 @@ func (m *Manager) attemptRegister(conn net.Conn) {

cleaner, err := regexp.Compile("[^a-zA-Z0-9-_.]+")
if err != nil {
log.Panic(err)
log.Panicln(err)
}
cleanedName := cleaner.ReplaceAllString(tokens[1], "")

Expand All @@ -110,7 +109,7 @@ func (m *Manager) attemptRegister(conn net.Conn) {

err = m.handler.Add(cleanedName, ip)
if err != nil {
log.Println(err)
log.Infoln(err)
conn.Close()
return
}
Expand All @@ -124,15 +123,15 @@ func (m *Manager) attemptRegister(conn net.Conn) {
comm.WriteLine("INVALID error while trying to read sanity check")
conn.Close()
m.handler.Remove(cleanedName)
log.Println("Error while trying to sanity check: " + err.Error())
log.Infoln("Error while trying to sanity check: " + err.Error())
}

sanityTokens := strings.Split(sanityResponse, " ")
if len(sanityTokens) < 2 || sanityTokens[0] != "SANE" || sanityTokens[1] != randString {
comm.WriteLine("INVALID bad sanity check url")
conn.Close()
m.handler.Remove(cleanedName)
log.Println("Client udp sanity check failed")
log.Infoln("Client udp sanity check failed")
return
}

Expand All @@ -155,10 +154,10 @@ func (m *Manager) monitor(name string) {
message, err := comm.ReadLine()
if err != nil {
if errChk, ok := err.(net.Error); ok && errChk.Timeout() {
log.Println(err)
log.Warnln(err)
m.deregisterClient(name, "health check timeout ran out")
} else {
log.Println(err)
log.Warnln(err)
m.deregisterClient(name, "error reading from tcp connection: "+err.Error())
}
return
Expand Down Expand Up @@ -218,4 +217,5 @@ func (m *Manager) deregisterClient(name, reason string) {
comm := m.managedBackends[name].comm
comm.WriteLine("DEREGISTERED " + name + " " + reason)
comm.Close()
log.Infoln("Deregistered " + name)
}
22 changes: 13 additions & 9 deletions internal/balancer/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package balancer

import (
"errors"
"log"
"net"
"os"
"os/signal"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/AkihiroSuda/go-netfilter-queue"
"github.com/coreos/go-iptables/iptables"
"github.com/pwpon500/caplance/internal/balancer/backends"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
)

Expand All @@ -26,11 +26,13 @@ type Balancer struct {
testFlag bool // flag to check if we're in test mode
mux sync.Mutex // lock to ensure we don't start and stop at the same time
nfq *netfilter.NFQueue // queue to grab packets from the iptables nfqueue
readTimeout int
writeTimeout int
}

// New creates new Balancer. Throws error if capacity is not prime
func New(startVIP, toConnect net.IP, capacity int) (*Balancer, error) {
manager, err := backends.NewManager(toConnect, 1338, capacity)
func New(startVIP, toConnect net.IP, capacity, readTimeout, writeTimeout int) (*Balancer, error) {
manager, err := backends.NewManager(toConnect, 1338, capacity, readTimeout, writeTimeout)
if err != nil {
return nil, err
}
Expand All @@ -41,12 +43,14 @@ func New(startVIP, toConnect net.IP, capacity int) (*Balancer, error) {
connectIP: toConnect,
packets: make(chan []byte, 100),
stopChan: make(chan os.Signal, 5),
testFlag: false}, nil
testFlag: false,
readTimeout: readTimeout,
writeTimeout: writeTimeout}, nil
}

// NewTest creates new Balancer with the testing flag on
func NewTest(startVIP, toConnect net.IP, capacity int) (*Balancer, error) {
back, err := New(startVIP, toConnect, capacity)
func NewTest(startVIP, toConnect net.IP, capacity, readTimeout, writeTimeout int) (*Balancer, error) {
back, err := New(startVIP, toConnect, capacity, readTimeout, writeTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -99,21 +103,21 @@ func (b *Balancer) Start() error {

ipt, err := iptables.New()
if err != nil {
log.Println(err)
log.Errorln(err)
}
ipt.Delete("filter", "INPUT", "-j", "NFQUEUE", "--queue-num", "0", "-d", b.vip.String(), "-p", "tcp")
ipt.Delete("filter", "INPUT", "-j", "NFQUEUE", "--queue-num", "0", "-d", b.vip.String(), "-p", "udp")

if graceful && !b.testFlag {
log.Println("Exiting")
log.Infoln("Exiting")
os.Exit(0)
}

b.mux.Unlock()
}()
sig := <-b.stopChan
graceful = true
log.Printf("caught sig: %+v \n", sig)
log.Warnf("caught sig: %+v \n", sig)
b.stopChan <- sig
}()

Expand Down
Loading

0 comments on commit f323c22

Please sign in to comment.