-
Notifications
You must be signed in to change notification settings - Fork 11
/
peer-finder.go
392 lines (337 loc) · 11.8 KB
/
peer-finder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// A small utility program to lookup hostnames of endpoints in a service.
package main
import (
"context"
"flag"
"fmt"
"net"
"os"
"os/exec"
"os/signal"
"regexp"
"strings"
"syscall"
"time"
"github.com/kballard/go-shellquote"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
)
const (
pollPeriod = 1 * time.Second
)
const (
defaultTimeout = 10 * time.Second
)
type AddressType string
const (
AddressTypeDNS AddressType = "DNS"
// Uses spec.podIP as address for db pods.
AddressTypeIP AddressType = "IP"
// Uses first IPv4 address from spec.podIP, spec.podIPs fields as address for db pods.
AddressTypeIPv4 AddressType = "IPv4"
// Uses first IPv6 address from spec.podIP, spec.podIPs fields as address for db pods.
AddressTypeIPv6 AddressType = "IPv6"
)
var (
kc kubernetes.Interface
controller *Controller
log = klogr.New().WithName("peer-finder") // nolint:staticcheck
)
var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
kubeconfigPath = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization information (the master location is set by the master flag).")
hostsFilePath = flag.String("hosts-file", "/etc/hosts", "Path to hosts file.")
onChange = flag.String("on-change", "", "Script to run on change, must accept a new line separated list of peers via stdin.")
onStart = flag.String("on-start", "", "Script to run on start, must accept a new line separated list of peers via stdin.")
addrType = flag.String("address-type", string(AddressTypeDNS), "Address type used to communicate with peers. Possible values: DNS, IP, IPv4, IPv6.")
svc = flag.String("service", "", "Governing service responsible for the DNS records of the domain this pod is in.")
namespace = flag.String("ns", "", "The namespace this pod is running in. If unspecified, the POD_NAMESPACE env var is used.")
domain = flag.String("domain", "", "The Cluster Domain which is used by the Cluster, if not set tries to determine it from /etc/resolv.conf file.")
selector = flag.String("selector", "", "The selector is used to select the pods whose ip will use to form peers.")
)
func lookupDNS(svcName string) (sets.Set[string], error) {
endpoints := sets.New[string]()
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
_, srvRecords, err := net.DefaultResolver.LookupSRV(ctx, "", "", svcName)
if err != nil {
return endpoints, fmt.Errorf("DNS lookup failed for service %s: %w", svcName, err)
}
for _, srvRecord := range srvRecords {
// The SRV records ends in a "." for the root domain
// Trim the trailing dot
ep := strings.TrimSuffix(srvRecord.Target, ".")
endpoints.Insert(ep)
}
if endpoints.Len() == 0 {
return endpoints, fmt.Errorf("no endpoints found for service %s", svcName)
}
return endpoints, nil
}
func lookupHostIPs(hostName string) (sets.Set[string], error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
ips := sets.New[string]()
hostIPs, err := net.DefaultResolver.LookupIP(ctx, "ip", hostName)
if err != nil {
return nil, fmt.Errorf("IP lookup failed for host %s: %w", hostName, err)
}
for _, hostIP := range hostIPs {
ips.Insert(hostIP.String())
}
if ips.Len() == 0 {
return nil, fmt.Errorf("no valid IP addresses found for host %s", hostName)
}
return ips, nil
}
func shellOut(script string, peers, hostIPs sets.Set[string], fqHostname string) error {
// add extra newline at the end to ensure end of line for bash read command
sendStdin := strings.Join(sets.List(peers), "\n") + "\n"
fields, err := shellquote.Split(script)
if err != nil {
return err
}
if len(fields) == 0 {
return fmt.Errorf("missing command: %s", script)
}
log.Info("exec", "command", fields[0], "stdin", sendStdin)
cmd := exec.Command(fields[0], fields[1:]...)
cmd.Stdin = strings.NewReader(sendStdin)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
info, err := retrieveHostInfo(fqHostname, hostIPs, peers)
if err != nil {
return err
}
envs := sets.NewString(os.Environ()...)
envs.Insert("HOST_ADDRESS=" + info.HostAddr) // FQDN, IPv4, IPv6
envs.Insert("HOST_ADDRESS_TYPE=" + string(info.HostAddrType)) // DNS, IPv4, IPv6
// WARNING: Potentially overwrites the POD_IP from container env before passing to script in case of IPv4 or IPv6 in a dual stack cluster
envs.Insert("POD_IP=" + info.PodIP) // used for whitelist
envs.Insert("POD_IP_TYPE=" + string(info.PodIPType)) // IPv4, IPv6
cmd.Env = envs.List()
err = cmd.Run()
if err != nil {
return fmt.Errorf("execution failed of script=%s. reason:%v", script, err)
}
return nil
}
type HostInfo struct {
// FQDN, IPv4, IPv6
HostAddr string
// DNS, IPv4, IPv6
HostAddrType AddressType
// used for allowlist
// WARNING: Potentially overwrites the POD_IP from container env before passing to script in case of IPv4 or IPv6 in a dual stack cluster
PodIP string
// IPv4 or IPv6
PodIPType AddressType
}
func retrieveHostInfo(fqHostname string, hostIPs, peers sets.Set[string]) (*HostInfo, error) {
var info HostInfo
var err error
switch AddressType(*addrType) {
case AddressTypeDNS:
info.HostAddr = fqHostname
info.HostAddrType = AddressTypeDNS
info.PodIP = os.Getenv("POD_IP") // set using Downward API
info.PodIPType, err = IPType(info.PodIP)
if err != nil {
return nil, err
}
case AddressTypeIP:
hostAddrs := sets.List(peers.Intersection(hostIPs))
if len(hostAddrs) == 0 {
return nil, fmt.Errorf("none of the hostIPs %q found in peers %q", strings.Join(sets.List(hostIPs), ","), strings.Join(sets.List(peers), ","))
}
info.HostAddr = hostAddrs[0]
info.HostAddrType, err = IPType(info.HostAddr)
if err != nil {
return nil, err
}
info.PodIP = info.HostAddr
info.PodIPType = info.HostAddrType
case AddressTypeIPv4:
hostAddrs := sets.List(peers.Intersection(hostIPs))
if len(hostAddrs) == 0 {
return nil, fmt.Errorf("none of the hostIPs %q found in peers %q", strings.Join(sets.List(hostIPs), ","), strings.Join(sets.List(peers), ","))
}
info.HostAddr = hostAddrs[0]
info.HostAddrType = AddressTypeIPv4
info.PodIP = info.HostAddr
info.PodIPType = info.HostAddrType
case AddressTypeIPv6:
hostAddrs := sets.List(peers.Intersection(hostIPs))
if len(hostAddrs) == 0 {
return nil, fmt.Errorf("none of the hostIPs %q found in peers %q", strings.Join(sets.List(hostIPs), ","), strings.Join(sets.List(peers), ","))
}
info.HostAddr = hostAddrs[0]
info.HostAddrType = AddressTypeIPv6
info.PodIP = info.HostAddr
info.PodIPType = info.HostAddrType
}
return &info, nil
}
func IPType(s string) (AddressType, error) {
ip := net.ParseIP(s)
if ip == nil {
return "", fmt.Errorf("%s is not a valid IP address", s)
}
if strings.ContainsRune(s, ':') {
return AddressTypeIPv6, nil
}
return AddressTypeIPv4, nil
}
func forwardSigterm() <-chan struct{} {
shutdownHandler := make(chan os.Signal, 1)
ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, syscall.SIGTERM)
go func() {
<-shutdownHandler
pgid, err := syscall.Getpgid(os.Getpid())
if err != nil {
log.Error(err, "failed to retrieve pgid for process", "pid", os.Getpid())
} else {
log.Info("sending SIGTERM", "pgid", pgid)
err = syscall.Kill(-pgid, syscall.SIGTERM)
if err != nil {
log.Error(err, "failed to send SIGTERM", "pgid", pgid)
}
}
cancel()
fmt.Println("waiting for all child process to complete for SIGTERM")
<-shutdownHandler
}()
return ctx.Done()
}
func main() {
klog.InitFlags(nil)
_ = flag.Set("v", "3")
flag.Parse()
stopCh := forwardSigterm()
// TODO: Exit if there's no on-change?
if err := run(stopCh); err != nil {
log.Error(err, "peer finder exiting.")
}
klog.Flush()
log.Info("Block until Kubernetes sends the signal SIGKILL .")
select {}
}
func run(stopCh <-chan struct{}) error {
var domainName string
ns := *namespace
if ns == "" {
ns = os.Getenv("POD_NAMESPACE")
}
hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to get hostname: %s", err)
}
// If domain is not provided, try to get it from resolv.conf
if *domain == "" {
resolvConfBytes, err := os.ReadFile("/etc/resolv.conf")
if err != nil {
return fmt.Errorf("unable to read /etc/resolv.conf")
}
resolvConf := string(resolvConfBytes)
var re *regexp.Regexp
if ns == "" {
// Looking for a domain that looks like with *.svc.**
re, err = regexp.Compile(`\A(.*\n)*search\s{1,}(.*\s{1,})*(?P<goal>[a-zA-Z0-9-]{1,63}.svc.([a-zA-Z0-9-]{1,63}\.)*[a-zA-Z0-9]{2,63})`)
} else {
// Looking for a domain that looks like svc.**
re, err = regexp.Compile(`\A(.*\n)*search\s{1,}(.*\s{1,})*(?P<goal>svc.([a-zA-Z0-9-]{1,63}\.)*[a-zA-Z0-9]{2,63})`)
}
if err != nil {
return fmt.Errorf("failed to create regular expression: %v", err)
}
groupNames := re.SubexpNames()
result := re.FindStringSubmatch(resolvConf)
for k, v := range result {
if groupNames[k] == "goal" {
if ns == "" {
// Domain is complete if ns is empty
domainName = v
} else {
// Need to convert svc.** into ns.svc.**
domainName = ns + "." + v
}
break
}
}
log.Info("determined", "domain", domainName)
} else {
domainName = strings.Join([]string{ns, "svc", *domain}, ".")
}
if (*selector == "" && *svc == "") || domainName == "" || (*onChange == "" && *onStart == "") {
return fmt.Errorf("incomplete arguments, require -on-change and/or -on-start, -service and -ns or an environment variable named POD_NAMESPACE")
}
if *selector != "" {
config, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfigPath)
if err != nil {
return fmt.Errorf("could not get Kubernetes config: %s", err)
}
kc, err = kubernetes.NewForConfig(config)
if err != nil {
return err
}
RunHostAliasSyncer(kc, ns, *selector, *addrType, stopCh)
}
myName := strings.Join([]string{hostname, *svc, domainName}, ".")
hostIPs, err := lookupHostIPs(hostname)
if err != nil {
return fmt.Errorf("failed to get IP addresses from host %v", err)
}
script := *onStart
if script == "" {
script = *onChange
log.Info(fmt.Sprintf("no on-start supplied, on-change %q will be applied on start.", script))
}
for peers := sets.New[string](); script != ""; time.Sleep(pollPeriod) {
var newPeers sets.Set[string]
if *selector != "" {
newPeers, err = controller.listPodsIP()
if err != nil {
return err
}
if newPeers.Equal(peers) || !newPeers.HasAny(hostIPs.UnsortedList()...) {
log.Info("have not found myself in list yet.", "hostname", myName, "hosts in list", strings.Join(sets.List(newPeers), ", "))
continue
}
} else {
newPeers, err = lookupDNS(*svc)
if err != nil {
log.Info(err.Error())
continue
}
if newPeers.Equal(peers) || !newPeers.Has(myName) {
log.Info("have not found myself in list yet.", "hostname", myName, "hosts in list", strings.Join(sets.List(newPeers), ", "))
continue
}
}
log.Info("peer list updated", "was", sets.List(peers), "now", sets.List(newPeers))
// add extra newline at the end to ensure end of line for bash read command
err = shellOut(script, newPeers, hostIPs, myName)
if err != nil {
return err
}
peers = newPeers
script = *onChange
}
return nil
}