Skip to content

Commit

Permalink
Add DBaaS engine Kafka (#633)
Browse files Browse the repository at this point in the history
* add dbaas engine kafka

* comments on exp. struct

* run gofmt

* update topic & dbconn struct fields

* update naming, kafkasettings -> settings
  • Loading branch information
dwilsondo authored Aug 25, 2023
1 parent 12ce6ef commit 4052b67
Show file tree
Hide file tree
Showing 2 changed files with 596 additions and 7 deletions.
177 changes: 170 additions & 7 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
databaseOptionsPath = databaseBasePath + "/options"
databaseUpgradeMajorVersionPath = databaseBasePath + "/%s/upgrade"
databasePromoteReplicaToPrimaryPath = databaseReplicaPath + "/promote"
databaseTopicPath = databaseBasePath + "/%s/topics/%s"
databaseTopicsPath = databaseBasePath + "/%s/topics"
)

// SQL Mode constants allow for MySQL-specific SQL flavor configuration.
Expand Down Expand Up @@ -146,6 +148,11 @@ type DatabasesService interface {
UpdateMySQLConfig(context.Context, string, *MySQLConfig) (*Response, error)
ListOptions(todo context.Context) (*DatabaseOptions, *Response, error)
UpgradeMajorVersion(context.Context, string, *UpgradeVersionRequest) (*Response, error)
ListTopics(context.Context, string, *ListOptions) ([]DatabaseTopic, *Response, error)
CreateTopic(context.Context, string, *DatabaseCreateTopicRequest) (*DatabaseTopic, *Response, error)
GetTopic(context.Context, string, string) (*DatabaseTopic, *Response, error)
DeleteTopic(context.Context, string, string) (*Response, error)
UpdateTopic(context.Context, string, string, *DatabaseUpdateTopicRequest) (*Response, error)
}

// DatabasesServiceOp handles communication with the Databases related methods
Expand Down Expand Up @@ -188,21 +195,38 @@ type DatabaseCA struct {

// DatabaseConnection represents a database connection
type DatabaseConnection struct {
URI string `json:"uri,omitempty"`
Database string `json:"database,omitempty"`
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
SSL bool `json:"ssl,omitempty"`
Protocol string `json:"protocol"`
URI string `json:"uri,omitempty"`
Database string `json:"database,omitempty"`
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
SSL bool `json:"ssl,omitempty"`
ApplicationPorts map[string]uint32 `json:"application_ports,omitempty"`
}

// DatabaseUser represents a user in the database
type DatabaseUser struct {
Name string `json:"name,omitempty"`
Role string `json:"role,omitempty"`
Password string `json:"password,omitempty"`
AccessCert string `json:"access_cert,omitempty"`
AccessKey string `json:"access_key,omitempty"`
MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"`
Settings *DatabaseUserSettings `json:"settings,omitempty"`
}

// KafkaACL contains Kafka specific user access control information
type KafkaACL struct {
ID string `json:"id,omitempty"`
Permission string `json:"permission,omitempty"`
Topic string `json:"topic,omitempty"`
}

// DatabaseUserSettings contains Kafka-specific user settings
type DatabaseUserSettings struct {
ACL []*KafkaACL `json:"acl,omitempty"`
}

// DatabaseMySQLUserSettings contains MySQL-specific user settings
Expand Down Expand Up @@ -271,6 +295,56 @@ type DatabaseDB struct {
Name string `json:"name"`
}

// DatabaseTopic represents a Kafka topic
type DatabaseTopic struct {
Name string `json:"name"`
PartitionCount *uint32 `json:"partition_count,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
State string `json:"state,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
}

// TopicConfig represents all configurable options for a Kafka topic
type TopicConfig struct {
CleanupPolicy string `json:"cleanup_policy,omitempty"`
CompressionType string `json:"compression_type,omitempty"`
DeleteRetentionMS *uint64 `json:"delete_retention_ms,omitempty"`
FileDeleteDelayMS *uint64 `json:"file_delete_delay_ms,omitempty"`
FlushMessages *uint64 `json:"flush_messages,omitempty"`
FlushMS *uint64 `json:"flush_ms,omitempty"`
IndexIntervalBytes *uint64 `json:"index_interval_bytes,omitempty"`
MaxCompactionLagMS *uint64 `json:"max_compaction_lag_ms,omitempty"`
MaxMessageBytes *uint64 `json:"max_message_bytes,omitempty"`
MessageDownConversionEnable *bool `json:"message_down_conversion_enable,omitempty"`
MessageFormatVersion string `json:"message_format_version,omitempty"`
MessageTimestampDifferenceMaxMS *uint64 `json:"message_timestamp_difference_max_ms,omitempty"`
MessageTimestampType string `json:"message_timestamp_type,omitempty"`
MinCleanableDirtyRatio *float32 `json:"min_cleanable_dirty_ratio,omitempty"`
MinCompactionLagMS *uint64 `json:"min_compaction_lag_ms,omitempty"`
MinInsyncReplicas *uint32 `json:"min_insync_replicas,omitempty"`
Preallocate *bool `json:"preallocate,omitempty"`
RetentionBytes *int64 `json:"retention_bytes,omitempty"`
RetentionMS *int64 `json:"retention_ms,omitempty"`
SegmentBytes *uint64 `json:"segment_bytes,omitempty"`
SegmentIndexBytes *uint64 `json:"segment_index_bytes,omitempty"`
SegmentJitterMS *uint64 `json:"segment_jitter_ms,omitempty"`
SegmentMS *uint64 `json:"segment_ms,omitempty"`
UncleanLeaderElectionEnable *bool `json:"unclean_leader_election_enable,omitempty"`
}

// DatabaseCreateTopicRequest is used to create a new topic within a kafka cluster
type DatabaseCreateTopicRequest struct {
Name string `json:"name"`
PartitionCount *uint32 `json:"partition_count,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
}

// DatabaseUpdateTopicRequest ...
type DatabaseUpdateTopicRequest struct {
Topic *DatabaseTopic `json:"topic"` // note: `name` field in Topic unused on update
}

// DatabaseReplica represents a read-only replica of a particular database
type DatabaseReplica struct {
ID string `json:"id"`
Expand Down Expand Up @@ -316,11 +390,13 @@ type DatabaseUpdatePoolRequest struct {
type DatabaseCreateUserRequest struct {
Name string `json:"name"`
MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"`
Settings *DatabaseUserSettings `json:"settings,omitempty"`
}

// DatabaseResetUserAuthRequest is used to reset a users DB auth
type DatabaseResetUserAuthRequest struct {
MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"`
Settings *DatabaseUserSettings `json:"settings,omitempty"`
}

// DatabaseCreateDBRequest is used to create a new engine-specific database within the cluster
Expand Down Expand Up @@ -551,12 +627,21 @@ type databaseOptionsRoot struct {
Options *DatabaseOptions `json:"options"`
}

type databaseTopicRoot struct {
Topic *DatabaseTopic `json:"topic"`
}

type databaseTopicsRoot struct {
Topics []DatabaseTopic `json:"topics"`
}

// DatabaseOptions represents the available database engines
type DatabaseOptions struct {
MongoDBOptions DatabaseEngineOptions `json:"mongodb"`
MySQLOptions DatabaseEngineOptions `json:"mysql"`
PostgresSQLOptions DatabaseEngineOptions `json:"pg"`
RedisOptions DatabaseEngineOptions `json:"redis"`
KafkaOptions DatabaseEngineOptions `json:"kafka"`
}

// DatabaseEngineOptions represents the configuration options that are available for a given database engine
Expand Down Expand Up @@ -1257,3 +1342,81 @@ func (svc *DatabasesServiceOp) UpgradeMajorVersion(ctx context.Context, database

return resp, nil
}

// ListTopics returns all topics for a given kafka cluster
func (svc *DatabasesServiceOp) ListTopics(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseTopic, *Response, error) {
path := fmt.Sprintf(databaseTopicsPath, databaseID)
path, err := addOptions(path, opts)
if err != nil {
return nil, nil, err
}
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseTopicsRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Topics, resp, nil
}

// GetTopic returns a single kafka topic by name
func (svc *DatabasesServiceOp) GetTopic(ctx context.Context, databaseID, name string) (*DatabaseTopic, *Response, error) {
path := fmt.Sprintf(databaseTopicPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseTopicRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Topic, resp, nil
}

// CreateTopic will create a new kafka topic
func (svc *DatabasesServiceOp) CreateTopic(ctx context.Context, databaseID string, createTopic *DatabaseCreateTopicRequest) (*DatabaseTopic, *Response, error) {
path := fmt.Sprintf(databaseTopicsPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createTopic)
if err != nil {
return nil, nil, err
}
root := new(databaseTopicRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Topic, resp, nil
}

// UpdateTopic updates a single kafka topic
func (svc *DatabasesServiceOp) UpdateTopic(ctx context.Context, databaseID string, name string, updateTopic *DatabaseUpdateTopicRequest) (*Response, error) {
path := fmt.Sprintf(databaseTopicPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateTopic)
if err != nil {
return nil, err
}
root := new(databaseTopicRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return resp, err
}
return resp, nil
}

// DeleteTopic will delete an existing kafka topic
func (svc *DatabasesServiceOp) DeleteTopic(ctx context.Context, databaseID, name string) (*Response, error) {
path := fmt.Sprintf(databaseTopicPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}
Loading

0 comments on commit 4052b67

Please sign in to comment.