濮阳杆衣贸易有限公司

主頁 > 知識庫 > 如何用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)

如何用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)

熱門標(biāo)簽:凱立德地鐵站地圖標(biāo)注 天津電銷外呼系統(tǒng)違法嗎 上海400客服電話怎么申請 滄州電銷外呼系統(tǒng)價格 手機外呼系統(tǒng)什么原理 銀行信貸電話機器人 合肥ai電銷機器人費用 溫州外呼系統(tǒng)招商 400電話個人能不能辦理

系統(tǒng)介紹

從圖中可以看到,我們這個系統(tǒng)是一個基于事件的異步任務(wù)系統(tǒng)。就是說當(dāng)一個事件產(chǎn)生時,生產(chǎn)者將事件拋給調(diào)度器,調(diào)度器負(fù)責(zé)查詢事件下有哪些任務(wù),然后將這些任務(wù)丟到相應(yīng)的隊列中,最后由消費者消費任務(wù)隊列中的任務(wù)。

在整個系統(tǒng)中主要分為三大部分

1.事件生產(chǎn)者,即產(chǎn)生消息事件的一方。

2.任務(wù)調(diào)度器(Scheduler),負(fù)責(zé)注冊事件并調(diào)度任務(wù)。

3.消費者(Worker),負(fù)責(zé)消費任務(wù)隊列中的任務(wù)。

事件生產(chǎn)者

事件生產(chǎn)者很簡單,在業(yè)務(wù)系統(tǒng)中直接調(diào)用即可,代碼如下。

?php
 
require_once DIR.'/../autoload.php';
 
use Asynclib\Ebats\Event;
 
try{
 
    $event = new Event('order_paied');  //定義事件
 
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件產(chǎn)生的參數(shù)
 
    $event->publish();
 
}catch (Exception $exc){
 
    echo $exc->getMessage();
 
}

任務(wù)調(diào)度器

調(diào)度器主要做兩件事,一是注冊事件,另一個是調(diào)度任務(wù)。

注冊事件代碼如下:

//注冊事件
 
EventManager::register('order_create', 'closeOrder', 'demo', 10);//關(guān)閉未付款訂單(延遲任務(wù))
 
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動發(fā)貨

這樣就注冊了兩個事件,事件下各有一個任務(wù)。

具體調(diào)度部分代碼很簡單,就不多贅述,有興趣的可以去看代碼。

消費者

重頭戲來了,一個異步任務(wù)系統(tǒng)最重要的就是消費端了,現(xiàn)在讓我們來看下Worker的流程圖。

可以看到,在這里我們采用了兩個交換器和兩個隊列,一個負(fù)責(zé)處理正常的任務(wù)即ntask,另一個負(fù)責(zé)處理需要延遲執(zhí)行的任務(wù)即dtask。簡單描述下一個任務(wù)的生命周期。

正常任務(wù)

1、task產(chǎn)生,進入正常任務(wù)的交換器Exchange[ebats_core_ntask]

2、交換器根據(jù)topic將任務(wù)分發(fā)到對應(yīng)的隊列中

3、子進程ntask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)

4、執(zhí)行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException

5、子進程ntask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]

6、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看

延遲任務(wù)

1、子進程dtask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)
2、執(zhí)行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
3、子進程dtask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
4、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看

消費者代碼如下:

require_once DIR.'/../autoload.php';
 
require_once DIR.'/task/TaskDemoModel.php';
 
use Asynclib\Ebats\Worker;
 
  
 
//執(zhí)行結(jié)果回調(diào)函數(shù)
 
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
 
  
 
};
 
$worker = new Worker($callback);  //支持多進程消費默認(rèn)為1
 
$worker->setQueue('demo');  //隊列名和事件的topic一一對應(yīng)
 
$worker->run();

自定義調(diào)度器

一般來說這是一個基于事件的任務(wù)系統(tǒng),那么能不能直接產(chǎn)生任務(wù)呢。答案是肯定的。

只需要創(chuàng)建一個自定義調(diào)度器,由您自行實現(xiàn)調(diào)度邏輯,最終生成一個任務(wù)即可。代碼如下:

?php
 
require_once DIR.'/../autoload.php';
 
use Asynclib\Ebats\Task;
 
use Asynclib\Core\Consumer;
 
use Asynclib\Amq\ExchangeTypes;
 
use Asynclib\Exception\ExceptionInterface;
 
  
 
/**
 
 * 本示例演示了如何創(chuàng)建一個自定義調(diào)度器,開發(fā)者可以根據(jù)自身需求開發(fā)自己的任務(wù)調(diào)度器
 
 */
 
try{
 
    $worker = new Consumer();
 
    $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
 
    $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
 
    $worker->run(function($key, $msg){
 
        $order_data = json_encode($msg);
 
        echo " [$key] $order_data \n";
 
        Task::create('demo', 'orderAsync', $msg);//創(chuàng)建任務(wù),之后消息將作為參數(shù)由任務(wù)接管處理
 
    });
 
}catch (ExceptionInterface $exc){
 
    echo $exc->getMessage();
 
}

這樣,當(dāng)接收到消息時就會產(chǎn)生一個orderAsync的任務(wù),您只需要啟動一個用來消費這個Topic的Worker即可。

也許你會覺得這里直接寫業(yè)務(wù)邏輯的代碼就可以了,實際上也確實可以。當(dāng)你可以忍受一個進程慢慢消費的時候是可以這樣做的。但大多數(shù)情況下我們還是希望它能夠盡快的消費掉,所以建議這里只負(fù)責(zé)創(chuàng)建任務(wù),具體任務(wù)的業(yè)務(wù)邏輯由worker去執(zhí)行。

以上就是如何用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!

您可能感興趣的文章:
  • PHP swoole中使用task進程異步的處理耗時任務(wù)應(yīng)用案例分析
  • PHP Swoole異步讀取、寫入文件操作示例
  • PHP Swoole異步Redis客戶端實現(xiàn)方法示例
  • PHP Swoole異步MySQL客戶端實現(xiàn)方法示例
  • php基于 swoole 實現(xiàn)的異步處理任務(wù)功能示例
  • PHP swoole和redis異步任務(wù)實現(xiàn)方法分析
  • PHP擴展Swoole實現(xiàn)實時異步任務(wù)隊列示例
  • Swoole實現(xiàn)異步投遞task任務(wù)案例詳解
  • 詳解thinkphp5+swoole實現(xiàn)異步郵件群發(fā)(SMTP方式)
  • php異步多線程swoole用法實例

標(biāo)簽:溫州 七臺河 金華 怒江 酒泉 洛陽 白城 赤峰

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《如何用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)》,本文關(guān)鍵詞  如,何用,RabbitMQ,和,Swoole,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《如何用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)》相關(guān)的同類信息!
  • 本頁收集關(guān)于如何用RabbitMQ和Swoole實現(xiàn)一個異步任務(wù)系統(tǒng)的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    正镶白旗| 平和县| 湖北省| 湖州市| 彩票| 夏邑县| 武安市| 郓城县| 凌云县| 昌平区| 富平县| 汽车| 乳山市| 鄢陵县| 舒兰市| 广汉市| 柞水县| 滕州市| 金平| 淮阳县| 成武县| 青龙| 无为县| 南丰县| 孟村| 腾冲县| 汝城县| 托克逊县| 北辰区| 门头沟区| 上栗县| 徐水县| 通道| 滨海县| 汉源县| 垣曲县| 阳谷县| 新沂市| 陆河县| 商南县| 合阳县|