Skip to content

Commit

Permalink
switch from tkn client to k8s pod log retrieval; fix panic, add debug…
Browse files Browse the repository at this point in the history
…, to e2e_gcs_test

rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero committed Feb 20, 2024
1 parent 2592566 commit b340071
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 49 deletions.
133 changes: 95 additions & 38 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
package dynamic

import (
"bytes"
"context"
"fmt"
"io"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/fatih/color"
"github.com/jonboulle/clockwork"
"github.com/tektoncd/cli/pkg/cli"
tknlog "github.com/tektoncd/cli/pkg/log"
tknopts "github.com/tektoncd/cli/pkg/options"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/log"
"github.com/tektoncd/results/pkg/api/server/v1alpha2/record"
Expand Down Expand Up @@ -353,62 +358,114 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
return nil
}

func getPodLogs(ctx context.Context, client kubernetes.Interface, ns, pod, container string) ([]byte, error) {
podLogOpts := corev1.PodLogOptions{
Container: container,
}
req := client.CoreV1().Pods(ns).GetLogs(pod, &podLogOpts)
podLogs, err := req.Stream(ctx)
if err != nil {
return nil, err
}
defer podLogs.Close()

if err != nil {
msg := fmt.Sprintf("error getting logs for pod %s container %s: %s", pod, container, err.Error())
msgBytes := []byte(msg)
return msgBytes, nil
}
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
return buf.Bytes(), err
}

func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, logName string) error {
logger := logging.FromContext(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logsClient, err := r.resultsClient.UpdateLog(ctx)
if err != nil {
return fmt.Errorf("failed to create UpdateLog client: %w", err)
}

writer := logs.NewBufferedWriter(logsClient, logName, logs.DefaultBufferSize)

inMemWriteBufferStdout := bytes.NewBuffer(make([]byte, 0))

tknParams := &cli.TektonParams{}
tknParams.SetNamespace(o.GetNamespace())
// KLUGE: tkn reader.Read() will raise an error if a step in the TaskRun failed and there is no
// Err writer in the Stream object. This will result in some "error" messages being written to
// the log.

reader, err := tknlog.NewReader(logType, &tknopts.LogOptions{
AllSteps: true,
Params: tknParams,
PipelineRunName: o.GetName(),
TaskrunName: o.GetName(),
Stream: &cli.Stream{
Out: writer,
Err: writer,
},
})
k8sClient, err := tknParams.KubeClient()
if err != nil {
return fmt.Errorf("failed to create tkn reader: %w", err)
return err
}

labelType := pipeline.PipelineRunLabelKey
if logType == tknlog.LogTypeTask {
labelType = pipeline.TaskRunLabelKey
}
lo := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", labelType, o.GetName()),
}
logChan, errChan, err := reader.Read()
var pods *corev1.PodList
pods, err = k8sClient.CoreV1().Pods(o.GetNamespace()).List(ctx, lo)
if err != nil {
return fmt.Errorf("error reading from tkn reader: %w", err)
return err
}

errChanRepeater := make(chan error)
go func(echan <-chan error, o metav1.Object) {
writeErr := <-echan
errChanRepeater <- writeErr

_, err := writer.Flush()
if err != nil {
logger.Error(err)
}
if err = logsClient.CloseSend(); err != nil {
logger.Error(err)
for _, pod := range pods.Items {
containers := pod.Spec.InitContainers[:]
containers = append(containers, pod.Spec.Containers...)
for _, container := range containers {
ba, podLogsErr := getPodLogs(ctx, k8sClient, o.GetNamespace(), pod.Name, container.Name)
if podLogsErr != nil {
return podLogsErr
}
hdr := fmt.Sprintf("*** Logs for pod %s container %s ***\n", pod.Name, container.Name)
inMemWriteBufferStdout.Write([]byte(hdr))
inMemWriteBufferStdout.Write(ba)
}
}(errChan, o)
}

// errChanRepeater receives stderr from the TaskRun containers.
// This will be forwarded as combined output (stdout and stderr)
bufStdout := inMemWriteBufferStdout.Bytes()
cntStdout, writeStdOutErr := writer.Write(bufStdout)
if writeStdOutErr != nil {
logger.Warnw("streamLogs in mem bufStdout write err",
zap.String("error", writeStdOutErr.Error()),
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
return writeStdOutErr
}
if cntStdout != len(bufStdout) {
logger.Warnw("streamLogs bufStdout write len inconsistent",
zap.Int("in", len(bufStdout)),
zap.Int("out", cntStdout),
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)

}

flushCount, flushErr := writer.Flush()
logger.Warnw("flush ret count",
zap.String("name", o.GetName()),
zap.Int("flushCount", flushCount))
if flushErr != nil {
logger.Warnw("flush ret err",
zap.String("error", flushErr.Error()))
logger.Error(flushErr)
return flushErr
}
if closeErr := logsClient.CloseSend(); closeErr != nil {
logger.Warnw("CloseSend ret err",
zap.String("name", o.GetName()),
zap.String("error", closeErr.Error()))
logger.Error(closeErr)
return closeErr
}

tknlog.NewWriter(logType, true).Write(&cli.Stream{
Out: writer,
Err: writer,
}, logChan, errChanRepeater)
logger.Debugw("Exiting streamLogs",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)

return nil
}
93 changes: 82 additions & 11 deletions test/e2e/e2e_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package e2e

import (
"bytes"
"context"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"testing"

resultsv1alpha2 "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/genproto/googleapis/api/httpbody"

"strings"
"time"
Expand Down Expand Up @@ -75,6 +80,10 @@ func TestGCSLog(t *testing.T) {
}
return false, nil
}); err != nil {
t.Log("dumping watcher logs")
podLogs(t, "tekton-pipelines", "watcher")
t.Log("dumping api logs")
podLogs(t, "tekton-pipelines", "api")
t.Fatalf("Error waiting for PipelineRun creation: %v", err)
}
})
Expand All @@ -93,17 +102,79 @@ func TestGCSLog(t *testing.T) {
if logName == "" {
t.Skip("log name not found")
}
logClient, err := gc.GetLog(context.Background(), &resultsv1alpha2.GetLogRequest{Name: logName})
if err != nil {
t.Errorf("Error getting Log Client: %v", err)
}
log, err := logClient.Recv()
if err != nil {
t.Errorf("Error getting Log: %v", err)
}
want := "[hello : hello] hello world!"
if !strings.Contains(string(log.Data), want) {
t.Errorf("Log Data inconsistent got: %s, doesn't have: %s", string(log.Data), want)
if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (done bool, err error) {
logClient, err := gc.GetLog(context.Background(), &resultsv1alpha2.GetLogRequest{Name: logName})
if err != nil {
t.Logf("Error getting Log Client: %v", err)
return false, nil
}
var log *httpbody.HttpBody
var cerr error
log, cerr = logClient.Recv()
if cerr != nil {
t.Logf("Error getting Log for %s: %v", logName, cerr)
return false, nil
}
want := "hello world!"
if log == nil {
t.Logf("Nil return from logClient.Recv()")
return false, nil
}
if !strings.Contains(string(log.Data), want) {
t.Logf("Log Data inconsistent for %s got: %s, doesn't have: %s", logName, string(log.Data), want)
return false, nil
}
return true, nil

}); err != nil {
t.Log("dumping watcher logs")
podLogs(t, "tekton-pipelines", "watcher")
t.Log("dumping api logs")
podLogs(t, "tekton-pipelines", "api")
t.Fatalf("Error waiting for check log: %v", err)
}
})
}

func podLogs(t *testing.T, ns string, name string) {
t.Logf("getting pod logs for the pattern %s", name)
clientset := kubernetes.NewForConfigOrDie(clientConfig(t))
ctx := context.Background()
list, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
t.Errorf("pod list error %s", err)
}
for _, pod := range list.Items {
if strings.Contains(pod.Name, name) {
t.Logf("found pod %s matcher pattern %s", pod.Name, name)
for _, c := range pod.Spec.Containers {
containerLogs(t, ctx, ns, pod.Name, c.Name)
}
break
}
}
}

func containerLogs(t *testing.T, ctx context.Context, ns, podName, containerName string) {
podLogOpts := corev1.PodLogOptions{}
podLogOpts.Container = containerName
t.Logf("print container %s from pod %s:", containerName, podName)
clientset := kubernetes.NewForConfigOrDie(clientConfig(t))
req := clientset.CoreV1().Pods(ns).GetLogs(podName, &podLogOpts)
logs, err := req.Stream(ctx)
if err != nil {
t.Errorf("error streaming pod logs %s", err.Error())
return
}
defer logs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, logs)
if err != nil {
t.Errorf("error copying pod logs %s", err.Error())
return
}
str := buf.String()
t.Logf("%s", str)

}

0 comments on commit b340071

Please sign in to comment.