Skip to content

Commit

Permalink
feat: move runner scaling to provisioner
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 27, 2024
1 parent 59f3dbc commit 1edf244
Show file tree
Hide file tree
Showing 71 changed files with 2,031 additions and 1,144 deletions.
94 changes: 50 additions & 44 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/leases/dbleaser"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
Expand Down Expand Up @@ -120,7 +119,6 @@ func (c *Config) OpenDBAndInstrument() (*sql.DB, error) {
func Start(
ctx context.Context,
config Config,
runnerScaling scaling.RunnerScaling,
cm *cf.Manager[configuration.Configuration],
sm *cf.Manager[configuration.Secrets],
conn *sql.DB,
Expand All @@ -146,7 +144,7 @@ func Start(
logger.Infof("Web console available at: %s", config.Bind)
}

svc, err := New(ctx, conn, cm, sm, config, devel, runnerScaling)
svc, err := New(ctx, conn, cm, sm, config, devel)
if err != nil {
return err
}
Expand All @@ -170,9 +168,6 @@ func Start(
rpc.PProf(),
)
})
g.Go(func() error {
return runnerScaling.Start(ctx, *config.Bind, svc.dbleaser)
})

go svc.dal.PollDeployments(ctx)

Expand Down Expand Up @@ -220,7 +215,6 @@ type Service struct {

increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex
runnerScaling scaling.RunnerScaling

clientLock sync.Mutex
}
Expand All @@ -232,7 +226,6 @@ func New(
sm *cf.Manager[configuration.Secrets],
config Config,
devel bool,
runnerScaling scaling.RunnerScaling,
) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
Expand Down Expand Up @@ -264,7 +257,6 @@ func New(
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
config: config,
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

Expand Down Expand Up @@ -506,15 +498,27 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U

logger := s.getDeploymentLogger(ctx, deploymentKey)
logger.Debugf("Update deployment for: %s", deploymentKey)

err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(req.Msg.MinReplicas))
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
if req.Msg.MinReplicas != nil {
err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas))
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}
}
if req.Msg.Endpoint != nil {
err = s.dal.SetDeploymentEndpoint(ctx, deploymentKey, *req.Msg.Endpoint)
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
logger.Errorf(err, "Could not set deployment endpoint: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment endpoint: %w", err)
}
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}

return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
Expand Down Expand Up @@ -1658,38 +1662,40 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
// And we set its replicas to zero
// It may seem a bit odd to do this here but this is where we are actually updating the routing table
// Which is what makes as a deployment 'live' from a clients POV
optURI, err := s.runnerScaling.GetEndpointForDeployment(ctx, v.Module, v.Key.String())
if err != nil {
if v.Schema.Runtime == nil {
deploymentLogger.Debugf("Deployment %s has no runtime metadata", v.Key.String())
continue
}
targetEndpoint, ok := v.Endpoint.Get()
if !ok {
deploymentLogger.Debugf("Failed to get updated endpoint for deployment %s", v.Key.String())
continue
} else if uri, ok := optURI.Get(); ok {
// Check if this is a new route
targetEndpoint := uri.String()
if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() {
// If it is a new route we only add it if we can ping it
// Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready.
_, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err != nil {
deploymentLogger.Tracef("Unable to ping %s, not adding to route table", v.Key.String())
continue
}
deploymentLogger.Infof("Deployed %s", v.Key.String())
status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed)
}
// Check if this is a new route
if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() {
// If it is a new route we only add it if we can ping it
// Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready.
_, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err != nil {
deploymentLogger.Tracef("Unable to ping %s, not adding to route table", v.Key.String())
continue
}
if prev, ok := newRoutes[v.Module]; ok {
// We have already seen a route for this module, the existing route must be an old one
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment)
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
deploymentLogger.Infof("Deployed %s", v.Key.String())
status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed)
}
if prev, ok := newRoutes[v.Module]; ok {
// We have already seen a route for this module, the existing route must be an old one
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment)
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}

orderedModules := maps.Values(modulesByName)
Expand Down
17 changes: 17 additions & 0 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,22 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return nil
}

// SetDeploymentEndpoint sets the deployment endpoint
func (d *DAL) SetDeploymentEndpoint(ctx context.Context, key model.DeploymentKey, endpoint string) (err error) {
// Start the transaction
tx, err := d.Begin(ctx)
if err != nil {
return libdal.TranslatePGError(err)
}
defer tx.CommitOrRollback(ctx, &err)

err = tx.db.SetDeploymentEndpoint(ctx, key, optional.Some(endpoint))
if err != nil {
return libdal.TranslatePGError(err)
}
return nil
}

var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active")

// ReplaceDeployment replaces an old deployment of a module with a new deployment.
Expand Down Expand Up @@ -520,6 +536,7 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment,
Replicas: optional.Some(int(in.Replicas)),
Schema: in.Deployment.Schema,
CreatedAt: in.Deployment.CreatedAt,
Endpoint: in.Deployment.Endpoint,
}
}), nil
}
Expand Down
1 change: 1 addition & 0 deletions backend/controller/dal/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ FROM runners r
ORDER BY r.key;

-- name: GetActiveDeployments :many
SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas
SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas, d.endpoint
FROM deployments d
JOIN modules m ON d.module_id = m.id
LEFT JOIN runners r ON d.id = r.deployment_id
Expand Down Expand Up @@ -138,6 +138,12 @@ SET min_replicas = $2, last_activated_at = CASE WHEN min_replicas = 0 THEN (NOW(
WHERE key = sqlc.arg('key')::deployment_key
RETURNING 1;

-- name: SetDeploymentEndpoint :exec
UPDATE deployments
SET endpoint = $2
WHERE key = sqlc.arg('key')::deployment_key
RETURNING 1;

-- name: GetExistingDeploymentForModule :one
SELECT *
FROM deployments d
Expand Down
32 changes: 26 additions & 6 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/dal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Deployment struct {
Schema *schema.Module
CreatedAt time.Time
Labels model.Labels
Endpoint optional.Option[string]
}

func (d Deployment) String() string { return d.Key.String() }
Expand Down
Loading

0 comments on commit 1edf244

Please sign in to comment.