Skip to content

Commit

Permalink
Added read timeout for command that has it, possible fix of #14
Browse files Browse the repository at this point in the history
  • Loading branch information
xobotyi committed Oct 28, 2019
1 parent 9a6f596 commit 25ab8c1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 34 deletions.
9 changes: 5 additions & 4 deletions src/BeansClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,22 @@ function useTube(string $tubeName): self {
/**
* @param \xobotyi\beansclient\Interfaces\CommandInterface $command
*
* @param int|null $readTimeout Amount of seconds to wait the response
*
* @return mixed
* @throws \xobotyi\beansclient\Exception\ClientException
* @throws \xobotyi\beansclient\Exception\CommandException
*/
public
function dispatchCommand(CommandInterface $command) {
function dispatchCommand(CommandInterface $command, int $readTimeout = null) {
if (!$this->connection->isActive()) {
throw new ClientException('Unable to dispatch command, connection is not active');
}

$commandString = (string)$command;
$this->connection->write($commandString . self::CRLF);

$responseHeaders = $this->connection->readLine();
$responseHeaders = $this->connection->readLine($readTimeout);

if (!$responseHeaders) {
throw new CommandException(sprintf('Got nothing in response to `%s`', $commandString));
Expand Down Expand Up @@ -247,7 +249,6 @@ function getDefaultPriority() {
*/
public
function setDefaultPriority($defaultPriority): self {

if (!is_numeric($defaultPriority)) {
throw new ClientException(sprintf('Default priority has to be a number, got %s', gettype($defaultPriority)));
}
Expand Down Expand Up @@ -484,7 +485,7 @@ function release(int $jobId, $priority = null, ?int $delay = null): ?string {
*/
public
function reserve(?int $timeout = null): ?Job {
$result = $this->dispatchCommand(new Command\ReserveCommand($timeout, $this->serializer ?: null));
$result = $this->dispatchCommand(new Command\ReserveCommand($timeout, $this->serializer ?: null), $timeout);

if (!$result) {
return null;
Expand Down
13 changes: 8 additions & 5 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,28 @@ function isPersistent(): bool {
/**
* Reads up to $bytes bytes from the socket
*
* @param int $bytes Amount of bytes to read
* @param int $bytes Amount of bytes to read
* @param int|null $timeout Amount of seconds to wait the response
*
* @return string
* @throws \xobotyi\beansclient\Exception\SocketException
*/
public
function read(int $bytes): string {
return $this->socket->read($bytes);
function read(int $bytes, int $timeout = null): string {
return $this->socket->read($bytes, $timeout);
}

/**
* Reads up to newline from socket
*
* @param int|null $timeout Amount of seconds to wait the response
*
* @return string
* @throws \xobotyi\beansclient\Exception\SocketException
*/
public
function readLine(): string {
return $this->socket->readLine();
function readLine(int $timeout = null): string {
return $this->socket->readLine($timeout);
}

/**
Expand Down
9 changes: 6 additions & 3 deletions src/Interfaces/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ function isActive(): bool;
function isPersistent(): bool;

/**
* @param int $length
* @param int $length
* @param int|null $timeout
*
* @return string
*/
public
function read(int $length): string;
function read(int $length, int $timeout = null): string;

/**
* @param int|null $timeout
*
* @return string
*/
public
function readLine(): string;
function readLine(int $timeout = null): string;

/**
* @param string $str
Expand Down
51 changes: 29 additions & 22 deletions src/Socket/SocketBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ function close() {
/**
* Reads up to $bytes bytes from the socket
*
* @param int $bytes Amount of bytes to read
* @param int $bytes Amount of bytes to read
* @param int|null $timeout Amount of seconds to wait the response
*
* @return string
* @throws \xobotyi\beansclient\Exception\SocketException
*/
public
function read(int $bytes): string {
function read(int $bytes, int $timeout = null): string {
$this->checkClosed();
error_clear_last();

Expand All @@ -110,6 +111,7 @@ function read(int $bytes): string {

$emptyConsecutiveReads = 0;

$timeout !== null && stream_set_timeout($this->socket, $timeout);
while ($bytesReadTotal < $bytes) {
$read = fread($this->socket, $bytes - $bytesReadTotal);

Expand All @@ -129,10 +131,35 @@ function read(int $bytes): string {
$result .= $read;
$bytesReadTotal += $bytesRead;
}
$timeout !== null && stream_set_timeout($this->socket, static::READ_TIMEOUT);

return $result;
}

/**
* Reads up to newline from socket
*
* @param int|null $timeout Amount of seconds to wait the response
*
* @return string
* @throws \xobotyi\beansclient\Exception\SocketException
*/
public
function readLine(int $timeout = null): string {
$this->checkClosed();
error_clear_last();

$timeout !== null && stream_set_timeout($this->socket, $timeout);
$result = fgets($this->socket, 8192);
$timeout !== null && stream_set_timeout($this->socket, static::READ_TIMEOUT);

if ($result === false) {
$this->throwLastError();
}

return rtrim($result);
}

/**
* @return $this
* @throws \xobotyi\beansclient\Exception\SocketException
Expand All @@ -158,26 +185,6 @@ function throwLastError() {
throw new SocketException('Unknown error');
}

/**
* Reads up to newline from socket
*
* @return string
* @throws \xobotyi\beansclient\Exception\SocketException
*/
public
function readLine(): string {
$this->checkClosed();
error_clear_last();

$result = fgets($this->socket, 8192);

if ($result === false) {
$this->throwLastError();
}

return rtrim($result);
}

/**
* Writes data to the socket
*
Expand Down

0 comments on commit 25ab8c1

Please sign in to comment.