php中socket服务的模型下的编程方式(同步和异步)

作者:袖梨 2022-06-24

    前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。

    如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:

    1:多启动进程,提高并发数

    2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数

    3:异步编程,避免阻塞,提高并发数

    这里我们重点介绍第三种方法,以访问第三方http为例。

    代码如下:

//同步读取
function get_data_blocking(){
    $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
    fwrite($socket, "GET /sleep1.php HTTP/1.0rnHost: test.raventech.cnrnAccept: */*rnrn");
    $str = "";
    while (!feof($socket)) {
        $str .= fgets($socket, 1024);
    }
    fclose($socket);
    return $str;
}

//异步读取
function get_data_unblocking(){
    $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
    stream_set_blocking($socket, 0);
    fwrite($socket, "GET /sleep1.php HTTP/1.0rnHost: test.raventech.cnrnAccept: */*rnrn");
    $write  = NULL;
    $except = NULL;
    while( $socket ){
        $read   = array($socket);
        $num_changed_streams = stream_select($read, $write, $except, 0);
        if ( $num_changed_streams > 0 ) {
            foreach($read as $r){
                $str = fread($r,2048);
                fclose($socket);
                $socket = false;
                return $str;
            }
        }
        usleep(100);
    }
}

//真正的异步读取--利用server的IO复用事件来提高并发
class Get_data_event{

    public $onMessage = null;
    private $str='';

    function __construct(&$server){
        $socket = stream_socket_client("tcp://test.xtgxiso.cn:80", $errno, $errstr, 6);
        stream_set_blocking($socket, 0);
        fwrite($socket, "GET /sleep1.php HTTP/1.0rnHost: test.xtgxiso.cnrnAccept: */*rnrn");
        $server->add_socket($socket, array($this, 'read'));
    }

    public function read($socket){
        while (1) {
            $buffer = fread($socket, 1024);
            if ($buffer === '' || $buffer === false) {
                break;
            }
            $this->str .= $buffer;
        }
        if( $this->onMessage && $this->str ) {
            call_user_func($this->onMessage, $this->str);
        }
        $this->str = '';
        return false;
    }

}

/**
 * 单进程IO复用select
 */
class Xtgxiso_server
{
    public $socket = false;
    public $master = array();
    public $onConnect = null;
    public $onMessage = null;
    public $other_socket_callback = array();

    function __construct($host="0.0.0.0",$port=1215)
    {
        $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
        if (!$this->socket) die($errstr."--".$errno);
        stream_set_blocking($this->socket,0);
        $id = (int)$this->socket;
        $this->master[$id] = $this->socket;
    }

    public function add_socket($socket,$callback){
        $id = (int)$socket;
        $this->master[$id] = $socket;
        $this->other_socket_callback[$id] = $callback;
    }

    public function run(){
        $read = $this->master;
        $receive = array();
        echo  "start run...n";
        while ( 1 ) {
            $read = $this->master;
            //echo  "waiting...n";
            $mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60);
            if ($mod_fd === FALSE) {
                break;
            }
            foreach ( $read as $k => $v ) {
                $id = (int)$v;
                if ( $v === $this->socket ) {
                    //echo "new connn";
                    $conn = stream_socket_accept($this->socket);
                    if ($this->onConnect) {
                        call_user_func($this->onConnect, $conn);
                    }
                    $id = (int)$conn;
                    $this->master[$id] = $conn;
                } else if ( @$this->other_socket_callback[$id] ){
                    call_user_func_array($this->other_socket_callback[$id], array($v));
                } else {
                    //echo "read datan";
                    if ( !isset($receive[$k]) ){
                        $receive[$k]="";
                    }
                    $buffer = fread($v, 1024);
                    //echo $buffer."n";
                    if ( strlen($buffer) === 0 ) {
                        if ( $this->onClose ){
                            call_user_func($this->onClose,$v);
                        }
                        fclose($v);
                        $id = (int)$v;
                        unset($this->master[$id]);
                    } else if ( $buffer === FALSE ) {
                        if ( $this->onClose ){
                            call_user_func($this->onClose, $this->master[$key_to_del]);
                        }
                        fclose($v);
                        $id = (int)$v;
                        unset($this->master[$id]);
                    } else {
                        $pos = strpos($buffer, "rnrn");
                        if ( $pos === false) {
                            $receive[$k] .= $buffer;
                            //echo "received:".$buffer.";not all package,continue recdiveingn";
                        }else{
                            $receive[$k] .= trim(substr ($buffer,0,$pos+4));
                            $buffer = substr($buffer,$pos+4);
                            if($this->onMessage) {
                                call_user_func($this->onMessage,$v,$receive[$k]);
                            }
                            $receive[$k]='';
                        }
                    }
                }
            }
            usleep(10000);
        }
    }
}


$server =  new Xtgxiso_server();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "n";
};

$server->onMessage = function($conn,$msg) use ( $server ) {
    /*
    $respone ="";//响应内容
    $respone = "HTTP/1.1 200 OKrn";
    $respone .= "Server: openrestyrn";
    $respone .= "Content-Type: text/html; charset=utf-8rn";
    $body = time().rand(111111,999999);
    $len = strlen($body);
    $respone .= "Content-Length:$lenrn";
    $respone .= "Connection: closern";
    $respone .= "rn$bodyrnrn";
    echo "onMessage --" . $msg . "n";
    */

    //同步读取
    //$respone = get_data_blocking();
    //fwrite($conn,$respone);

    //异步读取
    //$respone = get_data_unblocking();
    //fwrite($conn,$respone);

    //真正异步
    $data = new Get_data_event($server);
    $data->onMessage = function($str) use($conn){
        fwrite($conn,$str);
    };

};

$server->onClose = function($conn){
    echo "onClose --" . "n";
};

$server->run();
    第三方服务sleep1.php的代码比较简单

sleep(1);//模拟耗时
echo "OK";
    通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.




    通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.

相关文章

精彩推荐