swoole —— 重新定義PHP
swoole 的進(jìn)程之間有兩種通信方式,一種是消息隊(duì)列(queue),另一種是管道(pipe),對(duì)swoole_process 的研究在swoole中顯得尤為重要。
預(yù)備知識(shí)
IO多路復(fù)用
swoole 中的io多路復(fù)用表現(xiàn)為底層的 epoll進(jìn)程模型,在C語(yǔ)言中表現(xiàn)為 epoll 函數(shù)。
- epoll 模型下會(huì)持續(xù)監(jiān)聽(tīng)自己名下的素有socket 描述符 fd
- 當(dāng)觸發(fā)了 socket 監(jiān)聽(tīng)的事件時(shí),epoll 函數(shù)才會(huì)響應(yīng),并返回所有監(jiān)聽(tīng)該時(shí)間的 socket 集合
- epoll 的本質(zhì)是阻塞IO,它的優(yōu)點(diǎn)在于能同事處理大量socket連接
Event loop 事件循環(huán)
swoole 對(duì) epoll 實(shí)現(xiàn)了一個(gè)Reactor線程模型封裝,設(shè)置了read事件和write事件的監(jiān)聽(tīng)回調(diào)函數(shù)。(詳見(jiàn)swoole_event_add)
- Event loop 是一個(gè)Reactor線程,其中運(yùn)行了一個(gè)epoll實(shí)例。
- 通過(guò)swoole_event_add將socket描述符的一個(gè)事件添加到epoll監(jiān)聽(tīng)中,事件發(fā)生時(shí)將執(zhí)行回調(diào)函數(shù)
- 不可用于fpm環(huán)境下,因?yàn)閒pm在任務(wù)結(jié)束時(shí)可能會(huì)關(guān)掉進(jìn)程。
swoole_process
- 基于C語(yǔ)言封裝的進(jìn)程管理模塊,方便php來(lái)調(diào)用
- 內(nèi)置管道、消息隊(duì)列接口,方便實(shí)現(xiàn)進(jìn)程間通信
我們?cè)趐hp-fpm.conf配置文件中發(fā)現(xiàn),php-fpm中有兩種進(jìn)程池管理設(shè)置。
- 靜態(tài)模式 即初始化固定的進(jìn)程數(shù),當(dāng)來(lái)了一個(gè)請(qǐng)求時(shí),從中選取一個(gè)進(jìn)程來(lái)處理。
- 動(dòng)態(tài)模式 指定最小、最大進(jìn)程數(shù),當(dāng)請(qǐng)求量過(guò)大,進(jìn)程數(shù)不超過(guò)最大限制時(shí),新增線程去處理請(qǐng)求
接下來(lái)用swoole代碼來(lái)實(shí)現(xiàn),這里只是為理解swoole_process、進(jìn)程間通信、定時(shí)器等使用,實(shí)際情況使用封裝好的swoole_server來(lái)實(shí)現(xiàn)task任務(wù)隊(duì)列池會(huì)更方便。
假如有個(gè)定時(shí)投遞的任務(wù)隊(duì)列:
?php
/**
* 動(dòng)態(tài)進(jìn)程池,類(lèi)似fpm
* 動(dòng)態(tài)新建進(jìn)程
* 有初始進(jìn)程數(shù),最小進(jìn)程數(shù),進(jìn)程不夠處理時(shí)候新建進(jìn)程,不超過(guò)最大進(jìn)程數(shù)
*/
// 一個(gè)進(jìn)程定時(shí)投遞任務(wù)
/**
* 1. tick
* 2. process及其管道通訊
* 3. event loop 事件循環(huán)
*/
class processPool
{
private $pool;
/**
* @var swoole_process[] 記錄所有worker的process對(duì)象
*/
private $workers = [];
/**
* @var array 記錄worker工作狀態(tài)
*/
private $used_workers = [];
/**
* @var int 最小進(jìn)程數(shù)
*/
private $min_woker_num = 5;
/**
* @var int 初始進(jìn)程數(shù)
*/
private $start_worker_num = 10;
/**
* @var int 最大進(jìn)程數(shù)
*/
private $max_woker_num = 20;
/**
* 進(jìn)程閑置銷(xiāo)毀秒數(shù)
* @var int
*/
private $idle_seconds = 5;
/**
* @var int 當(dāng)前進(jìn)程數(shù)
*/
private $curr_num;
/**
* 閑置進(jìn)程時(shí)間戳
* @var array
*/
private $active_time = [];
public function __construct()
{
$this->pool = new swoole_process(function () {
// 循環(huán)建立worker進(jìn)程
for ($i = 0; $i $this->start_worker_num; $i++) {
$this->createWorker();
}
echo '初始化進(jìn)程數(shù):' . $this->curr_num . PHP_EOL;
// 每秒定時(shí)往閑置的worker的管道中投遞任務(wù)
swoole_timer_tick(1000, function ($timer_id) {
static $count = 0;
$count++;
$need_create = true;
foreach ($this->used_workers as $pid => $used) {
if ($used == 0) {
$need_create = false;
$this->workers[$pid]->write($count . ' job');
// 標(biāo)記使用中
$this->used_workers[$pid] = 1;
$this->active_time[$pid] = time();
break;
}
}
foreach ($this->used_workers as $pid => $used)
// 如果所有worker隊(duì)列都沒(méi)有閑置的,則新建一個(gè)worker來(lái)處理
if ($need_create $this->curr_num $this->max_woker_num) {
$new_pid = $this->createWorker();
$this->workers[$new_pid]->write($count . ' job');
$this->used_workers[$new_pid] = 1;
$this->active_time[$new_pid] = time();
}
// 閑置超過(guò)一段時(shí)間則銷(xiāo)毀進(jìn)程
foreach ($this->active_time as $pid => $timestamp) {
if ((time() - $timestamp) > $this->idle_seconds $this->curr_num > $this->min_woker_num) {
// 銷(xiāo)毀該進(jìn)程
if (isset($this->workers[$pid]) $this->workers[$pid] instanceof swoole_process) {
$this->workers[$pid]->write('exit');
unset($this->workers[$pid]);
$this->curr_num = count($this->workers);
unset($this->used_workers[$pid]);
unset($this->active_time[$pid]);
echo "{$pid} destroyed\n";
break;
}
}
}
echo "任務(wù){(diào)$count}/{$this->curr_num}\n";
if ($count == 20) {
foreach ($this->workers as $pid => $worker) {
$worker->write('exit');
}
// 關(guān)閉定時(shí)器
swoole_timer_clear($timer_id);
// 退出進(jìn)程池
$this->pool->exit(0);
exit();
}
});
});
$master_pid = $this->pool->start();
echo "Master $master_pid start\n";
while ($ret = swoole_process::wait()) {
$pid = $ret['pid'];
echo "process {$pid} existed\n";
}
}
/**
* 創(chuàng)建一個(gè)新進(jìn)程
* @return int 新進(jìn)程的pid
*/
public function createWorker()
{
$worker_process = new swoole_process(function (swoole_process $worker) {
// 給子進(jìn)程管道綁定事件
swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
$data = trim($worker->read());
if ($data == 'exit') {
$worker->exit(0);
exit();
}
echo "{$worker->pid} 正在處理 {$data}\n";
sleep(5);
// 返回結(jié)果,表示空閑
$worker->write("complete");
});
});
$worker_pid = $worker_process->start();
// 給父進(jìn)程管道綁定事件
swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
$data = trim($worker_process->read());
if ($data == 'complete') {
// 標(biāo)記為空閑
// echo "{$worker_process->pid} 空閑了\n";
$this->used_workers[$worker_process->pid] = 0;
}
});
// 保存process對(duì)象
$this->workers[$worker_pid] = $worker_process;
// 標(biāo)記為空閑
$this->used_workers[$worker_pid] = 0;
$this->active_time[$worker_pid] = time();
$this->curr_num = count($this->workers);
return $worker_pid;
}
}
new processPool();
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:- 詳解PHP swoole process的使用方法