其實golang用一個函數(shù)可以構(gòu)建一個并發(fā)隊列,現(xiàn)在編寫一個靈活可控的隊列程序
先定義一個工作
type Worker struct {
ID int
RepJobs chan int64
SM *SM
quit chan bool
}
包含了workid和執(zhí)行任務(wù)的id,上面的SM只是任務(wù)具體內(nèi)容,這個和具體業(yè)務(wù)相關(guān),大家自己編寫自己的SM業(yè)務(wù)邏輯
然后定義工作池
type workerPool struct {
workerChan chan *Worker
workerList []*Worker
}
這個里面定義了一個工作隊列的切片,可以自定義工作隊列的個數(shù),甚至后期還可以添加work,還定義了一個隊列類型的管道。
定義完成過后就可以初始化工作池了
func InitWorkerPool() error {
n := 3
WorkerPool = workerPool{
workerChan: make(chan *Worker, n),
workerList: make([]*Worker, 0, n),
}
for i := 0; i n; i++ {
worker := NewWorker(i)
WorkerPool.workerList = append(WorkerPool.workerList, worker)
worker.Start()
log.Debugf("worker %d started", worker.ID)
}
return nil
}
這個里面我寫死了worker的個數(shù)是3,當(dāng)然這個可以通過讀取配置文件或者參數(shù)傳遞的方式;這個里面逐一啟動work
worker.Start(),這個是關(guān)鍵
func (w *Worker) Start() {
go func() {
for {
WorkerPool.workerChan - w
select {
case jobID := -w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
w.handleRepJob(jobID)
case q := -w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
這個就是go 啟動一個協(xié)程,先把自己放到workerChan中,然后不斷從w.RepJobs管道中獲取任務(wù)并執(zhí)行,如果執(zhí)行完成后又把自己放回到隊列中。
所以如果你要有任務(wù)需要執(zhí)行,放到這個管道中即可
func Dispatch() {
for {
select {
case job := -jobQueue:
go func(jobID int64) {
println("Trying to dispatch job: %d", jobID)
worker := -WorkerPool.workerChan
worker.RepJobs - jobID
}(job)
}
}
}
從管道中拿出一個worker并把任務(wù)id放到worker中去執(zhí)行。
當(dāng)然你可以停止worker,甚至可以停止job
func (w *Worker) Stop() {
go func() {
w.quit - true
}()
}
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
for _, w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
w.SM.Stop(id)
}
}
}
}
補充一下,int64和字符串轉(zhuǎn)換。
string到int
int,err:=strconv.Atoi(string)
string到int64
int64, err := strconv.ParseInt(string, 10, 64)
int到string
string:=strconv.Itoa(int)
int64到string
string:=strconv.FormatInt(int64,10)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
您可能感興趣的文章:- 解決golang 關(guān)于全局變量的坑
- 深入淺析golang zap 日志庫使用(含文件切割、分級別存儲和全局使用等)
- 關(guān)于golang高并發(fā)的實現(xiàn)與注意事項說明
- 基于Golang 高并發(fā)問題的解決方案
- golang 并發(fā)編程之生產(chǎn)者消費者詳解
- golang 對私有函數(shù)進行單元測試的實例
- Golang全局變量加鎖的問題解決