swoole_process之进程间通信

博客好久没更新了,这大半年来主要精力放在了折腾swoole上,公司项目也上线了几个swoole的服务于中间件,最近马上也要上线一个中间件。swoole_server、swoole_process等都用的比较多了,现在就来慢慢总结。

从 swoole_process文档 中可以看出,swoole_process进程间支持3种通信方式:

1、管道pipe 2、IPC msgqueue 3、信号

接下来就详细介绍下每一种通信的原理以及实现。

1、管道pipe

关于管道文档中是这么说的

int swoole_process::__construct(mixed $function, $redirect_stdin_stdout = false, $create_pipe = true)

* $redirect_stdin_stdout,重定向子进程的标准输入和输出。 启用此选项后,在进程内echo将不是打印屏幕,而是写入到管道。读取键盘输入将变为从管道中读取数据。 默认为阻塞读取。
* $create_pipe,是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true 如果子进程内没有进程间通信,可以设置为false

* $process对象在销毁时会自动关闭管道,子进程内如果监听了管道会收到CLOSE事件
* 1.7.22或更高版本允许设置管道的类型,默认为SOCK_STREAM流式
* 参数$create_pipe为2时,管道类型将设置为SOCK_DGRAM

int swoole_process->write(string $data)

* swoole底层使用Unix Socket实现通信,UnixSocket是内核实现的全内存通信,无任何IO消耗。在1进程write,1进程read,每次读写1024字节数据的测试中,100万次通信仅需1.02秒。
* 管道通信默认的方式是流式,write写入的数据在read可能会被底层合并。可以设置swoole_process构造函数的第三个参数为2改变为数据报式。

从上面摘自文档中的部分就可以看出,管道有两种类型:SOCK_STREAM和SOCK_DGRAM,而这两个正是建立socket时候需要的类型参数。我们最常见的linux命令中使用管道操作符“|”,而在标准unix编程中,管道的创建也是通过函数 int pipe(int filedes[2]) 创建的匿名管道;或者通过 int mkfifo(const char *pathname, mode_t mode) 创建的命名管道,这两种方式创建的管道都没有SOCK_*参数的。

下面是摘自swoole扩展中创建pipe的代码

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|bl", &callback, &redirect_stdin_and_stdout, &pipe_type) == FAILURE)
{
        RETURN_FALSE;
}

......

if (pipe_type > 0)
{
    swPipe *_pipe = emalloc(sizeof(swWorker));
    int socket_type = pipe_type == 1 ? SOCK_STREAM : SOCK_DGRAM;
    if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
    {
        RETURN_FALSE;
    }

    process->pipe_object = _pipe;
    process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
    process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER);
    process->pipe = process->pipe_master;

    zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC);
}

从上面的代码中可以看出,这里的pipe是通过函数 swPipeUnsock_create 创建的,下面是这个函数的完整实现

int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
{
    int ret;
    swPipeUnsock *object = sw_malloc(sizeof(swPipeUnsock));
    if (object == NULL)
    {
        swWarn("malloc() failed.");
        return SW_ERR;
    }
    p->blocking = blocking;
    ret = socketpair(AF_UNIX, protocol, 0, object->socks);
    if (ret < 0)
    {
        swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    else
    {
        //Nonblock
        if (blocking == 0)
        {
            swSetNonBlock(object->socks[0]);
            swSetNonBlock(object->socks[1]);
        }

        int sbsize = SwooleG.socket_buffer_size;
        swSocket_set_buffer_size(object->socks[0], sbsize);
        swSocket_set_buffer_size(object->socks[1], sbsize);

        p->object = object;
        p->read = swPipeUnsock_read;
        p->write = swPipeUnsock_write;
        p->getFd = swPipeUnsock_getFd;
        p->close = swPipeUnsock_close;
    }
    return 0;
}

从上面的代码就可以很直观的看到,是通过函数 socketpair 创建了一对已连接的(UNIX族)无名socket。在Linux中,完全可以把这一对socket当成pipe返回的文件描述符一样使用,唯一的区别就是这一对文件描述符中的任何一个都可读和可写。所以这样主进程和子进程间的通信就完全是在进行socket通信。

相关资料:
Linux 上实现双向进程间通信管道
socketpair

2、IPC msgqueue

bool swoole_process->useQueue(int $msgkey = 0, int $mode = 2);

* $msgkey是消息队列的key,默认会使用ftok(FILE)
* $mode通信模式,默认为2,表示争抢模式,所有创建的子进程都会从队列中取数据

* 使用模式2后,创建的子进程无法进行单独通信,比如发给特定子进程。
* $process对象并未执行start,也可以执行push/pop向队列推送/提取数据
* 消息队列通信方式与管道不可公用。消息队列不支持EventLoop,使用消息队列后只能使用同步阻塞模式

上面是摘自文档中的一些描述。还是直接看源码怎么实现的

static PHP_METHOD(swoole_process, useQueue)
{
    long msgkey = 0;
    long mode = 2;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &msgkey, &mode) == FAILURE)
    {
        RETURN_FALSE;
    }

    swWorker *process = swoole_get_object(getThis());

    if (msgkey <= 0)
    {
#if PHP_MAJOR_VERSION == 7
        msgkey = ftok(execute_data->func->op_array.filename->val, 0);
#else
        msgkey = ftok(EG(active_op_array)->filename, 0);
#endif
    }

    swMsgQueue *queue = emalloc(sizeof(swMsgQueue));
    if (swMsgQueue_create(queue, 1, msgkey, 0) < 0)
    {
        RETURN_FALSE;
    }
    queue->delete = 0;
    process->queue = queue;
    process->ipc_mode = mode;
    RETURN_TRUE;
}

从源码可以看到,$msgkey的默认值为0,且当它的值小于等于0的时候,就会用使用默认值 ftok(FILE)。而mode在这里并没有直接使用,而是将它赋值给了process对象的ipc_mode属性。
但是这里使用 swMsgQueue_create 创建了队列,下面是这个函数的原型

int swMsgQueue_create(swMsgQueue *q, int blocking, key_t msg_key, long type)
{
    int msg_id;
    if (blocking == 0)
    {
        q->ipc_wait = IPC_NOWAIT;
    }
    else
    {
        q->ipc_wait = 0;
    }
    q->blocking = blocking;
    msg_id = msgget(msg_key, IPC_CREAT | O_EXCL | 0666);
    if (msg_id < 0)
    {
        swWarn("msgget() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }
    else
    {
        q->msg_id = msg_id;
        q->type = type;
    }
    return 0;
}

这个代码很简单,创建了一个key为$msg_key并且是阻塞性的msgqueue。这里就是完整的msgqueue的创建逻辑,当你没有给它设置默认的key的时候,系统取一个默认的值作为key,当然你也能根据你的需要设置任意的key来创建一个或者多个msgqueue。

上面说了第一个参数$msgkey,下面来看看第二个参数$mode的作用。上面的代码中将mode赋值给了process对象的ipc_mode属性,即系看它都在什么地方被用到。

首先看数据如队列 pop

static PHP_METHOD(swoole_process, push)
{
    ......

    struct
    {
        long type;
        char data[65536];
    } message;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &length) == FAILURE)
    {
        RETURN_FALSE;
    }

    ......

    message.type = process->id;
    memcpy(message.data, data, length);

    if (swMsgQueue_push(process->queue, (swQueue_data *)&message, length) < 0)
    {
        php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgsnd() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }
    RETURN_TRUE;
}

从上面的代码可以看出,我们PHP调用push时候传入的值被放入了一个 message 结构体中,然后再调用 swMsgQueue_push将数据入队列的。
下面是 swMsgQueue_push 的实现

int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length)
{
    int ret;

    while (1)
    {
        ret = msgsnd(q->msg_id, in, length, q->ipc_wait);

        if (ret < 0)
        {
            if (errno == EINTR)
            {
                continue;
            }
            else if (errno == EAGAIN)
            {
                swYield();
                continue;
            }
            else
            {
                return -1;
            }
        }
        else
        {
            return ret;
        }
    }
    return 0;
}

这里的代码很简单,就是调用系统调用 msgsnd 将数据放入队列中。

现在看数据出队列 pop

static PHP_METHOD(swoole_process, pop)
{
    ......

    struct
    {
        long type;
        char data[SW_MSGMAX];
    } message;

    if (process->ipc_mode == 2)
    {
        message.type = 0;
    }
    else
    {
        message.type = process->id;
    }

    int n = swMsgQueue_pop(process->queue, (swQueue_data *) &message, maxsize);
    if (n < 0)
    {
        php_error_docref(NULL TSRMLS_CC, E_WARNING, "msgrcv() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }
    SW_RETURN_STRINGL(message.data, n, 1);
}

看到这里,终于看到我们之前传入的$mode参数的作用了。如果我们如文档中所说默认值为2的话, message结构体的type就被设置为0,否则的话取当前进程的ID。说到这里有个要说的是,每个process进程在它的对象被new的时候,它的构造函数会将它自己的属性ID设置一个值,这个值是一个自增计数器,也就是会将一次初始化的process对象排队,所以每个process的id的值是不一样的。
重点是看 swMsgQueue_pop

int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length)
{
    int flag = q->ipc_wait;
    long type = data->mtype;

    return msgrcv(q->msg_id, data, length, type, flag);
}

看到了吧,type是 msgrcv系统调用需要的第4个参数。下面是对这个参数的解释:

1. =0:接收第一个消息
2. >0:接收类型等于msgtyp的第一个消息
3.

文章来源:

Author:花生
link:http://wenjun.org/?p=1190