diff --git a/main.go b/main.go index 2aae4c0d4..606157f57 100644 --- a/main.go +++ b/main.go @@ -125,8 +125,13 @@ func main() { var cableHealthchecker healthchecker.Interface if len(submSpec.GlobalCidr) == 0 && submSpec.HealthCheckEnabled { - cableHealthchecker, err = healthchecker.New(&watcher.Config{RestConfig: cfg}, submSpec.Namespace, - submSpec.ClusterID, submSpec.HealthCheckInterval, submSpec.HealthCheckMaxPacketLossCount) + cableHealthchecker, err = healthchecker.New(&healthchecker.Config{ + WatcherConfig: &watcher.Config{RestConfig: cfg}, + EndpointNamespace: submSpec.Namespace, + ClusterID: submSpec.ClusterID, + PingInterval: submSpec.HealthCheckInterval, + MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount, + }) if err != nil { klog.Errorf("Error creating healthChecker: %v", err) } diff --git a/pkg/cableengine/healthchecker/fake/pinger.go b/pkg/cableengine/healthchecker/fake/pinger.go new file mode 100644 index 000000000..68a814172 --- /dev/null +++ b/pkg/cableengine/healthchecker/fake/pinger.go @@ -0,0 +1,62 @@ +package fake + +import ( + "sync/atomic" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" +) + +type Pinger struct { + ip string + latencyInfo atomic.Value + start chan struct{} + stop chan struct{} +} + +func NewPinger(ip string) *Pinger { + return &Pinger{ + ip: ip, + start: make(chan struct{}), + stop: make(chan struct{}), + } +} + +func (p *Pinger) Start() { + defer GinkgoRecover() + Expect(p.start).ToNot(BeClosed()) + close(p.start) +} + +func (p *Pinger) Stop() { + defer GinkgoRecover() + Expect(p.stop).ToNot(BeClosed()) + close(p.stop) +} + +func (p *Pinger) GetLatencyInfo() *healthchecker.LatencyInfo { + o := p.latencyInfo.Load() + if o != nil { + info := o.(healthchecker.LatencyInfo) + return &info + } + + return nil +} + +func (p *Pinger) SetLatencyInfo(info *healthchecker.LatencyInfo) { + p.latencyInfo.Store(*info) +} + +func (p *Pinger) GetIP() string { + return p.ip +} + +func (p *Pinger) AwaitStart() { + Eventually(p.start).Should(BeClosed(), "Start was not called") +} + +func (p *Pinger) AwaitStop() { + Eventually(p.stop).Should(BeClosed(), "Stop was not called") +} diff --git a/pkg/cableengine/healthchecker/healthchecker.go b/pkg/cableengine/healthchecker/healthchecker.go index 404619ea4..c848263cc 100644 --- a/pkg/cableengine/healthchecker/healthchecker.go +++ b/pkg/cableengine/healthchecker/healthchecker.go @@ -1,7 +1,6 @@ package healthchecker import ( - "strconv" "sync" "time" @@ -23,21 +22,27 @@ type Interface interface { GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo } +type Config struct { + WatcherConfig *watcher.Config + EndpointNamespace string + ClusterID string + PingInterval uint + MaxPacketLossCount uint + NewPinger func(string, time.Duration, uint) PingerInterface +} + type controller struct { - endpointWatcher watcher.Interface - pingers sync.Map - clusterID string - pingInterval uint - maxPacketLossCount uint + endpointWatcher watcher.Interface + pingers sync.Map + config *Config } -func New(config *watcher.Config, endpointNameSpace, clusterID string, pingInterval, maxPacketLossCount uint) (Interface, error) { +func New(config *Config) (Interface, error) { controller := &controller{ - clusterID: clusterID, - pingInterval: pingInterval, - maxPacketLossCount: maxPacketLossCount, + config: config, } - config.ResourceConfigs = []watcher.ResourceConfig{ + + config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{ { Name: "HealthChecker Endpoint Controller", ResourceType: &submarinerv1.Endpoint{}, @@ -46,41 +51,23 @@ func New(config *watcher.Config, endpointNameSpace, clusterID string, pingInterv OnUpdateFunc: controller.endpointCreatedorUpdated, OnDeleteFunc: controller.endpointDeleted, }, - SourceNamespace: endpointNameSpace, + SourceNamespace: config.EndpointNamespace, }, } - endpointWatcher, err := watcher.New(config) + var err error + controller.endpointWatcher, err = watcher.New(config.WatcherConfig) if err != nil { return nil, err } - controller.endpointWatcher = endpointWatcher - return controller, nil } func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo { if obj, found := h.pingers.Load(endpoint.CableName); found { - pinger := obj.(*pingerInfo) - - lastTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.lastRtt, 10) + "ns") - minTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.minRtt, 10) + "ns") - averageTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.mean, 10) + "ns") - maxTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.maxRtt, 10) + "ns") - stdDevTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.stdDev, 10) + "ns") - - return &LatencyInfo{ - ConnectionError: pinger.failureMsg, - Spec: &submarinerv1.LatencyRTTSpec{ - Last: lastTime.String(), - Min: minTime.String(), - Average: averageTime.String(), - Max: maxTime.String(), - StdDev: stdDevTime.String(), - }, - } + return obj.(PingerInterface).GetLatencyInfo() } return nil @@ -91,13 +78,16 @@ func (h *controller) Start(stopCh <-chan struct{}) error { return err } + klog.Infof("CableEngine HealthChecker started with PingInterval: %v, MaxPacketLossCount: %v", h.config.PingInterval, + h.config.MaxPacketLossCount) + return nil } func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool { klog.V(log.TRACE).Infof("Endpoint created: %#v", obj) endpointCreated := obj.(*submarinerv1.Endpoint) - if endpointCreated.Spec.ClusterID == h.clusterID { + if endpointCreated.Spec.ClusterID == h.config.ClusterID { return false } @@ -108,33 +98,38 @@ func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool { } if obj, found := h.pingers.Load(endpointCreated.Spec.CableName); found { - pinger := obj.(*pingerInfo) - if pinger.healthCheckIP == endpointCreated.Spec.HealthCheckIP { + pinger := obj.(PingerInterface) + if pinger.GetIP() == endpointCreated.Spec.HealthCheckIP { return false } klog.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", endpointCreated.Name) - pinger.stop() + pinger.Stop() h.pingers.Delete(endpointCreated.Spec.CableName) } - klog.V(log.TRACE).Infof("Starting Pinger for CableName: %q, with HealthCheckIP: %q", - endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP) - - pingInterval := DefaultPingInterval - if h.pingInterval != 0 { - pingInterval = time.Second * time.Duration(h.pingInterval) + pingInterval := defaultPingInterval + if h.config.PingInterval != 0 { + pingInterval = time.Second * time.Duration(h.config.PingInterval) } - maxPacketLossCount := DefaultMaxPacketLossCount + maxPacketLossCount := defaultMaxPacketLossCount + + if h.config.MaxPacketLossCount != 0 { + maxPacketLossCount = h.config.MaxPacketLossCount + } - if h.maxPacketLossCount != 0 { - maxPacketLossCount = h.maxPacketLossCount + newPingerFunc := h.config.NewPinger + if newPingerFunc == nil { + newPingerFunc = newPinger } - pinger := newPinger(endpointCreated.Spec.HealthCheckIP, pingInterval, maxPacketLossCount) + pinger := newPingerFunc(endpointCreated.Spec.HealthCheckIP, pingInterval, maxPacketLossCount) h.pingers.Store(endpointCreated.Spec.CableName, pinger) - pinger.start() + pinger.Start() + + klog.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q", + endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP) return false } @@ -146,8 +141,8 @@ func (h *controller) endpointDeleted(obj runtime.Object) bool { } if obj, found := h.pingers.Load(endpointDeleted.Spec.CableName); found { - pinger := obj.(*pingerInfo) - pinger.stop() + pinger := obj.(PingerInterface) + pinger.Stop() h.pingers.Delete(endpointDeleted.Spec.CableName) } diff --git a/pkg/cableengine/healthchecker/pinger.go b/pkg/cableengine/healthchecker/pinger.go index 30196244a..d0b45f175 100644 --- a/pkg/cableengine/healthchecker/pinger.go +++ b/pkg/cableengine/healthchecker/pinger.go @@ -2,25 +2,34 @@ package healthchecker import ( "fmt" + "strconv" "time" "github.com/go-ping/ping" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "k8s.io/klog" ) var waitTime = 15 * time.Second -var DefaultMaxPacketLossCount uint = 5 +var defaultMaxPacketLossCount uint = 5 // The RTT will be stored and will be used to calculate the statistics until // the size is reached. Once the size is reached the array will be reset and // the last elements will be added to the array for statistics. var size uint64 = 1000 -var DefaultPingInterval = 1 * time.Second +var defaultPingInterval = 1 * time.Second + +type PingerInterface interface { + Start() + Stop() + GetLatencyInfo() *LatencyInfo + GetIP() string +} type pingerInfo struct { - healthCheckIP string + ip string pingInterval time.Duration maxPacketLossCount uint statistics statistics @@ -28,9 +37,9 @@ type pingerInfo struct { stopCh chan struct{} } -func newPinger(healthCheckIP string, pingInterval time.Duration, maxPacketLossCount uint) *pingerInfo { +func newPinger(ip string, pingInterval time.Duration, maxPacketLossCount uint) PingerInterface { return &pingerInfo{ - healthCheckIP: healthCheckIP, + ip: ip, pingInterval: pingInterval, maxPacketLossCount: maxPacketLossCount, statistics: statistics{ @@ -41,7 +50,7 @@ func newPinger(healthCheckIP string, pingInterval time.Duration, maxPacketLossCo } } -func (p *pingerInfo) start() { +func (p *pingerInfo) Start() { go func() { for { select { @@ -52,17 +61,16 @@ func (p *pingerInfo) start() { } } }() - klog.Infof("CableEngine HealthChecker started pinger for IP %q", p.healthCheckIP) } -func (p *pingerInfo) stop() { +func (p *pingerInfo) Stop() { close(p.stopCh) } func (p *pingerInfo) sendPing() { - pinger, err := ping.NewPinger(p.healthCheckIP) + pinger, err := ping.NewPinger(p.ip) if err != nil { - klog.Errorf("Error creating pinger for IP %q: %v", p.healthCheckIP, err) + klog.Errorf("Error creating pinger for IP %q: %v", p.ip, err) return } @@ -73,7 +81,7 @@ func (p *pingerInfo) sendPing() { pinger.OnSend = func(packet *ping.Packet) { // Pinger will mark a connection as an error if the packet loss reaches the threshold if pinger.PacketsSent-pinger.PacketsRecv > int(p.maxPacketLossCount) { - p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.healthCheckIP) + p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.ip) pinger.PacketsSent = 0 pinger.PacketsRecv = 0 } @@ -86,6 +94,29 @@ func (p *pingerInfo) sendPing() { err = pinger.Run() if err != nil { - klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.healthCheckIP, err) + klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.ip, err) + } +} + +func (p *pingerInfo) GetIP() string { + return p.ip +} + +func (p *pingerInfo) GetLatencyInfo() *LatencyInfo { + lastTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.lastRtt, 10) + "ns") + minTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.minRtt, 10) + "ns") + averageTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.mean, 10) + "ns") + maxTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.maxRtt, 10) + "ns") + stdDevTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.stdDev, 10) + "ns") + + return &LatencyInfo{ + ConnectionError: p.failureMsg, + Spec: &submarinerv1.LatencyRTTSpec{ + Last: lastTime.String(), + Min: minTime.String(), + Average: averageTime.String(), + Max: maxTime.String(), + StdDev: stdDevTime.String(), + }, } } diff --git a/pkg/cableengine/syncer/syncer.go b/pkg/cableengine/syncer/syncer.go index 6cc9337c6..05539e558 100644 --- a/pkg/cableengine/syncer/syncer.go +++ b/pkg/cableengine/syncer/syncer.go @@ -218,6 +218,9 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway { connection.Status = v1.ConnectionError connection.StatusMessage = latencyInfo.ConnectionError } + } else if connection.Status == v1.ConnectionError && latencyInfo.ConnectionError == "" { + connection.Status = v1.Connected + connection.StatusMessage = "" } } } diff --git a/pkg/cableengine/syncer/syncer_test.go b/pkg/cableengine/syncer/syncer_test.go index d54ba008f..3261e7f2e 100644 --- a/pkg/cableengine/syncer/syncer_test.go +++ b/pkg/cableengine/syncer/syncer_test.go @@ -1,13 +1,16 @@ package syncer_test import ( - "fmt" + "reflect" "strconv" + "strings" "testing" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/onsi/gomega/format" + gomegaTypes "github.com/onsi/gomega/types" "github.com/pkg/errors" . "github.com/submariner-io/admiral/pkg/gomega" "github.com/submariner-io/admiral/pkg/syncer/test" @@ -15,14 +18,17 @@ import ( submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" fakeEngine "github.com/submariner-io/submariner/pkg/cableengine/fake" "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker/fake" "github.com/submariner-io/submariner/pkg/cableengine/syncer" fakeClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned/fake" fakeClientsetv1 "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1/fake" submarinerInformers "github.com/submariner-io/submariner/pkg/client/informers/externalversions" "github.com/submariner-io/submariner/pkg/types" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/submariner-io/submariner/pkg/util" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" kubeScheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" @@ -33,9 +39,9 @@ const ( namespace = "submariner" ) -var _ = Describe("", func() { +func init() { klog.InitFlags(nil) -}) +} var _ = BeforeSuite(func() { syncer.GatewayUpdateInterval = 200 * time.Millisecond @@ -46,6 +52,7 @@ var _ = Describe("", func() { Context("Gateway syncing", testGatewaySyncing) Context("Stale Gateway cleanup", testStaleGatewayCleanup) Context("Gateway sync errors", testGatewaySyncErrors) + Context("Gateway latency info", testGatewayLatencyInfo) }) func testGatewaySyncing() { @@ -133,7 +140,7 @@ func testStaleGatewayCleanup() { BeforeEach(func() { t = newTestDriver() staleGateway = &submarinerv1.Gateway{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "raiders", }, Status: submarinerv1.GatewayStatus{ @@ -285,11 +292,101 @@ func testGatewaySyncErrors() { }) } +func testGatewayLatencyInfo() { + var t *testDriver + + BeforeEach(func() { + t = newTestDriver() + }) + + JustBeforeEach(func() { + t.run() + }) + + AfterEach(func() { + t.stop() + }) + + When("the health checker provides latency info", func() { + It("should correctly update the Gateway Status information", func() { + t.awaitGatewayUpdated(t.expectedGateway) + + endpointSpec := &submarinerv1.EndpointSpec{ + ClusterID: "north", + CableName: "submariner-cable-north-192-68-1-20", + PrivateIP: "192-68-1-20", + HealthCheckIP: t.pinger.GetIP(), + } + + endpointName, err := util.GetEndpointCRDNameFromParams(endpointSpec.ClusterID, endpointSpec.CableName) + Expect(err).To(Succeed()) + + test.CreateResource(t.endpoints, &submarinerv1.Endpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: endpointName, + }, + Spec: *endpointSpec, + }) + + t.engine.Lock() + + t.expectedGateway.Status.HAStatus = submarinerv1.HAStatusActive + t.engine.HAStatus = t.expectedGateway.Status.HAStatus + + t.expectedGateway.Status.Connections = []submarinerv1.Connection{ + { + Status: submarinerv1.Connected, + Endpoint: *endpointSpec, + }, + } + + t.engine.Connections = []submarinerv1.Connection{t.expectedGateway.Status.Connections[0]} + + t.expectedGateway.Status.Connections[0].LatencyRTT = &submarinerv1.LatencyRTTSpec{ + Last: "93ms", + Min: "90ms", + Average: "95ms", + Max: "100ms", + StdDev: "94ms", + } + + t.pinger.SetLatencyInfo(&healthchecker.LatencyInfo{ + Spec: t.expectedGateway.Status.Connections[0].LatencyRTT, + }) + + t.engine.Unlock() + + t.awaitGatewayUpdated(t.expectedGateway) + + t.expectedGateway.Status.Connections[0].Status = submarinerv1.ConnectionError + t.expectedGateway.Status.Connections[0].StatusMessage = "Ping failed" + + t.pinger.SetLatencyInfo(&healthchecker.LatencyInfo{ + ConnectionError: t.expectedGateway.Status.Connections[0].StatusMessage, + Spec: t.expectedGateway.Status.Connections[0].LatencyRTT, + }) + + t.awaitGatewayUpdated(t.expectedGateway) + + t.expectedGateway.Status.Connections[0].Status = submarinerv1.Connected + t.expectedGateway.Status.Connections[0].StatusMessage = "" + + t.pinger.SetLatencyInfo(&healthchecker.LatencyInfo{ + Spec: t.expectedGateway.Status.Connections[0].LatencyRTT, + }) + + t.awaitGatewayUpdated(t.expectedGateway) + }) + }) +} + type testDriver struct { engine *fakeEngine.Engine gateways *fakeClientsetv1.FailingGateways syncer *syncer.GatewaySyncer healthChecker healthchecker.Interface + pinger *fake.Pinger + endpoints dynamic.ResourceInterface expectedGateway *submarinerv1.Gateway expectedDeletedAfter *submarinerv1.Gateway gatewayUpdated chan *submarinerv1.Gateway @@ -321,7 +418,7 @@ func newTestDriver() *testDriver { }} t.expectedGateway = &submarinerv1.Gateway{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: t.engine.LocalEndPoint.Spec.Hostname, }, Status: submarinerv1.GatewayStatus{ @@ -352,11 +449,26 @@ func (t *testDriver) run() { client := fakeClientset.NewSimpleClientset() t.gateways.GatewayInterface = client.SubmarinerV1().Gateways(namespace) - t.healthChecker, _ = healthchecker.New(&watcher.Config{ - RestMapper: restMapper, - Client: dynamicClient, - Scheme: scheme, - }, namespace, "west", 1, 15) + + t.pinger = fake.NewPinger("10.130.2.2") + + t.healthChecker, _ = healthchecker.New(&healthchecker.Config{ + WatcherConfig: &watcher.Config{ + RestMapper: restMapper, + Client: dynamicClient, + Scheme: scheme, + }, + EndpointNamespace: namespace, + ClusterID: t.engine.LocalEndPoint.Spec.ClusterID, + NewPinger: func(ip string, i time.Duration, m uint) healthchecker.PingerInterface { + defer GinkgoRecover() + Expect(ip).To(Equal(t.pinger.GetIP())) + return t.pinger + }, + }) + + t.endpoints = dynamicClient.Resource(*test.GetGroupVersionResourceFor(restMapper, &submarinerv1.Endpoint{})).Namespace(namespace) + t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version, t.healthChecker) informerFactory := submarinerInformers.NewSharedInformerFactory(client, 0) @@ -378,6 +490,8 @@ func (t *testDriver) run() { Expect(cache.WaitForCacheSync(t.stopInformer, informer.HasSynced)).To(BeTrue()) t.syncer.Run(t.stopSyncer) + + Expect(t.healthChecker.Start(t.stopSyncer)).To(Succeed()) } func (t *testDriver) stop() { @@ -392,7 +506,7 @@ func (t *testDriver) stop() { } func (t *testDriver) awaitGatewayUpdated(expected *submarinerv1.Gateway) { - t.awaitGateway(t.gatewayUpdated, fmt.Sprintf("Gateway was not received - %#v", expected), expected) + t.awaitGateway(t.gatewayUpdated, expected) } func (t *testDriver) awaitNoGatewayUpdated() { @@ -400,36 +514,66 @@ func (t *testDriver) awaitNoGatewayUpdated() { } func (t *testDriver) awaitGatewayDeleted(expected *submarinerv1.Gateway) { - t.awaitGateway(t.gatewayDeleted, fmt.Sprintf("Gateway was not deleted - %#v", expected), expected) + t.awaitGateway(t.gatewayDeleted, expected) } func (t *testDriver) awaitNoGatewayDeleted() { Consistently(t.gatewayDeleted, syncer.GatewayUpdateInterval+50).ShouldNot(Receive(), "Gateway was unexpectedly deleted") } -func (t *testDriver) awaitGateway(gatewayChan chan *submarinerv1.Gateway, msg string, expected *submarinerv1.Gateway) { - actual, err := func() (*submarinerv1.Gateway, error) { +func (t *testDriver) awaitGateway(gatewayChan chan *submarinerv1.Gateway, expected *submarinerv1.Gateway) { + var last *submarinerv1.Gateway + + Eventually(func() *submarinerv1.Gateway { select { case gw := <-gatewayChan: - return gw, nil - case <-time.After(5 * time.Second): - return nil, fmt.Errorf(msg) + last = gw + return gw + default: + return last } - }() + }, 5).Should(equalGateway(expected)) +} + +func TestSyncer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Cable engine syncer Suite") +} + +type equalGatewayMatcher struct { + expected *submarinerv1.Gateway +} + +func equalGateway(expected *submarinerv1.Gateway) gomegaTypes.GomegaMatcher { + return &equalGatewayMatcher{expected} +} - Expect(err).To(Succeed()) - Expect(actual.Name).To(Equal(expected.Name)) +func (m *equalGatewayMatcher) Match(x interface{}) (bool, error) { + actual := x.(*submarinerv1.Gateway) + if actual == nil { + return false, nil + } + + if actual.Name != m.expected.Name { + return false, nil + } + + if m.expected.Status.StatusFailure != "" { + if !strings.Contains(actual.Status.StatusFailure, m.expected.Status.StatusFailure) { + return false, nil + } - if expected.Status.StatusFailure != "" { - Expect(actual.Status.StatusFailure).To(ContainSubstring(expected.Status.StatusFailure)) actual = actual.DeepCopy() - actual.Status.StatusFailure = expected.Status.StatusFailure + actual.Status.StatusFailure = m.expected.Status.StatusFailure } - Expect(actual.Status).To(Equal(expected.Status)) + return reflect.DeepEqual(actual.Status, m.expected.Status), nil } -func TestSyncer(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Cable engine syncer Suite") +func (m *equalGatewayMatcher) FailureMessage(actual interface{}) string { + return format.Message(actual, "to equal", m.expected) +} + +func (m *equalGatewayMatcher) NegatedFailureMessage(actual interface{}) (message string) { + return format.Message(actual, "not to equal", m.expected) }