Skip to content

Commit

Permalink
thread bind
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuangcp committed Sep 24, 2024
1 parent 206c88a commit 20d421d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* @author Kuangcp
* 2024-09-24 16:08
*/
@Slf4j
@Getter
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {

private final String id;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.github.kuangcp.queue.disruptor.first;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
* @author Kuangcp
* 2024-09-24 16:28
*/
@Slf4j
public class OrderEventSlowHandler extends OrderEventHandler {

public OrderEventSlowHandler() {
}

public OrderEventSlowHandler(String id) {
super(id);
}

@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.info("{} event: {}, sequence: {}, endOfBatch: {}", getId(), event, sequence, endOfBatch);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void onEvent(OrderEvent event) {
log.info("{} event: {}", getId(), event);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,34 @@ public void testConsumerChain() throws Exception {

Thread.currentThread().join(3000);
}


/**
* 多生产者 多消费者
*/
@Test
public void testProductConsumerSlow() throws Exception {
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,
1024 * 1024,
Executors.defaultThreadFactory(),
// 这里的枚举修改为多生产者
ProducerType.MULTI,
new YieldingWaitStrategy()
);
// 有多少个消费者就会创建多少线程去执行消费,此时会创建三个线程
disruptor
.handleEventsWithWorkerPool(new OrderEventSlowHandler("a"), new OrderEventSlowHandler("b"))
.then(new OrderEventSlowHandler("c"));
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 创建一个线程池,模拟多个生产者
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString()));
}

Thread.currentThread().join();
}
}

0 comments on commit 20d421d

Please sign in to comment.