C語言中文網 目錄

goroutine(Go語言并發)如何使用才更加高效?

Go 語言原生支持并發是被眾人津津樂道的特性。goroutine 早期是 Inferno 操作系統的一個試驗性特性,而現在這個特性與操作系統一起,將開發變得越來越簡單。

很多剛開始使用 Go 語言開發的人都很喜歡使用并發特性,而沒有考慮并發是否真正能解決他們的問題。

了解goroutine的生命期時再創建goroutine

在 Go 語言中,開發者習慣將并發內容與 goroutine 一一對應地創建 goroutine。開發者很少會考慮 goroutine 在什么時候能退出和控制 goroutine 生命期,這就會造成 goroutine 失控的情況。下面來看一段代碼。

失控的 goroutine:
package main

import (
    "fmt"
    "runtime"
)

// 一段耗時的計算函數
func consumer(ch chan int) {

    // 無限獲取數據的循環
    for {

        // 從通道獲取數據
        data := <-ch

        // 打印數據
        fmt.Println(data)
    }

}

func main() {

    // 創建一個傳遞數據用的通道
    ch := make(chan int)

    for {

        // 空變量, 什么也不做
        var dummy string

        // 獲取輸入, 模擬進程持續運行
        fmt.Scan(&dummy)

        // 啟動并發執行consumer()函數
        go consumer(ch)

        // 輸出現在的goroutine數量
        fmt.Println("goroutines:", runtime.NumGoroutine())
    }

}
代碼說明如下:
  • 第 9 行,consumer() 函數模擬平時業務中放到 goroutine 中執行的耗時操作。該函數從其他 goroutine 中獲取和接收數據或者指令,處理后返回結果。
  • 第 12 行,需要通過無限循環不停地獲取數據。
  • 第 15 行,每次從通道中獲取數據。
  • 第 18 行,模擬處理完數據后的返回數據。
  • 第 26 行,創建一個整型通道。
  • 第 34 行,使用 fmt.Scan() 函數接收數據時,需要提供變量地址。如果輸入匹配的變量類型,將會成功賦值給變量。
  • 第 37 行,啟動并發執行 consumer() 函數,并傳入 ch 通道。
  • 第 40 行,每啟動一個 goroutine,使用 runtime.NumGoroutine 檢查進程創建的 goroutine 數量總數。

運行程序,每輸入一個字符串+回車,將會創建一個 goroutine,結果如下:

a
goroutines: 2
b
goroutines: 3
c
goroutines: 4

注意,結果中 a、b、c 為通過鍵盤輸入的字符,其他為打印字符。

這個程序實際在模擬一個進程根據需要創建 goroutine 的情況。運行后,問題已經被暴露出來:隨著輸入的字符串越來越多,goroutine 將會無限制地被創建,但并不會結束。這種情況如果發生在生產環境中,將會造成內存大量分配,最終使進程崩潰。現實的情況也許比這段代碼更加隱蔽:也許你設置了一個退出的條件,但是條件永遠不會被滿足或者觸發。

為了避免這種情況,在這個例子中,需要為 consumer() 函數添加合理的退出條件,修改代碼后如下:
package main

import (
    "fmt"
    "runtime"
)

// 一段耗時的計算函數
func consumer(ch chan int) {

    // 無限獲取數據的循環
    for {

        // 從通道獲取數據
        data := <-ch

        if data == 0 {
            break
        }

        // 打印數據
        fmt.Println(data)
    }

    fmt.Println("goroutine exit")
}

func main() {

    // 傳遞數據用的通道
    ch := make(chan int)

    for {

        // 空變量, 什么也不做
        var dummy string

        // 獲取輸入, 模擬進程持續運行
        fmt.Scan(&dummy)

        if dummy == "quit" {

            for i := 0; i < runtime.NumGoroutine()-1; i++ {
                ch <- 0
            }

            continue
        }

        // 啟動并發執行consumer()函數
        go consumer(ch)

        // 輸出現在的goroutine數量
        fmt.Println("goroutines:", runtime.NumGoroutine())
    }
}
代碼中加粗部分是新添加的代碼,具體說明如下:
  • 第 17 行,為無限循環設置退出條件,這里設置 0 為退出。
  • 第 41 行,當命令行輸入 quit 時,進入退出處理的流程。
  • 第 43 行,runtime.NumGoroutine 返回一個進程的所有 goroutine 數,main() 的 goroutine 也被算在里面。因此需要扣除 main() 的 goroutine 數。剩下的 goroutine 為實際創建的 goroutine 數,對這些 goroutine 進行遍歷。
  • 第 44 行,并發開啟的 goroutine 都在競爭獲取通道中的數據,因此只要知道有多少個 goroutine 需要退出,就給通道里發多少個 0。

修改程序并運行,結果如下:

a
goroutines: 2
b
goroutines: 3
quit
goroutine exit
goroutine exit
c
goroutines: 2

避免在不必要的地方使用通道

通道(channel)和 map、切片一樣,也是由 Go 源碼編寫而成。為了保證兩個 goroutine 并發訪問的安全性,通道也需要做一些鎖操作,因此通道其實并不比鎖高效。

下面的例子展示套接字的接收和并發管理。對于 TCP 來說,一般是接收過程創建 goroutine 并發處理。當套接字結束時,就要正常退出這些 goroutine。

本例完整代碼請參考./src/chapter12/exitnotify/exitnotify.go
本套教程所有源碼下載地址:https://pan.baidu.com/s/1ORFVTOLEYYqDhRzeq0zIiQ    提取密碼:hfyf
下面是對各個部分的詳細分析。

1) 套接字接收部分

套接字在連接后,就需要不停地接收數據,代碼如下:
// 套接字接收過程
func socketRecv(conn net.Conn, exitChan chan string) {

// 創建一個接收的緩沖
    buff := make([]byte, 1024)

    // 不停地接收數據
    for {

        // 從套接字中讀取數據
        _, err := conn.Read(buff)

        // 需要結束接收, 退出循環
        if err != nil {
            break
        }

    }

    // 函數已經結束, 發送通知
    exitChan <- "recv exit"
}
代碼說明如下:
  • 第 2 行傳入的 net.Conn 是套接字的接口,exitChan 為退出發送同步通道。
  • 第 5 行為套接字的接收數據創建一個緩沖。
  • 第 8 行構建一個接收的循環,不停地接收數據。
  • 第 11 行,從套接字中取出數據。這個例子中,不關注具體接收到的數據,只是關注錯誤,這里將接收到的字節數做匿名處理。
  • 第 14 行,當套接字調用了 Close 方法時,會觸發錯誤,這時需要結束接收循環。
  • 第 21 行,結束函數時,與函數綁定的 goroutine 會同時結束,此時需要通知 main() 的 goroutine。

2) 連接、關閉、同步 goroutine 主流程部分

下面代碼中嘗試使用套接字的 TCP 協議連接一個網址,連接上后,進行數據接收,等待一段時間后主動關閉套接字,等待套接字所在的 goroutine 自然結束,代碼如下:
func main() {

    // 連接一個地址
    conn, err := net.Dial("tcp", "www.163.com:80")

    // 發生錯誤時打印錯誤退出
    if err != nil {
        fmt.Println(err)
        return
    }

    // 創建退出通道
    exit := make(chan string)

    // 并發執行套接字接收
    go socketRecv(conn, exit)

    // 在接收時, 等待1秒
    time.Sleep(time.Second)

    // 主動關閉套接字
    conn.Close()

    // 等待goroutine退出完畢
    fmt.Println(<-exit)
}
代碼說明如下:
  • 第 4 行,使用 net.Dial 發起 TCP 協議的連接,調用函數就會發送阻塞直到連接超時或者連接完成。
  • 第 7 行,如果連接發生錯誤,將會打印錯誤并退出。
  • 第 13 行,創建一個通道用于退出信號同步,這個通道會在接收用的 goroutine 中使用。
  • 第 16 行,并發執行接收函數,傳入套接字和用于退出通知的通道。
  • 第 19 行,接收需要一個過程,使用 time.Sleep() 等待一段時間。
  • 第 22 行,主動關閉套接字,此時會觸發套接字接收錯誤。
  • 第 25 行,從 exit 通道接收退出數據,也就是等待接收 goroutine 結束。

在這個例子中,goroutine 退出使用通道來通知,這種做法可以解決問題,但是實際上通道中的數據并沒有完全使用。

3) 優化:使用等待組替代通道簡化同步

通道的內部實現代碼在 Go 語言開發包的 src/runtime/chan.go 中,經過分析后大概了解到通道也是用常見的互斥量等進行同步。因此通道雖然是一個語言級特性,但也不是被神化的特性,通道的運行和使用都要比傳統互斥量、等待組(sync.WaitGroup)有一定的消耗。

所以在這個例子中,更建議使用等待組來實現同步,調整后的代碼如下:
package main

import (
    "fmt"
    "net"
    "sync"
    "time"
)

// 套接字接收過程
func socketRecv(conn net.Conn, wg *sync.WaitGroup) {

    // 創建一個接收的緩沖
    buff := make([]byte, 1024)

    // 不停地接收數據
    for {

        // 從套接字中讀取數據
        _, err := conn.Read(buff)

        // 需要結束接收, 退出循環
        if err != nil {
            break
        }

    }

    // 函數已經結束, 發送通知
    wg.Done()
}

func main() {

    // 連接一個地址
    conn, err := net.Dial("tcp", "www.163.com:80")

    // 發生錯誤時打印錯誤退出
    if err != nil {
        fmt.Println(err)
        return
    }

    // 退出通道
    var wg sync.WaitGroup
   
    // 添加一個任務
    wg.Add(1)

    // 并發執行接收套接字
    go socketRecv(conn, &wg)

    // 在接收時, 等待1秒
    time.Sleep(time.Second)

    // 主動關閉套接字
    conn.Close()

    // 等待goroutine退出完畢
    wg.Wait()
    fmt.Println("recv done")
}
調整后的代碼說明如下:
  • 第 45 行,聲明退出同步用的等待組。
  • 第 48 行,為等待組的計數器加 1,表示需要完成一個任務。
  • 第 51 行,將等待組的指針傳入接收函數。
  • 第 60 行,等待等待組的完成,完成后打印提示。
  • 第 30 行,接收完成后,使用 wg.Done() 方法將等待組計數器減一。

精美而實用的網站,提供C語言C++STLLinuxShellJavaGo語言等教程,以及socketGCCviSwing設計模式JSP等專題。

Copyright ?2011-2018 biancheng.net, 陜ICP備15000209號

底部Logo