swoole_process应用之master-worker-tasker

年初时候为了给其他部门推送数据,所以利用swoole_process开发了一个数据推送的服务。最近发觉好些童鞋都需要开发类似功能的东西,PHP大会休息时间Rango也被问了同样的东西,所以就聊聊我们这边的这个服务吧。自我感觉功能和进程模型做的挺不错。

不多说,先上图,然后对着图慢慢解释swoole_process应用

从上面的图来看,从功能上来看分为3种进程,分别为master、worker、tasker进程,很熟悉吧,swoole的server模型就是这样的。接下来分别说说每种进程是怎么工作的。

Master

//daemon
\swoole_process::daemon(false, true);

//注册共享内存计数器
Table::registery();

//fork tasker子进程
$this->initTaskerProcess();

//fork worker子进程
$this->initWorkerProcess();

//主进程中注册监听的信号量
$this->initSignal();

//主进程中注册定时器
$this->initTimer();

//记录以及修改一些进程信息
$this->info();

//loop...
$this->loop();

这里的主进程就是swoole_process的父进程,它不干具体的活,他的主要工作有

fork产生worker与tasker子进程 监听信号:SIGCHLD与SIGTERM 注册定时器 修改主进程名并将主进程pid写入到pid文件 基于swoole_event实现一个纯异步的tcp server,让主进程彻底的异步化

这里有个重点是,我们的进程分工是worker进程从外部获取数据(Kafka/Reids/Mysql…)然后分给tasker去处理,所以这里worker进程必须得拿到属于它的tasker进程组,因此worker进程必须在tasker进程fork完成后它才能fork出来,也就是它后fork的话他可以获取到TakserProcessObjList。这样的话它就可以给每一个tasker发送数据了。

private function initTaskerProcess()
{
    for ($i = 0; $i < Main::$taskerConf['process_num']; $i++) {
        $processObj = new \swoole_process('\Action\Tasker::processCallback', false, false);
        //每个tasker进程一个独立的队列
        $processObj->useQueue($i + 1);
        $processObj->id = $i;
        $processObj->start();

        $this->taskerProcessList[] = $processObj;
    }
}

private function initWorkerProcess()
{
    for ($i = 0; $i < Main::$workerConf['process_num']; $i++) {
        $processObj = new \swoole_process('\Action\Worker::processCallback', false, false);
        $processObj->id = $i;
        $processObj->start();

        $this->workerProcessList[] = $processObj;
    }
}

这里使用的是msgqueue的通信方式,使用这种方式主要有两种考虑:

第一,任务队列对我们来说比较好理解与使用,在一个个的任务的情况下用socket的话没有消息队列好使用 第二,我们还有个非常重要的功能,也就是进程异常退出重新拉起,这里就需要即使进程退出了,但是我们的队列和任务还存在,不会消失,新的进程拉起后还能在原消息队列上继续工作

关于信号监听的话,监听SIGCHLD的作用是当worker子进程或者tasker子进程异常退出的时候,主进程会接收到这个信号,然后我们的主进程就可以知道是哪个进程退出了,然后重新拉起它,看代码吧

\swoole_process::signal(SIGCHLD, array($this, 'sigchldHandle'));

public function sigchldHandle($signal)
{
    //回收结束子进程
    while ($ret = \swoole_process::wait(false)) {
        //log
        Log::info("child process pid[{$ret['pid']}] exit", 'master');

        //重新拉起worker进程
        foreach (Master::getInstance()->workerProcessList as $id => $processObj) {
            if ($processObj->pid == $ret['pid']) {
                $processObj->start();

                Master::getInstance()->workerProcessList[$id] = $processObj;

                Log::info("child worker process id[{$processObj->id}] pid[{$processObj->pid}] start", 'master');
                break;
            }
        }

        //重新拉起tasker进程
        foreach (Master::getInstance()->taskerProcessList as $id => $processObj) {
            if ($processObj->pid == $ret['pid']) {
                $processObj->start();

                Master::getInstance()->taskerProcessList[$id] = $processObj;

                Log::info("child tasker process id[{$processObj->id}] pid[{$processObj->pid}] start", 'master');
                break;
            }
        }
    }
}
//SIGTERM 进程正常退出
\swoole_process::signal(SIGTERM, array($this, 'sigtermHandle'));

public function sigtermHandle($signal)
{
    $allProcess = array();

    //退出worker进程
    foreach (Master::getInstance()->workerProcessList as $id => $processObj) {
        \swoole_process::kill($processObj->pid, SIGKILL);

        $allProcess[$processObj->pid] = $processObj;
    }

    //检测系统中所有任务完成并退出tasker进程
    $allTaskerProcess = Master::getInstance()->taskerProcessList;

    //获取内存表的对象
    $tableObj = GlobalRegistery::get('table_obj');
    while (true) {
        //所有内部状态
        $allStatus = $tableObj->get(Main::PROGRAM_NAME);

        foreach ($allTaskerProcess as $id => $processObj) {
            //tasker总共收到的task个数
            $taskerReciveKey = sprintf(Table::TASKER_RECIVE_COUNT, $id);
            //tasker总共完成的task个数
            $taskerFinishKey = sprintf(Table::TASKER_FINISH_COUNT, $id);

            $reciveCount  = $allStatus[$taskerReciveKey];
            $finishCount  = $allStatus[$taskerFinishKey];
            $unfinshCount = $reciveCount - $finishCount;
            if (0 == $unfinshCount) {
                \swoole_process::kill($processObj->pid, SIGKILL);

                $allProcess[$processObj->pid] = $processObj;
                unset($allTaskerProcess[$id]);
            }
        }

        if (empty($allTaskerProcess)) {
            break;
        }
    }

    //等待捕获所有子进程退出
    while (!empty($allProcess)) {
        while ($result = \swoole_process::wait(false)) {
            $pid = $result['pid'];
            unset($allProcess[$pid]);

            //log
            Log::info("child process #{$pid} exit", 'master');
        }
    }

    //delete pid file
    unlink(Main::getPidConf());

    //退出主进程的socket
    socket_close(Master::getInstance()->masterListenSock);

    //删除系统的msg_queue
    shell_exec('ipcs -q | awk \'{ print "ipcrm -q "$2}\' | sh > /dev/null 2>&1;');

    //log
    Log::info('ddps shutdown success', 'master');

    //主进程退出
    swoole_event_exit();
}

上面的这套逻辑,由监听信号SIGTERM来保证了系统能平稳安全的退出,而不至于丢失数据。

下面是主进程的异步server的实现,它的功能只有一个,输出当前系统的运行状态

private function loop()
{
    $this->masterListenSock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
    //绑定要监听的端口
    $result = socket_bind($this->masterListenSock, Main::MONITOR_LISTEN_IP, Main::MONITOR_LISTEN_PORT);
    //监听端口
    $result = socket_listen($this->masterListenSock);

    swoole_event_add($this->masterListenSock,
        function($socket) {  //accept
            $connSocket = socket_accept($socket);
            swoole_event_add($connSocket, null,
                function($socket){
                    $tableObj = GlobalRegistery::get('table_obj');
                    $allStatus = $tableObj->get(Main::PROGRAM_NAME);

                    socket_write($socket, json_encode($allStatus));
                    socket_close($socket);
                },
                SWOOLE_EVENT_WRITE);
        },
        null, SWOOLE_EVENT_READ
    );
}

Worker

worker进程是由Master进程fork产生的,它本身没有开任务的通信通道,但是它手握着所有的Tasker进程的对象,这样它就可以给任意的Tasker进程通过消息队列分配和发送任务。下面来一点Worker进程的代码

if (null !== Master::getInstance()->masterListenSock) {
    socket_close(Master::getInstance()->masterListenSock);
}

$id  = $workerProcess->id;
$pid = $workerProcess->pid;

//rename master process
\swoole_set_process_name(sprintf(Main::WORKER_PROCESS_NAME, $id));

//计数器对象与Key
$tableObj     = GlobalRegistery::get('table_obj');
$sendCountKey = sprintf(Table::WORKER_SEND_COUNT, $id);

//worker config
$workerConfig = Main::$workerConf['process_list'][$id];
$Dataconfig   = $workerConfig['data_config'];
$stroageObj   = Factory::getInstance($Dataconfig);

//cpu亲和性绑定
if (is_array($workerConfig['affinity']) && !empty($workerConfig['affinity'])) {
    \swoole_process::setaffinity($workerConfig['affinity']);
}

//tasker process list
$taskerList        = Master::getInstance()->taskerProcessList;
$taskerGroupConfig = Main::$taskerConf['group_list'][$workerConfig['tasker_group_id']];
GlobalRegistery::set('tasker_group_config', $taskerGroupConfig);

while (true) {
    foreach ($stroageObj->getDataList() as $data) {
        //无休止的干活、、、、干活、、、
        $result = $taskerProcess->push($dataStr);
    }
}

要说明的一个就是,最上面的socket_close只会在当进程被重新拉起时候有用,上面说了,Master进程在fork完子进程后开启了一个纯异步的socket_server,所以这里得关掉fork后拿到的socket。由于worker进程效率比较高,对CPU利用也比较好,所以开启了CPU亲和性绑定,然后下面就是一个死循环的拉取数据,然后将数据分发给tsker进程的过程,当中中间会有一些计数器,记录当前进程发送了多少数据给tasker进程。

Tasker

//清理工作
if (null !== Master::getInstance()->masterListenSock) {
    socket_close(Master::getInstance()->masterListenSock);
}

$id  = $taskerProcess->id;
$pid = $taskerProcess->pid;

//找出当前tasker所属的配置组
$taskerConfig = self::getTaskerConfig($id);
if (false === $taskerConfig) {
    Log::warning("tasker[{$pid}] get config error", 'tasker', $id);
    exit;
}
GlobalRegistery::set('current_tasker_config', $taskerConfig);

//rename master process
\swoole_set_process_name(sprintf(Main::TASKER_PROCESS_NAME, $id));

//计数器对象与Key
$tableObj        = GlobalRegistery::get('table_obj');
$finishCountKey  = sprintf(Table::TASKER_FINISH_COUNT, $id);
$successCountKey = sprintf(Table::TASKER_SUCCESS_COUNT, $id);
$failCountKey    = sprintf(Table::TASKER_FAIL_COUNT, $id);

while(true) {
    $orderInfoStr = $taskerProcess->pop();

    //处理数据。。。。。。。。。。。。。。。。

    //计数
    $tableObj->incr(Main::PROGRAM_NAME, $finishCountKey);
    if (false === $result) {    //fail
        $tableObj->incr(Main::PROGRAM_NAME, $failCountKey);
        continue;
    }
    //success
    $tableObj->incr(Main::PROGRAM_NAME, $successCountKey);
    Log::info("tasker[{$pid}] post sucess", 'tasker', $orderInfoStr);
}

tasker进程也是一个死循环,无休止的从消息队列中pop数据出来处理,然后它里面的计数器更多,包括记录了总的完成量、处理成功的数量、失败的数量,这些数据的统计都在由swoole_table完成的,然后由主进程的异步server对外输出工作状态。

大概的就这些了。这个模型将数据获取与处理分割开,让每个进程的效率都达到最高,而且让worker进程和tasker进程的匹配更加可控可调。系统中包含了一个内部计数器,这样系统外部就能通过tcp server知道系统内部的状态,也能让系统内部通过这些计数知道worker与tasker进程之间的消息队列的任务积压,从而能保护系统不被压垮。当子进程异常退出时候还能将进程重新拉起,保证系统的稳定运行。还有一套让整个系统从容安全退出的机制,不至于丢失数据

秀一下我们线上的1 Worker对应50 Tasker

文章来源:

Author:花生
link:http://wenjun.org/?p=1236