Skip to content

Commit

Permalink
OneToOne variant
Browse files Browse the repository at this point in the history
  • Loading branch information
eleventy7 committed Jan 24, 2024
1 parent ca68faf commit 8d6f0e7
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
import org.agrona.DirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.ShutdownSignalBarrier;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReceiveAgent implements Agent
{
private final ShutdownSignalBarrier barrier;
private final ManyToOneRingBuffer ringBuffer;
private final OneToOneRingBuffer ringBuffer;
private final int sendCount;
private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);
private final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder();
private final SampleSimpleDecoder sampleSimpleDecoder = new SampleSimpleDecoder();

public ReceiveAgent(final ManyToOneRingBuffer ringBuffer, final ShutdownSignalBarrier barrier, final int sendCount)
public ReceiveAgent(final OneToOneRingBuffer ringBuffer, final ShutdownSignalBarrier barrier, final int sendCount)
{
this.ringBuffer = ringBuffer;
this.barrier = barrier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
import com.aeroncookbook.sbe.SampleSimpleEncoder;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;

public class SendAgent1 implements Agent
{
private final int sendCount;
private final ManyToOneRingBuffer ringBuffer;
private final OneToOneRingBuffer ringBuffer;
private final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
private final SampleSimpleEncoder sampleSimpleEncoder = new SampleSimpleEncoder();
private int currentCountItem = 1;

public SendAgent1(final ManyToOneRingBuffer ringBuffer, final int sendCount)
public SendAgent1(final OneToOneRingBuffer ringBuffer, final int sendCount)
{
this.ringBuffer = ringBuffer;
this.sendCount = sendCount;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ShutdownSignalBarrier;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,27 +38,22 @@ public static void main(final String[] args)
final int bufferLength = 16384 + RingBufferDescriptor.TRAILER_LENGTH;
final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(bufferLength));
final IdleStrategy idleStrategySend1 = new BusySpinIdleStrategy();
final IdleStrategy idleStrategySend2 = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
final ManyToOneRingBuffer ringBuffer = new ManyToOneRingBuffer(unsafeBuffer);
final OneToOneRingBuffer ringBuffer = new OneToOneRingBuffer(unsafeBuffer);

//construct the agents
final SendAgent1 sendAgent1 = new SendAgent1(ringBuffer, sendCount);
final SendAgent2 sendAgent2 = new SendAgent2(ringBuffer, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(ringBuffer, barrier, sendCount);

//construct agent runners
final AgentRunner sendAgentRunner1 = new AgentRunner(idleStrategySend1,
Throwable::printStackTrace, null, sendAgent1);
final AgentRunner sendAgentRunner2 = new AgentRunner(idleStrategySend2,
Throwable::printStackTrace, null, sendAgent2);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
Throwable::printStackTrace, null, receiveAgent);
LOGGER.info("starting");
//start the runners
AgentRunner.startOnThread(sendAgentRunner1);
AgentRunner.startOnThread(sendAgentRunner2);
AgentRunner.startOnThread(receiveAgentRunner);

//wait for the final item to be received before closing
Expand All @@ -67,6 +62,5 @@ public static void main(final String[] args)
//close the resources
receiveAgentRunner.close();
sendAgentRunner1.close();
sendAgentRunner2.close();
}
}

0 comments on commit 8d6f0e7

Please sign in to comment.