swoole_process应用之master-worker-tasker
年初时候为了给其他部门推送数据,所以利用swoole_process开发了一个数据推送的服务。最近发觉好些童鞋都需要开发类似功能的东西,PHP大会休息时间Rango也被问了同样的东西,所以就聊聊我们这边的这个服务吧。自我感觉功能和进程模型做的挺不错。
不多说,先上图,然后对着图慢慢解释
从上面的图来看,从功能上来看分为3种进程,分别为master、worker、tasker进程,很熟悉吧,swoole的server模型就是这样的。接下来分别说说每种进程是怎么工作的。
Master
//daemon \swoole_process::daemon(false, true); //注册共享内存计数器 Table::registery(); //fork tasker子进程 $this->initTaskerProcess(); //fork worker子进程 $this->initWorkerProcess(); //主进程中注册监听的信号量 $this->initSignal(); //主进程中注册定时器 $this->initTimer(); //记录以及修改一些进程信息 $this->info(); //loop... $this->loop();
这里的主进程就是swoole_process的父进程,它不干具体的活,他的主要工作有
fork产生worker与tasker子进程 监听信号:SIGCHLD与SIGTERM 注册定时器 修改主进程名并将主进程pid写入到pid文件 基于swoole_event实现一个纯异步的tcp server,让主进程彻底的异步化这里有个重点是,我们的进程分工是worker进程从外部获取数据(Kafka/Reids/Mysql…)然后分给tasker去处理,所以这里worker进程必须得拿到属于它的tasker进程组,因此worker进程必须在tasker进程fork完成后它才能fork出来,也就是它后fork的话他可以获取到TakserProcessObjList。这样的话它就可以给每一个tasker发送数据了。
private function initTaskerProcess() { for ($i = 0; $i < Main::$taskerConf['process_num']; $i++) { $processObj = new \swoole_process('\Action\Tasker::processCallback', false, false); //每个tasker进程一个独立的队列 $processObj->useQueue($i + 1); $processObj->id = $i; $processObj->start(); $this->taskerProcessList[] = $processObj; } } private function initWorkerProcess() { for ($i = 0; $i < Main::$workerConf['process_num']; $i++) { $processObj = new \swoole_process('\Action\Worker::processCallback', false, false); $processObj->id = $i; $processObj->start(); $this->workerProcessList[] = $processObj; } }
这里使用的是msgqueue的通信方式,使用这种方式主要有两种考虑:
第一,任务队列对我们来说比较好理解与使用,在一个个的任务的情况下用socket的话没有消息队列好使用 第二,我们还有个非常重要的功能,也就是进程异常退出重新拉起,这里就需要即使进程退出了,但是我们的队列和任务还存在,不会消失,新的进程拉起后还能在原消息队列上继续工作关于信号监听的话,监听SIGCHLD的作用是当worker子进程或者tasker子进程异常退出的时候,主进程会接收到这个信号,然后我们的主进程就可以知道是哪个进程退出了,然后重新拉起它,看代码吧
\swoole_process::signal(SIGCHLD, array($this, 'sigchldHandle')); public function sigchldHandle($signal) { //回收结束子进程 while ($ret = \swoole_process::wait(false)) { //log Log::info("child process pid[{$ret['pid']}] exit", 'master'); //重新拉起worker进程 foreach (Master::getInstance()->workerProcessList as $id => $processObj) { if ($processObj->pid == $ret['pid']) { $processObj->start(); Master::getInstance()->workerProcessList[$id] = $processObj; Log::info("child worker process id[{$processObj->id}] pid[{$processObj->pid}] start", 'master'); break; } } //重新拉起tasker进程 foreach (Master::getInstance()->taskerProcessList as $id => $processObj) { if ($processObj->pid == $ret['pid']) { $processObj->start(); Master::getInstance()->taskerProcessList[$id] = $processObj; Log::info("child tasker process id[{$processObj->id}] pid[{$processObj->pid}] start", 'master'); break; } } } }
//SIGTERM 进程正常退出 \swoole_process::signal(SIGTERM, array($this, 'sigtermHandle')); public function sigtermHandle($signal) { $allProcess = array(); //退出worker进程 foreach (Master::getInstance()->workerProcessList as $id => $processObj) { \swoole_process::kill($processObj->pid, SIGKILL); $allProcess[$processObj->pid] = $processObj; } //检测系统中所有任务完成并退出tasker进程 $allTaskerProcess = Master::getInstance()->taskerProcessList; //获取内存表的对象 $tableObj = GlobalRegistery::get('table_obj'); while (true) { //所有内部状态 $allStatus = $tableObj->get(Main::PROGRAM_NAME); foreach ($allTaskerProcess as $id => $processObj) { //tasker总共收到的task个数 $taskerReciveKey = sprintf(Table::TASKER_RECIVE_COUNT, $id); //tasker总共完成的task个数 $taskerFinishKey = sprintf(Table::TASKER_FINISH_COUNT, $id); $reciveCount = $allStatus[$taskerReciveKey]; $finishCount = $allStatus[$taskerFinishKey]; $unfinshCount = $reciveCount - $finishCount; if (0 == $unfinshCount) { \swoole_process::kill($processObj->pid, SIGKILL); $allProcess[$processObj->pid] = $processObj; unset($allTaskerProcess[$id]); } } if (empty($allTaskerProcess)) { break; } } //等待捕获所有子进程退出 while (!empty($allProcess)) { while ($result = \swoole_process::wait(false)) { $pid = $result['pid']; unset($allProcess[$pid]); //log Log::info("child process #{$pid} exit", 'master'); } } //delete pid file unlink(Main::getPidConf()); //退出主进程的socket socket_close(Master::getInstance()->masterListenSock); //删除系统的msg_queue shell_exec('ipcs -q | awk \'{ print "ipcrm -q "$2}\' | sh > /dev/null 2>&1;'); //log Log::info('ddps shutdown success', 'master'); //主进程退出 swoole_event_exit(); }
上面的这套逻辑,由监听信号SIGTERM来保证了系统能平稳安全的退出,而不至于丢失数据。
下面是主进程的异步server的实现,它的功能只有一个,输出当前系统的运行状态
private function loop() { $this->masterListenSock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); //绑定要监听的端口 $result = socket_bind($this->masterListenSock, Main::MONITOR_LISTEN_IP, Main::MONITOR_LISTEN_PORT); //监听端口 $result = socket_listen($this->masterListenSock); swoole_event_add($this->masterListenSock, function($socket) { //accept $connSocket = socket_accept($socket); swoole_event_add($connSocket, null, function($socket){ $tableObj = GlobalRegistery::get('table_obj'); $allStatus = $tableObj->get(Main::PROGRAM_NAME); socket_write($socket, json_encode($allStatus)); socket_close($socket); }, SWOOLE_EVENT_WRITE); }, null, SWOOLE_EVENT_READ ); }
Worker
worker进程是由Master进程fork产生的,它本身没有开任务的通信通道,但是它手握着所有的Tasker进程的对象,这样它就可以给任意的Tasker进程通过消息队列分配和发送任务。下面来一点Worker进程的代码
if (null !== Master::getInstance()->masterListenSock) { socket_close(Master::getInstance()->masterListenSock); } $id = $workerProcess->id; $pid = $workerProcess->pid; //rename master process \swoole_set_process_name(sprintf(Main::WORKER_PROCESS_NAME, $id)); //计数器对象与Key $tableObj = GlobalRegistery::get('table_obj'); $sendCountKey = sprintf(Table::WORKER_SEND_COUNT, $id); //worker config $workerConfig = Main::$workerConf['process_list'][$id]; $Dataconfig = $workerConfig['data_config']; $stroageObj = Factory::getInstance($Dataconfig); //cpu亲和性绑定 if (is_array($workerConfig['affinity']) && !empty($workerConfig['affinity'])) { \swoole_process::setaffinity($workerConfig['affinity']); } //tasker process list $taskerList = Master::getInstance()->taskerProcessList; $taskerGroupConfig = Main::$taskerConf['group_list'][$workerConfig['tasker_group_id']]; GlobalRegistery::set('tasker_group_config', $taskerGroupConfig); while (true) { foreach ($stroageObj->getDataList() as $data) { //无休止的干活、、、、干活、、、 $result = $taskerProcess->push($dataStr); } }
要说明的一个就是,最上面的socket_close只会在当进程被重新拉起时候有用,上面说了,Master进程在fork完子进程后开启了一个纯异步的socket_server,所以这里得关掉fork后拿到的socket。由于worker进程效率比较高,对CPU利用也比较好,所以开启了CPU亲和性绑定,然后下面就是一个死循环的拉取数据,然后将数据分发给tsker进程的过程,当中中间会有一些计数器,记录当前进程发送了多少数据给tasker进程。
Tasker
//清理工作 if (null !== Master::getInstance()->masterListenSock) { socket_close(Master::getInstance()->masterListenSock); } $id = $taskerProcess->id; $pid = $taskerProcess->pid; //找出当前tasker所属的配置组 $taskerConfig = self::getTaskerConfig($id); if (false === $taskerConfig) { Log::warning("tasker[{$pid}] get config error", 'tasker', $id); exit; } GlobalRegistery::set('current_tasker_config', $taskerConfig); //rename master process \swoole_set_process_name(sprintf(Main::TASKER_PROCESS_NAME, $id)); //计数器对象与Key $tableObj = GlobalRegistery::get('table_obj'); $finishCountKey = sprintf(Table::TASKER_FINISH_COUNT, $id); $successCountKey = sprintf(Table::TASKER_SUCCESS_COUNT, $id); $failCountKey = sprintf(Table::TASKER_FAIL_COUNT, $id); while(true) { $orderInfoStr = $taskerProcess->pop(); //处理数据。。。。。。。。。。。。。。。。 //计数 $tableObj->incr(Main::PROGRAM_NAME, $finishCountKey); if (false === $result) { //fail $tableObj->incr(Main::PROGRAM_NAME, $failCountKey); continue; } //success $tableObj->incr(Main::PROGRAM_NAME, $successCountKey); Log::info("tasker[{$pid}] post sucess", 'tasker', $orderInfoStr); }
tasker进程也是一个死循环,无休止的从消息队列中pop数据出来处理,然后它里面的计数器更多,包括记录了总的完成量、处理成功的数量、失败的数量,这些数据的统计都在由swoole_table完成的,然后由主进程的异步server对外输出工作状态。
大概的就这些了。这个模型将数据获取与处理分割开,让每个进程的效率都达到最高,而且让worker进程和tasker进程的匹配更加可控可调。系统中包含了一个内部计数器,这样系统外部就能通过tcp server知道系统内部的状态,也能让系统内部通过这些计数知道worker与tasker进程之间的消息队列的任务积压,从而能保护系统不被压垮。当子进程异常退出时候还能将进程重新拉起,保证系统的稳定运行。还有一套让整个系统从容安全退出的机制,不至于丢失数据
秀一下我们线上的1 Worker对应50 Tasker
文章来源:
Author:花生
link:http://wenjun.org/?p=1236