Skip to content

Commit

Permalink
Refactor protocol writer
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Jan 12, 2019
1 parent 4c89150 commit 61f3bb1
Show file tree
Hide file tree
Showing 52 changed files with 1,460 additions and 843 deletions.
62 changes: 31 additions & 31 deletions src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -981,110 +981,110 @@ public function doPublish
->appendUint8(206)
;

$s = 14;
$size = 14;

if (isset($headers['content-type'])) {
$flags |= 32768;
$contentType = $headers['content-type'];
$s += 1 + \strlen($contentType);
$size += 1 + \strlen($contentType);
unset($headers['content-type']);
}

if (isset($headers['content-encoding'])) {
$flags |= 16384;
$contentEncoding = $headers['content-encoding'];
$s += 1 + \strlen($contentEncoding);
$size += 1 + \strlen($contentEncoding);
unset($headers['content-encoding']);
}

if (isset($headers['delivery-mode'])) {
$flags |= 4096;
$deliveryMode = (int) $headers['delivery-mode'];
$s += 1;
$size += 1;
unset($headers['delivery-mode']);
}

if (isset($headers['priority'])) {
$flags |= 2048;
$priority = (int) $headers['priority'];
$s += 1;
$size += 1;
unset($headers['priority']);
}

if (isset($headers['correlation-id'])) {
$flags |= 1024;
$correlationId = $headers['correlation-id'];
$s += 1 + \strlen($correlationId);
$size += 1 + \strlen($correlationId);
unset($headers['correlation-id']);
}

if (isset($headers['reply-to'])) {
$flags |= 512;
$replyTo = $headers['reply-to'];
$s += 1 + \strlen($replyTo);
$size += 1 + \strlen($replyTo);
unset($headers['reply-to']);
}

if (isset($headers['expiration'])) {
$flags |= 256;
$expiration = $headers['expiration'];
$s += 1 + \strlen($expiration);
$size += 1 + \strlen($expiration);
unset($headers['expiration']);
}

if (isset($headers['message-id'])) {
$flags |= 128;
$messageId = $headers['message-id'];
$s += 1 + \strlen($messageId);
$size += 1 + \strlen($messageId);
unset($headers['message-id']);
}

if (isset($headers['timestamp'])) {
$flags |= 64;
$timestamp = $headers['timestamp'];
$s += 8;
$size += 8;
unset($headers['timestamp']);
}

if (isset($headers['type'])) {
$flags |= 32;
$type = $headers['type'];
$s += 1 + \strlen($type);
$size += 1 + \strlen($type);
unset($headers['type']);
}

if (isset($headers['user-id'])) {
$flags |= 16;
$userId = $headers['user-id'];
$s += 1 + \strlen($userId);
$size += 1 + \strlen($userId);
unset($headers['user-id']);
}

if (isset($headers['app-id'])) {
$flags |= 8;
$appId = $headers['app-id'];
$s += 1 + \strlen($appId);
$size += 1 + \strlen($appId);
unset($headers['app-id']);
}

if (isset($headers['cluster-id'])) {
$flags |= 4;
$clusterId = $headers['cluster-id'];
$s += 1 + \strlen($clusterId);
$size += 1 + \strlen($clusterId);
unset($headers['cluster-id']);
}

if (!empty($headers)) {
$flags |= 8192;
$headersBuffer = new Buffer;
$headersBuffer->appendTable($headers);
$s += $headersBuffer->size();
$size += $headersBuffer->size();
}

$buffer
->appendUint8(2)
->appendUint16($this->id)
->appendUint32($s)
->appendUint32($size)
->appendUint16(60)
->appendUint16(0)
->appendUint64(\strlen($body))
Expand Down Expand Up @@ -1165,6 +1165,16 @@ public function doPublish

return $this->connection->write($buffer);
}

/**
* @param string $frame
*
* @return Promise<Protocol\AbstractFrame>
*/
private function await(string $frame): Promise
{
return $this->connection->await($this->id, $frame);
}

/**
* @return void
Expand All @@ -1179,30 +1189,20 @@ private function startConsuming(): void

asyncCall(function () {
while ($this->state === self::STATE_OPEN) {
/** @var Protocol\BasicDeliverFrame $deliver */
$deliver = yield $this->await(Protocol\BasicDeliverFrame::class);
/** @var Protocol\BasicDeliverFrame $frame */
$frame = yield $this->await(Protocol\BasicDeliverFrame::class);

if (!isset($this->callbacks[$deliver->consumerTag])) {
if (!isset($this->callbacks[$frame->consumerTag])) {
continue;
}

$message = yield $this->consumeMessage($deliver, $deliver->consumerTag);
$message = yield $this->consumeMessage($frame, $frame->consumerTag);

asyncCall($this->callbacks[$deliver->consumerTag], $message, $this);
asyncCall($this->callbacks[$frame->consumerTag], $message, $this);
}
});
}

/**
* @param string $frame
*
* @return Promise<Protocol\AbstractFrame>
*/
private function await(string $frame): Promise
{
return $this->connection->await($this->id, $frame);
}

/**
* @param Protocol\MessageFrame $frame
* @param string $consumerTag
Expand Down
47 changes: 33 additions & 14 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

use function Amp\asyncCall, Amp\call, Amp\Socket\connect;
use Amp\Deferred;
use Amp\Emitter;
use Amp\Iterator;
use Amp\Loop;
use Amp\Promise;
use Amp\Socket\ClientConnectContext;
use Amp\Socket\Socket;
use PHPinnacle\Ridge\Protocol\ContentHeaderFrame;
use PHPinnacle\Ridge\Protocol\MethodFrame;

final class Connection
{
Expand All @@ -27,14 +31,9 @@ final class Connection
private $uri;

/**
* @var ProtocolWriter
* @var Parser
*/
private $writer;

/**
* @var ProtocolReader
*/
private $reader;
private $parser;

/**
* @var Socket
Expand Down Expand Up @@ -62,8 +61,7 @@ final class Connection
public function __construct(string $uri)
{
$this->uri = $uri;
$this->writer = new ProtocolWriter;
$this->reader = new ProtocolReader;
$this->parser = new Parser;
}

/**
Expand All @@ -88,7 +86,28 @@ public function write(Buffer $payload): Promise
*/
public function send(Protocol\AbstractFrame $frame): Promise
{
return $this->write($this->writer->buffer($frame));
if ($frame instanceof MethodFrame && $frame->payload !== null) {
// payload already supplied
} elseif ($frame instanceof MethodFrame || $frame instanceof ContentHeaderFrame) {
$buffer = $frame->pack();

$frame->size = $buffer->size();
$frame->payload = $buffer;
} elseif ($frame instanceof Protocol\ContentBodyFrame) {
// body frame's payload is already loaded
} elseif ($frame instanceof Protocol\HeartbeatFrame) {
// heartbeat frame is empty
} else {
throw Exception\ProtocolException::unknownFrameClass($frame);
}

return $this->write((new Buffer)
->appendUint8($frame->type)
->appendUint16($frame->channel)
->appendUint32($frame->size)
->append($frame->payload)
->appendUint8(206)
);
}

/**
Expand Down Expand Up @@ -222,15 +241,15 @@ public function close(): void
*/
private function consume(string $chunk): void
{
$this->reader->append($chunk);
$this->parser->append($chunk);

while ($frame = $this->reader->frame()) {
while ($frame = $this->parser->parse()) {
$class = \get_class($frame);
$defers = $this->await[$frame->channel][$class] ?? [];

foreach ($defers as $i => $defer) {
$defer->resolve($frame);

unset($this->await[$frame->channel][$class][$i]);
}
}
Expand Down
Loading

0 comments on commit 61f3bb1

Please sign in to comment.