diff --git a/databases.go b/databases.go index a0240735..fe0bac78 100644 --- a/databases.go +++ b/databases.go @@ -32,6 +32,8 @@ const ( databaseOptionsPath = databaseBasePath + "/options" databaseUpgradeMajorVersionPath = databaseBasePath + "/%s/upgrade" databasePromoteReplicaToPrimaryPath = databaseReplicaPath + "/promote" + databaseTopicPath = databaseBasePath + "/%s/topics/%s" + databaseTopicsPath = databaseBasePath + "/%s/topics" ) // SQL Mode constants allow for MySQL-specific SQL flavor configuration. @@ -146,6 +148,11 @@ type DatabasesService interface { UpdateMySQLConfig(context.Context, string, *MySQLConfig) (*Response, error) ListOptions(todo context.Context) (*DatabaseOptions, *Response, error) UpgradeMajorVersion(context.Context, string, *UpgradeVersionRequest) (*Response, error) + ListTopics(context.Context, string, *ListOptions) ([]DatabaseTopic, *Response, error) + CreateTopic(context.Context, string, *DatabaseCreateTopicRequest) (*DatabaseTopic, *Response, error) + GetTopic(context.Context, string, string) (*DatabaseTopic, *Response, error) + DeleteTopic(context.Context, string, string) (*Response, error) + UpdateTopic(context.Context, string, string, *DatabaseUpdateTopicRequest) (*Response, error) } // DatabasesServiceOp handles communication with the Databases related methods @@ -188,13 +195,15 @@ type DatabaseCA struct { // DatabaseConnection represents a database connection type DatabaseConnection struct { - URI string `json:"uri,omitempty"` - Database string `json:"database,omitempty"` - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - SSL bool `json:"ssl,omitempty"` + Protocol string `json:"protocol"` + URI string `json:"uri,omitempty"` + Database string `json:"database,omitempty"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + SSL bool `json:"ssl,omitempty"` + ApplicationPorts map[string]uint32 `json:"application_ports,omitempty"` } // DatabaseUser represents a user in the database @@ -202,7 +211,22 @@ type DatabaseUser struct { Name string `json:"name,omitempty"` Role string `json:"role,omitempty"` Password string `json:"password,omitempty"` + AccessCert string `json:"access_cert,omitempty"` + AccessKey string `json:"access_key,omitempty"` MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"` + Settings *DatabaseUserSettings `json:"settings,omitempty"` +} + +// KafkaACL contains Kafka specific user access control information +type KafkaACL struct { + ID string `json:"id,omitempty"` + Permission string `json:"permission,omitempty"` + Topic string `json:"topic,omitempty"` +} + +// DatabaseUserSettings contains Kafka-specific user settings +type DatabaseUserSettings struct { + ACL []*KafkaACL `json:"acl,omitempty"` } // DatabaseMySQLUserSettings contains MySQL-specific user settings @@ -271,6 +295,56 @@ type DatabaseDB struct { Name string `json:"name"` } +// DatabaseTopic represents a Kafka topic +type DatabaseTopic struct { + Name string `json:"name"` + PartitionCount *uint32 `json:"partition_count,omitempty"` + ReplicationFactor *uint32 `json:"replication_factor,omitempty"` + State string `json:"state,omitempty"` + Config *TopicConfig `json:"config,omitempty"` +} + +// TopicConfig represents all configurable options for a Kafka topic +type TopicConfig struct { + CleanupPolicy string `json:"cleanup_policy,omitempty"` + CompressionType string `json:"compression_type,omitempty"` + DeleteRetentionMS *uint64 `json:"delete_retention_ms,omitempty"` + FileDeleteDelayMS *uint64 `json:"file_delete_delay_ms,omitempty"` + FlushMessages *uint64 `json:"flush_messages,omitempty"` + FlushMS *uint64 `json:"flush_ms,omitempty"` + IndexIntervalBytes *uint64 `json:"index_interval_bytes,omitempty"` + MaxCompactionLagMS *uint64 `json:"max_compaction_lag_ms,omitempty"` + MaxMessageBytes *uint64 `json:"max_message_bytes,omitempty"` + MessageDownConversionEnable *bool `json:"message_down_conversion_enable,omitempty"` + MessageFormatVersion string `json:"message_format_version,omitempty"` + MessageTimestampDifferenceMaxMS *uint64 `json:"message_timestamp_difference_max_ms,omitempty"` + MessageTimestampType string `json:"message_timestamp_type,omitempty"` + MinCleanableDirtyRatio *float32 `json:"min_cleanable_dirty_ratio,omitempty"` + MinCompactionLagMS *uint64 `json:"min_compaction_lag_ms,omitempty"` + MinInsyncReplicas *uint32 `json:"min_insync_replicas,omitempty"` + Preallocate *bool `json:"preallocate,omitempty"` + RetentionBytes *int64 `json:"retention_bytes,omitempty"` + RetentionMS *int64 `json:"retention_ms,omitempty"` + SegmentBytes *uint64 `json:"segment_bytes,omitempty"` + SegmentIndexBytes *uint64 `json:"segment_index_bytes,omitempty"` + SegmentJitterMS *uint64 `json:"segment_jitter_ms,omitempty"` + SegmentMS *uint64 `json:"segment_ms,omitempty"` + UncleanLeaderElectionEnable *bool `json:"unclean_leader_election_enable,omitempty"` +} + +// DatabaseCreateTopicRequest is used to create a new topic within a kafka cluster +type DatabaseCreateTopicRequest struct { + Name string `json:"name"` + PartitionCount *uint32 `json:"partition_count,omitempty"` + ReplicationFactor *uint32 `json:"replication_factor,omitempty"` + Config *TopicConfig `json:"config,omitempty"` +} + +// DatabaseUpdateTopicRequest ... +type DatabaseUpdateTopicRequest struct { + Topic *DatabaseTopic `json:"topic"` // note: `name` field in Topic unused on update +} + // DatabaseReplica represents a read-only replica of a particular database type DatabaseReplica struct { ID string `json:"id"` @@ -316,11 +390,13 @@ type DatabaseUpdatePoolRequest struct { type DatabaseCreateUserRequest struct { Name string `json:"name"` MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"` + Settings *DatabaseUserSettings `json:"settings,omitempty"` } // DatabaseResetUserAuthRequest is used to reset a users DB auth type DatabaseResetUserAuthRequest struct { MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"` + Settings *DatabaseUserSettings `json:"settings,omitempty"` } // DatabaseCreateDBRequest is used to create a new engine-specific database within the cluster @@ -551,12 +627,21 @@ type databaseOptionsRoot struct { Options *DatabaseOptions `json:"options"` } +type databaseTopicRoot struct { + Topic *DatabaseTopic `json:"topic"` +} + +type databaseTopicsRoot struct { + Topics []DatabaseTopic `json:"topics"` +} + // DatabaseOptions represents the available database engines type DatabaseOptions struct { MongoDBOptions DatabaseEngineOptions `json:"mongodb"` MySQLOptions DatabaseEngineOptions `json:"mysql"` PostgresSQLOptions DatabaseEngineOptions `json:"pg"` RedisOptions DatabaseEngineOptions `json:"redis"` + KafkaOptions DatabaseEngineOptions `json:"kafka"` } // DatabaseEngineOptions represents the configuration options that are available for a given database engine @@ -1257,3 +1342,81 @@ func (svc *DatabasesServiceOp) UpgradeMajorVersion(ctx context.Context, database return resp, nil } + +// ListTopics returns all topics for a given kafka cluster +func (svc *DatabasesServiceOp) ListTopics(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseTopic, *Response, error) { + path := fmt.Sprintf(databaseTopicsPath, databaseID) + path, err := addOptions(path, opts) + if err != nil { + return nil, nil, err + } + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + root := new(databaseTopicsRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root.Topics, resp, nil +} + +// GetTopic returns a single kafka topic by name +func (svc *DatabasesServiceOp) GetTopic(ctx context.Context, databaseID, name string) (*DatabaseTopic, *Response, error) { + path := fmt.Sprintf(databaseTopicPath, databaseID, name) + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + root := new(databaseTopicRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root.Topic, resp, nil +} + +// CreateTopic will create a new kafka topic +func (svc *DatabasesServiceOp) CreateTopic(ctx context.Context, databaseID string, createTopic *DatabaseCreateTopicRequest) (*DatabaseTopic, *Response, error) { + path := fmt.Sprintf(databaseTopicsPath, databaseID) + req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createTopic) + if err != nil { + return nil, nil, err + } + root := new(databaseTopicRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root.Topic, resp, nil +} + +// UpdateTopic updates a single kafka topic +func (svc *DatabasesServiceOp) UpdateTopic(ctx context.Context, databaseID string, name string, updateTopic *DatabaseUpdateTopicRequest) (*Response, error) { + path := fmt.Sprintf(databaseTopicPath, databaseID, name) + req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateTopic) + if err != nil { + return nil, err + } + root := new(databaseTopicRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return resp, err + } + return resp, nil +} + +// DeleteTopic will delete an existing kafka topic +func (svc *DatabasesServiceOp) DeleteTopic(ctx context.Context, databaseID, name string) (*Response, error) { + path := fmt.Sprintf(databaseTopicPath, databaseID, name) + req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil) + if err != nil { + return nil, err + } + resp, err := svc.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + return resp, nil +} diff --git a/databases_test.go b/databases_test.go index 9809c05b..43a2e0ff 100644 --- a/databases_test.go +++ b/databases_test.go @@ -701,6 +701,39 @@ func TestDatabases_ResetUserAuth(t *testing.T) { require.Equal(t, want, got) } +func TestDatabases_ResetUserAuthKafka(t *testing.T) { + setup() + defer teardown() + dbID := "deadbeef-dead-4aa5-beef-deadbeef347d" + path := fmt.Sprintf("/v2/databases/%s/users/user/reset_auth", dbID) + + body := `{ + "user": { + "name": "name", + "role": "foo", + "password": "otherpass" + } + }` + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPost) + fmt.Fprint(w, body) + }) + + want := &DatabaseUser{ + Name: "name", + Role: "foo", + Password: "otherpass", + } + + got, _, err := client.Databases.ResetUserAuth(ctx, dbID, "user", &DatabaseResetUserAuthRequest{ + Settings: &DatabaseUserSettings{}, + }) + + require.NoError(t, err) + require.Equal(t, want, got) +} + func TestDatabases_ListDBs(t *testing.T) { setup() defer teardown() @@ -1605,6 +1638,24 @@ func TestDatabases_GetDatabaseOptions(t *testing.T) { } ] }, + "kafka": { + "regions": [ + "ams3", + "tor1" + ], + "versions": [ + "3.3" + ], + "layouts": [ + { + "num_nodes": 3, + "sizes": [ + "gd-2vcpu-8gb", + "gd-4vcpu-16gb" + ] + } + ] + }, "redis": { "regions": [ "ams3", @@ -1645,18 +1696,22 @@ func TestDatabases_GetDatabaseOptions(t *testing.T) { require.NotNil(t, options.PostgresSQLOptions) require.NotNil(t, options.RedisOptions) require.NotNil(t, options.MySQLOptions) + require.NotNil(t, options.KafkaOptions) require.Greater(t, len(options.MongoDBOptions.Regions), 0) require.Greater(t, len(options.PostgresSQLOptions.Regions), 0) require.Greater(t, len(options.RedisOptions.Regions), 0) require.Greater(t, len(options.MySQLOptions.Regions), 0) + require.Greater(t, len(options.KafkaOptions.Regions), 0) require.Greater(t, len(options.MongoDBOptions.Versions), 0) require.Greater(t, len(options.PostgresSQLOptions.Versions), 0) require.Greater(t, len(options.RedisOptions.Versions), 0) require.Greater(t, len(options.MySQLOptions.Versions), 0) + require.Greater(t, len(options.KafkaOptions.Versions), 0) require.Greater(t, len(options.MongoDBOptions.Layouts), 0) require.Greater(t, len(options.PostgresSQLOptions.Layouts), 0) require.Greater(t, len(options.RedisOptions.Layouts), 0) require.Greater(t, len(options.MySQLOptions.Layouts), 0) + require.Greater(t, len(options.KafkaOptions.Layouts), 0) } func TestDatabases_CreateDatabaseUserWithMySQLSettings(t *testing.T) { @@ -1769,6 +1824,157 @@ func TestDatabases_GetDatabaseUserWithMySQLSettings(t *testing.T) { require.Equal(t, expectedUser, user) } +func TestDatabases_CreateDatabaseUserWithKafkaSettings(t *testing.T) { + setup() + defer teardown() + + dbID := "deadbeef-dead-4aa5-beef-deadbeef347d" + path := fmt.Sprintf("/v2/databases/%s/users", dbID) + + writeTopicACL := []*KafkaACL{ + { + ID: "1", + Topic: "bar", + Permission: "write", + }, + } + + acljson, err := json.Marshal(writeTopicACL) + if err != nil { + t.Fatal(err) + } + + responseJSON := []byte(fmt.Sprintf(`{ + "user": { + "name": "foo", + "settings": { + "acl": %s + } + } + }`, string(acljson))) + + expectedUser := &DatabaseUser{ + Name: "foo", + Settings: &DatabaseUserSettings{ + ACL: writeTopicACL, + }, + } + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPost) + w.WriteHeader(http.StatusOK) + w.Write(responseJSON) + }) + + user, _, err := client.Databases.CreateUser(ctx, dbID, &DatabaseCreateUserRequest{ + Name: expectedUser.Name, + Settings: &DatabaseUserSettings{ACL: expectedUser.Settings.ACL}, + }) + require.NoError(t, err) + require.Equal(t, expectedUser, user) +} + +func TestDatabases_ListDatabaseUsersWithKafkaSettings(t *testing.T) { + setup() + defer teardown() + + dbID := "deadbeef-dead-4aa5-beef-deadbeef347d" + + path := fmt.Sprintf("/v2/databases/%s/users", dbID) + + writeTopicACL := []*KafkaACL{ + { + ID: "1", + Topic: "bar", + Permission: "write", + }, + } + + acljson, err := json.Marshal(writeTopicACL) + if err != nil { + t.Fatal(err) + } + + responseJSON := []byte(fmt.Sprintf(`{ + "users": [ + { + "name": "foo", + "settings": { + "acl": %s + } + } + ] + }`, string(acljson))) + + expectedUsers := []DatabaseUser{ + { + Name: "foo", + Settings: &DatabaseUserSettings{ + ACL: writeTopicACL, + }, + }, + } + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + w.WriteHeader(http.StatusOK) + w.Write(responseJSON) + }) + + users, _, err := client.Databases.ListUsers(ctx, dbID, &ListOptions{}) + require.NoError(t, err) + require.Equal(t, expectedUsers, users) +} + +func TestDatabases_GetDatabaseUserWithKafkaSettings(t *testing.T) { + setup() + defer teardown() + + dbID := "deadbeef-dead-4aa5-beef-deadbeef347d" + userID := "d290a0a0-27da-42bd-a4b2-bcecf43b8832" + + path := fmt.Sprintf("/v2/databases/%s/users/%s", dbID, userID) + + writeTopicACL := []*KafkaACL{ + { + ID: "1", + Topic: "bar", + Permission: "write", + }, + } + + acljson, err := json.Marshal(writeTopicACL) + if err != nil { + t.Fatal(err) + } + + responseJSON := []byte(fmt.Sprintf(`{ + "user": { + "name": "foo", + "settings": { + "acl": %s + } + } + }`, string(acljson))) + + expectedUser := &DatabaseUser{ + Name: "foo", + Settings: &DatabaseUserSettings{ + ACL: writeTopicACL, + }, + } + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + w.WriteHeader(http.StatusOK) + w.Write(responseJSON) + }) + + user, _, err := client.Databases.GetUser(ctx, dbID, userID) + require.NoError(t, err) + require.Equal(t, expectedUser, user) +} + func TestDatabases_GetConfigPostgres(t *testing.T) { setup() defer teardown() @@ -2103,3 +2309,223 @@ func TestDatabases_UpgradeMajorVersion(t *testing.T) { _, err := client.Databases.UpgradeMajorVersion(ctx, dbID, upgradeVersionReq) require.NoError(t, err) } + +func TestDatabases_CreateTopic(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + numPartitions = uint32(3) + replicationFactor = uint32(2) + retentionMS = int64(1000 * 60) + ) + + want := &DatabaseTopic{ + Name: "events", + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, + }, + } + + body := `{ + "topic": { + "name": "events", + "partition_count": 3, + "replication_factor": 2, + "config": { + "retention_ms": 60000 + } + } + }` + + path := fmt.Sprintf("/v2/databases/%s/topics", dbID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPost) + fmt.Fprint(w, body) + }) + + topic, _, err := client.Databases.CreateTopic(ctx, dbID, &DatabaseCreateTopicRequest{ + Name: "events", + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, + }, + }) + + require.NoError(t, err) + require.Equal(t, want, topic) +} + +func TestDatabases_UpdateTopic(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + topicName = "events" + numPartitions = uint32(3) + replicationFactor = uint32(2) + retentionMS = int64(1000 * 60) + ) + + body := `{ + "topic": { + "name": "events", + "partition_count": 3, + "replication_factor": 2, + "config": { + "retention_ms": 60000 + } + } + }` + + path := fmt.Sprintf("/v2/databases/%s/topics/%s", dbID, topicName) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPut) + fmt.Fprint(w, body) + }) + + _, err := client.Databases.UpdateTopic(ctx, dbID, topicName, &DatabaseUpdateTopicRequest{ + Topic: &DatabaseTopic{ + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, + }, + }, + }) + + require.NoError(t, err) +} + +func TestDatabases_DeleteTopic(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + topicName = "events" + ) + + path := fmt.Sprintf("/v2/databases/%s/topics/%s", dbID, topicName) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodDelete) + }) + + _, err := client.Databases.DeleteTopic(ctx, dbID, topicName) + require.NoError(t, err) +} + +func TestDatabases_GetTopic(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + topicName = "events" + numPartitions = uint32(3) + replicationFactor = uint32(2) + retentionMS = int64(1000 * 60) + ) + + want := &DatabaseTopic{ + Name: "events", + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, + }, + } + + body := `{ + "topic": { + "name": "events", + "partition_count": 3, + "replication_factor": 2, + "config": { + "retention_ms": 60000 + } + } + }` + + path := fmt.Sprintf("/v2/databases/%s/topics/%s", dbID, topicName) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, body) + }) + + got, _, err := client.Databases.GetTopic(ctx, dbID, topicName) + require.NoError(t, err) + require.Equal(t, want, got) +} + +func TestDatabases_ListTopics(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + numPartitions = uint32(3) + replicationFactor = uint32(2) + retentionMS = int64(1000 * 60) + ) + + want := []DatabaseTopic{ + { + Name: "events", + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, + }, + }, + { + Name: "events_ii", + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, + }, + }, + } + + body := `{ + "topics": [ + { + "name": "events", + "partition_count": 3, + "replication_factor": 2, + "config": { + "retention_ms": 60000 + } + }, + { + "name": "events_ii", + "partition_count": 3, + "replication_factor": 2, + "config": { + "retention_ms": 60000 + } + } + ] + }` + + path := fmt.Sprintf("/v2/databases/%s/topics", dbID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, body) + }) + + got, _, err := client.Databases.ListTopics(ctx, dbID, &ListOptions{}) + require.NoError(t, err) + require.Equal(t, want, got) +}