Skip to content

Commit

Permalink
fix(err): consumer hangs (#26288)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Nov 19, 2024
1 parent 53b600d commit e92b381
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 22 deletions.
35 changes: 27 additions & 8 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions rust/cymbal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@ version = "0.1.0"
edition = "2021"

[dependencies]
rdkafka = { workspace = true }
# We use an older version of rdkafka for cymbal specifically because the newer version
# has a bug which causes consumers to hang when offset auto-storing is disabled and the
# program is producing and consuming at the same time. Cymbal is the only rust service
# that does this, and 0.35 is significantly lower-performance than 0.36, so rather than
# rolling back all other rust services (particularly capture and propdefs), we just have
# cymbal use the older version by overriding the workspace setting. The tracking issue
# for this is: https://github.com/fede1024/rust-rdkafka/issues/638
rdkafka = { version = "0.35.0", features = ["cmake-build", "ssl", "tracing"] }
# We don't rely on common-kafka due to the bug mentioned above
# common-kafka = { path = "../common/kafka" }
# This does force us to take a direct dependency on join_all
futures = { workspace = true }
tokio = { workspace = true }
envconfig = { workspace = true }
tracing = { workspace = true }
Expand All @@ -14,7 +25,6 @@ axum = { workspace = true }
metrics = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
common-kafka = { path = "../common/kafka" }
common-types = { path = "../common/types" }
common-dns = { path = "../common/dns" }
thiserror = { workspace = true }
Expand Down
5 changes: 1 addition & 4 deletions rust/cymbal/src/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use aws_config::{BehaviorVersion, Region};
use common_kafka::{
kafka_consumer::SingleTopicConsumer, kafka_producer::create_kafka_producer,
kafka_producer::KafkaContext,
};
use health::{HealthHandle, HealthRegistry};
use rdkafka::producer::FutureProducer;
use sqlx::{postgres::PgPoolOptions, PgPool};
Expand All @@ -14,6 +10,7 @@ use crate::{
config::Config,
error::UnhandledError,
frames::resolver::Resolver,
hack::kafka::{create_kafka_producer, KafkaContext, SingleTopicConsumer},
symbol_store::{
caching::{Caching, SymbolSetCache},
concurrency,
Expand Down
33 changes: 33 additions & 0 deletions rust/cymbal/src/bin/generate_test_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::sync::Arc;

use common_types::ClickHouseEvent;
use cymbal::{
get_props,
hack::kafka::{create_kafka_producer, send_iter_to_kafka, KafkaConfig},
};
use envconfig::Envconfig;
use health::HealthRegistry;

const EXCEPTION_DATA: &str = include_str!("../../tests/static/raw_ch_exception_list.json");

#[tokio::main]
async fn main() {
let config = KafkaConfig::init_from_env().unwrap();
let health = Arc::new(HealthRegistry::new("test"));
let handle = health
.register("rdkafka".to_string(), std::time::Duration::from_secs(30))
.await;
let producer = create_kafka_producer(&config, handle).await.unwrap();

let exception: ClickHouseEvent = serde_json::from_str(EXCEPTION_DATA).unwrap();
let exceptions = (0..10000).map(|_| exception.clone()).collect::<Vec<_>>();
get_props(&exception).unwrap();

loop {
println!("Sending {} exception kafka", exceptions.len());
send_iter_to_kafka(&producer, "exception_symbolification_events", &exceptions)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
7 changes: 7 additions & 0 deletions rust/cymbal/src/bin/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export KAFKA_CONSUMER_GROUP="cymbal"
export KAFKA_CONSUMER_TOPIC="exception_symbolification_events"
export OBJECT_STORAGE_BUCKET="posthog"
export OBJECT_STORAGE_ACCESS_KEY_ID="object_storage_root_user"
export OBJECT_STORAGE_SECRET_ACCESS_KEY="object_storage_root_password"

cargo run --bin cymbal
8 changes: 2 additions & 6 deletions rust/cymbal/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use common_kafka::config::{ConsumerConfig, KafkaConfig};
use envconfig::Envconfig;

use crate::hack::kafka::{ConsumerConfig, KafkaConfig};

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(from = "BIND_HOST", default = "::")]
Expand Down Expand Up @@ -70,11 +71,6 @@ pub struct Config {
pub frame_cache_ttl_seconds: u64,
}

pub enum AwsRegion {
USEast1,
USWest1,
}

impl Config {
pub fn init_with_defaults() -> Result<Self, envconfig::Error> {
ConsumerConfig::set_defaults("error-tracking-rs", "exception_symbolification_events");
Expand Down
Loading

0 comments on commit e92b381

Please sign in to comment.