Skip to content

Commit

Permalink
chore: rename field
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Mar 22, 2024
1 parent c1c66d4 commit 44df033
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
14 changes: 7 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Kafka<Exe: Executor> {
pub manager: Arc<ConnectionManager<Exe>>,
pub operation_retry_options: OperationRetryOptions,
pub executor: Arc<Exe>,
pub cluster_meta: Arc<Cluster>,
pub cluster: Arc<Cluster>,
supported_versions: Arc<DashMap<i16, VersionRange>>,
}

Expand Down Expand Up @@ -122,20 +122,20 @@ impl<Exe: Executor> Kafka<Exe> {
manager,
operation_retry_options,
executor,
cluster_meta: Arc::new(Cluster::default()),
cluster: Arc::new(Cluster::default()),
supported_versions: Arc::new(supported_versions),
})
}

pub fn topic_id(&self, topic_name: &TopicName) -> Uuid {
match self.cluster_meta.topic_id(topic_name) {
match self.cluster.topic_id(topic_name) {
Some(topic_id) => topic_id,
None => Uuid::nil(),
}
}

pub fn partitions(&self, topic: &TopicName) -> Result<PartitionRef> {
self.cluster_meta.partitions(topic)
self.cluster.partitions(topic)
}

pub fn version_range(&self, key: ApiKey) -> Option<VersionRange> {
Expand Down Expand Up @@ -284,7 +284,7 @@ impl<Exe: Executor> Kafka<Exe> {
let request = RequestKind::MetadataRequest(request);
let response = self.manager.invoke(&self.manager.url, request).await?;
if let ResponseKind::MetadataResponse(metadata) = response {
self.cluster_meta.update_metadata(metadata)
self.cluster.update_metadata(metadata)
} else {
Err(Error::Connection(ConnectionError::UnexpectedResponse(
format!("{response:?}"),
Expand All @@ -293,8 +293,8 @@ impl<Exe: Executor> Kafka<Exe> {
}

pub async fn update_full_metadata(&self) -> Result<()> {
let mut topics = Vec::with_capacity(self.cluster_meta.topics.len());
for topic in self.cluster_meta.topics.iter() {
let mut topics = Vec::with_capacity(self.cluster.topics.len());
for topic in self.cluster.topics.iter() {
topics.push(topic.key().clone());
}
self.update_metadata(topics).await
Expand Down
12 changes: 6 additions & 6 deletions src/consumer/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<Exe: Executor> Fetcher<Exe> {
completed_fetches_tx: mpsc::UnboundedSender<CompletedFetch>,
) -> Self {
let sessions = DashMap::new();
for node in client.cluster_meta.nodes.iter() {
for node in client.cluster.nodes.iter() {
sessions.insert(node.id, FetchSession::new(node.id));
}

Expand Down Expand Up @@ -100,7 +100,7 @@ impl<Exe: Executor> Fetcher<Exe> {
version = 12;
}
let metadata = fetch_request_data.metadata;
if let Some(node) = self.client.cluster_meta.nodes.get(&node) {
if let Some(node) = self.client.cluster.nodes.get(&node) {
let fetch_request =
self.fetch_builder(&mut fetch_request_data, version).await?;
trace!("Send fetch request: {:?}", fetch_request);
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<Exe: Executor> Fetcher<Exe> {
match rx.await {
Ok(partitions) => {
for tp in partitions {
let current_leader = self.client.cluster_meta.current_leader(&tp);
let current_leader = self.client.cluster.current_leader(&tp);
self.event_tx.unbounded_send(
CoordinatorEvent::MaybeValidatePositionForCurrentLeader {
partition: tp,
Expand Down Expand Up @@ -428,7 +428,7 @@ impl<Exe: Executor> Fetcher<Exe> {
let position = FetchPosition {
offset: offset_data.offset - 1,
offset_epoch: None,
current_leader: self.client.cluster_meta.current_leader(&partition),
current_leader: self.client.cluster.current_leader(&partition),
};
// TODO: metadata update last seen epoch if newer
self.event_tx
Expand Down Expand Up @@ -603,8 +603,8 @@ impl<Exe: Executor> Fetcher<Exe> {
offset_reset_timestamps: &mut HashMap<TopicPartition, i64>,
) -> Result<HashMap<Node, ListOffsetsRequest>> {
let mut node_request = HashMap::new();
for node_entry in self.client.cluster_meta.nodes.iter() {
if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) {
for node_entry in self.client.cluster.nodes.iter() {
if let Ok(node_topology) = self.client.cluster.drain_node(node_entry.value().id) {
let partitions = node_topology.value();

let mut topics = HashMap::new();
Expand Down
8 changes: 4 additions & 4 deletions src/coordinator/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
};
let mut tp_state = TopicPartitionState::new(*partition);
tp_state.position.current_leader =
self.client.cluster_meta.current_leader(&tp);
self.client.cluster.current_leader(&tp);
self.subscriptions.assignments.insert(tp, tp_state);
}
}
Expand Down Expand Up @@ -837,7 +837,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
match self.group_meta.protocol_name {
Some(ref protocol) => {
let assignor = self.look_up_assignor(&protocol.to_string())?;
let cluster = self.client.cluster_meta.clone();
let cluster = self.client.cluster.clone();
request.assignments =
serialize_assignments(assignor.assign(cluster, &self.group_subscription)?)?;
}
Expand Down Expand Up @@ -866,7 +866,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
if version <= 7 {
let mut topics = Vec::with_capacity(self.subscriptions.topics.len());
for assign in self.subscriptions.topics.iter() {
let partitions = self.client.cluster_meta.partitions(assign)?;
let partitions = self.client.cluster.partitions(assign)?;

let mut topic = OffsetFetchRequestTopic::default();
topic.name = assign.clone();
Expand All @@ -878,7 +878,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
} else {
let mut topics = Vec::with_capacity(self.subscriptions.topics.len());
for assign in self.subscriptions.topics.iter() {
let partitions = self.client.cluster_meta.partitions(assign)?;
let partitions = self.client.cluster.partitions(assign)?;

let mut topic = OffsetFetchRequestTopics::default();
topic.name = assign.clone();
Expand Down
8 changes: 4 additions & 4 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ impl<Exe: Executor> Producer<Exe> {
encode_options: &RecordEncodeOptions,
) -> Result<Vec<(Node, FlushResult)>> {
let mut result = Vec::new();
for node_entry in self.client.cluster_meta.nodes.iter() {
if let Ok(node) = self.client.cluster_meta.drain_node(node_entry.value().id) {
for node_entry in self.client.cluster.nodes.iter() {
if let Ok(node) = self.client.cluster.drain_node(node_entry.value().id) {
let partitions = node.value();
if partitions.is_empty() {
continue;
Expand Down Expand Up @@ -448,7 +448,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
pub async fn new(client: Arc<Kafka<Exe>>, topic: TopicName) -> Result<Arc<TopicProducer<Exe>>> {
client.update_metadata(vec![topic.clone()]).await?;

let partitions = client.cluster_meta.partitions(&topic)?;
let partitions = client.cluster.partitions(&topic)?;
let partitions = partitions.value();
let num_partitions = partitions.len();
let batches = DashMap::with_capacity_and_hasher(num_partitions, FxBuildHasher::default());
Expand Down Expand Up @@ -477,7 +477,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
&self.topic,
record.key(),
record.value(),
&self.client.cluster_meta,
&self.client.cluster,
)?;
}
return match self.batches.get_mut(&partition) {
Expand Down

0 comments on commit 44df033

Please sign in to comment.