diff --git a/application/controllers/Index.php b/application/controllers/Index.php index 1b61c01..cc4f469 100644 --- a/application/controllers/Index.php +++ b/application/controllers/Index.php @@ -60,7 +60,7 @@ public function indexAction() { //第二各参数是生成日志文件名 //第三个参数$level分为:EMERG,ALERT,CRIT,ERR,WARN,NOTIC,INFO,DEBUG,SQL logs('zas'); - //$user=new HbModel('hb_users');//直接实例化给表名就行了,其他跟操作thinkphp一样 + //$user=new ZysModel('hb_users');//直接实例化给表名就行了,其他跟操作thinkphp一样 //$result = $user->where($where)->select(); //echo $user->getlastsql(); //print_r($result); @@ -86,7 +86,7 @@ public function dbtestAction() { } public function testAction() { $where=array('id' =>353); - $user=new HbModel('hb_goods'); + $user=new ZysModel('hb_goods'); $result = $user->where($where)->select(); print_r($result); exit; @@ -274,7 +274,7 @@ private function create_unique() { public function swoolehttpAction(){ Yaf_Dispatcher::getInstance()->autoRender(FALSE); $where=array('id' =>37936); - $user=new HbModel('hb_users');//直接实例化给表名就行了,其他跟操作thinkphp一样 + $user=new ZysModel('hb_users');//直接实例化给表名就行了,其他跟操作thinkphp一样 $result = $user->where($where)->select(); //echo $user->getlastsql(); // echo json_encode( $result); @@ -305,5 +305,12 @@ public function rpcAction(){ $sd->close(); exit; +} +public function hproseAction(){ + //hprose调用 + Yaf_Dispatcher::getInstance()->autoRender(FALSE); + echo hprose::getInstance()->getdata(); + exit; + } } diff --git a/application/library/hprose.php b/application/library/hprose.php new file mode 100644 index 0000000..c746f40 --- /dev/null +++ b/application/library/hprose.php @@ -0,0 +1,21 @@ +hprose->toArray(); + $client = new Client("tcp://" . $hprose_config['ServerIp'] . ":" . $hprose_config['port'],false); + return $client->zys("zys"); + } + public static function getInstance() { + if (!(self::$instance instanceof hprose)) { + self::$instance = new hprose; + } + return self::$instance; + } +} + diff --git a/application/models/Hb.php b/application/models/Zys.php similarity index 72% rename from application/models/Hb.php rename to application/models/Zys.php index fa76ae5..12a5ea9 100644 --- a/application/models/Hb.php +++ b/application/models/Zys.php @@ -1,6 +1,6 @@ * + * * +\**********************************************************/ +class BytesIO { + protected $buffer; + protected $length; + protected $pos = 0; + protected $mark = -1; + public function __construct($string = '') { + $this->buffer = $string; + $this->length = strlen($string); + } + public function close() { + $this->buffer = ''; + $this->pos = 0; + $this->mark = -1; + $this->length = 0; + } + public function length() { + return $this->length; + } + public function getc() { + if ($this->pos < $this->length) { + return $this->buffer[$this->pos++]; + } + return ''; + } + public function read($n) { + $s = substr($this->buffer, $this->pos, $n); + $this->skip($n); + return $s; + } + public function readfull() { + $s = substr($this->buffer, $this->pos); + $this->pos = $this->length; + return $s; + } + public function readuntil($tag) { + $pos = strpos($this->buffer, $tag, $this->pos); + if ($pos !== false) { + $s = substr($this->buffer, $this->pos, $pos - $this->pos); + $this->pos = $pos + strlen($tag); + } + else { + $s = substr($this->buffer, $this->pos); + $this->pos = $this->length; + } + return $s; + } + public function readString($n) { + $pos = $this->pos; + $buffer = $this->buffer; + for ($i = 0; $i < $n; ++$i) { + switch (ord($buffer[$pos]) >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: { + // 0xxx xxxx + ++$pos; + break; + } + case 12: + case 13: { + // 110x xxxx 10xx xxxx + $pos += 2; + break; + } + case 14: { + // 1110 xxxx 10xx xxxx 10xx xxxx + $pos += 3; + break; + } + case 15: { + // 1111 0xxx 10xx xxxx 10xx xxxx 10xx xxxx + $pos += 4; + ++$i; + if ($i >= $n) { + throw new Exception('bad utf-8 encoding'); + } + break; + } + default: { + throw new Exception('bad utf-8 encoding'); + } + } + } + return $this->read($pos - $this->pos); + } + public function mark() { + $this->mark = $this->pos; + } + public function unmark() { + $this->mark = -1; + } + public function reset() { + if ($this->mark != -1) { + $this->pos = $this->mark; + } + } + public function skip($n) { + $this->pos += $n; + } + public function eof() { + return ($this->pos >= $this->length); + } + public function write($str, $n = -1) { + if ($n == -1) { + $this->buffer .= $str; + $n = strlen($str); + } + else { + $this->buffer .= substr($str, 0, $n); + } + $this->length += $n; + } + public function load($filename) { + $str = file_get_contents($filename); + if ($str === false) return false; + $this->buffer = $str; + $this->pos = 0; + $this->mark = -1; + $this->length = strlen($str); + return true; + } + public function save($filename) { + return file_put_contents($filename, $this->buffer); + } + public function toString() { + return $this->buffer; + } + public function __toString() { + return $this->buffer; + } +} diff --git a/hprose/lib/Client.php b/hprose/lib/Client.php new file mode 100644 index 0000000..b00335f --- /dev/null +++ b/hprose/lib/Client.php @@ -0,0 +1,107 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Socket_Client.php'; +require_once __DIR__ . '/HalfDuplexTransporter.php'; +require_once __DIR__ . '/FullDuplexTransporter.php'; +class Client extends Socket_Client { + private $hdtrans; + private $fdtrans; + public $fullDuplex = false; + public $readBuffer = 8192; + public $writeBuffer = 8192; + public $maxPoolSize = 10; + public $noDelay = true; + public $keepAlive = true; + public $options = null; + public function __construct($uris = null, $async = true) { + parent::__construct($uris, $async); + $this->hdtrans = new HalfDuplexTransporter($this, $async); + $this->fdtrans = new FullDuplexTransporter($this, $async); + } + public function __destruct() { + try { + $this->loop(); + } + catch (\Exception $e) { + } + } + public function isFullDuplex() { + return $this->fullDuplex; + } + public function setFullDuplex($fullDuplex) { + $this->fullDuplex = $fullDuplex; + } + public function getReadBuffer() { + return $this->readBuffer; + } + public function setReadBuffer($size) { + $this->readBuffer = $size; + } + public function getWriteBuffer() { + return $this->writeBuffer; + } + public function setWriteBuffer($size) { + $this->writeBuffer = $size; + } + public function getMaxPoolSize() { + return $this->maxPoolSize; + } + public function setMaxPoolSize($maxPoolSize) { + if ($maxPoolSize < 1) throw new Exception("maxPoolSize must be great than 0"); + $this->maxPoolSize = $maxPoolSize; + } + public function setNoDelay($value) { + $this->noDelay = $value; + } + public function isNoDelay() { + return $this->noDelay; + } + public function setKeepAlive($value) { + $this->keepAlive = $value; + } + public function isKeepAlive() { + return $this->keepAlive; + } + protected function sendAndReceive($request, stdClass $context) { + if ($this->fullDuplex) { + return $this->fdtrans->sendAndReceive($request, $context); + } + return $this->hdtrans->sendAndReceive($request, $context); + } + public function getOptions() { + return $this->options; + } + public function setOptions(array $options) { + $this->options = $options; + } + public function set($key, $value) { + $this->options[$key] = $value; + return $this; + } + public function loop() { + if ($this->fullDuplex) { + $this->fdtrans->loop(); + } + else { + $this->hdtrans->loop(); + } + } +} diff --git a/hprose/lib/FakeReaderRefer.php b/hprose/lib/FakeReaderRefer.php new file mode 100644 index 0000000..9f41615 --- /dev/null +++ b/hprose/lib/FakeReaderRefer.php @@ -0,0 +1,30 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/ReaderRefer.php'; +class FakeReaderRefer implements ReaderRefer { + public function set($val) {} + public function read($index) { + throw new Exception("Unexpected serialize tag '" . + Tags::TagRef . + "' in stream"); + } + public function reset() {} +} diff --git a/hprose/lib/FakeWriterRefer.php b/hprose/lib/FakeWriterRefer.php new file mode 100644 index 0000000..8165fb7 --- /dev/null +++ b/hprose/lib/FakeWriterRefer.php @@ -0,0 +1,26 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/WriterRefer.php'; +class FakeWriterRefer implements WriterRefer { + public function set($val) {} + public function write(BytesIO $stream, $val) { return false; } + public function reset() {} +} diff --git a/hprose/lib/FullDuplexTransporter.php b/hprose/lib/FullDuplexTransporter.php new file mode 100644 index 0000000..9fb219e --- /dev/null +++ b/hprose/lib/FullDuplexTransporter.php @@ -0,0 +1,102 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Transporter.php'; +class FullDuplexTransporter extends Transporter { + private $id = 0; + private function getId() { + return $this->id++; + } + protected function appendHeader($request, $id = null) { + if ($id === null) { + $id = $this->getId(); + } + return pack("NN", strlen($request) | 0x80000000, $id) . $request; + } + protected function createRequest($index, $request) { + $id = $this->getId(); + $buffer = $this->appendHeader($request, $id); + return new DataBuffer($index, $buffer, strlen($buffer), $id); + } + protected function afterWrite($request, $stream, $o) { + $response = new DataBuffer($request->index, '', 0, $request->id); + $stream_id = (integer)$stream; + unset($o->requests[$stream_id]); + if (empty($o->queue[$stream_id])) { + $o->queue[$stream_id] = array(); + $o->readpool[] = $stream; + } + $o->queue[$stream_id][$request->id] = $response; + } + protected function asyncReadError($o, $stream, $index = -1) { + $stream_id = (integer)$stream; + foreach ($o->queue[$stream_id] as $response) { + $index = $response->index; + $o->results[$index]->reject($this->getLastError('response read error')); + $this->free($o, $index); + } + unset($o->queue[$stream_id]); + unset($o->responses[$stream_id]); + @fclose($stream); + $this->removeStream($stream, $o->readpool); + $this->removeStream($stream, $o->writepool); + } + private function getHeaderInfo($stream) { + $header = $this->readHeader($stream, 8); + if ($header === false) return false; + list(, $length, $id) = unpack('N*', $header); + $length &= 0x7FFFFFFF; + return array($length, $id); + } + protected function getBodyLength($stream) { + $headerInfo = $this->getHeaderInfo($stream); + if ($headerInfo === false) return false; + return $headerInfo[0]; + } + protected function getResponse($stream, $o) { + $stream_id = (integer)$stream; + if (isset($o->responses[$stream_id])) { + $response = $o->responses[$stream_id]; + } + else { + $headerInfo = $this->getHeaderInfo($stream); + if ($headerInfo === false) return false; + $id = $headerInfo[1]; + if (isset($o->queue[$stream_id][$id])) { + $response = $o->queue[$stream_id][$id]; + } + else { + $response = new DataBuffer(-1, '', 0, $id); + } + $response->length = $headerInfo[0]; + $o->responses[$stream_id] = $response; + } + return $response; + } + protected function afterRead($stream, $o, $response) { + $stream_id = (integer)$stream; + if (isset($o->queue[$stream_id][$response->id])) { + unset($o->queue[$stream_id][$response->id]); + } + if (empty($o->queue[$stream_id])) { + $this->removeStream($stream, $o->readpool); + } + } +} \ No newline at end of file diff --git a/hprose/lib/Future.php b/hprose/lib/Future.php new file mode 100644 index 0000000..2948bb0 --- /dev/null +++ b/hprose/lib/Future.php @@ -0,0 +1,309 @@ + * + * * +\**********************************************************/ +class Future { + const PENDING = 0; + const FULFILLED = 1; + const REJECTED = 2; + + public $state = Future::PENDING; + public $value; + public $reason; + private $subscribers = array(); + + public function __construct($computation = NULL) { + if (is_callable($computation)) { + try { + $this->resolve(call_user_func($computation)); + } + catch (UncatchableException $e) { + throw $e->getPrevious(); + } + catch (Exception $e) { + $this->reject($e); + } + catch (Throwable $e) { + $this->reject($e); + } + } + } + + private function privateCall($callback, $next, $x) { + try { + $r = call_user_func($callback, $x); + $next->resolve($r); + } + catch (UncatchableException $e) { + throw $e->getPrevious(); + } + catch (Exception $e) { + $next->reject($e); + } + catch (Throwable $e) { + $next->reject($e); + } + } + + private function privateResolve($onfulfill, $next, $x) { + if (is_callable($onfulfill)) { + $this->privateCall($onfulfill, $next, $x); + } + else { + $next->resolve($x); + } + } + + private function privateReject($onreject, $next, $e) { + if (is_callable($onreject)) { + $this->privateCall($onreject, $next, $e); + } + else { + $next->reject($e); + } + } + + public function resolve($value) { + if ($value === $this) { + $this->reject(new TypeError('Self resolution')); + return; + } + if (isFuture($value)) { + $value->fill($this); + return; + } + if (($value !== NULL) and is_object($value) or is_string($value)) { + if (method_exists($value, 'then')) { + $then = array($value, 'then'); + $notrun = true; + $self = $this; + try { + call_user_func($then, + function($y) use (&$notrun, $self) { + if ($notrun) { + $notrun = false; + $self->resolve($y); + } + }, + function($r) use (&$notrun, $self) { + if ($notrun) { + $notrun = false; + $self->reject($r); + } + } + ); + } + catch (UncatchableException $e) { + throw $e->getPrevious(); + } + catch (Exception $e) { + if ($notrun) { + $notrun = false; + $this->reject($e); + } + } + catch (Throwable $e) { + if ($notrun) { + $notrun = false; + $this->reject($e); + } + } + return; + } + } + if ($this->state === self::PENDING) { + $this->state = self::FULFILLED; + $this->value = $value; + while (count($this->subscribers) > 0) { + $subscriber = array_shift($this->subscribers); + $this->privateResolve( + $subscriber['onfulfill'], + $subscriber['next'], + $value); + } + } + } + + public function reject($reason) { + if ($this->state === self::PENDING) { + $this->state = self::REJECTED; + $this->reason = $reason; + while (count($this->subscribers) > 0) { + $subscriber = array_shift($this->subscribers); + $this->privateReject( + $subscriber['onreject'], + $subscriber['next'], + $reason); + } + } + } + + public function then($onfulfill, $onreject = NULL) { + if (!is_callable($onfulfill)) { $onfulfill = NULL; } + if (!is_callable($onreject)) { $onreject = NULL; } + $next = new Future(); + if ($this->state === self::FULFILLED) { + $this->privateResolve($onfulfill, $next, $this->value); + } + elseif ($this->state === self::REJECTED) { + $this->privateReject($onreject, $next, $this->reason); + } + else { + array_push($this->subscribers, array( + 'onfulfill' => $onfulfill, + 'onreject' => $onreject, + 'next' => $next + )); + } + return $next; + } + + public function done($onfulfill, $onreject = NULL) { + $this->then($onfulfill, $onreject)->then(NULL, function($error) { + throw new UncatchableException("", 0, $error); + }); + } + + public function inspect() { + switch ($this->state) { + case self::PENDING: return array('state' => 'pending'); + case self::FULFILLED: return array('state' => 'fulfilled', 'value' => $this->value); + case self::REJECTED: return array('state' => 'rejected', 'reason' => $this->reason); + } + } + + public function catchError($onreject, $test = NULL) { + if (is_callable($test)) { + $self = $this; + return $this->then(NULL, + function($e) use ($self, $onreject, $test) { + if (call_user_func($test, $e)) { + return $self->then(NULL, $onreject); + } + else { + throw $e; + } + } + ); + } + return $this->then(NULL, $onreject); + } + + public function fail($onreject) { + $this->done(NULL, $onreject); + } + + public function whenComplete($action) { + return $this->then( + function($v) use ($action) { + call_user_func($action); + return $v; + }, + function($e) use ($action) { + call_user_func($action); + throw $e; + } + ); + } + + public function complete($oncomplete) { + return $this->then($oncomplete, $oncomplete); + } + + public function always($oncomplete) { + $this->done($oncomplete, $oncomplete); + } + + public function fill($future) { + $this->then(array($future, 'resolve'), array($future, 'reject')); + } + + public function tap($onfulfilledSideEffect) { + return $this->then( + function($result) use ($onfulfilledSideEffect) { + call_user_func($onfulfilledSideEffect, $result); + return $result; + } + ); + } + + public function spread($onfulfilledArray) { + return $this->then( + function($array) use ($onfulfilledArray) { + return call_user_func_array($onfulfilledArray, $array); + } + ); + } + + public function __get($key) { + return $this->then( + function($result) use ($key) { + return $result->$key; + } + ); + } + + public function __call($method, $args) { + if ($args === NULL) { + $args = array(); + } + return $this->then( + function($result) use ($method, $args) { + return all($args)->then( + function($args) use ($result, $method) { + return call_user_func_array(array($result, $method), $args); + } + ); + } + ); + } + + public function each($callback) { + return each($this, $callback); + } + + public function every($callback) { + return every($this, $callback); + } + + public function some($callback) { + return some($this, $callback); + } + + public function filter($callback, $preserveKeys = false) { + return filter($this, $callback, $preserveKeys); + } + + public function map($callback) { + return map($this, $callback); + } + + public function reduce($callback, $initial = NULL) { + return reduce($this, $callback, $initial); + } + + public function search($searchElement, $strict = false) { + return search($this, $searchElement, $strict); + } + + public function includes($searchElement, $strict = false) { + return includes($this, $searchElement, $strict); + } + +} diff --git a/hprose/lib/HalfDuplexTransporter.php b/hprose/lib/HalfDuplexTransporter.php new file mode 100644 index 0000000..32ab37c --- /dev/null +++ b/hprose/lib/HalfDuplexTransporter.php @@ -0,0 +1,68 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Transporter.php'; +class HalfDuplexTransporter extends Transporter { + protected function appendHeader($request) { + return pack("N", strlen($request)) . $request; + } + protected function createRequest($index, $request) { + $buffer = $this->appendHeader($request); + return new DataBuffer($index, $buffer, strlen($buffer)); + } + protected function afterWrite($request, $stream, $o) { + $stream_id = (integer)$stream; + $o->responses[$stream_id] = new DataBuffer($request->index, '', 0); + $o->readpool[] = $stream; + unset($o->requests[$stream_id]); + $this->removeStream($stream, $o->writepool); + } + protected function asyncReadError($o, $stream, $index) { + if (isset($o->results[$index])) { + $o->results[$index] + ->reject($this->getLastError('response read error')); + $this->free($o, $index); + } + unset($o->responses[(integer)$stream]); + @fclose($stream); + $this->removeStream($stream, $o->readpool); + } + protected function getBodyLength($stream) { + $header = $this->readHeader($stream, 4); + if ($header === false) return false; + list(, $length) = unpack('N', $header); + return $length; + } + protected function getResponse($stream, $o) { + $stream_id = (integer)$stream; + $response = $o->responses[$stream_id]; + if ($response->length === 0) { + $length = $this->getBodyLength($stream); + $response->length = $length; + } + return $response; + } + protected function afterRead($stream, $o, $response) { + if ($o->current < $o->count) { + $o->writepool[] = $stream; + } + $this->removeStream($stream, $o->readpool); + } +} \ No newline at end of file diff --git a/hprose/lib/HandlerManager.php b/hprose/lib/HandlerManager.php new file mode 100644 index 0000000..f20aa35 --- /dev/null +++ b/hprose/lib/HandlerManager.php @@ -0,0 +1,134 @@ + * + * * +\**********************************************************/ + +abstract class HandlerManager { + private $invokeHandlers = array(); + private $beforeFilterHandlers = array(); + private $afterFilterHandlers = array(); + private $defaultInvokeHandler; + private $defaultBeforeFilterHandler; + private $defaultAfterFilterHandler; + protected $invokeHandler; + protected $beforeFilterHandler; + protected $afterFilterHandler; + public function __construct() { + $self = $this; + $this->defaultInvokeHandler = function(/*string*/ $name, array &$args, stdClass $context) use ($self) { + return $self->invokeHandler($name, $args, $context); + }; + $this->defaultBeforeFilterHandler = function(/*string*/ $request, stdClass $context) use ($self) { + return $self->beforeFilterHandler($request, $context); + }; + $this->defaultAfterFilterHandler = function(/*string*/ $request, stdClass $context) use ($self) { + return $self->afterFilterHandler($request, $context); + }; + $this->invokeHandler = $this->defaultInvokeHandler; + $this->beforeFilterHandler = $this->defaultBeforeFilterHandler; + $this->afterFilterHandler = $this->defaultAfterFilterHandler; + } + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ abstract function invokeHandler(/*string*/ $name, array &$args, stdClass $context); + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ abstract function beforeFilterHandler(/*string*/ $request, stdClass $context); + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ abstract function afterFilterHandler(/*string*/ $request, stdClass $context); + protected function getNextInvokeHandler(Closure $next, /*callable*/ $handler) { + return function(/*string*/ $name, array &$args, stdClass $context) use ($next, $handler) { + try { + $array = array($name, &$args, $context, $next); + $result = call_user_func_array($handler, $array); + if (class_exists("\\Generator")) { + return Future\co($result); + } + else { + return Future\toFuture($result); + } + } + catch (Exception $e) { + return Future\error($e); + } + catch (Throwable $e) { + return Future\error($e); + } + }; + } + protected function getNextFilterHandler(Closure $next, /*callable*/ $handler) { + return function(/*string*/ $request, stdClass $context) use ($next, $handler) { + try { + $result = call_user_func($handler, $request, $context, $next); + if (class_exists("\\Generator")) { + return Future\co($result); + } + else { + return Future\toFuture($result); + } + } + catch (Exception $e) { + return Future\error($e); + } + catch (Throwable $e) { + return Future\error($e); + } + }; + } + public function addInvokeHandler(/*callable*/ $handler) { + if ($handler == null) return; + $this->invokeHandlers[] = $handler; + $next = $this->defaultInvokeHandler; + for ($i = count($this->invokeHandlers) - 1; $i >= 0; --$i) { + $next = $this->getNextInvokeHandler($next, $this->invokeHandlers[$i]); + } + $this->invokeHandler = $next; + return $this; + } + public function addBeforeFilterHandler(/*callable*/ $handler) { + if ($handler == null) return; + $this->beforeFilterHandlers[] = $handler; + $next = $this->defaultBeforeFilterHandler; + for ($i = count($this->beforeFilterHandlers) - 1; $i >= 0; --$i) { + $next = $this->getNextFilterHandler($next, $this->beforeFilterHandlers[$i]); + } + $this->beforeFilterHandler = $next; + return $this; + } + public function addAfterFilterHandler(/*callable*/ $handler) { + if ($handler == null) return; + $this->afterFilterHandlers[] = $handler; + $next = $this->defaultAfterFilterHandler; + for ($i = count($this->afterFilterHandlers) - 1; $i >= 0; --$i) { + $next = $this->getNextFilterHandler($next, $this->afterFilterHandlers[$i]); + } + $this->afterFilterHandler = $next; + return $this; + } +} diff --git a/hprose/lib/InvokeSettings.php b/hprose/lib/InvokeSettings.php new file mode 100644 index 0000000..5fe4cca --- /dev/null +++ b/hprose/lib/InvokeSettings.php @@ -0,0 +1,55 @@ + * + * * +\**********************************************************/ +class InvokeSettings implements ArrayAccess { + public $settings; + public function __construct(array $settings = array()) { + if ($settings !== null) { + $this->settings = $settings; + } + else { + $this->settings = array(); + } + } + public function __set($name, $value) { + $this->settings[$name] = $value; + } + public function __get($name) { + return isset($this->settings[$name]) ? $this->settings[$name] : null; + } + public function __isset($name) { + return isset($this->settings[$name]); + } + public function __unset($name) { + unset($this->settings[$name]); + } + public function offsetSet($offset, $value) { + $this->settings[$offset] = $value; + } + public function offsetGet($offset) { + return isset($this->settings[$offset]) ? $this->settings[$offset] : null; + } + public function offsetExists($offset) { + return isset($this->settings[$offset]); + } + public function offsetUnset($offset) { + unset($this->settings[$offset]); + } +} diff --git a/hprose/lib/RawReader.php b/hprose/lib/RawReader.php new file mode 100644 index 0000000..140b9de --- /dev/null +++ b/hprose/lib/RawReader.php @@ -0,0 +1,153 @@ + * + * * +\**********************************************************/ +class RawReader { + public $stream; + public function __construct(BytesIO $stream) { + $this->stream = $stream; + } + public function unexpectedTag($tag, $expectTags = '') { + if ($tag && $expectTags) { + return new Exception("Tag '" . $expectTags . "' expected, but '" . $tag . "' found in stream"); + } + else if ($tag) { + return new Exception("Unexpected serialize tag '" . $tag . "' in stream"); + } + else { + return new Exception('No byte found in stream'); + } + } + public function readRaw() { + $ostream = new BytesIO(); + $this->privateReadRaw($ostream); + return $ostream; + } + + private function privateReadRaw(BytesIO $ostream, $tag = '') { + if ($tag == '') { + $tag = $this->stream->getc(); + } + $ostream->write($tag); + switch ($tag) { + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + case Tags::TagNull: + case Tags::TagEmpty: + case Tags::TagTrue: + case Tags::TagFalse: + case Tags::TagNaN: + break; + case Tags::TagInfinity: + $ostream->write($this->stream->getc()); + break; + case Tags::TagInteger: + case Tags::TagLong: + case Tags::TagDouble: + case Tags::TagRef: + $this->readNumberRaw($ostream); + break; + case Tags::TagDate: + case Tags::TagTime: + $this->readDateTimeRaw($ostream); + break; + case Tags::TagUTF8Char: + $this->readUTF8CharRaw($ostream); + break; + case Tags::TagBytes: + $this->readBytesRaw($ostream); + break; + case Tags::TagString: + $this->readStringRaw($ostream); + break; + case Tags::TagGuid: + $this->readGuidRaw($ostream); + break; + case Tags::TagList: + case Tags::TagMap: + case Tags::TagObject: + $this->readComplexRaw($ostream); + break; + case Tags::TagClass: + $this->readComplexRaw($ostream); + $this->privateReadRaw($ostream); + break; + case Tags::TagError: + $this->privateReadRaw($ostream); + break; + default: throw $this->unexpectedTag($tag); + } + } + + private function readNumberRaw(BytesIO $ostream) { + $s = $this->stream->readuntil(Tags::TagSemicolon) . + Tags::TagSemicolon; + $ostream->write($s); + } + + private function readDateTimeRaw(BytesIO $ostream) { + $s = ''; + do { + $tag = $this->stream->getc(); + $s .= $tag; + } while ($tag != Tags::TagSemicolon && + $tag != Tags::TagUTC); + $ostream->write($s); + } + + private function readUTF8CharRaw(BytesIO $ostream) { + $ostream->write($this->stream->readString(1)); + } + + private function readBytesRaw(BytesIO $ostream) { + $len = $this->stream->readuntil(Tags::TagQuote); + $s = $len . Tags::TagQuote . $this->stream->read((int)$len) . Tags::TagQuote; + $this->stream->skip(1); + $ostream->write($s); + } + + private function readStringRaw(BytesIO $ostream) { + $len = $this->stream->readuntil(Tags::TagQuote); + $s = $len . Tags::TagQuote . $this->stream->readString((int)$len) . Tags::TagQuote; + $this->stream->skip(1); + $ostream->write($s); + } + + private function readGuidRaw(BytesIO $ostream) { + $ostream->write($this->stream->read(38)); + } + + private function readComplexRaw(BytesIO $ostream) { + $s = $this->stream->readuntil(Tags::TagOpenbrace) . + Tags::TagOpenbrace; + $ostream->write($s); + while (($tag = $this->stream->getc()) != Tags::TagClosebrace) { + $this->privateReadRaw($ostream, $tag); + } + $ostream->write($tag); + } +} diff --git a/hprose/lib/Reader.php b/hprose/lib/Reader.php new file mode 100644 index 0000000..380eb65 --- /dev/null +++ b/hprose/lib/Reader.php @@ -0,0 +1,473 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/RawReader.php'; +require_once __DIR__ . '/FakeReaderRefer.php'; +require_once __DIR__ . '/RealReaderRefer.php'; +class Reader extends RawReader { + private $classref; + private $refer; + public function __construct(BytesIO $stream, $simple = false) { + parent::__construct($stream); + $this->classref = array(); + $this->refer = $simple ? new FakeReaderRefer() : new RealReaderRefer(); + } + public function unserialize() { + $tag = $this->stream->getc(); + switch ($tag) { + case '0': return 0; + case '1': return 1; + case '2': return 2; + case '3': return 3; + case '4': return 4; + case '5': return 5; + case '6': return 6; + case '7': return 7; + case '8': return 8; + case '9': return 9; + case Tags::TagInteger: return $this->readIntegerWithoutTag(); + case Tags::TagLong: return $this->readLongWithoutTag(); + case Tags::TagDouble: return $this->readDoubleWithoutTag(); + case Tags::TagNull: return null; + case Tags::TagEmpty: return ''; + case Tags::TagTrue: return true; + case Tags::TagFalse: return false; + case Tags::TagNaN: return log(-1); + case Tags::TagInfinity: return $this->readInfinityWithoutTag(); + case Tags::TagDate: return $this->readDateWithoutTag(); + case Tags::TagTime: return $this->readTimeWithoutTag(); + case Tags::TagBytes: return $this->readBytesWithoutTag(); + case Tags::TagUTF8Char: return $this->readUTF8CharWithoutTag(); + case Tags::TagString: return $this->readStringWithoutTag(); + case Tags::TagGuid: return $this->readGuidWithoutTag(); + case Tags::TagList: return $this->readListWithoutTag(); + case Tags::TagMap: return $this->readMapWithoutTag(); + case Tags::TagClass: $this->readClass(); return $this->readObject(); + case Tags::TagObject: return $this->readObjectWithoutTag(); + case Tags::TagRef: return $this->readRef(); + case Tags::TagError: throw new Exception($this->privateReadString()); + default: throw $this->unexpectedTag($tag); + } + } + private function unserializeKey() { + $tag = $this->stream->getc(); + switch ($tag) { + case '0': return 0; + case '1': return 1; + case '2': return 2; + case '3': return 3; + case '4': return 4; + case '5': return 5; + case '6': return 6; + case '7': return 7; + case '8': return 8; + case '9': return 9; + case Tags::TagInteger: return $this->readIntegerWithoutTag(); + case Tags::TagLong: + case Tags::TagDouble: return $this->readLongWithoutTag(); + case Tags::TagNull: return 'null'; + case Tags::TagEmpty: return ''; + case Tags::TagTrue: return 'true'; + case Tags::TagFalse: return 'false'; + case Tags::TagNaN: return (string)log(-1); + case Tags::TagInfinity: return (string)$this->readInfinityWithoutTag(); + case Tags::TagBytes: return $this->readBytesWithoutTag(); + case Tags::TagUTF8Char: return $this->readUTF8CharWithoutTag(); + case Tags::TagString: return $this->readStringWithoutTag(); + case Tags::TagGuid: return $this->readGuidWithoutTag(); + case Tags::TagRef: return (string)$this->readRef(); + case Tags::TagError: throw new Exception($this->privateReadString()); + default: throw $this->unexpectedTag($tag); + } + } + public function checkTag($expectTag, $tag = null) { + if ($tag === null) { + $tag = $this->stream->getc(); + } + if ($tag != $expectTag) { + throw $this->unexpectedTag($tag, $expectTag); + } + } + public function checkTags($expectTags, $tag = null) { + if ($tag === null) { + $tag = $this->stream->getc(); + } + if (!strchr($expectTags, $tag)) { + throw $this->unexpectedTag($tag, $expectTags); + } + return $tag; + } + public function readIntegerWithoutTag() { + return (int)($this->stream->readuntil(Tags::TagSemicolon)); + } + public function readInteger() { + $tag = $this->stream->getc(); + switch ($tag) { + case '0': return 0; + case '1': return 1; + case '2': return 2; + case '3': return 3; + case '4': return 4; + case '5': return 5; + case '6': return 6; + case '7': return 7; + case '8': return 8; + case '9': return 9; + case Tags::TagInteger: return $this->readIntegerWithoutTag(); + default: throw $this->unexpectedTag($tag); + } + } + public function readLongWithoutTag() { + return $this->stream->readuntil(Tags::TagSemicolon); + } + public function readLong() { + $tag = $this->stream->getc(); + switch ($tag) { + case '0': return '0'; + case '1': return '1'; + case '2': return '2'; + case '3': return '3'; + case '4': return '4'; + case '5': return '5'; + case '6': return '6'; + case '7': return '7'; + case '8': return '8'; + case '9': return '9'; + case Tags::TagInteger: + case Tags::TagLong: return $this->readLongWithoutTag(); + default: throw $this->unexpectedTag($tag); + } + } + public function readDoubleWithoutTag() { + return (float)($this->stream->readuntil(Tags::TagSemicolon)); + } + public function readDouble() { + $tag = $this->stream->getc(); + switch ($tag) { + case '0': return 0.0; + case '1': return 1.0; + case '2': return 2.0; + case '3': return 3.0; + case '4': return 4.0; + case '5': return 5.0; + case '6': return 6.0; + case '7': return 7.0; + case '8': return 8.0; + case '9': return 9.0; + case Tags::TagInteger: + case Tags::TagLong: + case Tags::TagDouble: return $this->readDoubleWithoutTag(); + case Tags::TagNaN: return log(-1); + case Tags::TagInfinity: return $this->readInfinityWithoutTag(); + default: throw $this->unexpectedTag($tag); + } + } + public function readNaN() { + $this->checkTag(Tags::TagNaN); + return log(-1); + } + public function readInfinityWithoutTag() { + return (($this->stream->getc() === Tags::TagNeg) ? log(0) : -log(0)); + } + public function readInfinity() { + $this->checkTag(Tags::TagInfinity); + return $this->readInfinityWithoutTag(); + } + public function readNull() { + $this->checkTag(Tags::TagNull); + return null; + } + public function readEmpty() { + $this->checkTag(Tags::TagEmpty); + return ''; + } + public function readBoolean() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagTrue: return true; + case Tags::TagFalse: return false; + default: throw $this->unexpectedTag($tag); + } + } + public function readDateWithoutTag() { + $ymd = $this->stream->read(8); + $hms = '000000'; + $u = '000000'; + $tag = $this->stream->getc(); + if ($tag == Tags::TagTime) { + $hms = $this->stream->read(6); + $tag = $this->stream->getc(); + if ($tag == Tags::TagPoint) { + $u = $this->stream->read(3); + $tag = $this->stream->getc(); + if (($tag >= '0') && ($tag <= '9')) { + $u .= $tag . $this->stream->read(2); + $tag = $this->stream->getc(); + if (($tag >= '0') && ($tag <= '9')) { + $this->stream->skip(2); + $tag = $this->stream->getc(); + } + } + else { + $u .= '000'; + } + } + } + if ($tag == Tags::TagUTC) { + $date = date_create_from_format('YmdHisu', $ymd.$hms.$u, timezone_open('UTC')); + } + else { + $date = date_create_from_format('YmdHisu', $ymd.$hms.$u); + } + $this->refer->set($date); + return $date; + } + public function readDate() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: return null; + case Tags::TagDate: return $this->readDateWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readTimeWithoutTag() { + $hms = $this->stream->read(6); + $u = '000000'; + $tag = $this->stream->getc(); + if ($tag == Tags::TagPoint) { + $u = $this->stream->read(3); + $tag = $this->stream->getc(); + if (($tag >= '0') && ($tag <= '9')) { + $u .= $tag . $this->stream->read(2); + $tag = $this->stream->getc(); + if (($tag >= '0') && ($tag <= '9')) { + $this->stream->skip(2); + $tag = $this->stream->getc(); + } + } + else { + $u .= '000'; + } + } + if ($tag == Tags::TagUTC) { + $time = date_create_from_format('!Hisu', $hms.$u, timezone_open('UTC')); + } + else { + $time = date_create_from_format('!Hisu', $hms.$u); + } + $this->refer->set($time); + return $time; + } + public function readTime() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: return null; + case Tags::TagTime: return $this->readTimeWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readBytesWithoutTag() { + $count = (int)($this->stream->readuntil(Tags::TagQuote)); + $bytes = $this->stream->read($count); + $this->stream->skip(1); + $this->refer->set($bytes); + return $bytes; + } + public function readBytes() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: return null; + case Tags::TagEmpty: return ''; + case Tags::TagBytes: return $this->readBytesWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readUTF8CharWithoutTag() { + return $this->stream->readString(1); + } + public function readUTF8Char() { + $this->checkTag(Tags::TagUTF8Char); + return $this->readUTF8CharWithoutTag(); + } + private function privateReadStringWithoutTag() { + $len = (int)$this->stream->readuntil(Tags::TagQuote); + $s = $this->stream->readString($len); + $this->stream->skip(1); + return $s; + } + public function readStringWithoutTag() { + $s = $this->privateReadStringWithoutTag(); + $this->refer->set($s); + return $s; + } + private function privateReadString() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagUTF8Char: return $this->readUTF8CharWithoutTag(); + case Tags::TagString: return $this->readStringWithoutTag(); + case Tags::TagRef: return (string)$this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readString() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: return null; + case Tags::TagEmpty: return ''; + case Tags::TagUTF8Char: return $this->readUTF8CharWithoutTag(); + case Tags::TagString: return $this->readStringWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readGuidWithoutTag() { + $this->stream->skip(1); + $s = $this->stream->read(36); + $this->stream->skip(1); + $this->refer->set($s); + return $s; + } + public function readGuid() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: return null; + case Tags::TagGuid: return $this->readGuidWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readListWithoutTag() { + $list = array(); + $this->refer->set($list); + $count = (int)$this->stream->readuntil(Tags::TagOpenbrace); + for ($i = 0; $i < $count; ++$i) { + $list[$i] = $this->unserialize(); + } + $this->stream->skip(1); + return $list; + } + public function readList() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: $result = null; return $result; + case Tags::TagList: return $this->readListWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readMapWithoutTag() { + $map = array(); + $this->refer->set($map); + $count = (int)$this->stream->readuntil(Tags::TagOpenbrace); + for ($i = 0; $i < $count; ++$i) { + $key = $this->unserializeKey(); + $map[$key] = $this->unserialize(); + } + $this->stream->skip(1); + return $map; + } + public function readMap() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: $result = null; return $result; + case Tags::TagMap: return $this->readMapWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + public function readObjectWithoutTag() { + $index = (int)$this->stream->readuntil(Tags::TagOpenbrace); + list($classname, $props) = $this->classref[$index]; + if ($classname == 'stdClass') { + $object = new stdClass(); + $this->refer->set($object); + foreach ($props as $prop) { + $object->$prop = $this->unserialize(); + } + } + else { + $reflector = new ReflectionClass($classname); + if ($reflector->getConstructor() === null) { + if (version_compare(PHP_VERSION, '5.4.0', '>=')) { + $object = $reflector->newInstanceWithoutConstructor(); + } + else { + $object = new $classname(); + } + } + else { + $object = $reflector->newInstance(); + } + $this->refer->set($object); + foreach ($props as $prop) { + $value = $this->unserialize(); + if ($reflector->hasProperty($prop)) { + $property = $reflector->getProperty($prop); + $property->setAccessible(true); + $property->setValue($object, $value); + } + else { + $p = strtoupper($prop[0]) . substr($prop, 1); + if ($reflector->hasProperty($p)) { + $property = $reflector->getProperty($p); + $property->setAccessible(true); + $property->setValue($object, $value); + } + else { + $object->$prop = $value; + } + } + } + } + $this->stream->skip(1); + return $object; + } + public function readObject() { + $tag = $this->stream->getc(); + switch ($tag) { + case Tags::TagNull: return null; + case Tags::TagClass: $this->readclass(); return $this->readObject(); + case Tags::TagObject: return $this->readObjectWithoutTag(); + case Tags::TagRef: return $this->readRef(); + default: throw $this->unexpectedTag($tag); + } + } + protected function readClass() { + $classname = ClassManager::getClass($this->privateReadStringWithoutTag()); + $count = (int)$this->stream->readuntil(Tags::TagOpenbrace); + $props = new SplFixedArray($count); + for ($i = 0; $i < $count; ++$i) { + $props[$i] = $this->privateReadString(); + } + $this->stream->skip(1); + $this->classref[] = array($classname, $props); + } + protected function readRef() { + return $this->refer->read((int)$this->stream->readuntil(Tags::TagSemicolon)); + } + public function reset() { + $this->classref = array(); + $this->refer->reset(); + } +} + diff --git a/hprose/lib/ReaderRefer.php b/hprose/lib/ReaderRefer.php new file mode 100644 index 0000000..d186794 --- /dev/null +++ b/hprose/lib/ReaderRefer.php @@ -0,0 +1,26 @@ + * + * * +\**********************************************************/ + +interface ReaderRefer { + public function set($val); + public function read($index); + public function reset(); +} diff --git a/hprose/lib/RealReaderRefer.php b/hprose/lib/RealReaderRefer.php new file mode 100644 index 0000000..8c835a2 --- /dev/null +++ b/hprose/lib/RealReaderRefer.php @@ -0,0 +1,35 @@ + * + * * +\**********************************************************/ +class RealReaderRefer implements ReaderRefer { + private $ref; + public function __construct() { + $this->reset(); + } + public function set($val) { + $this->ref[] = $val; + } + public function read($index) { + return $this->ref[$index]; + } + public function reset() { + $this->ref = array(); + } +} diff --git a/hprose/lib/RealWriterRefer.php b/hprose/lib/RealWriterRefer.php new file mode 100644 index 0000000..008957b --- /dev/null +++ b/hprose/lib/RealWriterRefer.php @@ -0,0 +1,58 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/WriterRefer.php'; +class RealWriterRefer implements WriterRefer { + private $oref; + private $sref = array(); + private $refcount = 0; + + public function __construct() { + $this->oref = new SplObjectStorage(); + } + + private function writeRef(BytesIO $stream, $index) { + $stream->write(Tags::TagRef . $index . Tags::TagSemicolon); + return true; + } + public function set($val) { + if (is_string($val)) { + $this->sref[$val] = $this->refcount; + } + elseif (is_object($val)) { + $this->oref->attach($val, $this->refcount); + } + $this->refcount++; + } + public function write(BytesIO $stream, $val) { + if (is_string($val) && isset($this->sref[$val])) { + return $this->writeRef($stream, $this->sref[$val]); + } + elseif (is_object($val) && isset($this->oref[$val])) { + return $this->writeRef($stream, $this->oref[$val]); + } + return false; + } + public function reset() { + $this->oref = new \SplObjectStorage(); + $this->sref = array(); + $this->refcount = 0; + } +} diff --git a/hprose/lib/ResultMode.php b/hprose/lib/ResultMode.php new file mode 100644 index 0000000..396774e --- /dev/null +++ b/hprose/lib/ResultMode.php @@ -0,0 +1,26 @@ + * + * * +\**********************************************************/ +class ResultMode { + const Normal = 0; + const Serialized = 1; + const Raw = 2; + const RawWithEndTag = 3; +} diff --git a/hprose/lib/Server.php b/hprose/lib/Server.php new file mode 100644 index 0000000..45474a7 --- /dev/null +++ b/hprose/lib/Server.php @@ -0,0 +1,97 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Service.php'; +class Server extends Service { + public $server; + public $settings = array(); + public $noDelay = true; + private $type; + private function parseUrl($uri) { + $result = new stdClass(); + $p = parse_url($uri); + if ($p) { + switch (strtolower($p['scheme'])) { + case 'tcp': + case 'tcp4': + case 'ssl': + case 'sslv2': + case 'sslv3': + case 'tls': + $result->type = SWOOLE_SOCK_TCP; + $result->host = $p['host']; + $result->port = $p['port']; + break; + case 'tcp6': + $result->type = SWOOLE_SOCK_TCP6; + $result->host = $p['host']; + $result->port = $p['port']; + break; + case 'unix': + $result->type = SWOOLE_UNIX_STREAM; + $result->host = $p['path']; + $result->port = 0; + break; + default: + throw new Exception("Can't support this scheme: {$p['scheme']}"); + } + } + else { + throw new Exception("Can't parse this uri: " . $uri); + } + return $result; + } + public function __construct($uri, $mode = SWOOLE_BASE) { + parent::__construct(); + $url = $this->parseUrl($uri); + $this->type = $url->type; + $this->server = new swoole_server($url->host, $url->port, $mode, $url->type); + } + public function setNoDelay($value) { + $this->noDelay = $value; + } + public function isNoDelay() { + return $this->noDelay; + } + public function set($settings) { + $this->settings = array_replace($this->settings, $settings); + } + public function on($name, $callback) { + $this->server->on($name, $callback); + } + public function addListener($uri) { + $url = $this->parseUrl($uri); + $this->server->addListener($url->host, $url->port, $url->type); + } + public function listen($host, $port, $type = SWOOLE_SOCK_TCP) { + return $this->server->listen($host, $port, $type); + } + public function start() { + if ($this->type !== SWOOLE_UNIX_STREAM) { + $this->settings['open_tcp_nodelay'] = $this->noDelay; + } + $this->settings['open_eof_check'] = false; + $this->settings['open_length_check'] = false; + $this->settings['open_eof_split'] = false; + $this->server->set($this->settings); + $this->socketHandle($this->server); + $this->server->start(); + } +} diff --git a/hprose/lib/Service.php b/hprose/lib/Service.php new file mode 100644 index 0000000..dedc25e --- /dev/null +++ b/hprose/lib/Service.php @@ -0,0 +1,146 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Socket_Service.php'; +require_once __DIR__ . '/Timer.php'; +class Service extends Socket_Service { + const MAX_PACK_LEN = 0x200000; + public $onAccept = null; + public $onClose = null; + public function __construct() { + parent::__construct(); + $this->timer = new Timer(); + } + private function send($server, $socket, $data) { + if ($server->exist($socket)) { + return $server->send($socket, $data); + } + return false; + } + public function socketSend($server, $socket, $data, $id) { + $dataLength = strlen($data); + if ($id === null) { + $this->send($server, $socket, pack("N", $dataLength)); + } + else { + $this->send($server, $socket, pack("NN", $dataLength | 0x80000000, $id)); + } + if ($dataLength <= self::MAX_PACK_LEN) { + return $this->send($server, $socket, $data); + } + else { + for ($i = 0; $i < $dataLength; $i += self::MAX_PACK_LEN) { + if (!$this->send($server, $socket, substr($data, $i, min($dataLength - $i, self::MAX_PACK_LEN)))) { + return false; + } + } + return true; + } + } + public function getOnReceive() { + $self = $this; + $bytes = ''; + $headerLength = 4; + $dataLength = -1; + $id = null; + return function($server, $socket, $fromid, $data) + use ($self, &$bytes, &$headerLength, &$dataLength, &$id) { + $bytes .= $data; + while (true) { + $length = strlen($bytes); + if (($dataLength < 0) && ($length >= $headerLength)) { + list(, $dataLength) = unpack('N', substr($bytes, 0, 4)); + if (($dataLength & 0x80000000) !== 0) { + $dataLength &= 0x7FFFFFFF; + $headerLength = 8; + } + } + if (($headerLength === 8) && ($id === null) && ($length >= $headerLength)) { + list(, $id) = unpack('N', substr($bytes, 4, 4)); + } + if (($dataLength >= 0) && (($length - $headerLength) >= $dataLength)) { + $context = new stdClass(); + $context->server = $server; + $context->socket = $socket; + $context->fd = $socket; + $context->fromid = $fromid; + $context->userdata = new stdClass(); + $data = substr($bytes, $headerLength, $dataLength); + $self->userFatalErrorHandler = function($error) use ($self, $server, $socket, $id, $context) { + $self->socketSend($server, $socket, $self->endError($error, $context), $id); + }; + $self->defaultHandle($data, $context)->then(function($data) use ($self, $server, $socket, $id) { + $self->socketSend($server, $socket, $data, $id); + }); + $bytes = substr($bytes, $headerLength + $dataLength); + $id = null; + $headerLength = 4; + $dataLength = -1; + } + else { + break; + } + } + }; + } + public function socketHandle($server) { + $self = $this; + $onReceives = array(); + $server->on('connect', function($server, $socket, $fromid) use ($self, &$onReceives) { + $onReceives[$socket] = $self->getOnReceive(); + $context = new stdClass(); + $context->server = $server; + $context->socket = $socket; + $context->fd = $socket; + $context->fromid = $fromid; + $context->userdata = new stdClass(); + try { + $onAccept = $self->onAccept; + if (is_callable($onAccept)) { + call_user_func($onAccept, $context); + } + } + catch (Exception $e) { $server->close($socket); } + catch (Throwable $e) { $server->close($socket); } + }); + $server->on('close', function($server, $socket, $fromid) use ($self, &$onReceives) { + unset($onReceives[$socket]); + $context = new stdClass(); + $context->server = $server; + $context->socket = $socket; + $context->fd = $socket; + $context->fromid = $fromid; + $context->userdata = new stdClass(); + try { + $onClose = $self->onClose; + if (is_callable($onClose)) { + call_user_func($onClose, $context); + } + } + catch (Exception $e) {} + catch (Throwable $e) {} + }); + $server->on("receive", function ($server, $socket, $fromid, $data) use(&$onReceives) { + $onReceive = $onReceives[$socket]; + $onReceive($server, $socket, $fromid, $data); + }); + } +} + diff --git a/hprose/lib/Socket_Client.php b/hprose/lib/Socket_Client.php new file mode 100644 index 0000000..a0f0603 --- /dev/null +++ b/hprose/lib/Socket_Client.php @@ -0,0 +1,768 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/HandlerManager.php'; +require_once __DIR__ . '/InvokeSettings.php'; +require_once __DIR__ . '/ResultMode.php'; +require_once __DIR__ . '/BytesIO.php'; +require_once __DIR__ . '/Writer.php'; +require_once __DIR__ . '/Tags.php'; +require_once __DIR__ . '/Reader.php'; +require_once __DIR__ . '/functions.php'; +abstract class Socket_Client extends HandlerManager { + private $index = -1; + private $uriList = null; + protected $async = true; + public $uri = null; + public $filters = array(); + public $timeout = 30000; + public $retry = 10; + public $idempotent = false; + public $failswitch = false; + public $failround = 0; + public $byref = false; + public $simple = false; + public $onError = null; + public $onFailswitch = null; + private $methodCache = array(); + + private static $clientFactories = array(); + private static $clientFactoriesInited = false; + + public static function registerClientFactory($scheme, $clientFactory) { + self::$clientFactories[$scheme] = $clientFactory; + } + + public static function tryRegisterClientFactory($scheme, $clientFactory) { + if (empty(self::$clientFactories[$scheme])) { + self::$clientFactories[$scheme] = $clientFactory; + } + } + + private static function initClientFactories() { + self::tryRegisterClientFactory("http", "\\Hprose\\Http\\Client"); + self::tryRegisterClientFactory("https", "\\Hprose\\Http\\Client"); + self::tryRegisterClientFactory("tcp", "\\Hprose\\Socket\\Client"); + self::tryRegisterClientFactory("ssl", "\\Hprose\\Socket\\Client"); + self::tryRegisterClientFactory("sslv2", "\\Hprose\\Socket\\Client"); + self::tryRegisterClientFactory("sslv3", "\\Hprose\\Socket\\Client"); + self::tryRegisterClientFactory("tls", "\\Hprose\\Socket\\Client"); + self::tryRegisterClientFactory("unix", "\\Hprose\\Socket\\Client"); + self::$clientFactoriesInited = true; + } + + public static function create($uriList, $async = true) { + if (!self::$clientFactoriesInited) self::initClientFactories(); + if (is_string($uriList)) $uriList = array($uriList); + $scheme = strtolower(parse_url($uriList[0], PHP_URL_SCHEME)); + $n = count($uriList); + for ($i = 1; $i < $n; ++$i) { + if (strtolower(parse_url($uriList[$i], PHP_URL_SCHEME)) != $scheme) { + throw new Exception("Not support multiple protocol."); + } + } + $clientFactory = self::$clientFactories[$scheme]; + if (empty($clientFactory)) { + throw new Exception("This client doesn't support $scheme scheme."); + } + return new $clientFactory($uriList, $async); + } + + public function __construct($uriList = null, $async = true) { + parent::__construct(); + if ($uriList != null) { + $this->setUriList($uriList); + if (is_bool($uriList)) { + $async = $uriList; + } + } + $this->async = $async; + } + + public function __destruct() { + $this->close(); + } + + public function close() {} + + public final function getTimeout() { + return $this->timeout; + } + + public final function setTimeout($timeout) { + if ($timeout < 1) throw new Exception("timeout must be great than 0"); + $this->timeout = $timeout; + } + + public final function getRetry() { + return $this->retry; + } + + public final function setRetry($retry) { + $this->retry = $retry; + } + + public final function isIdempotent() { + return $this->idempotent; + } + + public final function setIdempotent($idempotent) { + $this->idempotent = $idempotent; + } + + public final function isFailswitch() { + return $this->failswitch; + } + + public final function setFailswitch($failswitch) { + $this->failswitch = $failswitch; + } + + public final function getFailround() { + return $this->failround; + } + + public final function isByref() { + return $this->byref; + } + + public final function setByref($byref) { + $this->byref = $byref; + } + + public final function isSimple() { + return $this->simple; + } + public final function setSimple($simple = true) { + $this->simple = $simple; + } + + public final function getFilter() { + if (empty($this->filters)) { + return null; + } + return $this->filters[0]; + } + + public final function setFilter(Filter $filter) { + $this->filters = array(); + if ($filter !== null) { + $this->filters[] = $filter; + } + } + + public final function addFilter(Filter $filter) { + if ($filter !== null) { + if (empty($this->filters)) { + $this->filters = array($filter); + } + else { + $this->filters[] = $filter; + } + } + return $this; + } + + public final function removeFilter(Filter $filter) { + if (empty($this->filters)) { + return false; + } + $i = array_search($filter, $this->filters); + if ($i === false || $i === null) { + return false; + } + $this->filters = array_splice($this->filters, $i, 1); + return true; + } + + protected function setUri($uri) { + $this->uri = $uri; + } + + public function getUriList() { + return $this->uriList; + } + + public function setUriList($uriList) { + if (is_string($uriList)) { + $uriList = array($uriList); + } + else if (is_array($uriList)) { + shuffle($uriList); + } + else { + return; + } + $this->index = 0; + $this->failround = 0; + $this->uriList = $uriList; + $this->setUri($uriList[$this->index]); + } + + public function useService($uriList = array(), $namespace = '') { + if (!empty($uriList)) { + $this->setUriList($uriList); + } + if ($namespace) { + $namespace .= "_"; + } + return new Proxy($this, $namespace); + } + + private function outputFilter($request, stdClass $context) { + if (empty($this->filters)) return $request; + $count = count($this->filters); + for ($i = 0; $i < $count; ++$i) { + $request = $this->filters[$i]->outputFilter($request, $context); + } + return $request; + } + + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function inputFilter($response, stdClass $context) { + if (empty($this->filters)) return $response; + $count = count($this->filters); + for ($i = $count - 1; $i >= 0; --$i) { + $response = $this->filters[$i]->inputFilter($response, $context); + } + return $response; + } + + protected function wait($interval, $callback) { + $seconds = floor($interval); + $nanoseconds = ($interval - $seconds) * 1000000000; + time_nanosleep($seconds, $nanoseconds); + return $callback(); + } + + private function failswitch() { + $n = count($this->uriList); + if ($n > 1) { + $i = $this->index + 1; + if ($i >= $n) { + $i = 0; + $this->failround++; + } + $this->index = $i; + $this->setUri($this->uriList[$i]); + } + else { + $this->failround++; + } + $onFailswitch = $this->onFailswitch; + if (is_callable($onFailswitch)) { + call_user_func($onFailswitch, $this); + } + } + + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function retry($request, stdClass $context) { + if ($context->failswitch) { + $this->failswitch(); + } + if ($context->idempotent && ($context->retried < $context->retry)) { + $interval = ++$context->retried * 0.5; + if ($context->failswitch) { + $interval -= (count($this->uriList) - 1) * 0.5; + } + if ($interval > 5) $interval = 5; + $self = $this; + if ($interval > 0) { + return $this->wait($interval, function() use ($self, $request, $context) { + return $self->afterFilterHandler($request, $context); + }); + } + else { + return $this->afterFilterHandler($request, $context); + } + } + return null; + } + + private function encode($name, array $args, stdClass $context) { + $stream = new BytesIO(Tags::TagCall); + $writer = new Writer($stream, $context->simple); + $writer->writeString($name); + if (count($args) > 0 || $context->byref) { + $writer->reset(); + $writer->writeArray($args); + if ($context->byref) { + $writer->writeBoolean(true); + } + } + $stream->write(Tags::TagEnd); + $request = $stream->toString(); + $stream->close(); + return $request; + } + + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function decode($response, array &$args, stdClass $context) { + if ($context->oneway) return null; + if (empty($response)) throw new Exception("EOF"); + if ($response[strlen($response) - 1] !== Tags::TagEnd) { + throw new Exception("Wrong Response: \r\n$response"); + } + $mode = $context->mode; + if ($mode === ResultMode::RawWithEndTag) { + return $response; + } + elseif ($mode === ResultMode::Raw) { + return substr($response, 0, -1); + } + $stream = new BytesIO($response); + $reader = new Reader($stream); + $result = null; + $tag = $stream->getc(); + if ($tag === Tags::TagResult) { + if ($mode === ResultMode::Normal) { + $result = $reader->unserialize(); + } + elseif ($mode === ResultMode::Serialized) { + $result = $reader->readRaw()->toString(); + } + $tag = $stream->getc(); + if ($tag === Tags::TagArgument) { + $reader->reset(); + $arguments = $reader->readList(); + $n = min(count($arguments), count($args)); + for ($i = 0; $i < $n; $i++) { + $args[$i] = $arguments[$i]; + } + $tag = $stream->getc(); + } + } + elseif ($tag === Tags::TagError) { + $e = new Exception($reader->readString()); + $stream->close(); + throw $e; + } + if ($tag !== Tags::TagEnd) { + $stream->close(); + throw new Exception("Wrong Response: \r\n$response"); + } + $stream->close(); + return $result; + } + + private function getContext(InvokeSettings $settings) { + $context = new stdClass(); + $context->client = $this; + $context->userdata = isset($settings->userdata) ? (object)($settings->userdata) : new stdClass(); + $context->mode = isset($settings->mode) ? $settings->mode : ResultMode::Normal; + $context->oneway = isset($settings->oneway) ? $settings->oneway : false; + $context->byref = isset($settings->byref) ? $settings->byref : $this->byref; + $context->simple = isset($settings->simple) ? $settings->simple : $this->simple; + $context->failswitch = isset($settings->failswitch) ? $settings->failswitch : $this->failswitch; + $context->idempotent = isset($settings->idempotent) ? $settings->idempotent : $this->idempotent; + $context->retry = isset($settings->retry) ? $settings->retry : $this->retry; + $context->retried = 0; + $context->timeout = isset($settings->timeout) ? $settings->timeout : $this->timeout; + return $context; + } + + public function __call($name, array $args) { + if (isset($this->methodCache[$name])) { + $method = $this->methodCache[$name]; + return call_user_func_array($method, $args); + } + $n = count($args); + if ($n > 0) { + if ($args[$n - 1] instanceof Closure) { + $callback = array_pop($args); + return $this->invoke($name, $args, $callback); + } + else if ($args[$n - 1] instanceof InvokeSettings) { + if (($n > 1) && ($args[$n - 2] instanceof Closure)) { + $settings = array_pop($args); + $callback = array_pop($args); + return $this->invoke($name, $args, $callback, $settings); + } + $settings = array_pop($args); + return $this->invoke($name, $args, $settings); + } + else if (($n > 1) && is_array($args[$n - 1]) && + ($args[$n - 2] instanceof Closure)) { + $settings = new InvokeSettings(array_pop($args)); + $callback = array_pop($args); + return $this->invoke($name, $args, $callback, $settings); + } + } + return $this->invoke($name, $args); + } + + public function __get($name) { + if (isset($this->methodCache[$name])) { + return $this->methodCache[$name]; + } + $method = new Proxy($this, $name . '_'); + $this->methodCache[$name] = $method; + return $method; + } + + protected function getNextInvokeHandler(Closure $next, /*callable*/ $handler) { + if ($this->async) return parent::getNextInvokeHandler($next, $handler); + return function($name, array &$args, stdClass $context) use ($next, $handler) { + $array = array($name, &$args, $context, $next); + return call_user_func_array($handler, $array); + }; + } + protected function getNextFilterHandler(Closure $next, /*callable*/ $handler) { + if ($this->async) return parent::getNextFilterHandler($next, $handler); + return function($request, stdClass $context) use ($next, $handler) { + return call_user_func($handler, $request, $context, $next); + }; + } + + private function asyncInvokeHandler($name, array &$args, stdClass $context) { + try { + $request = $this->encode($name, $args, $context); + } + catch (Exception $e) { + return error($e); + } + catch (Throwable $e) { + return error($e); + } + $self = $this; + $beforeFilterHandler = $this->beforeFilterHandler; + return $beforeFilterHandler($request, $context)->then(function($response) use ($self, &$args, $context) { + return $self->decode($response, $args, $context); + }); + } + + private function syncInvokeHandler($name, array &$args, stdClass $context) { + $request = $this->encode($name, $args, $context); + $beforeFilterHandler = $this->beforeFilterHandler; + $response = $beforeFilterHandler($request, $context); + return $this->decode($response, $args, $context); + } + + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ function invokeHandler($name, array &$args, stdClass $context) { + if ($this->async) { + return $this->asyncInvokeHandler($name, $args, $context); + } + return $this->syncInvokeHandler($name, $args, $context); + } + + private function asyncBeforeFilterHandler($request, stdClass $context) { + $afterFilterHandler = $this->afterFilterHandler; + $self = $this; + return $afterFilterHandler($this->outputFilter($request, $context), $context) + ->then(function($response) use ($self, $context) { + if ($context->oneway) return null; + return $self->inputFilter($response, $context); + }); + } + + private function syncBeforeFilterHandler($request, stdClass $context) { + $afterFilterHandler = $this->afterFilterHandler; + $response = $afterFilterHandler($this->outputFilter($request, $context), $context); + if ($context->oneway) return null; + return $this->inputFilter($response, $context); + } + + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ function beforeFilterHandler($request, stdClass $context) { + if ($this->async) { + return $this->asyncBeforeFilterHandler($request, $context); + } + return $this->syncBeforeFilterHandler($request, $context); + } + + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ function afterFilterHandler($request, stdClass $context) { + if ($this->async) { + $self = $this; + return $this->sendAndReceive($request, $context)->catchError(function($e) use ($self, $request, $context) { + $response = $self->retry($request, $context); + if ($response !== null) { + return $response; + } + throw $e; + }); + } + $error = null; + try { + $response = $this->sendAndReceive($request, $context); + } + catch (Exception $e) { $error = $e; } + catch (Throwable $e) { $error = $e; } + if ($error !== null) { + $response = $this->retry($request, $context); + if ($response !== null) { + return $response; + } + throw $error; + } + return $response; + } + + public function invoke($name, array &$args = array(), $callback = null, InvokeSettings $settings = null) { + if ($callback instanceof InvokeSettings) { + $settings = $callback; + $callback = null; + } + if ($settings === null) $settings = new InvokeSettings(); + $context = $this->getContext($settings); + $invokeHandler = $this->invokeHandler; + if (is_callable($callback)) { + if (is_array($callback)) { + $f = new ReflectionMethod($callback[0], $callback[1]); + } + else { + $f = new ReflectionFunction($callback); + } + $n = $f->getNumberOfParameters(); + $onError = $this->onError; + return all($args)->then(function($args) use ($invokeHandler, $name, $context, $n, $callback, $onError) { + $result = toFuture($invokeHandler($name, $args, $context)); + $result->then( + function($result) use ($n, $callback, $args) { + switch($n) { + case 0: call_user_func($callback); break; + case 1: call_user_func($callback, $result); break; + case 2: call_user_func($callback, $result, $args); break; + case 3: call_user_func($callback, $result, $args, null); break; + } + }, + function($error) use ($n, $callback, $args, $name, $onError) { + switch($n) { + case 0: + call_user_func($callback); + if (is_callable($onError)) { + call_user_func($onError, $name, $error); + } + break; + case 1: call_user_func($callback, $error); break; + case 2: call_user_func($callback, $error, $args); break; + case 3: call_user_func($callback, null, $args, $error); break; + } + } + ); + return $result; + }); + } + else { + if ($this->async) { + $args = all($args); + return $args->then(function($args) use ($invokeHandler, $name, $context) { + return $invokeHandler($name, $args, $context); + }); + } + return $invokeHandler($name, $args, $context); + } + } + + protected abstract function sendAndReceive($request, stdClass $context); + + private $topics; + private $id; + private function autoId() { + $settings = new InvokeSettings(array( + 'idempotent' => true, + 'failswitch' => true + )); + $args = array(); + return toFuture($this->invoke('#', $args, $settings)); + } + public function getId() { + if ($this->id == null) { + $this->id = $this->autoId(); + } + return $this->id; + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function getTopic($name, $id) { + if (isset($this->topics[$name])) { + $topics = $this->topics[$name]; + if (isset($topics[$id])) { + return $topics[$id]; + } + } + return null; + } + // subscribe($name, $callback, $timeout, $failswitch) + // subscribe($name, $id, $callback, $timeout, $failswitch) + public function subscribe($name, $id = null, $callback = null, $timeout = null, $failswitch = false) { + $self = $this; + if (!is_string($name)) { + throw new TypeError('topic name must be a string'); + } + if (is_callable($id) && !is_callable($callback)) { + $timeout = $callback; + $callback = $id; + $id = null; + } + if (!is_callable($callback)) { + throw new TypeError('callback must be a function.'); + } + if (!isset($this->topics[$name])) { + $this->topics[$name] = array(); + } + if ($id === null) { + if ($this->id == null) { + $this->id = $this->autoId(); + } + $this->id->then(function($id) use ($self, $name, $callback, $timeout, $failswitch) { + $self->subscribe($name, $id, $callback, $timeout, $failswitch); + }); + return; + } + if (!is_int($timeout)) $timeout = $this->timeout; + $topic = $this->getTopic($name, $id); + if ($topic === null) { + $topic = new stdClass(); + $settings = new InvokeSettings(array( + 'idempotent' => true, + 'failswitch' => $failswitch, + 'timeout' => $timeout + )); + $cb = function() use ($self, &$cb, $topic, $name, $id, $settings) { + $args = array($id); + $self->invoke($name, $args, $settings) + ->then($topic->handler, $cb); + }; + $topic->handler = function($result) use ($self, $name, $id, $cb) { + $topic = $self->getTopic($name, $id); + if ($topic !== null) { + if ($result !== null) { + $callbacks = $topic->callbacks; + foreach ($callbacks as $callback) { + try { + call_user_func($callback, $result); + } + catch (Exception $ex) {} + catch (Throwable $ex) {} + } + } + if ($self->getTopic($name, $id) !== null) $cb(); + } + }; + $topic->callbacks = array($callback); + $this->topics[$name][$id] = $topic; + $cb(); + } + elseif (array_search($callback, $topic->callbacks, true) === false) { + $topic->callbacks[] = $callback; + } + } + private function delTopic(&$topics, $id, $callback) { + if ($topics !== null) { + if (is_callable($callback)) { + if (isset($topics[$id])) { + $topic = $topics[$id]; + $callbacks = array_diff($topic->callbacks, array($callback)); + if (count($callbacks) > 0) { + $topic->callbacks = $callbacks; + } + else { + unset($topics[$id]); + } + } + } + else { + unset($topics[$id]); + } + } + } + // unsubscribe($name) + // unsubscribe($name, $callback) + // unsubscribe($name, $id) + // unsubscribe($name, $id, $callback) + public function unsubscribe($name, $id = null, $callback = null) { + $self = $this; + if (!is_string($name)) { + throw new TypeError('topic name must be a string'); + } + if (($id === null) && ($callback === null)) { + unset($this->topics[$name]); + return; + } + if (is_callable($id) && !is_callable($callback)) { + $callback = $id; + $id = null; + } + if ($id === null) { + if ($this->id === null) { + if (isset($this->topics[$name])) { + $topics = $this->topics[$name]; + $ids = array_keys($topics); + foreach ($ids as $id) { + $this->delTopic($topics, $id, $callback); + } + } + } + else { + $this->id->then(function($id) use ($self, $name, $callback) { + $self->unsubscribe($name, $id, $callback); + }); + } + } + elseif (isFuture($id)) { + $id->then(function($id) use ($self, $name, $callback) { + $self->unsubscribe($name, $id, $callback); + }); + } + else { + $this->delTopic($this->topics[$name], $id, $callback); + } + if (isset($this->topics[$name]) && count($this->topics[$name]) === 0) { + unset($this->topics[$name]); + } + } + + public function isSubscribed($name) { + return isset($this->topics[$name]); + } + + public function subscribedList() { + return array_keys($this->topics); + } +} \ No newline at end of file diff --git a/hprose/lib/Socket_Service.php b/hprose/lib/Socket_Service.php new file mode 100644 index 0000000..198459b --- /dev/null +++ b/hprose/lib/Socket_Service.php @@ -0,0 +1,1216 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/HandlerManager.php'; +require_once __DIR__ . '/ResultMode.php'; +require_once __DIR__ . '/BytesIO.php'; +require_once __DIR__ . '/Writer.php'; +require_once __DIR__ . '/Tags.php'; +require_once __DIR__ . '/Reader.php'; +require_once __DIR__ . '/functions.php'; +abstract class Socket_Service extends HandlerManager { + private static $magicMethods = array( + "__construct", + "__destruct", + "__call", + "__callStatic", + "__get", + "__set", + "__isset", + "__unset", + "__sleep", + "__wakeup", + "__toString", + "__invoke", + "__set_state", + "__clone" + ); + private $calls = array(); + private $names = array(); + private $filters = array(); + protected $userFatalErrorHandler = null; + public $onBeforeInvoke = null; + public $onAfterInvoke = null; + public $onSendError = null; + public $errorDelay = 10000; + public $errorTypes; + public $simple = false; + public $debug = false; + public $passContext = false; + // for push service + protected $timer = null; + public $timeout = 120000; + public $heartbeat = 3000; + public $onSubscribe = null; + public $onUnsubscribe = null; + private $topics = array(); + private $nextid = 0; + public function __construct() { + parent::__construct(); + $this->errorTypes = error_reporting(); + register_shutdown_function(array($this, 'fatalErrorHandler')); + $this->addMethod('getNextId', $this, '#', array('simple' => true)); + } + public function getNextId() { + if (function_exists('com_create_guid')) { + return trim(com_create_guid(), '{}'); + } + return md5(uniqid(dechex(mt_rand()), true) . dechex(mt_rand())); + } + public function fatalErrorHandler() { + if (!is_callable($this->userFatalErrorHandler)) return; + $e = error_get_last(); + if ($e == null) return; + switch ($e['type']) { + case E_ERROR: + case E_PARSE: + case E_USER_ERROR: + case E_CORE_ERROR: + case E_COMPILE_ERROR: { + $error = new ErrorException($e['message'], 0, $e['type'], $e['file'], $e['line']); + @ob_end_clean(); + $userFatalErrorHandler = $this->userFatalErrorHandler; + call_user_func($userFatalErrorHandler, $error); + } + } + } + public final function getTimeout() { + return $this->timeout; + } + public final function setTimeout($value) { + $this->timeout = $value; + } + public final function getHeartbeat() { + return $this->heartbeat; + } + public final function setHeartbeat($value) { + $this->heartbeat = $value; + } + public final function getErrorDelay() { + return $this->errorDelay; + } + public final function setErrorDelay($value) { + $this->errorDelay = $value; + } + public final function getErrorTypes() { + return $this->errorTypes; + } + public final function setErrorTypes($value) { + $this->errorTypes = $value; + } + public final function isDebugEnabled() { + return $this->debug; + } + public final function setDebugEnabled($value = true) { + $this->debug = $value; + } + public final function isSimple() { + return $this->simple; + } + public final function setSimple($value = true) { + $this->simple = $value; + } + public final function isPassContext() { + return $this->passContext; + } + public final function setPassContext($value = true) { + $this->passContext = $value; + } + public final function getFilter() { + if (empty($this->filters)) { + return null; + } + return $this->filters[0]; + } + public final function setFilter(Filter $filter) { + $this->filters = array(); + if ($filter !== null) { + $this->filters[] = $filter; + } + } + public final function addFilter(Filter $filter) { + if ($filter !== null) { + if (empty($this->filters)) { + $this->filters = array($filter); + } + else { + $this->filters[] = $filter; + } + } + return $this; + } + public final function removeFilter(Filter $filter) { + if (empty($this->filters)) { + return false; + } + $i = array_search($filter, $this->filters); + if ($i === false || $i === null) { + return false; + } + $this->filters = array_splice($this->filters, $i, 1); + return true; + } + protected function nextTick($callback) { + $callback(); + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function callService(array $args, stdClass $context) { + if ($context->oneway) { + $this->nextTick(function() use ($args, $context) { + try { + call_user_func_array($context->method, $args); + } + catch (Exception $e) {} + catch (Throwable $e) {} + }); + if ($context->async) { + call_user_func($args[count($args) - 1], null); + } + return null; + } + return call_user_func_array($context->method, $args); + } + private function inputFilter($data, stdClass $context) { + for ($i = count($this->filters) - 1; $i >= 0; $i--) { + $data = $this->filters[$i]->inputFilter($data, $context); + } + return $data; + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function outputFilter($data, stdClass $context) { + for ($i = 0, $n = count($this->filters); $i < $n; $i++) { + $data = $this->filters[$i]->outputFilter($data, $context); + } + return $data; + } + + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function sendError($error, stdClass $context) { + if (is_string($error)) { + $error = new Exception($error); + } + try { + if ($this->onSendError !== null) { + $onSendError = $this->onSendError; + $e = call_user_func_array($onSendError, array(&$error, $context)); + if ($e instanceof Exception || $e instanceof Throwable) { + $error = $e; + } + } + } + catch (Exception $e) { + $error = $e; + } + $stream = new BytesIO(); + $writer = new Writer($stream, true); + $stream->write(Tags::TagError); + $writer->writeString($this->debug ? $error->getTraceAsString() : $error->getMessage()); + return $stream; + } + public function endError($error, stdClass $context) { + $stream = $this->sendError($error, $context); + $stream->write(Tags::TagEnd); + $data = $stream->toString(); + $stream->close(); + return $data; + } + private function beforeInvoke($name, array &$args, stdClass $context) { + try { + $self = $this; + if ($this->onBeforeInvoke !== null) { + $onBeforeInvoke = $this->onBeforeInvoke; + $value = call_user_func_array($onBeforeInvoke, array($name, &$args, $context->byref, $context)); + if ($value instanceof Exception || $value instanceof Throwable) { + throw $value; + } + if (isFuture($value)) { + return $value->then(function($value) use ($self, $name, $args, $context) { + if ($value instanceof Exception || $value instanceof Throwable) { + throw $value; + } + return $self->invoke($name, $args, $context); + })->then(null, function($error) use ($self, $context) { + return $self->sendError($error, $context); + }); + } + } + return $this->invoke($name, $args, $context)->then(null, function($error) use ($self, $context) { + return $self->sendError($error, $context); + }); + } + catch (Exception $error) { + return $this->sendError($error, $context); + } + } + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ function invokeHandler($name, array &$args, stdClass $context) { + if ($context->isMissingMethod) { + $args = array($name, $args); + } + $passContext = $context->passContext; + if ($passContext === null) { + $passContext = $this->passContext; + } + if ($context->async) { + $self = $this; + return promise(function($resolve, $reject) use ($self, $passContext, &$args, $context) { + if ($passContext) $args[] = $context; + $args[] = function($value) use ($resolve, $reject) { + if ($value instanceof Exception || $value instanceof Throwable) { + $reject($value); + } + else { + $resolve($value); + } + }; + $self->callService($args, $context); + }); + } + else { + if ($passContext) $args[] = $context; + return toFuture($this->callService($args, $context)); + } + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function invoke($name, array &$args, stdClass $context) { + $invokeHandler = $this->invokeHandler; + $self = $this; + return $invokeHandler($name, $args, $context) + ->then(function($value) use ($self, $name, &$args, $context) { + if ($value instanceof Exception || $value instanceof Throwable) { + throw $value; + } + return $self->afterInvoke($name, $args, $context, $value); + }); + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function afterInvoke($name, array $args, stdClass $context, $result) { + if ($context->async && is_callable($args[count($args) - 1])) { + unset($args[count($args) - 1]); + } + if ($context->passContext && ($args[count($args) - 1] === $context)) { + unset($args[count($args) - 1]); + } + if ($this->onAfterInvoke !== null) { + $onAfterInvoke = $this->onAfterInvoke; + $value = call_user_func_array($onAfterInvoke, array($name, &$args, $context->byref, &$result, $context)); + if ($value instanceof Exception || $value instanceof Throwable) { + throw $value; + } + if (isFuture($value)) { + $self = $this; + return $value->then(function($value) use ($self, $args, $context, $result) { + if ($value instanceof Exception || $value instanceof Throwable) { + throw $value; + } + return $self->doOutput($args, $context, $result); + }); + } + } + return $this->doOutput($args, $context, $result); + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function doOutput(array $args, stdClass $context, $result) { + $mode = $context->mode; + $simple = $context->simple; + if ($simple === null) { + $simple = $this->simple; + } + if ($mode === ResultMode::RawWithEndTag || $mode == ResultMode::Raw) { + return $result; + } + $stream = new BytesIO(); + $writer = new Writer($stream, $simple); + $stream->write(Tags::TagResult); + if ($mode === ResultMode::Serialized) { + $stream->write($result); + } + else { + $writer->reset(); + $writer->serialize($result); + } + if ($context->byref) { + $stream->write(Tags::TagArgument); + $writer->reset(); + $writer->writeArray($args); + } + $data = $stream->toString(); + $stream->close(); + return $data; + } + private function doInvoke(BytesIO $stream, stdClass $context) { + $results = array(); + $reader = new Reader($stream); + do { + $reader->reset(); + $name = $reader->readString(); + $alias = strtolower($name); + $cc = new stdClass(); + $cc->isMissingMethod = false; + foreach ($context as $key => $value) { + $cc->$key = $value; + } + if (isset($this->calls[$alias])) { + $call = $this->calls[$alias]; + } + else if (isset($this->calls['*'])) { + $call = $this->calls['*']; + $cc->isMissingMethod = true; + } + if ($call) { + foreach ($call as $key => $value) { + $cc->$key = $value; + } + } + $args = array(); + $cc->byref = false; + $tag = $stream->getc(); + if ($tag === Tags::TagList) { + $reader->reset(); + $args = $reader->readListWithoutTag(); + $tag = $stream->getc(); + if ($tag === Tags::TagTrue) { + $cc->byref = true; + $tag = $stream->getc(); + } + } + if ($tag !== Tags::TagEnd && $tag !== Tags::TagCall) { + $data = $stream->toString(); + throw new Exception("Unknown tag: $tag\r\nwith following data: $data"); + } + if ($call) { + $results[] = $this->beforeInvoke($name, $args, $cc); + } + else { + $results[] = $this->sendError(new Exception("Can\'t find this function $name()."), $cc); + } + } while($tag === Tags::TagCall); + return reduce($results, function($stream, $result) { + $stream->write($result); + return $stream; + }, new BytesIO())->then(function($stream) { + $stream->write(Tags::TagEnd); + $data = $stream->toString(); + $stream->close(); + return $data; + }); + } + protected function doFunctionList() { + $stream = new BytesIO(); + $writer = new Writer($stream, true); + $stream->write(Tags::TagFunctions); + $writer->writeArray($this->names); + $stream->write(Tags::TagEnd); + $data = $stream->toString(); + $stream->close(); + return $data; + } + protected function delay($interval, $data) { + $seconds = floor($interval); + $nanoseconds = ($interval - $seconds) * 1000000000; + time_nanosleep($seconds, $nanoseconds); + return value($data); + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function delayError($error, $context) { + $err = $this->endError($error, $context); + if ($this->errorDelay > 0) { + return $this->delay($this->errorDelay, $err); + } + return value($err); + } + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ function beforeFilterHandler($request, stdClass $context) { + $self = $this; + try { + $afterFilterHandler = $this->afterFilterHandler; + $response = $afterFilterHandler($this->inputFilter($request, $context), $context) + ->then(null, function($error) use ($self, $context) { + return $self->delayError($error, $context); + }); + } + catch (Exception $error) { + $response = $this->delayError($error, $context); + } + return $response->then(function($value) use ($self, $context) { + return $self->outputFilter($value, $context); + }); + } + /* + This method is a protected method. + But PHP 5.3 can't call protected method in closure, + so we comment the protected keyword. + */ + /*protected*/ function afterFilterHandler($request, stdClass $context) { + $stream = new BytesIO($request); + try { + switch ($stream->getc()) { + case Tags::TagCall: { + $data = $this->doInvoke($stream, $context); + $stream->close(); + return $data; + } + case Tags::TagEnd: { + $stream->close(); + return value($this->doFunctionList()); + } + default: throw new Exception("Wrong Request: \r\n$request"); + } + } + catch (Exception $e) { + $stream->close(); + return error($e); + } + } + public function defaultHandle($request, stdClass $context) { + $error = null; + set_error_handler(function($errno, $errstr, $errfile, $errline) use (&$error) { + $error = new ErrorException($errstr, 0, $errno, $errfile, $errline); + }, $this->errorTypes); + ob_start(); + ob_implicit_flush(0); + $context->clients = $this; + $context->methods = $this->calls; + $beforeFilterHandler = $this->beforeFilterHandler; + $response = $beforeFilterHandler($request, $context); + $self = $this; + return $response->then(function($result) use ($self, &$error, $context) { + @ob_end_clean(); + restore_error_handler(); + if ($error === null) { + return $result; + } + return $self->endError($error, $context); + }); + } + private static function getDeclaredOnlyMethods($class) { + $result = get_class_methods($class); + if (($parentClass = get_parent_class($class)) !== false) { + $inherit = get_class_methods($parentClass); + $result = array_diff($result, $inherit); + } + return array_diff($result, self::$magicMethods); + } + private static function getDeclaredOnlyInstanceMethods($class) { + $methods = self::getDeclaredOnlyMethods($class); + $instanceMethods = array(); + foreach ($methods as $name) { + $method = new ReflectionMethod($class, $name); + if ($method->isPublic() && + !$method->isStatic() && + !$method->isConstructor() && + !$method->isDestructor() && + !$method->isAbstract()) { + $instanceMethods[] = $name; + } + } + if (empty($instanceMethods)) { + throw new Exception('There is no pubic instance method in class $class.'); + } + return $instanceMethods; + } + private static function getDeclaredOnlyStaticMethods($class) { + $methods = self::getDeclaredOnlyMethods($class); + $instanceMethods = array(); + foreach ($methods as $name) { + $method = new ReflectionMethod($class, $name); + if ($method->isPublic() && + $method->isStatic() && + !$method->isAbstract()) { + $instanceMethods[] = $name; + } + } + if (empty($instanceMethods)) { + throw new Exception('There is no pubic static method in class $class.'); + } + return $instanceMethods; + } + public function addFunction($func, $alias = '', array $options = array()) { + if (!is_callable($func)) { + throw new Exception('Argument func must be callable.'); + } + if (is_array($alias) && empty($options)) { + $options = $alias; + $alias = ''; + } + if (empty($alias)) { + if (is_string($func)) { + $alias = $func; + } + elseif (is_array($func)) { + $alias = $func[1]; + } + else { + throw new Exception('Need an alias'); + } + } + $name = strtolower($alias); + if (!array_key_exists($name, $this->calls)) { + $this->names[] = $alias; + } + if (class_exists("\\Generator")) { + if (is_array($func)) { + $f = new ReflectionMethod($func[0], $func[1]); + } + else { + $f = new ReflectionFunction($func); + } + if ($f->isGenerator()) { + $func = wrap($func); + } + } + $call = new stdClass(); + $call->method = $func; + $call->mode = isset($options['mode']) ? $options['mode'] : ResultMode::Normal; + $call->simple = isset($options['simple']) ? $options['simple'] : null; + $call->oneway = isset($options['oneway']) ? $options['oneway'] : false; + $call->async = isset($options['async']) ? $options['async'] : false; + $call->passContext = isset($options['passContext']) ? $options['passContext']: null; + $this->calls[$name] = $call; + return $this; + } + public function addAsyncFunction($func, + $alias = '', + array $options = array()) { + if (is_array($alias) && empty($options)) { + $options = $alias; + $alias = ''; + } + $options['async'] = true; + return $this->addFunction($func, $alias, $options); + } + public function addMissingFunction($func, array $options = array()) { + return $this->addFunction($func, '*', $options); + } + public function addAsyncMissingFunction($func, array $options = array()) { + return $this->addAsyncFunction($func, '*', $options); + } + public function addFunctions(array $funcs, + array $aliases = array(), + array $options = array()) { + if (!empty($aliases) && empty($options) && (array_keys($funcs) != array_key($aliases))) { + $options = $aliases; + $aliases = array(); + } + $count = count($aliases); + if ($count == 0) { + foreach ($funcs as $func) { + $this->addFunction($func, '', $options); + } + } + elseif ($count == count($funcs)) { + foreach ($funcs as $i => $func) { + $this->addFunction($func, $aliases[$i], $options); + } + } + else { + throw new Exception('The count of functions is not matched with aliases'); + } + return $this; + } + public function addAsyncFunctions(array $funcs, + array $aliases = array(), + array $options = array()) { + if (!empty($aliases) && empty($options) && (array_keys($funcs) != array_key($aliases))) { + $options = $aliases; + $aliases = array(); + } + $options['async'] = true; + return $this->addFunctions($funcs, $aliases, $options); + } + public function addMethod($method, + $scope, + $alias = '', + array $options = array()) { + $func = array($scope, $method); + return $this->addFunction($func, $alias, $options); + } + public function addAsyncMethod($method, + $scope, + $alias = '', + array $options = array()) { + $func = array($scope, $method); + return $this->addAsyncFunction($func, $alias, $options); + } + public function addMissingMethod($method, $scope, array $options = array()) { + return $this->addMethod($method, $scope, '*', $options); + } + public function addAsyncMissingMethod($method, $scope, array $options = array()) { + return $this->addAsyncMethod($method, $scope, '*', $options); + } + public function addMethods($methods, + $scope, + $aliases = array(), + array $options = array()) { + $aliasPrefix = ''; + if (is_string($aliases)) { + $aliasPrefix = $aliases; + if ($aliasPrefix !== '') { + $aliasPrefix .= '_'; + } + $aliases = array(); + } + else if (!empty($aliases) && empty($options) && (array_keys($methods) != array_key($aliases))) { + $options = $aliases; + $aliases = array(); + } + if (empty($aliases)) { + foreach ($methods as $k => $method) { + $aliases[$k] = $aliasPrefix . $method; + } + } + if (count($methods) != count($aliases)) { + throw new Exception('The count of methods is not matched with aliases'); + } + foreach($methods as $k => $method) { + $func = array($scope, $method); + if (is_callable($func)) { + $this->addFunction($func, $aliases[$k], $options); + } + } + return $this; + } + public function addAsyncMethods($methods, + $scope, + $aliases = array(), + array $options = array()) { + $aliasPrefix = ''; + if (is_string($aliases)) { + $aliasPrefix = $aliases; + if ($aliasPrefix !== '') { + $aliasPrefix .= '_'; + } + $aliases = array(); + } + else if (!empty($aliases) && empty($options) && (array_keys($methods) != array_key($aliases))) { + $options = $aliases; + $aliases = array(); + } + if (empty($aliases)) { + foreach ($methods as $k => $method) { + $aliases[$k] = $aliasPrefix . $method; + } + } + if (count($methods) != count($aliases)) { + throw new Exception('The count of methods is not matched with aliases'); + } + foreach($methods as $k => $method) { + $func = array($scope, $method); + if (is_callable($func)) { + $this->addAsyncFunction($func, $aliases[$k], $options); + } + } + return $this; + } + public function addInstanceMethods($object, + $class = '', + $aliasPrefix = '', + array $options = array()) { + if ($class == '') { + $class = get_class($object); + } + return $this->addMethods(self::getDeclaredOnlyInstanceMethods($class), + $object, $aliasPrefix, $options); + } + public function addAsyncInstanceMethods($object, + $class = '', + $aliasPrefix = '', + array $options = array()) { + if ($class == '') { + $class = get_class($object); + } + return $this->addAsyncMethods(self::getDeclaredOnlyInstanceMethods($class), + $object, $aliasPrefix, $options); + } + public function addClassMethods($class, + $scope = '', + $aliasPrefix = '', + array $options = array()) { + if ($scope == '') { + $scope = $class; + } + return $this->addMethods(self::getDeclaredOnlyStaticMethods($class), + $scope, $aliasPrefix, $options); + } + public function addAsyncClassMethods($class, + $scope = '', + $aliasPrefix = '', + array $options = array()) { + if ($scope == '') { + $scope = $class; + } + return $this->addAsyncMethods(self::getDeclaredOnlyStaticMethods($class), + $scope, $aliasPrefix, $options); + } + public function add() { + $args_num = func_num_args(); + $args = func_get_args(); + switch ($args_num) { + case 1: { + if (is_callable($args[0])) { + return $this->addFunction($args[0]); + } + elseif (is_array($args[0])) { + return $this->addFunctions($args[0]); + } + elseif (is_object($args[0])) { + return $this->addInstanceMethods($args[0]); + } + elseif (is_string($args[0])) { + return $this->addClassMethods($args[0]); + } + break; + } + case 2: { + if (is_callable($args[0]) && is_string($args[1])) { + return $this->addFunction($args[0], $args[1]); + } + elseif (is_string($args[0])) { + if (is_string($args[1]) && !is_callable(array($args[1], $args[0]))) { + if (class_exists($args[1])) { + return $this->addClassMethods($args[0], $args[1]); + } + return $this->addClassMethods($args[0], '', $args[1]); + } + return $this->addMethod($args[0], $args[1]); + } + elseif (is_array($args[0])) { + if (is_array($args[1])) { + return $this->addFunctions($args[0], $args[1]); + } + return $this->addMethods($args[0], $args[1]); + } + elseif (is_object($args[0])) { + return $this->addInstanceMethods($args[0], $args[1]); + } + break; + } + case 3: { + if (is_callable($args[0]) && $args[1] == '' && is_string($args[2])) { + return $this->addFunction($args[0], $args[2]); + } + elseif (is_string($args[0]) && is_string($args[2])) { + if (is_string($args[1]) && !is_callable(array($args[1], $args[0]))) { + return $this->addClassMethods($args[0], $args[1], $args[2]); + } + return $this->addMethod($args[0], $args[1], $args[2]); + } + elseif (is_array($args[0])) { + if ($args[1] == '' && is_array($args[2])) { + return $this->addFunctions($args[0], $args[2]); + } + return $this->addMethods($args[0], $args[1], $args[2]); + } + elseif (is_object($args[0])) { + return $this->addInstanceMethods($args[0], $args[1], $args[2]); + } + break; + } + } + throw new Exception('Wrong arguments'); + } + public function addAsync() { + $args_num = func_num_args(); + $args = func_get_args(); + switch ($args_num) { + case 1: { + if (is_callable($args[0])) { + return $this->addAsyncFunction($args[0]); + } + elseif (is_array($args[0])) { + return $this->addAsyncFunctions($args[0]); + } + elseif (is_object($args[0])) { + return $this->addAsyncInstanceMethods($args[0]); + } + elseif (is_string($args[0])) { + return $this->addAsyncClassMethods($args[0]); + } + break; + } + case 2: { + if (is_callable($args[0]) && is_string($args[1])) { + return $this->addAsyncFunction($args[0], $args[1]); + } + elseif (is_string($args[0])) { + if (is_string($args[1]) && !is_callable(array($args[1], $args[0]))) { + if (class_exists($args[1])) { + return $this->addAsyncClassMethods($args[0], $args[1]); + } + return $this->addAsyncClassMethods($args[0], '', $args[1]); + } + return $this->addAsyncMethod($args[0], $args[1]); + } + elseif (is_array($args[0])) { + if (is_array($args[1])) { + return $this->addAsyncFunctions($args[0], $args[1]); + } + return $this->addAsyncMethods($args[0], $args[1]); + } + elseif (is_object($args[0])) { + return $this->addAsyncInstanceMethods($args[0], $args[1]); + } + break; + } + case 3: { + if (is_callable($args[0]) && $args[1] == '' && is_string($args[2])) { + return $this->addAsyncFunction($args[0], $args[2]); + } + elseif (is_string($args[0]) && is_string($args[2])) { + if (is_string($args[1]) && !is_callable(array($args[1], $args[0]))) { + return $this->addAsyncClassMethods($args[0], $args[1], $args[2]); + } + return $this->addAsyncMethod($args[0], $args[1], $args[2]); + } + elseif (is_array($args[0])) { + if ($args[1] == '' && is_array($args[2])) { + return $this->addAsyncFunctions($args[0], $args[2]); + } + return $this->addAsyncMethods($args[0], $args[1], $args[2]); + } + elseif (is_object($args[0])) { + return $this->addAsyncInstanceMethods($args[0], $args[1], $args[2]); + } + break; + } + } + throw new Exception('Wrong arguments'); + } + + public function remove($alias) { + $index = array_search($alias, $this->names, true); + if ($index !== false) { + array_splice($this->names, $index, 1); + unset($this->calls[strtolower($alias)]); + } + } + +// for push service + private function checkPushService() { + if ($this->timer === null) { + throw new Exception(get_class($this) . " can't support push service."); + } + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function getTopics($topic) { + if (empty($this->topics[$topic])) { + throw new Exception('topic "' + $topic + '" is not published.'); + } + return $this->topics[$topic]; + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function delTimer(ArrayObject $topics, $id) { + $t = $topics[$id]; + if (isset($t->timer)) { + $this->timer->clearTimeout($t->timer); + unset($t->timer); + } + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function offline(ArrayObject $topics, $topic, $id) { + $this->delTimer($topics, $id); + $messages = $topics[$id]->messages; + unset($topics[$id]); + foreach ($messages as $message) { + $message->detector->resolve(false); + } + $onUnsubscribe = $this->onUnsubscribe; + if (is_callable($onUnsubscribe)) { + call_user_func($onUnsubscribe, $topic, $id, $this); + } + } + private function setTimer(ArrayObject $topics, $topic, $id) { + $t = $topics[$id]; + if (!isset($t->timer)) { + $self = $this; + $t->timer = $this->timer->setTimeout(function() + use ($self, $topics, $topic, $id) { + $self->offline($topics, $topic, $id); + }, $t->heartbeat); + } + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function resetTimer(ArrayObject $topics, $topic, $id) { + $this->delTimer($topics, $id); + $this->setTimer($topics, $topic, $id); + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function setRequestTimer($topic, $id, $request, $timeout) { + if ($timeout > 0) { + $self = $this; + $topics = $this->getTopics($topic); + $future = new Future(); + $timer = $this->timer->setTimeout(function() use ($future) { + $future->reject(new TimeoutException('timeout')); + }, $timeout); + $request->whenComplete(function() use ($self, $timer) { + $self->timer->clearTimeout($timer); + })->fill($future); + return $future->catchError(function($e) use ($self, $topics, $topic, $id) { + if ($e instanceof TimeoutException) { + $checkoffline = function() use ($self, &$checkoffline, $topics, $topic, $id) { + $t = $topics[$id]; + $t->timer = $self->timer->setTimeout($checkoffline, $t->heartbeat); + if ($t->count < 0) { + $self->offline($topics, $topic, $id); + } + else { + --$t->count; + } + }; + $checkoffline(); + } + }); + } + return $request; + } + public function publish($topic, array $options = array()) { + $this->checkPushService(); + if (is_array($topic)) { + foreach ($topic as $t) { + $this->publish($t, $options); + } + return $this; + } + $self = $this; + $timeout = isset($options['timeout']) ? $options['timeout'] : $this->timeout; + $heartbeat = isset($options['heartbeat']) ? $options['heartbeat'] : $this->heartbeat; + $this->topics[$topic] = new ArrayObject(); + return $this->addFunction(function($id) use ($self, $topic, $timeout, $heartbeat) { + $topics = $self->getTopics($topic); + if (isset($topics[$id])) { + if ($topics[$id]->count < 0) { + $topics[$id]->count = 0; + } + $messages = $topics[$id]->messages; + if (!$messages->isEmpty()) { + $message = $messages->shift(); + $message->detector->resolve(true); + $self->resetTimer($topics, $topic, $id); + return $message->result; + } + else { + $self->delTimer($topics, $id); + $topics[$id]->count++; + } + } + else { + $topics[$id] = new stdClass(); + $topics[$id]->messages = new SplQueue(); + $topics[$id]->count = 1; + $topics[$id]->heartbeat = $heartbeat; + $this->timer->setImmediate(function() use ($self, $topic, $id) { + $onSubscribe = $self->onSubscribe; + if (is_callable($onSubscribe)) { + call_user_func($onSubscribe, $topic, $id, $self); + } + }); + } + if (isset($topics[$id]->request)) { + $topics[$id]->request->resolve(null); + } + $request = new Future(); + $request->complete(function() use ($topics, $id) { + $topics[$id]->count--; + }); + $topics[$id]->request = $request; + return $self->setRequestTimer($topic, $id, $request, $timeout); + }, $topic); + } + /* + This method is a private method. + But PHP 5.3 can't call private method in closure, + so we comment the private keyword. + */ + /*private*/ function internalPush($topic, $id, $result) { + if (isFuture($result)) { + $self = $this; + return $result->complete(function($value) use ($self, $topic, $id) { + return $self->internalPush($topic, $id, $value); + }); + } + $topics = $this->getTopics($topic); + if (!isset($topics[$id])) { + return value(false); + } + if (isset($topics[$id]->request)) { + $topics[$id]->request->resolve($result); + unset($topics[$id]->request); + $this->setTimer($topics, $topic, $id); + return value(true); + } + else { + $detector = new Future(); + $message = new stdClass(); + $message->detector = $detector; + $message->result = $result; + $topics[$id]->messages->push($message); + $this->setTimer($topics, $topic, $id); + return $detector; + } + } + public function idlist($topic) { + return array_keys($this->getTopics($topic)->getArrayCopy()); + } + public function exist($topic, $id) { + $topics = $this->getTopics($topic); + return isset($topics[$id]); + } + public function broadcast($topic, $result, $callback = null) { + $this->checkPushService(); + $this->multicast($topic, $this->idlist($topic), $result, $callback); + } + public function multicast($topic, $ids, $result, $callback = null) { + $this->checkPushService(); + if (!is_callable($callback)) { + foreach ($ids as $id) { + $this->internalPush($topic, $id, $result); + } + return; + } + $sent = array(); + $unsent = array(); + $n = count($ids); + $count = $n; + $check = function($id) use (&$sent, &$unsent, &$count, $callback) { + return function($success) use ($id, &$sent, &$unsent, &$count, $callback) { + if ($success) { + $sent[] = $id; + } + else { + $unsent[] = $id; + } + if (--$count === 0) { + call_user_func($callback, $sent, $unsent); + } + }; + }; + for ($i = 0; $i < $n; ++$i) { + $id = $ids[$i]; + if ($id !== null) { + $this->internalPush($topic, $id, $result)->then($check($id)); + } + else { + --$count; + } + } + } + public function unicast($topic, $id, $result, $callback = null) { + $this->checkPushService(); + $detector = $this->internalPush($topic, $id, $result); + if (is_callable($callback)) { + $detector->then($callback); + } + } + // push($topic, $result) + // push($topic, $ids, $result) + // push($topic, $id, $result) + public function push($topic) { + $this->checkPushService(); + $args = func_get_args(); + $argc = func_num_args(); + $id = null; + $result = null; + if (($argc < 2) || ($argc > 3)) { + throw new Exception('Wrong number of arguments'); + } + if ($argc === 2) { + $result = $args[1]; + } + else { + $id = $args[1]; + $result = $args[2]; + } + if ($id === null) { + $topics = $this->getTopics($topic); + $iterator = $topics->getIterator(); + while($iterator->valid()) { + $id = $iterator->key(); + $this->internalPush($topic, $id, $result); + $iterator->next(); + } + } + elseif (is_array($id)) { + $ids = $id; + foreach ($ids as $id) { + $this->internalPush($topic, $id, $result); + } + } + else { + $this->internalPush($topic, $id, $result); + } + } +} diff --git a/hprose/lib/Tags.php b/hprose/lib/Tags.php new file mode 100644 index 0000000..e8f10f2 --- /dev/null +++ b/hprose/lib/Tags.php @@ -0,0 +1,59 @@ + * + * * +\**********************************************************/ +class Tags { + /* Serialize Tags */ + const TagInteger = 'i'; + const TagLong = 'l'; + const TagDouble = 'd'; + const TagNull = 'n'; + const TagEmpty = 'e'; + const TagTrue = 't'; + const TagFalse = 'f'; + const TagNaN = 'N'; + const TagInfinity = 'I'; + const TagDate = 'D'; + const TagTime = 'T'; + const TagUTC = 'Z'; + const TagBytes = 'b'; + const TagUTF8Char = 'u'; + const TagString = 's'; + const TagGuid = 'g'; + const TagList = 'a'; + const TagMap = 'm'; + const TagClass = 'c'; + const TagObject = 'o'; + const TagRef = 'r'; + /* Serialize Marks */ + const TagPos = '+'; + const TagNeg = '-'; + const TagSemicolon = ';'; + const TagOpenbrace = '{'; + const TagClosebrace = '}'; + const TagQuote = '"'; + const TagPoint = '.'; + /* Protocol Tags */ + const TagFunctions = 'F'; + const TagCall = 'C'; + const TagResult = 'R'; + const TagArgument = 'A'; + const TagError = 'E'; + const TagEnd = 'z'; +} diff --git a/hprose/lib/TimeoutException.php b/hprose/lib/TimeoutException.php new file mode 100644 index 0000000..219eaf2 --- /dev/null +++ b/hprose/lib/TimeoutException.php @@ -0,0 +1,22 @@ + * + * * +\**********************************************************/ + +class TimeoutException extends Exception {} diff --git a/hprose/lib/Timer.php b/hprose/lib/Timer.php new file mode 100644 index 0000000..045ad60 --- /dev/null +++ b/hprose/lib/Timer.php @@ -0,0 +1,31 @@ + * + * * +\**********************************************************/ +class Timer { + public function setTimeout($callback, $delay) { + return swoole_timer_after($delay, $callback); + } + public function clearTimeout($timerid) { + return swoole_timer_clear($timerid); + } + public function setImmediate($callback) { + swoole_event_defer($callback); + } +} diff --git a/hprose/lib/Transporter.php b/hprose/lib/Transporter.php new file mode 100644 index 0000000..7f40c56 --- /dev/null +++ b/hprose/lib/Transporter.php @@ -0,0 +1,361 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Future.php'; +require_once __DIR__ . '/TimeoutException.php'; +abstract class Transporter { + private $client; + private $requests = array(); + private $deadlines = array(); + private $results = array(); + private $stream = null; + private $async; + protected abstract function appendHeader($request); + protected abstract function createRequest($index, $request); + protected abstract function afterWrite($request, $stream, $o); + protected abstract function getBodyLength($stream); + protected abstract function asyncReadError($o, $stream, $index); + protected abstract function getResponse($stream, $o); + protected abstract function afterRead($stream, $o, $response); + + public function __construct(Client $client, $async) { + $this->client = $client; + $this->async = $async; + } + public function __destruct() { + if ($this->stream !== null) @fclose($this->stream); + } + protected function getLastError($error) { + $e = error_get_last(); + if ($e === null) { + return new Exception($error); + } + else { + return new ErrorException($e['message'], 0, $e['type'], $e['file'], $e['line']); + } + } + protected function removeStream($stream, &$pool) { + $index = array_search($stream, $pool, true); + if ($index !== false) { + unset($pool[$index]); + } + } + protected function readHeader($stream, $n) { + $header = ''; + do { + $buffer = @fread($stream, $n - strlen($header)); + $header .= $buffer; + } while (!empty($buffer) && (strlen($header) < $n)); + if (strlen($header) < $n) { + return false; + } + return $header; + } + protected function free($o, $index) { + unset($o->results[$index]); + unset($o->deadlines[$index]); + unset($o->buffers[$index]); + } + protected function asyncWrite($stream, $o) { + $stream_id = (integer)$stream; + if (isset($o->requests[$stream_id])) { + $request = $o->requests[$stream_id]; + } + else { + if ($o->current < $o->count) { + $request = $this->createRequest($o->current, $o->buffers[$o->current]); + $o->requests[$stream_id] = $request; + unset($o->buffers[$o->current]); + $o->current++; + } + else { + $this->removeStream($stream, $o->writepool); + return; + } + } + $sent = @fwrite($stream, $request->buffer, $request->length); + if ($sent === false) { + $o->results[$request->index]->reject($this->getLastError('request write error')); + $this->free($o, $request->index); + @fclose($stream); + $this->removeStream($stream, $o->writepool); + return; + } + if ($sent < $request->length) { + $request->buffer = substr($request->buffer, $sent); + $request->length -= $sent; + } + else { + $this->afterWrite($request, $stream, $o); + } + } + private function asyncRead($stream, $o) { + $response = $this->getResponse($stream, $o); + if ($response === false) { + $this->asyncReadError($o, $stream, -1); + return; + } + if ($response->length === false) { + $this->asyncReadError($o, $stream, $response->index); + return; + } + $remaining = $response->length - strlen($response->buffer); + $buffer = @fread($stream, $remaining); + if (empty($buffer)) { + $this->asyncReadError($o, $stream, $response->index); + return; + } + $response->buffer .= $buffer; + if (strlen($response->buffer) === $response->length) { + if (isset($o->results[$response->index])) { + $result = $o->results[$response->index]; + $this->free($o, $response->index); + } + $stream_id = (integer)$stream; + unset($o->responses[$stream_id]); + $this->afterRead($stream, $o, $response); + if (isset($result)) { + $result->resolve($response->buffer); + } + } + } + private function removeStreamById($stream_id, &$pool) { + foreach ($pool as $index => $stream) { + if ((integer)$stream == $stream_id) { + @fclose($stream); + unset($pool[$index]); + return; + } + } + } + private function closeTimeoutStream($o, $index) { + foreach ($o->requests as $stream_id => $request) { + if ($request->index == $index) { + unset($o->requests[$stream_id]); + if (!$this->client->fullDuplex) { + $this->removeStreamById($stream_id, $o->writepool); + } + } + } + foreach ($o->responses as $stream_id => $response) { + if ($response->index == $index) { + unset($o->responses[$stream_id]); + if (!$this->client->fullDuplex) { + $this->removeStreamById($stream_id, $o->readpool); + } + } + } + } + private function checkTimeout($o) { + foreach ($o->deadlines as $index => $deadline) { + if (microtime(true) > $deadline) { + $result = $o->results[$index]; + $this->free($o, $index); + $this->closeTimeoutStream($o, $index); + $result->reject(new TimeoutException("timeout")); + } + } + } + private function createPool($client, $o) { + $n = min(count($o->results), $client->maxPoolSize); + $pool = array(); + $errno = 0; + $errstr = ''; + $context = @stream_context_create($client->options); + for ($i = 0; $i < $n; $i++) { + $scheme = parse_url($client->uri, PHP_URL_SCHEME); + if ($scheme == 'unix') { + $stream = @fsockopen('unix://' . parse_url($client->uri, PHP_URL_PATH)); + } + else { + $stream = @stream_socket_client( + $client->uri . '/' . $i, + $errno, + $errstr, + max(0, $o->deadlines[$i] - microtime(true)), + STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT, + $context + ); + } + if (($stream !== false) && + (@stream_set_blocking($stream, false) !== false)) { + @stream_set_read_buffer($stream, $client->readBuffer); + @stream_set_write_buffer($stream, $client->writeBuffer); + if (function_exists('socket_import_stream')) { + if (($scheme === 'tcp') || ($scheme === 'unix')) { + $socket = socket_import_stream($stream); + socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, (int)$client->keepAlive); + if ($scheme === 'tcp') { + socket_set_option($socket, SOL_TCP, TCP_NODELAY, (int)$client->noDelay); + } + } + } + $pool[] = $stream; + } + } + if (empty($pool)) { + $e = new Exception($errstr, $errno); + $results = $o->results; + $o->buffers = array(); + $o->deadlines = array(); + $o->results = array(); + foreach ($results as $result) { + $result->reject($e); + } + return false; + } + return $pool; + } + public function loop() { + $client = $this->client; + while (count($this->results) > 0) { + $pool = $this->createPool($client, $this); + if ($pool === false) continue; + $o = new stdClass(); + $o->current = 0; + $o->count = count($this->results); + $o->responses = array(); + $o->requests = array(); + $o->readpool = array(); + $o->writepool = $pool; + $o->buffers = $this->buffers; + $o->deadlines = $this->deadlines; + $o->results = $this->results; + $this->buffers = array(); + $this->deadlines = array(); + $this->results = array(); + while (count($o->results) > 0) { + $read = array_values($o->readpool); + $write = array_values($o->writepool); + $except = null; + $timeout = max(0, min($o->deadlines) - microtime(true)); + $tv_sec = floor($timeout); + $tv_usec = ($timeout - $tv_sec) * 1000; + $n = @stream_select($read, $write, $except, $tv_sec, $tv_usec); + if ($n === false) { + $e = $this->getLastError('unkown io error.'); + foreach ($o->results as $result) { + $result->reject($e); + } + $o->results = array(); + } + if ($n > 0) { + foreach ($write as $stream) $this->asyncWrite($stream, $o); + foreach ($read as $stream) $this->asyncRead($stream, $o); + } + $this->checkTimeout($o); + if (count($o->results) > 0 && + count($o->readpool) + count($o->writepool) === 0) { + $o->writepool = $this->createPool($client, $o); + } + } + foreach ($o->writepool as $stream) @fclose($stream); + foreach ($o->readpool as $stream) @fclose($stream); + } + } + public function asyncSendAndReceive($buffer, stdClass $context) { + $deadline = ($context->timeout / 1000) + microtime(true); + $result = new Future(); + $this->buffers[] = $buffer; + $this->deadlines[] = $deadline; + $this->results[] = $result; + return $result; + } + private function write($stream, $request) { + $buffer = $this->appendHeader($request); + $length = strlen($buffer); + while (true) { + $sent = @fwrite($stream, $buffer, $length); + if ($sent === false) { + return false; + } + if ($sent < $length) { + $buffer = substr($buffer, $sent); + $length -= $sent; + } + else { + return true; + } + } + } + private function read($stream) { + $length = $this->getBodyLength($stream); + if ($length === false) return false; + $response = ''; + while (($remaining = $length - strlen($response)) > 0) { + $buffer = @fread($stream, $remaining); + if ($buffer === false) { + return false; + } + $response .= $buffer; + } + return $response; + } + public function syncSendAndReceive($buffer, stdClass $context) { + $client = $this->client; + $timeout = ($context->timeout / 1000); + $sec = floor($timeout); + $usec = ($timeout - $sec) * 1000; + $trycount = 0; + $errno = 0; + $errstr = ''; + while ($trycount <= 1) { + if ($this->stream === null) { + $this->stream = @stream_socket_client( + $this->client->uri, + $errno, + $errstr, + $timeout, + STREAM_CLIENT_CONNECT, + stream_context_create($client->options)); + if ($this->stream === false) { + $this->stream = null; + throw new Exception($errstr, $errno); + } + } + $stream = $this->stream; + @stream_set_read_buffer($stream, $client->readBuffer); + @stream_set_write_buffer($stream, $client->writeBuffer); + if (@stream_set_timeout($stream, $sec, $usec) == false) { + if ($trycount > 0) { + throw $this->getLastError("unknown error"); + } + $trycount++; + } + else { + break; + } + } + if ($this->write($stream, $buffer) === false) { + throw $this->getLastError("request write error"); + } + $response = $this->read($stream, $buffer); + if ($response === false) { + throw $this->getLastError("response read error"); + } + return $response; + } + public function sendAndReceive($buffer, stdClass $context) { + if ($this->async) { + return $this->asyncSendAndReceive($buffer, $context); + } + return $this->syncSendAndReceive($buffer, $context); + } +} diff --git a/hprose/lib/Writer.php b/hprose/lib/Writer.php new file mode 100644 index 0000000..6675375 --- /dev/null +++ b/hprose/lib/Writer.php @@ -0,0 +1,348 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/FakeWriterRefer.php'; +require_once __DIR__ . '/RealWriterRefer.php'; +class Writer { + public $stream; + private $classref = array(); + private $propsref = array(); + private $refer; + public function __construct(BytesIO $stream, $simple = false) { + $this->stream = $stream; + $this->refer = $simple ? + new FakeWriterRefer() : + new RealWriterRefer(); + } + private static function isUTF8($s) { + return mb_detect_encoding($s, 'UTF-8', true) !== false; + } + private static function ustrlen($s) { + return strlen(iconv('UTF-8', 'UTF-16LE', $s)) >> 1; + } + private static function isList(array $a) { + $count = count($a); + return ($count === 0) || + ((isset($a[0]) || array_key_exists(0, $a)) && (($count === 1) || + (isset($a[$count - 1]) || array_key_exists($count - 1, $a)))); + } + public function serialize($val) { + if ($val === null) { + $this->writeNull(); + } + elseif (is_scalar($val)) { + if (is_int($val)) { + if ($val >= 0 && $val <= 9) { + $this->stream->write((string)$val); + } + elseif ($val >= -2147483648 && $val <= 2147483647) { + $this->writeInteger($val); + } + else { + $this->writeLong((string)$val); + } + } + elseif (is_bool($val)) { + $this->writeBoolean($val); + } + elseif (is_float($val)) { + $this->writeDouble($val); + } + elseif (is_string($val)) { + if ($val === '') { + $this->writeEmpty(); + } + elseif (strlen($val) < 4 && + self::isUTF8($val) && + self::ustrlen($val) == 1) { + $this->writeUTF8Char($val); + } + elseif (self::isUTF8($val)) { + $this->writeStringWithRef($val); + } + else { + $this->writeBytesWithRef($val); + } + } + } + elseif (is_array($val)) { + if (self::isList($val)) { + $this->writeArray($val); + } + else { + $this->writeAssocArray($val); + } + } + elseif (is_object($val)) { + if ($val instanceof BytesIO) { + $this->writeBytesIOWithRef($val); + } + elseif ($val instanceof DateTime) { + $this->writeDateTimeWithRef($val); + } + elseif ($val instanceof SplObjectStorage) { + $this->writeMapWithRef($val); + } + elseif ($val instanceof Traversable) { + $this->writeListWithRef($val); + } + elseif ($val instanceof stdClass) { + $this->writeStdClassWithRef($val); + } + else { + $this->writeObjectWithRef($val); + } + } + else { + throw new Exception('Not support to serialize this data'); + } + } + public function writeInteger($int) { + $this->stream->write(Tags::TagInteger . $int . Tags::TagSemicolon); + } + public function writeLong($long) { + $this->stream->write(Tags::TagLong . $long . Tags::TagSemicolon); + } + public function writeDouble($double) { + if (is_nan($double)) { + $this->writeNaN(); + } + elseif (is_infinite($double)) { + $this->writeInfinity($double > 0); + } + else { + $this->stream->write(Tags::TagDouble . $double . Tags::TagSemicolon); + } + } + public function writeNaN() { + $this->stream->write(Tags::TagNaN); + } + public function writeInfinity($positive = true) { + $this->stream->write(Tags::TagInfinity . ($positive ? Tags::TagPos : Tags::TagNeg)); + } + public function writeNull() { + $this->stream->write(Tags::TagNull); + } + public function writeEmpty() { + $this->stream->write(Tags::TagEmpty); + } + public function writeBoolean($bool) { + $this->stream->write($bool ? Tags::TagTrue : Tags::TagFalse); + } + public function writeUTF8Char($char) { + $this->stream->write(Tags::TagUTF8Char . $char); + } + public function writeString($str) { + $this->refer->set($str); + $len = self::ustrlen($str); + $this->stream->write(Tags::TagString); + if ($len > 0) { + $this->stream->write((string)$len); + } + $this->stream->write(Tags::TagQuote . $str . Tags::TagQuote); + } + public function writeStringWithRef($str) { + if (!$this->refer->write($this->stream, $str)) { + $this->writeString($str); + } + } + public function writeBytes($bytes) { + $this->refer->set($bytes); + $len = strlen($bytes); + $this->stream->write(Tags::TagBytes); + if ($len > 0) { + $this->stream->write((string)$len); + } + $this->stream->write(Tags::TagQuote . $bytes . Tags::TagQuote); + } + public function writeBytesWithRef($bytes) { + if (!$this->refer->write($this->stream, $bytes)) { + $this->writeBytes($bytes); + } + } + public function writeBytesIO(BytesIO $bytes) { + $this->refer->set($bytes); + $len = $bytes->length(); + $this->stream->write(Tags::TagBytes); + if ($len > 0) { + $this->stream->write((string)$len); + } + $this->stream->write(Tags::TagQuote . $bytes->toString() . Tags::TagQuote); + } + public function writeBytesIOWithRef(BytesIO $bytes) { + if (!$this->refer->write($this->stream, $bytes)) { + $this->writeBytesIO($bytes); + } + } + public function writeDateTime(DateTime $datetime) { + $this->refer->set($datetime); + if ($datetime->getOffset() == 0) { + $this->stream->write($datetime->format('\DYmd\THis.u\Z')); + } + else { + $this->stream->write($datetime->format('\DYmd\THis.u;')); + } + } + public function writeDateTimeWithRef(DateTime $datetime) { + if (!$this->refer->write($this->stream, $datetime)) { + $this->writeDateTime($datetime); + } + } + public function writeArray(array $array) { + $this->refer->set($array); + $count = count($array); + $this->stream->write(Tags::TagList); + if ($count > 0) { + $this->stream->write((string)$count); + } + $this->stream->write(Tags::TagOpenbrace); + for ($i = 0; $i < $count; $i++) { + $this->serialize($array[$i]); + } + $this->stream->write(Tags::TagClosebrace); + } + public function writeAssocArray(array $map) { + $this->refer->set($map); + $count = count($map); + $this->stream->write(Tags::TagMap); + if ($count > 0) { + $this->stream->write((string)$count); + } + $this->stream->write(Tags::TagOpenbrace); + foreach ($map as $key => $value) { + $this->serialize($key); + $this->serialize($value); + } + $this->stream->write(Tags::TagClosebrace); + } + public function writeList(Traversable $list) { + $this->refer->set($list); + $count = count($list); + $this->stream->write(Tags::TagList); + if ($count > 0) { + $this->stream->write((string)$count); + } + $this->stream->write(Tags::TagOpenbrace); + foreach ($list as $e) { + $this->serialize($e); + } + $this->stream->write(Tags::TagClosebrace); + } + public function writeListWithRef(Traversable $list) { + if (!$this->refer->write($this->stream, $list)) { + $this->writeList($list); + } + } + public function writeMap(SplObjectStorage $map) { + $this->refer->set($map); + $count = count($map); + $this->stream->write(Tags::TagMap); + if ($count > 0) { + $this->stream->write((string)$count); + } + $this->stream->write(Tags::TagOpenbrace); + foreach ($map as $o) { + $this->serialize($o); + $this->serialize($map[$o]); + } + $this->stream->write(Tags::TagClosebrace); + } + public function writeMapWithRef(SplObjectStorage $map) { + if (!$this->refer->write($this->stream, $map)) { + $this->writeMap($map); + } + } + public function writeStdClass(stdClass $obj) { + $this->refer->set($obj); + $vars = get_object_vars($obj); + $count = count($vars); + $this->stream->write(Tags::TagMap); + if ($count > 0) { + $this->stream->write((string)$count); + } + $this->stream->write(Tags::TagOpenbrace); + foreach ($vars as $key => $value) { + $this->serialize($key); + $this->serialize($value); + } + $this->stream->write(Tags::TagClosebrace); + } + public function writeStdClassWithRef(stdClass $obj) { + if (!$this->refer->write($this->stream, $obj)) { + $this->writeStdClass($obj); + } + } + public function writeObject($obj) { + $class = get_class($obj); + $alias = ClassManager::getClassAlias($class); + if (isset($this->classref[$alias])) { + $index = $this->classref[$alias]; + } + else { + $reflector = new ReflectionClass($obj); + $props = $reflector->getProperties( + ReflectionProperty::IS_PUBLIC | + ReflectionProperty::IS_PROTECTED | + ReflectionProperty::IS_PRIVATE); + $index = $this->writeClass($alias, $props); + } + $this->refer->set($obj); + $props = $this->propsref[$index]; + $this->stream->write(Tags::TagObject . $index . Tags::TagOpenbrace); + foreach ($props as $prop) { + $this->serialize($prop->getValue($obj)); + } + $this->stream->write(Tags::TagClosebrace); + } + public function writeObjectWithRef($obj) { + if (!$this->refer->write($this->stream, $obj)) { + $this->writeObject($obj); + } + } + protected function writeClass($alias, array $props) { + $len = self::ustrlen($alias); + $this->stream->write(Tags::TagClass . $len . + Tags::TagQuote . $alias . Tags::TagQuote); + $count = count($props); + if ($count > 0) { + $this->stream->write((string)$count); + } + $this->stream->write(Tags::TagOpenbrace); + foreach ($props as $prop) { + $prop->setAccessible(true); + $name = $prop->getName(); + $fl = ord($name[0]); + if ($fl >= ord('A') && $fl <= ord('Z')) { + $name = strtolower($name[0]) . substr($name, 1); + } + $this->writeString($name); + } + $this->stream->write(Tags::TagClosebrace); + $index = count($this->propsref); + $this->classref[$alias] = $index; + $this->propsref[] = $props; + return $index; + } + public function reset() { + $this->classref = array(); + $this->propsref = array(); + $this->refer->reset(); + } +} diff --git a/hprose/lib/WriterRefer.php b/hprose/lib/WriterRefer.php new file mode 100644 index 0000000..56c6ac9 --- /dev/null +++ b/hprose/lib/WriterRefer.php @@ -0,0 +1,25 @@ + * + * * +\**********************************************************/ +interface WriterRefer { + public function set($val); + public function write(BytesIO $stream, $val); + public function reset(); +} \ No newline at end of file diff --git a/hprose/lib/functions.php b/hprose/lib/functions.php new file mode 100644 index 0000000..fb9e76a --- /dev/null +++ b/hprose/lib/functions.php @@ -0,0 +1,505 @@ + * + * * +\**********************************************************/ +require_once __DIR__ . '/Future.php'; +function isFuture($obj) { + return $obj instanceof Future; +} + +function error($e) { + $future = new Future(); + $future->reject($e); + return $future; +} + +function value($v) { + $future = new Future(); + $future->resolve($v); + return $future; +} + +function resolve($value) { + return value($value); +} +function reject($reason) { + return error($reason); +} + +function sync($computation) { + try { + return toPromise(call_user_func($computation)); + } + catch (UncatchableException $e) { + throw $e->getPrevious(); + } + catch (Exception $e) { + return error($e); + } + catch (Throwable $e) { + return error($e); + } +} + +function promise($executor) { + $future = new Future(); + call_user_func($executor, + function($value) use ($future) { + $future->resolve($value); + }, + function($reason) use ($future) { + $future->reject($reason); + } + ); + return $future; +} + +function toFuture($obj) { + return isFuture($obj) ? $obj : value($obj); +} + +function all($array) { + return toFuture($array)->then( + function($array) { + $keys = array_keys($array); + $n = count($array); + $result = array(); + if ($n === 0) { + return value($result); + } + $future = new Future(); + $onfulfilled = function($index) use ($future, &$result, &$n, $keys) { + return function($value) use ($index, $future, &$result, &$n, $keys) { + $result[$index] = $value; + if (--$n === 0) { + $array = array(); + foreach($keys as $key) { + $array[$key] = $result[$key]; + } + $future->resolve($array); + } + }; + }; + $onrejected = array($future, "reject"); + foreach ($array as $index => $element) { + toFuture($element)->then($onfulfilled($index), $onrejected); + } + return $future; + } + ); +} + +function race($array) { + return toFuture($array)->then( + function($array) { + $future = new Future(); + foreach ($array as $element) { + toFuture($element)->fill($future); + } + return $future; + } + ); +} + +function any($array) { + return toFuture($array)->then( + function($array) { + $keys = array_keys($array); + $n = count($array); + if ($n === 0) { + throw new RangeException('any(): $array must not be empty'); + } + $reasons = array(); + $future = new Future(); + $onfulfilled = array($future, "resolve"); + $onrejected = function($index) use ($future, &$reasons, &$n, $keys) { + return function($reason) use ($index, $future, &$reasons, &$n, $keys) { + $reasons[$index] = $reason; + if (--$n === 0) { + $array = array(); + foreach($keys as $key) { + $array[$key] = $reasons[$key]; + } + $future->reject($array); + } + }; + }; + foreach ($array as $index => $element) { + $f = toFuture($element); + $f->then($onfulfilled, $onrejected($index)); + } + return $future; + } + ); +} + +function settle($array) { + return toFuture($array)->then( + function($array) { + $keys = array_keys($array); + $n = count($array); + $result = array(); + if ($n === 0) { + return value($result); + } + $future = new Future(); + $oncomplete = function($index, $f) use ($future, &$result, &$n, $keys) { + return function() use ($index, $f, $future, &$result, &$n, $keys) { + $result[$index] = $f->inspect(); + if (--$n === 0) { + $array = array(); + foreach($keys as $key) { + $array[$key] = $result[$key]; + } + $future->resolve($array); + } + }; + }; + foreach ($array as $index => $element) { + $f = toFuture($element); + $f->whenComplete($oncomplete($index, $f)); + } + return $future; + } + ); +} + +function run($handler/*, arg1, arg2, ... */) { + $args = array_slice(func_get_args(), 1); + return all($args)->then( + function($args) use ($handler) { + return call_user_func_array($handler, $args); + } + ); +} + +function wrap($handler) { + if (class_exists("\\Generator") && is_callable($handler)) { + if (is_array($handler)) { + $m = new ReflectionMethod($handler[0], $handler[1]); + } + else { + $m = new ReflectionFunction($handler); + } + if ($m->isGenerator()) { + return function() use ($handler) { + return all(func_get_args())->then( + function($args) use ($handler) { + array_splice($args, 0, 0, array($handler)); + return call_user_func_array('\\Hprose\\Future\\co', $args); + } + ); + }; + } + } + if (is_object($handler)) { + if (is_callable($handler)) { + return new CallableWrapper($handler); + } + return new Wrapper($handler); + } + if (is_callable($handler)) { + return function() use ($handler) { + return all(func_get_args())->then( + function($args) use ($handler) { + return call_user_func_array($handler, $args); + } + ); + }; + } + return $handler; +} + + +function every($array, $callback) { + if (is_array($callback)) { + $f = new ReflectionMethod($callback[0], $callback[1]); + } + else { + $f = new ReflectionFunction($callback); + } + $n = $f->getNumberOfParameters(); + return all($array)->then( + function($array) use ($n, $callback) { + foreach ($array as $key => $value) { + switch ($n) { + case 1: { + if (!call_user_func($callback, $value)) return false; + break; + } + case 2: { + if (!call_user_func($callback, $value, $key)) return false; + break; + } + default: { + if (!call_user_func($callback, $value, $key, $array)) return false; + break; + } + } + } + return true; + } + ); +} + +function some($array, $callback) { + if (is_array($callback)) { + $f = new ReflectionMethod($callback[0], $callback[1]); + } + else { + $f = new ReflectionFunction($callback); + } + $n = $f->getNumberOfParameters(); + return all($array)->then( + function($array) use ($n, $callback) { + foreach ($array as $key => $value) { + switch ($n) { + case 1: { + if (call_user_func($callback, $value)) return true; + break; + } + case 2: { + if (call_user_func($callback, $value, $key)) return true; + break; + } + default: { + if (call_user_func($callback, $value, $key, $array)) return true; + break; + } + } + } + return false; + } + ); +} + +function filter($array, $callback, $preserveKeys = false) { + if (is_array($callback)) { + $f = new ReflectionMethod($callback[0], $callback[1]); + } + else { + $f = new ReflectionFunction($callback); + } + $n = $f->getNumberOfParameters(); + return all($array)->then( + function($array) use ($n, $callback, $preserveKeys) { + $result = array(); + $setResult = function($key, $value) use (&$result, $preserveKeys) { + if ($preserveKeys) { + $result[$key] = $value; + } + else { + $result[] = $value; + } + }; + foreach ($array as $key => $value) { + switch ($n) { + case 1: { + if (call_user_func($callback, $value)) { + $setResult($key, $value); + } + break; + } + case 2: { + if (call_user_func($callback, $value, $key)) { + $setResult($key, $value); + } + break; + } + default: { + if (call_user_func($callback, $value, $key, $array)) { + $setResult($key, $value); + } + break; + } + } + } + return $result; + } + ); +} + +function map($array, $callback) { + if (is_array($callback)) { + $f = new ReflectionMethod($callback[0], $callback[1]); + } + else { + $f = new ReflectionFunction($callback); + } + $n = $f->getNumberOfParameters(); + return all($array)->then( + function($array) use ($n, $callback) { + switch ($n) { + case 1: return array_map($callback, $array); + case 2: return array_map($callback, $array, array_keys($array)); + default: { + $result = array(); + foreach ($array as $key => $value) { + $result[$key] = call_user_func($callback, $value, $key, $array); + } + return $result; + } + } + } + ); +} + +function reduce($array, $callback, $initial = NULL) { + if ($initial !== NULL) { + return all($array)->then( + function($array) use ($callback, $initial) { + $initial = toFuture($initial); + return $initial->then( + function($initial) use ($array, $callback) { + return array_reduce($array, $callback, $initial); + } + ); + } + ); + } + return all($array)->then( + function($array) use ($callback) { + return array_reduce($array, $callback); + } + ); +} + +function search($array, $searchElement, $strict = false) { + return all($array)->then( + function($array) use ($searchElement, $strict) { + $searchElement = toFuture($searchElement); + return $searchElement->then( + function($searchElement) use ($array, $strict) { + return array_search($searchElement, $array, $strict); + } + ); + } + ); +} + +function includes($array, $searchElement, $strict = false) { + return all($array)->then( + function($array) use ($searchElement, $strict) { + $searchElement = toFuture($searchElement); + return $searchElement->then( + function($searchElement) use ($array, $strict) { + return in_array($searchElement, $array, $strict); + } + ); + } + ); +} + +function diff(/*$array1, $array2, ...*/) { + $args = func_get_args(); + for ($i = 0, $n = func_num_args(); $i < $n; ++$i) { + $args[$i] = all($args[$i]); + } + return all($args)->then( + function($array) { + return call_user_func_array("array_diff", $array); + } + ); +} + +function udiff(/*$array1, $array2, $...*/) { + $args = func_get_args(); + $callback = array_pop($args); + for ($i = 0, $n = func_num_args() - 1; $i < $n; ++$i) { + $args[$i] = all($args[$i]); + } + return all($args)->then( + function($array) use ($callback) { + array_push($array, $callback); + return call_user_func_array("array_udiff", $array); + } + ); +} + +function toPromise($obj) { + if (isFuture($obj)) return $obj; + if (class_exists("\\Generator") && ($obj instanceof \Generator)) return co($obj); + if (is_array($obj)) return arrayToPromise($obj); + if (is_object($obj)) return objectToPromise($obj); + return value($obj); +} + +function arrayToPromise(array $array) { + $result = array(); + foreach ($array as $key => $value) { + $result[$key] = toPromise($value); + } + return all($result); +} + +function objectToPromise($obj) { + $r = new ReflectionObject($obj); + if ($r->isCloneable()) { + $result = clone $obj; + $values = array(); + foreach ($result as $key => $value) { + $values[] = toPromise($value)->then(function($v) use ($result, $key) { + $result->$key = $v; + }); + } + return all($values)->then(function() use ($result) { + return $result; + }); + } + return $obj; +} + +if (class_exists("\\Generator")) { + function co($generator/*, arg1, arg2...*/) { + if (is_callable($generator)) { + $args = array_slice(func_get_args(), 1); + $generator = call_user_func_array($generator, $args); + } + if (!($generator instanceof \Generator)) { + return toFuture($generator); + } + $next = function($yield) use ($generator, &$next) { + if ($generator->valid()) { + return co($yield)->then(function($value) use ($generator, &$next) { + $yield = $generator->send($value); + if ($generator->valid()) { + return $next($yield); + } + if (method_exists($generator, "getReturn")) { + $result = $generator->getReturn(); + return ($result === null) ? $value : $result; + } + return $value; + }, + function($e) use ($generator, &$next) { + return $next($generator->throw($e)); + }); + } + else { + if (method_exists($generator, "getReturn")) { + return value($generator->getReturn()); + } + else { + return value(null); + } + } + }; + return $next($generator->current()); + } +} diff --git a/hprose/server.php b/hprose/server.php new file mode 100644 index 0000000..6e19d9c --- /dev/null +++ b/hprose/server.php @@ -0,0 +1,34 @@ +start(); +function zys($name) +{ + return "$name is high performance service framework based on yaf and swoole\r\n"; +} +function hproseserver_call(swoole_process $worker) +{ + define('APPLICATION_PATH', dirname(__DIR__) . "/application"); + define('MYPATH', dirname(APPLICATION_PATH)); + $application = new Yaf_Application(dirname(APPLICATION_PATH) . "/conf/application.ini"); + $application->bootstrap(); + $config_obj = Yaf_Registry::get("config"); + $hprose_config = $config_obj->hprose->toArray(); + $server = new Server("tcp://" . $hprose_config['ServerIp'] . ":" . $hprose_config['port']); + $server->setErrorTypes(E_ALL); + $server->setDebugEnabled(); + $server->addFunction('zys'); + $server->start(); +} +swoole_process::daemon(true); +swoole_process::wait(); \ No newline at end of file diff --git a/hprose/test/TcpClient.php b/hprose/test/TcpClient.php new file mode 100644 index 0000000..036f221 --- /dev/null +++ b/hprose/test/TcpClient.php @@ -0,0 +1,23 @@ + * + * * +\**********************************************************/ +require_once dirname(__DIR__) . '/lib/Client.php'; +$client = new Client('tcp://192.168.102.163:1314', false); +echo $client->zys("zys"); \ No newline at end of file diff --git a/hprose/test/TcpServer.php b/hprose/test/TcpServer.php new file mode 100644 index 0000000..17e662f --- /dev/null +++ b/hprose/test/TcpServer.php @@ -0,0 +1,11 @@ +setErrorTypes(E_ALL); +$server->setDebugEnabled(); +$server->addFunction('zys'); +$server->start(); diff --git a/server/hprose/Server.php b/server/hprose/Server.php new file mode 100644 index 0000000..e00f0de --- /dev/null +++ b/server/hprose/Server.php @@ -0,0 +1,13 @@ + diff --git a/server/server.php b/server/server.php index 31200b3..4a5d529 100644 --- a/server/server.php +++ b/server/server.php @@ -31,6 +31,7 @@ function syncServer() echo (yield ['swoolelive']) ."\n"; echo (yield ['task']) ."\n"; echo (yield ['distributed']) ."\n"; + echo (yield ['hprose']) ."\n"; } //异步调用器 function asyncCaller(Generator $gen) @@ -93,6 +94,15 @@ function asyncCaller(Generator $gen) $gen->send('distributed SERVEICE SUCCESS!'); asyncCaller($gen); break; + case 'hprose': + foreach(glob(__DIR__.'/hprose/*.php') as $start_file) + { + exec($cmd.' '.$start_file); + } + echo "hprose SERVEICE START ...\n";//hprose提供rpc服务 + $gen->send('hprose SERVEICE SUCCESS!'); + asyncCaller($gen); + break; default: $gen->send('no method'); asyncCaller($gen);