簡介
PHP 的異步進(jìn)程助手,借助于 AMQP 實(shí)現(xiàn)異步執(zhí)行 PHP 的方法,將一些很耗時(shí)、追求高可用、需要重試機(jī)制的操作放到異步進(jìn)程中去執(zhí)行,將你的 HTTP 服務(wù)從繁重的業(yè)務(wù)邏輯中解脫出來。以一個(gè)較低的成本將傳統(tǒng) PHP 業(yè)務(wù)邏輯轉(zhuǎn)換成非阻塞、高可用、可擴(kuò)展的異步模式。
依賴
- php 5.6+
- ext-bcmath
- ext-amqp 1.9.1+
- ext-memcached 3.0.3+
安裝
通過 composer 安裝
composer require l669/async-helper
或直接下載項(xiàng)目源碼
wget https://github.com/l669306630/async-helper/archive/master.zip
使用范例
業(yè)務(wù)邏輯:這里定義了很多等待被調(diào)用的類和方法,在你的項(xiàng)目中這可能是數(shù)據(jù)模型、或是一個(gè)發(fā)送郵件的類。
?php
class SendMailHelper
{
/**
* @param array $mail
* @throws Exception
*/
public static function request($mail)
{
// 在這里發(fā)送郵件,或是通過調(diào)用第三方提供的服務(wù)發(fā)送郵件
// 發(fā)送失敗的時(shí)候你拋出了異常,希望被進(jìn)程捕獲,并按設(shè)定的規(guī)則進(jìn)行重試
}
}
生產(chǎn)者:通常是 HTTP 服務(wù),傳統(tǒng)的 PHP 項(xiàng)目或是一個(gè)命令行程序,接收到某個(gè)請(qǐng)求或指令后進(jìn)行一系列的操作。
?php
use l669\AsyncHelper;
class UserController
{
public function register()
{
// 假設(shè)這是一個(gè)用戶注冊(cè)的請(qǐng)求,用戶提交了姓名、郵箱、驗(yàn)證碼
// 第一步、校驗(yàn)用戶信息
// 第二步、實(shí)例化異步助手,這時(shí)候會(huì)連接 AMQP
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => '/'
]);
// 第三步、保存用戶信息到數(shù)據(jù)庫
$mail = [
'from' => 'service@yourdomain.com',
'to' => 'username@163.com',
'subject' => '恭喜你注冊(cè)成功',
'body' => '請(qǐng)點(diǎn)擊郵件中的鏈接完成驗(yàn)證....'
];
// 第四步、通過異步助手發(fā)送郵件
$async_helper->run('\\SendMailHelper', 'request', [$mail]);
// 這是同步的模式去發(fā)送郵件,如果郵件服務(wù)響應(yīng)遲緩或異常,就會(huì)直接影響該請(qǐng)求的響應(yīng)時(shí)間,甚至丟失這封重要郵件
// SendMailHelper::request($mail);
}
}
消費(fèi)者:PHP 的異步進(jìn)程,監(jiān)聽消息隊(duì)列,執(zhí)行你指定的方法。并且該消費(fèi)者進(jìn)程是可擴(kuò)展的高可用的服務(wù),這一切都得益于 AMQP,這是系統(tǒng)解耦、布局微服務(wù)的最佳方案。
consume.php
?php
require_once('vendor/autoload.php');
require_once('SendMailHelper.php');
use l669\AsyncHelper;
use l669\CacheHelper;
$cache_helper = new CacheHelper('127.0.0.1', 11211);
while(true){
try{
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => '/',
'cacheHelper' => $cache_helper
]);
$async_helper->consume();
}catch(Exception $e){
// 可以在這里記錄一些日志
sleep(2);
}
}
# 在命令行下啟動(dòng)消費(fèi)者進(jìn)程,推薦使用 supervisor 來管理進(jìn)程
php consume.php
支持事務(wù):需要一次提交執(zhí)行多個(gè)異步方法,事務(wù)可以確保完成性。
// 接著上面的示例來說,這里省略了一些重復(fù)的代碼,下同
$async_helper->beginTransaction();
try{
$async_helper->run('\\SendMailHelper', 'request', [$mail1]);
$async_helper->run('\\SendMailHelper', 'request', [$mail2]);
$async_helper->run('\\SendMailHelper', 'request', [$mail3]);
$async_helper->commit();
}catch(\Exception $e){
$async_helper->rollback();
}
阻塞式重試:當(dāng)異步進(jìn)程執(zhí)行一個(gè)方法,方法內(nèi)部拋出異常時(shí)進(jìn)行重試,一些必須遵循執(zhí)行順序的業(yè)務(wù)就要采用阻塞式的重試,通過指定重試最大阻塞時(shí)長來控制。
use l669\CacheHelper;
use l669\AsyncHelper;
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => '/',
'cacheHelper' => new CacheHelper('127.0.0.1', 11211),
'retryMode' => AsyncHelper::RETRY_MODE_REJECT, // 阻塞式重試
'maxDuration' => 600 // 最長重試 10 分鐘
]);
$send_mail_helper = new \SendMailHelper();
$mail = new \stdClass();
$mail->from = 'service@yourdomain.com';
$mail->to = 'username@163.com';
$mail->subject = '恭喜你注冊(cè)成功';
$mail->body = '請(qǐng)點(diǎn)擊郵件中的鏈接完成驗(yàn)證....';
$async_helper->run($send_mail_helper, 'request', [$mail]);
// 如果方法中需要拋出異常來結(jié)束程序,又不希望被異步進(jìn)程重試,可以拋出以下幾種錯(cuò)誤碼,進(jìn)程捕獲到這些異常后會(huì)放棄重試:
// l669\AsyncException::PARAMS_ERROR
// l669\AsyncException::METHOD_DOES_NOT_EXIST
// l669\AsyncException::KNOWN_ERROR
非阻塞式重試:當(dāng)異步執(zhí)行的方法內(nèi)部拋出異常,async-helper 會(huì)將該方法重新放進(jìn)隊(duì)列的尾部,先執(zhí)行新進(jìn)入隊(duì)列的方法,回頭再重試剛才執(zhí)行失敗的方法,通過指定最大重試次數(shù)來控制。
use l669\CacheHelper;
use l669\AsyncHelper;
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => 'new',
'cacheHelper' => new CacheHelper('127.0.0.1', 11211),
'queueName' => 'emails.vip', // 給付費(fèi)的大爺走 VIP 隊(duì)列
'retryMode' => AsyncHelper::RETRY_MODE_TTL, // 非阻塞式重試
'maxRetries' => 10 // 最多重試 10 次
]);
$mail = new \stdClass();
$mail->from = 'service@yourdomain.com';
$mail->to = 'username@163.com';
$mail->subject = '恭喜你注冊(cè)成功';
$mail->body = '請(qǐng)點(diǎn)擊郵件中的鏈接完成驗(yàn)證....';
$async_helper->run('\\SendMailHelper', 'request', [$mail]);
應(yīng)用和解惑
- 我們采用的是開源的 RabbitMQ 來為我們提供的 AMQP 服務(wù)。
- 你的項(xiàng)目部署在擁有很多服務(wù)器節(jié)點(diǎn)的集群上,每個(gè)節(jié)點(diǎn)的程序都需要寫日志文件,現(xiàn)在的問題就是要收集所有節(jié)點(diǎn)上面的日志到一個(gè)地方,方便我們及時(shí)發(fā)現(xiàn)問題或是做一些統(tǒng)計(jì)。所有節(jié)點(diǎn)都可以使用 async-helper 異步調(diào)用一個(gè)寫日志的方法,而執(zhí)行這個(gè)寫日志的方法的進(jìn)程只需要在一臺(tái)機(jī)器上啟動(dòng)就可以了,這樣所有節(jié)點(diǎn)的日志就都實(shí)時(shí)掌握在手里了。
- 做過微信公眾號(hào)開發(fā)的都知道,騰訊微信可以將用戶的消息推送到我們的服務(wù)器,如果我們?cè)?5s 內(nèi)未及時(shí)響應(yīng),騰訊微信會(huì)重試 3 次,其實(shí)這就是消息隊(duì)列的應(yīng)用,使用 async-helper 可以輕松的做和這一樣的事情。
- 得益于 RabbitMQ,你可以輕松的橫向擴(kuò)展你的消費(fèi)者進(jìn)程的能力,因?yàn)?RabbitMQ 天生就支持集群部署,你可以輕松的啟動(dòng)多個(gè)消費(fèi)者進(jìn)程,或是將消費(fèi)者進(jìn)程分布到多臺(tái)機(jī)器上。
- 如果 RabbitMQ 服務(wù)不可用怎么辦呢?部署 RabbitMQ 高可用服務(wù)是容易的,對(duì)外提供單一 IP,這個(gè) IP 是個(gè)負(fù)載均衡,背后是 RabbitMQ 集群,負(fù)載均衡承擔(dān)對(duì)后端集群節(jié)點(diǎn)的健康檢查。
- async-helper 能否承受高并發(fā)請(qǐng)求?async-helper 生產(chǎn)者使用的是短連接,也就說在你的 HTTP 還沒有響應(yīng)瀏覽器的時(shí)候 async-helper 就已經(jīng)結(jié)束了工作,你連接 RabbitMQ 的時(shí)間是百分之百小于 HTTP 請(qǐng)求的時(shí)間的,換言之,只要 RabbitMQ 承受并發(fā)的能力超過你的 HTTP 服務(wù)的承受并發(fā)的能力,RabbitMQ 就永遠(yuǎn)不會(huì)崩,通過橫向擴(kuò)展 RabbitMQ 很容易做到的。
和傳統(tǒng) PHP 相比
- 對(duì)任何 PHP 方法通過反射進(jìn)行異步執(zhí)行;
- 高可用,執(zhí)行方法進(jìn)入消息隊(duì)列,可持久化,即使服務(wù)器宕機(jī),執(zhí)行任務(wù)也不丟失;
- 高可用,對(duì)異常可以進(jìn)行不限次數(shù)和時(shí)間的重試,重試次數(shù)和時(shí)間可配置;
- 支持對(duì)多個(gè)異步方法包含在事務(wù)中執(zhí)行,支持回滾事務(wù);
- 方法的參數(shù)類型支持除資源類型(resource)和回調(diào)函數(shù)(callable)外的任意類型的參數(shù);
- 得益于 AMQP,異步方法可以承受高并發(fā)、高負(fù)載,支持集群部署、橫向擴(kuò)展;
- 低延時(shí),實(shí)測延時(shí)時(shí)間 0.016 ~ 0.021s;
- 適用于:日常數(shù)據(jù)庫操作、日志收集、金融交易、消息推送、發(fā)送郵件和短信、數(shù)據(jù)導(dǎo)入導(dǎo)出、計(jì)算大量數(shù)據(jù)生成報(bào)表;