Skip to content

Commit

Permalink
Merge pull request #62 from clue-labs/inject-buffer
Browse files Browse the repository at this point in the history
The Buffer can now be injected into the Stream (or be used standalone)
  • Loading branch information
clue authored Jan 24, 2017
2 parents 12859f7 + 23d51f0 commit 78ee694
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
18 changes: 12 additions & 6 deletions src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Stream extends EventEmitter implements DuplexStreamInterface
protected $loop;
protected $buffer;

public function __construct($stream, LoopInterface $loop)
public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
{
$this->stream = $stream;
if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") {
Expand All @@ -52,16 +52,21 @@ public function __construct($stream, LoopInterface $loop)
stream_set_read_buffer($this->stream, 0);
}

if ($buffer === null) {
$buffer = new Buffer($stream, $loop);
}

$this->loop = $loop;
$this->buffer = new Buffer($this->stream, $this->loop);
$this->buffer = $buffer;

$that = $this;

$this->buffer->on('error', function ($error) use ($that) {
$that->emit('error', array($error, $that));
$that->close();
});

$this->buffer->on('close', array($this, 'close'));

$this->buffer->on('drain', function () use ($that) {
$that->emit('drain', array($that));
});
Expand Down Expand Up @@ -114,7 +119,7 @@ public function close()
$this->emit('end', array($this));
$this->emit('close', array($this));
$this->loop->removeStream($this->stream);
$this->buffer->removeAllListeners();
$this->buffer->close();
$this->removeAllListeners();

$this->handleClose();
Expand All @@ -131,8 +136,6 @@ public function end($data = null)
$this->readable = false;
$this->writable = false;

$this->buffer->on('close', array($this, 'close'));

$this->buffer->end($data);
}

Expand Down Expand Up @@ -182,6 +185,9 @@ public function handleClose()
}
}

/**
* @return WritableStreamInterface|Buffer
*/
public function getBuffer()
{
return $this->buffer;
Expand Down
15 changes: 15 additions & 0 deletions tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ public function testConstructorThrowsExceptionOnInvalidStream()
$conn = new Stream('breakme', $loop);
}

/**
* @covers React\Stream\Stream::__construct
*/
public function testConstructorAcceptsBuffer()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$buffer = $this->getMock('React\Stream\WritableStreamInterface');

$conn = new Stream($stream, $loop, $buffer);

$this->assertSame($buffer, $conn->getBuffer());
}

/**
* @covers React\Stream\Stream::__construct
* @covers React\Stream\Stream::handleData
Expand Down

0 comments on commit 78ee694

Please sign in to comment.