MQTTサーバーを実装しながらGoを学ぶ - その12 Contextを使ったgoroutineの停止

前回、別goroutineで発生したエラーハンドリングをしました。具体的にはサブスクライバへの書き込みでエラーが発生した場合にサブスクリプションの削除処理するようにしました。今回は、 context.Context を使ってgoroutineを停止することで、クライアントが接続を切ったタイミングでサブスクリプションの削除をするようにしてみます。

目次。

今のgoroutine

現状の実装だと、クライアントからSUBSCRIBEパケットが送信されてくると、いくつかのgoroutineが生成され以下のような親子関係になる。

https://i.gyazo.com/6bf617a5406e91b099915ce86e0aff09.png

  • main
    • サーバーのメインgoroutine
    • Run 関数内の conn, err := ln.Accept() でブロックしてる
  • Broker
    • "main" goroutineによって1つだけ生成される
    • パブリッシャからのメッセージをサブスクリプションへ配送するgoroutine
    • Broker 関数内の for ... select ... で無限ループしてる
  • handle
    • "main" goroutineによってクライアントからのTCP接続がある度に生成される
    • MQTTパケットを受け取り処理する
    • handle 関数内から mqttReader.ReadPacketType() の呼び出し、最終的には net.Conn に対する Read でブロックしてる
  • handleSub
    • "handle" goroutineによってクライアントからのSUBSCRIBEパケットにより生成される
    • Broker からchannel経由でPUBLISHを受け取り、 net.Conn に書き込む
    • channelの読み取りでブロックしてる
  • handle内の無名関数
    • "handle" goroutineによってクライアントからのSUBSCRIBEパケットにより生成される
    • handleSub 関数で発生した error をerror channel経由で読み取り Broker に伝える
    • channelの読み取りでブロックしてる

net.Conn に対する Read をしているのは"handle" goroutineなので、サブスクライバが切断したことは"handle" goroutineで処理できるはず。

goroutineリーク

サブスクライバが切断、つまりmosquitto_subをCtrl-Cで終了したとき、handleSubのgoroutineが減らない。

2つmosquitto_subを実行してる状態でpprofを使いgoroutieの状態を見てみる。pprofは導入済みなので http://localhost:6060/debug/pprof/goroutine?debug=1 にアクセスすれば良い。

結果の一部は以下。

2 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327a6b 0x1328457 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327a6a   github.com/bati11/oreno-mqtt/mqtt.handle+0x13a                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:54
#   0x1328456   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:38

2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x1328501 0x105ca31
#   0x1328500   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0x40 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:91

2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x132865c 0x105ca31
#   0x132865b   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0x6b  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:118

3つのブロックがあるが、最初のブロックは packet.(*MQTTReader).ReadPacketType+0x76 とあり mqttReader.ReadPacketType() の呼び出しのことなので "handle" goroutineである。先頭の 2 という数値から "handle" goroutineが2つ存在してることになる。

2つめのブロックは、"handle" goroutineの数は 1 に減っているが、"handle 関数内の無名関数" goroutine、3つめのブロックは "handleSub" goroutineである。2つサブスクライバがいるので、goroutineの数もそれぞれ2つである。

1つCtrl-Cで終了してから再度状態を確認。"handle内の無名関数" goroutineと"handleSub" goroutineが減ってない。

goroutine profile: total 10
2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x1328501 0x105ca31
#   0x1328500   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0x40 /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:91

2 @ 0x102f20b 0x102f2b3 0x100758e 0x10072bb 0x132865c 0x105ca31
#   0x132865b   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0x6b  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:118

1 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327a6b 0x1328457 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327a6a   github.com/bati11/oreno-mqtt/mqtt.handle+0x13a                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:54
#   0x1328456   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:38

goroutineが残り続けてしまうことを、goroutineリークというらしい。

"handleSub" goroutineの親goroutineである "handle" goroutineが終了するときに、子goroutineを終了させたい。

おなじみの「Go言語による並行処理」に以下のように書いてある。

Go言語による並行処理

Go言語による並行処理

  • 作者: Katherine Cox-Buday,山口能迪
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2018/10/26
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

4.3 ゴルーチンリークを避ける

もしあるゴルーチンがゴルーチンの生成の責任を持っているのであれば、そのゴルーチンを停止できるようにする責任もあります。

書籍では done チャネルを使った方式と、それがGo1.7からは context パッケージとして標準化されたことが書いてある。

contextパッケージ

context.Contextを使う。

context.Contextを使うことで、親goroutineから子goroutineを停止することができる。

diff --git a/mqtt/server.go b/mqtt/server.go
index 5f53e6f..a050e0a 100644
--- a/mqtt/server.go
+++ b/mqtt/server.go
@@ -2,6 +2,7 @@ package mqtt
 
 import (
        "bufio"
+       "context"
        "fmt"
        "io"
        "net"
@@ -48,6 +49,9 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT
 
        var clientID string
 
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        for {
                r := bufio.NewReader(conn)
                mqttReader := packet.NewMQTTReader(r)
@@ -85,16 +89,21 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT
                        if err != nil {
                                return err
                        }
-                       subscription, errCh := handleSub(clientID, conn)
+                       subscription, errCh := handleSub(ctx, clientID, conn)
                        subscriptionToBroker <- subscription
-                       go func() {
-                               err, ok := <-errCh
-                               if !ok {
-                                       return
+                       go func(ctx context.Context) {
+                               var result *DoneSubscriptionResult
+                               select {
+                               case <-ctx.Done():
+                                       result = NewDoneSubscriptionResult(subscription.clientID, nil)
+                               case err, ok := <-errCh:
+                                       if !ok {
+                                               return
+                                       }
+                                       result = NewDoneSubscriptionResult(subscription.clientID, err)
                                }
-                               done := NewDoneSubscriptionResult(subscription.clientID, err)
-                               doneSubscriptions <- done
-                       }()
+                               doneSubscriptions <- result
+                       }(ctx)
                case packet.PINGREQ:
                        pingresp, err := handler.HandlePingreq(mqttReader)
                        if err != nil {
@@ -110,16 +119,24 @@ func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionT
        }
 }
 
-func handleSub(clientID string, conn net.Conn) (*Subscription, <-chan error) {
+func handleSub(ctx context.Context, clientID string, conn net.Conn) (*Subscription, <-chan error) {
        errCh := make(chan error)
        subscription, pubFromBroker := NewSubscription(clientID)
        go func() {
                defer close(errCh)
-               for publishMessage := range pubFromBroker {
-                       bs := publishMessage.ToBytes()
-                       _, err := conn.Write(bs)
-                       if err != nil {
-                               errCh <- err
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case publishedMessage, ok := <-pubFromBroker:
+                               if !ok {
+                                       return
+                               }
+                               bs := publishedMessage.ToBytes()
+                               _, err := conn.Write(bs)
+                               if err != nil {
+                                       errCh <- err
+                               }
                        }
                }
        }()

試してみる。

2つsub

goroutine profile: total 12
2 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327abc 0x1328527 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327abb   github.com/bati11/oreno-mqtt/mqtt.handle+0x18b                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:58
#   0x1328526   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:39

2 @ 0x102f20b 0x103ef16 0x132865f 0x105ca31
#   0x132865e   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0xce /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:95

2 @ 0x102f20b 0x103ef16 0x1328890 0x105ca31
#   0x132888f   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0xff  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:127

1つをCtrl-Cで閉じる

goroutine profile: total 8
1 @ 0x102f20b 0x102a6a9 0x1029d56 0x108e22a 0x108e33d 0x108f0e6 0x118011f 0x11926f8 0x11330ef 0x1133989 0x13239b7 0x1327abc 0x1328527 0x105ca31
#   0x1029d55   internal/poll.runtime_pollWait+0x65                     /Users/bati11/.goenv/versions/1.11.4/src/runtime/netpoll.go:173
#   0x108e229   internal/poll.(*pollDesc).wait+0x99                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:85
#   0x108e33c   internal/poll.(*pollDesc).waitRead+0x3c                     /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_poll_runtime.go:90
#   0x108f0e5   internal/poll.(*FD).Read+0x1d5                          /Users/bati11/.goenv/versions/1.11.4/src/internal/poll/fd_unix.go:169
#   0x118011e   net.(*netFD).Read+0x4e                              /Users/bati11/.goenv/versions/1.11.4/src/net/fd_unix.go:202
#   0x11926f7   net.(*conn).Read+0x67                               /Users/bati11/.goenv/versions/1.11.4/src/net/net.go:177
#   0x11330ee   bufio.(*Reader).fill+0x10e                          /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:100
#   0x1133988   bufio.(*Reader).ReadByte+0x38                           /Users/bati11/.goenv/versions/1.11.4/src/bufio/bufio.go:242
#   0x13239b6   github.com/bati11/oreno-mqtt/mqtt/packet.(*MQTTReader).ReadPacketType+0x76  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/packet/mqtt_reader.go:20
#   0x1327abb   github.com/bati11/oreno-mqtt/mqtt.handle+0x18b                  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:58
#   0x1328526   github.com/bati11/oreno-mqtt/mqtt.Run.func1+0x56                /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:39

1 @ 0x102f20b 0x103ef16 0x132865f 0x105ca31
#   0x132865e   github.com/bati11/oreno-mqtt/mqtt.handle.func1+0xce /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:95

1 @ 0x102f20b 0x103ef16 0x1328890 0x105ca31
#   0x132888f   github.com/bati11/oreno-mqtt/mqtt.handleSub.func1+0xff  /Users/bati11/dev/src/github.com/bati11/oreno-mqtt/mqtt/server.go:127

ちゃんと"handle内の無名関数" goroutineと"handleSub" goroutineが減ってる。

おしまい

context.Context を使って、子goroutineを停止させることができました。 ctx をどんどん渡していけば孫goroutineやひ孫goroutineなども同じように停止させることができます。Goでは複数の並行処理を協調させる仕組みが色々あって良いですね!

コードはこちら。

github.com

今回の学び。