go的并發(fā)量是很厲害的,goroutine創(chuàng)建的代價極小,其中一個重要的原因是因為go采用了分段棧技術(shù),每一個goroutine只占極小的空間。與此同時,goroutine是語言層面的,減少了內(nèi)核態(tài)到用戶態(tài)的切換開銷,并且goroutine摒棄了一些golang用不到的一些os thread的系統(tǒng)調(diào)用,創(chuàng)建代價小。
我們可以一瞬間創(chuàng)建很多個goroutine,這是相當(dāng)容易的。
乍一看,這與題目完全不符,前面說了那么多,難道不是鼓勵我們多創(chuàng)建goroutine嗎?不不不,goroutine確實很好用,但是如果不加以限制,很有可能出現(xiàn)其他的不可預(yù)料的錯誤。
比如在web領(lǐng)域中, 一個連接,在linux/unix下就相當(dāng)于是打開了一個文件,占用一個文件描述符。但是系統(tǒng)會規(guī)定文件描述符的上限,我們可以使用ulimit -n來進(jìn)行查看,如果我們遵循量大就好的話,那么一擁而上的請求連接會瞬間報錯。
2018/06/30 10:09:54 dial tcp :8080: socket: too many open files
上面這條報錯信息源于我寫的一個循環(huán)請求的工具
package main
import (
"sync"
"net"
"strconv"
"fmt"
"log"
)
const (
MAX_CONCURRENCY = 10000
)
var waitGroup sync.WaitGroup
func main(){
concurrency()
waitGroup.Wait()
}
//進(jìn)行網(wǎng)絡(luò)io
func request(currentCount int){
fmt.Println("request" + strconv.Itoa(currentCount) + "\r")
conn, err := net.Dial("tcp",":8080")
if err != nil { log.Fatal(err) }
defer conn.Close()
defer waitGroup.Done()
}
//并發(fā)請求
func concurrency(){
for i := 0;i MAX_CONCURRENCY;i++ {
waitGroup.Add(1)
go request(i)
}
}
用go建立一個服務(wù)端很簡單,我這里簡單的貼下server的代碼
package main
import (
"io"
"os"
"fmt"
"net"
)
func checkErr(err error){
if err != nil { fmt.Fprintln(os.Stderr, err) }
}
func main() {
listener, err := net.Listen("tcp",":8080")
checkErr(err)
for {
conn, err := listener.Accept()
checkErr(err)
go func(conn net.Conn){
_, err := io.WriteString(conn, "welcome!")
checkErr(err)
defer conn.Close()
}(conn)
}
}
現(xiàn)在回到主題,我們可以看到一擁而上其實也有壞處,想要解決這一問題,我們可以限制同一時間的并發(fā)數(shù)量,可以利用channel來達(dá)到這一點,這有點類似于信號量(Semaphore)
創(chuàng)建一個帶緩存的channel,其中CHANNEL_CACHE為同一時間的最大并發(fā)量
想簡單的說一下為什么這里chan的類型要用一個空的struct,這是因為在這個場景下(限制同一時間的并發(fā)量),通過channel傳輸?shù)臄?shù)據(jù)的類型并不重要,我們只需要通過做一個通知效果就行了(就像你通知你朋友起床,你只用閃個電話,而不用實際的接通,省去了電話費(fèi)的開銷),這里的空的struct實際上是不占任何空間的,因此這里選用空的struct
const (
CHANNEL_CACHE = 200
)
var tmpChannel = make(chan struct{}, CHANNEL_CACHE)
在與服務(wù)器建立連接的地方這樣寫(是不是很類似于信號量)
tmpChan - struct{}{}
conn, err := net.Dial("tcp",":8080")
- tmpChan
這樣同一時間的并發(fā)量就由CHANNEL_CACHE限制下來
經(jīng)過循環(huán)開啟的goroutine在請求服務(wù)器之前會向channel發(fā)送消息,如果緩存滿了,那么說明已經(jīng)有CHANNEL_CACHE個goroutine在進(jìn)行與服務(wù)器的連接,接著就會阻塞在這里,等待其中一個goroutine處理完之后,從channel中讀出一個空的struct,這時阻塞的地方向channel發(fā)送一個空struct,就可以與服務(wù)器建立連接了
下面貼一下全部的代碼
package main
import (
"sync"
"net"
"strconv"
"fmt"
"log"
)
const (
MAX_CONCURRENCY = 10000
CHANNEL_CACHE = 200
)
var tmpChan = make(chan struct{}, MAX_CONCURRENCY)
var waitGroup sync.WaitGroup
func main(){
concurrency()
waitGroup.Wait()
}
//進(jìn)行網(wǎng)絡(luò)io
func request(currentCount int){
fmt.Println("request" + strconv.Itoa(currentCount) + "\r")
tmpChan - struct{}{}
conn, err := net.Dial("tcp",":8080")
- tmpChan
if err != nil { log.Fatal(err) }
defer conn.Close()
defer waitGroup.Done()
}
//并發(fā)
func concurrency(){
for i := 0;i MAX_CONCURRENCY;i++ {
waitGroup.Add(1)
go request(i)
}
}
這樣就可以愉快的進(jìn)行并發(fā)了?。。?/p>
補(bǔ)充:Golang限制N個并發(fā)同時運(yùn)行
我就廢話不多說了,大家還是直接看代碼吧~
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
var wg sync.WaitGroup
sem := make(chan struct{}, 2) // 最多允許2個并發(fā)同時執(zhí)行
taskNum := 10
for i := 0; i taskNum; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem - struct{}{} // 獲取信號
defer func() { -sem }() // 釋放信號
// do something for task
time.Sleep(time.Second * 2)
fmt.Println(id, time.Now())
}(i)
}
wg.Wait()
}
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
您可能感興趣的文章:- 快速解決Golang Map 并發(fā)讀寫安全的問題
- 淺談golang并發(fā)操作變量安全的問題
- golang高并發(fā)限流操作 ping / telnet
- golang gin 框架 異步同步 goroutine 并發(fā)操作
- Golang 實現(xiàn)分片讀取http超大文件流和并發(fā)控制
- golang-gin-mgo高并發(fā)服務(wù)器搭建教程
- golang并發(fā)編程的實現(xiàn)
- golang通過context控制并發(fā)的應(yīng)用場景實現(xiàn)
- Golang 并發(fā)以及通道的使用方式