diff --git a/crates/arroyo-connectors/src/confluent/mod.rs b/crates/arroyo-connectors/src/confluent/mod.rs index c694b9ef7..4db2ebf4b 100644 --- a/crates/arroyo-connectors/src/confluent/mod.rs +++ b/crates/arroyo-connectors/src/confluent/mod.rs @@ -84,6 +84,7 @@ impl From for KafkaConfig { password: c.secret, }, schema_registry_enum: Some(c.schema_registry.into()), + connection_properties: vec![], } } } diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 4db46c48a..2f2668ad6 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -105,6 +105,7 @@ impl KafkaConnector { authentication: auth, bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?), schema_registry_enum: schema_registry, + connection_properties: vec![], }) } @@ -926,6 +927,14 @@ pub fn client_configs( } }; + for prop in connection.connection_properties.iter() { + if let Some((k, v)) = prop.split_once('=') { + client_configs.insert(k.to_string(), v.to_string()); + } else { + bail!("invalid connection property: {}", prop); + } + } + if let Some(table) = table { client_configs.extend( table diff --git a/crates/arroyo-connectors/src/kafka/profile.json b/crates/arroyo-connectors/src/kafka/profile.json index 6fe762dee..463472710 100644 --- a/crates/arroyo-connectors/src/kafka/profile.json +++ b/crates/arroyo-connectors/src/kafka/profile.json @@ -106,6 +106,16 @@ "sensitive": ["apiSecret"] } ] + }, + "connectionProperties": { + "type": "array", + "title": "Connection Properties", + "description": "Key-value pairs of rdkafka configuration options", + "examples": ["client.id=arroyo"], + "items": { + "type": "string", + "title": "property" + } } }, "required": ["bootstrapServers", "authentication"]