MQTTサーバーを実装しながらGoを学ぶ - その8 goroutine, pprof

前回までで、クライアントがPUBLISH、またはSUBSCRIBEすることができるようになりました。次は、クライアントからPUBLISHされたメッセージを、SUBSCRIBEしている別のクライアントへ送る処理を実装していきます。が、その前にgoroutineを使ってサーバーが複数クライアントと同時に接続できるようにします。

目次。

複数クライアントからの接続

クライアントが接続した状態で、もう1つ別のクライアントから接続してみる。例えば、Subscribeしてるクライアントが存在している状態で、Publishしてみる。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
Error: Unknown error.

すぐには何も出力されず、しばらく放っておくと上記の Error: Unknown error. が出力される。まだサーバーが複数クライアントを受けられるように実装していないから、タイムアウトする。

具体的にコードで確認する。

// server.go
func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    fmt.Println("server starts at localhost:1883")

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        err = handle(conn)
        if err != nil {
            fmt.Printf("%v", err)
            panic(err)
        }
    }
}

func handle(conn net.Conn) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        ...
    }
}

最初のクライアントから接続が来ると、 Accept() が net.Conn を返してきてhandle内で処理する。handle内では for { } でループすることで、クライアントとやりとりする。

この時、2つめのクライアントが接続してきてもサーバーは、handle内の for { } でループしているので、 Accept() が処理されない。そのため、1つのクライアントしか捌けない。

並行処理を使って複数のクライアントを捌くようにすれば良い。Goではgoroutineを使って並行処理を実現する。

1つのクライアントを1つのgoroutineで取り扱うようにする。

--- a/mqtt/server.go
+++ b/mqtt/server.go
@@ -23,11 +23,13 @@ func Run() {
                        panic(err)
                }
 
-               err = handle(conn)
-               if err != nil {
-                       fmt.Printf("%v", err)
-                       panic(err)
-               }
+               go func(conn net.Conn) {
+                       err = handle(conn)
+                       if err != nil {
+                               fmt.Printf("%v", err)
+                               panic(err)
+                       }
+               }(conn)
        }
 }

goroutine

以前、Goでechoサーバーを書いたときにもgoroutineを使った。goroutineは並行処理を実現したいときに生成するが、このとき生成されるのはGoランタイム上のgoroutineでありOSのスレッドではない。

goroutineの特徴を ASCII.jp:Go言語と並列処理(2) から引用させていただく。

スレッドがCPUコアに対してマッピングされるのに対し、goroutineはOSのスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。 この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。


両者にはほかにも次のような細かい違いがあります。

・OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
・OSスレッドの1〜2MBと比べると、初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
・優先度を持たない
・タイムスライスで強制的に処理が切り替わることがないが、コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
・(IDで一意にgoroutineで特定できないため)外部から終了のリクエストを送る仕組みがない

Goのランタイムがどのようにgoroutineを切り替えているかは以下の記事が参考になります。

こちらの本の「6章 ゴルーチンとGoランタイム」にも丁寧に書かれていた。

pprof

goroutineがOSのスレッドと異なることは分かった。じゃあ、今goroutineがいくつ作られていてどういう状態なのかはどうやって調べれば良いのだろう?OSのスレッドではないのでpsコマンドなどでは確認できない。

pprofというのがGoには用意されている。これを使うとGoで書いたアプリケーションのプロファイリングができる。

pprofを使うとCPUプロファイリング以外にも色々できる。以下の連載記事が分かりやすかった。

pprofを使ってgoroutineの情報をダンプしてみる。準備は以下のように import _ "net/http/pprof" してHTTPサーバーを起動するだけ。

 package main
 
-import "github.com/bati11/oreno-mqtt/mqtt"
+import (
+       "log"
+       "net/http"
+       _ "net/http/pprof"
+
+       "github.com/bati11/oreno-mqtt/mqtt"
+)
 
 func main() {
+       go func() {
+               log.Println(http.ListenAndServe("localhost:6060", nil))
+       }()
        mqtt.Run()
 }

http://localhost:6060/debug/pprof/goroutine?debug=1 でgoroutineのダンプが取得できる。簡単!

さらに、Go1.10以降ではGUIでグラフを確認することもできる。

以下のコマンドを実行すると

$ go tool pprof -http=":8888" http://localhost:6060/debug/pprof/goroutine?debug=1

ブラウザが立ち上がり、こんなグラフが確認できる!

おしまい

goroutineを使って複数クライアントと同時に接続することができるようになりました。次回は、パブリッシャと接続してるgoroutineからサブスクライバと接続してるgoroutineへメッセージを送り、パブリッシャからサブスクライバへのメッセージ配信を実現します。goroutine間のメッセージ送信は、Goのchannelを使って実装していきます。

今回の学び