Skip to content
This repository has been archived by the owner on Mar 17, 2021. It is now read-only.

Commit

Permalink
account for build containers
Browse files Browse the repository at this point in the history
fixes concourse/concourse#5901

Currently there's some odd behaviour for one-off builds -- they appear as
`<team name>///<build name>/<step name>`, but this will be handled in a later
issue.

While working on this, I noticed some blank entries in the output and realized
these corresponded to image check and image pull containers -- these are for
finding and fetching the `image_resource` on a task. I've written a separate
issue to add accounting for these as well.

There is clearly some conceptual duplication between the
`insertResourceWorkloads` and `insertBuildWorkloads` functions -- I can imagine
adding more such functions for the other container types I mentioned above, so
it's probably worth formalizing this as an interface. Instinctively it feels
a Strategy pattern, and I'd like to have a good name for the way these
strategies get aggregated as well. My other thought was to make multiple
Accountant implementations with some means of composing them -- like a monoid
structure on Accountants. Maybe this would look like a `CompositeAccountant`.

There is also some not-so-pretty duplication between the big test cases, and
despite my best efforts at isolation, they are already reaching a 10-second
running time. I assume most of that time is spent waiting for asynchronous
processes in the setup -- I wonder if there is a good way to profile each test
case.

Signed-off-by: Jamie Klassen <[email protected]>
  • Loading branch information
Jamie Klassen committed Jul 22, 2020
1 parent 3a6aa0b commit 4802d1f
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 23 deletions.
130 changes: 117 additions & 13 deletions accounts/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,37 @@ func (da *DBAccountant) Account(containers []Container) ([]Sample, error) {
handles = append(handles, container.Handle)
}

dbConn, err := sql.Open("postgres", da.postgresConfig.ConnectionString())
conn, err := sql.Open("postgres", da.postgresConfig.ConnectionString())
if err != nil {
return nil, err
}
defer dbConn.Close()
defer conn.Close()

workloads := map[string][]Workload{}
err = insertResourceWorkloads(workloads, conn, handles)
if err != nil {
return nil, err
}
err = insertBuildWorkloads(workloads, conn, handles)
if err != nil {
return nil, err
}

samples := []Sample{}
for _, c := range containers {
samples = append(
samples,
Sample{Container: c, Workloads: workloads[c.Handle]},
)
}
return samples, nil
}

func insertResourceWorkloads(
workloads map[string][]Workload,
conn *sql.DB,
handles []string,
) error {
rows, err := sq.StatementBuilder.
PlaceholderFormat(sq.Dollar).
Select("c.handle", "r.name", "p.name", "t.name").
Expand All @@ -38,27 +63,29 @@ func (da *DBAccountant) Account(containers []Container) ([]Sample, error) {
Join("pipelines p on r.pipeline_id = p.id").
Join("teams t on p.team_id = t.id").
Where(sq.Eq{"c.handle": handles}).
RunWith(dbConn).
RunWith(conn).
Query()
if err != nil {
return nil, err
return err
}

workloadMap := map[string][]Workload{}

defer db.Close(rows)
for rows.Next() {
resource := ResourceWorkload{}
var handle string
rows.Scan(&handle, &resource.resourceName, &resource.pipelineName, &resource.teamName)
workloadMap[handle] = append(workloadMap[handle], resource)
err = rows.Scan(
&handle,
&resource.resourceName,
&resource.pipelineName,
&resource.teamName,
)
if err != nil {
return err
}
workloads[handle] = append(workloads[handle], &resource)
}

samples := []Sample{}
for _, c := range containers {
samples = append(samples, Sample{Container: c, Workloads: workloadMap[c.Handle]})
}
return samples, nil
return nil
}

type ResourceWorkload struct {
Expand All @@ -70,3 +97,80 @@ type ResourceWorkload struct {
func (rw ResourceWorkload) ToString() string {
return fmt.Sprintf("%s/%s/%s", rw.teamName, rw.pipelineName, rw.resourceName)
}

func insertBuildWorkloads(
workloads map[string][]Workload,
conn *sql.DB,
handles []string,
) error {
rows, err := sq.StatementBuilder.
PlaceholderFormat(sq.Dollar).
Select(
"c.handle",
"t.name",
"c.meta_type",
"c.meta_step_name",
"c.meta_attempt",
"c.meta_working_directory",
"c.meta_process_user",
"c.meta_pipeline_id",
"c.meta_job_id",
"c.meta_build_id",
"c.meta_pipeline_name",
"c.meta_job_name",
"c.meta_build_name",
).
From("containers c").
Join("teams t ON c.team_id = t.id").
Where(sq.And{
sq.Eq{"c.handle": handles},
sq.NotEq{"c.meta_type": db.ContainerTypeCheck},
}).
RunWith(conn).
Query()
if err != nil {
return err
}

defer db.Close(rows)
for rows.Next() {
var metadata db.ContainerMetadata
var handle, teamName string
columns := append(
[]interface{}{&handle, &teamName},
metadata.ScanTargets()...,
)
err = rows.Scan(columns...)
if err != nil {
return err
}
workloads[handle] = append(workloads[handle], &BuildWorkload{
teamName: teamName,
pipelineName: metadata.PipelineName,
jobName: metadata.JobName,
buildName: metadata.BuildName,
stepName: metadata.StepName,
})
}

return nil
}

type BuildWorkload struct {
teamName string
pipelineName string
jobName string
buildName string
stepName string
}

func (bw BuildWorkload) ToString() string {
return fmt.Sprintf(
"%s/%s/%s/%s/%s",
bw.teamName,
bw.pipelineName,
bw.jobName,
bw.buildName,
bw.stepName,
)
}
141 changes: 131 additions & 10 deletions accounts/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (

"code.cloudfoundry.org/garden"
"code.cloudfoundry.org/garden/gardenfakes"
"code.cloudfoundry.org/lager/lagerctx"
"code.cloudfoundry.org/lager/lagertest"
"github.com/concourse/baggageclaim"
"github.com/concourse/baggageclaim/baggageclaimfakes"
"github.com/concourse/concourse/atc"
"github.com/concourse/concourse/atc/builds"
"github.com/concourse/concourse/atc/compression"
"github.com/concourse/concourse/atc/creds/credsfakes"
"github.com/concourse/concourse/atc/db"
Expand All @@ -24,13 +26,16 @@ import (
"github.com/concourse/concourse/atc/lidar"
"github.com/concourse/concourse/atc/metric"
"github.com/concourse/concourse/atc/resource"
"github.com/concourse/concourse/atc/scheduler"
"github.com/concourse/concourse/atc/scheduler/algorithm"
"github.com/concourse/concourse/atc/worker"
"github.com/concourse/concourse/atc/worker/gclient"
"github.com/concourse/concourse/atc/worker/gclient/gclientfakes"
"github.com/concourse/flag"
"github.com/concourse/ctop/accounts"
"github.com/concourse/flag"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
gocache "github.com/patrickmn/go-cache"
)

var _ = Describe("DBAccountant", func() {
Expand Down Expand Up @@ -113,8 +118,15 @@ var _ = Describe("DBAccountant", func() {
dropTestDB()
})

registerWorker := func(w atc.Worker) {
workerFactory.SaveWorker(w, 10*time.Second)
registerWorker := func() {
workerFactory.SaveWorker(atc.Worker{
Platform: "linux",
Version: "0.0.0-dev",
Name: "worker",
ResourceTypes: []atc.WorkerResourceType{{
Type: "git",
}},
}, 10*time.Second)
}

createResources := func(rs atc.ResourceConfigs) {
Expand Down Expand Up @@ -239,13 +251,7 @@ var _ = Describe("DBAccountant", func() {
It("accounts for resource check containers", func() {
atc.EnableGlobalResources = true
// register a worker with "git" resource type
registerWorker(atc.Worker{
Version: "0.0.0-dev",
Name: "worker",
ResourceTypes: []atc.WorkerResourceType{{
Type: "git",
}},
})
registerWorker()
resources := atc.ResourceConfigs{
{
Name: "r",
Expand Down Expand Up @@ -282,4 +288,119 @@ var _ = Describe("DBAccountant", func() {
}
Expect(workloadStrings).To(ContainElements("main/p/r", "main/p/s"))
})

createJob := func(jobConfig atc.JobConfig) db.Job {
pipeline, _, err := team.SavePipeline(
"p",
atc.Config{
Jobs: atc.JobConfigs{
jobConfig,
},
},
0,
false,
)
Expect(err).NotTo(HaveOccurred())
job, _, err := pipeline.Job(jobConfig.Name)
Expect(err).NotTo(HaveOccurred())
return job
}

It("accounts for job build containers", func() {
// register a worker with "git" resource type
registerWorker()
job := createJob(atc.JobConfig{
Name: "some-job",
PlanSequence: []atc.Step{
{Config: &atc.TaskStep{
Name: "task",
Config: &atc.TaskConfig{
Platform: "linux",
Run: atc.TaskRunConfig{
Path: "foo",
},
},
}},
},
})
job.CreateBuild()
alg := algorithm.New(
db.NewVersionsDB(
dbConn,
100,
gocache.New(10*time.Second, 10*time.Second),
),
)

scheduler.NewRunner(
lagertest.NewTestLogger("scheduler"),
db.NewJobFactory(dbConn, lockFactory),
&scheduler.Scheduler{
Algorithm: alg,
BuildStarter: scheduler.NewBuildStarter(
builds.NewPlanner(
atc.NewPlanFactory(time.Now().Unix()),
),
alg),
},
32,
).Run(context.TODO())

fakeGClient := new(gclientfakes.FakeClient)
fakeGClientContainer := new(gclientfakes.FakeContainer)
fakeGClientContainer.RunStub = func(ctx context.Context, ps garden.ProcessSpec, pi garden.ProcessIO) (garden.Process, error) {
fakeProcess := new(gardenfakes.FakeProcess)
fakeProcess.WaitStub = func() (int, error) {
io.WriteString(pi.Stdout, "[]")
return 0, nil
}
return fakeProcess, nil
}
fakeGClient.CreateReturns(fakeGClientContainer, nil)
fakeBaggageclaimClient := new(baggageclaimfakes.FakeClient)
fakeBaggageclaimVolume := new(baggageclaimfakes.FakeVolume)
fakeBaggageclaimVolume.PathReturns("/path/to/fake/volume")
fakeBaggageclaimClient.LookupVolumeReturns(fakeBaggageclaimVolume, true, nil)

dbBuildFactory := db.NewBuildFactory(
dbConn,
lockFactory,
5*time.Minute,
120*time.Hour,
)
Eventually(dbBuildFactory.GetAllStartedBuilds).ShouldNot(BeEmpty())
builds.NewTracker(
dbBuildFactory,
testEngine(fakeGClient, fakeBaggageclaimClient),
).Run(
lagerctx.NewContext(
context.TODO(),
lagertest.NewTestLogger("build-tracker"),
),
)

// TODO wait for build to complete?

accountant := accounts.NewDBAccountant(flag.PostgresConfig{
Host: dbHost(),
Port: 5432,
User: "postgres",
Password: "password",
Database: testDBName(),
SSLMode: "disable",
})
Eventually(team.Containers).ShouldNot(BeEmpty())
containers := []accounts.Container{}
dbContainers, _ := team.Containers()
for _, container := range dbContainers {
containers = append(containers, accounts.Container{Handle: container.Handle()})
}
samples, err := accountant.Account(containers)
Expect(err).NotTo(HaveOccurred())
workloadStrings := []string{}
for _, workload := range samples[0].Workloads {
workloadStrings = append(workloadStrings, workload.ToString())
}
Expect(workloadStrings).To(ConsistOf("main/p/some-job/1/task"))
})
})
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: '3'
services:
db:
image: postgres:11-alpine
ports: ["5432:5432"]
environment:
POSTGRES_PASSWORD: password
tests:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/onsi/ginkgo v1.13.0
github.com/onsi/gomega v1.10.1
github.com/papertrail/remote_syslog2 v0.0.0-20190614180052-09062fc2b02a // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
)

0 comments on commit 4802d1f

Please sign in to comment.