MQTTサーバーを実装しながらGoを学ぶ - その10 type, "chan<-"と"<-chan"

前回、goroutine, channel, selectを使って Broker を実装しました。この Broker をサーバーで利用し、MQTTでメッセージを配送してみます。

その前に <-chan chan<- []byte という型が読みづらいので type を使って対応します。その後、読み書き可能なchannelを、 chan<- , <-chan という型に変換しgoroutineへ渡しメッセージの配送を実現します。

目次

typeで型をつくる

現在の実装だとサブスクリプションchan<- []byte で表現している。channelのchannel( <-chan chan<- []byte )やchannelの配列( []chan<- []byte )が登場したので少々読みづらい。

type を使って chan<- []byte という型から Subscription という型を作ることにする。

type Subscription chan<- []byte

Broker の実装はこうなる。 <-chan chan<- []byte<-chan Subscription[]chan<- []byte[]Subscription となって読みやすい。

// broker.go
package mqtt

type Subscription chan<- []byte

func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) {
    // サブスクリプションの配列
    var ss []Subscription
    for {
        select {
        case sub := <- subscriptions:
            // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加
            ss = append(ss, sub)
        case message := <- fromPub:
            // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送
            for _, sub := range ss {
                sub <- message
            }
        }
    }
}

サーバーに組み込む

[]byteからPublishへ変更

サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker で配送してるメッセージの型を []byte から packet.Publish に変更する。 packet.Publish を生成するための packet.NewPublish() という関数を追加。

func NewPublish(topicName string, message []byte) Publish {
    variableHeader := NewPublishVariableHeader(topicName)
    fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message)))
    return Publish{fixedHeader, variableHeader, message}
}

SubscriptionBroker の定義を変更する。

-type Subscription chan<- []byte
+import "github.com/bati11/oreno-mqtt/mqtt/packet"
 
-func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) {
+type Subscription chan<- *packet.Publish
+
+func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) {
     // サブスクリプションの配列
     var ss []Subscription
     for {

server.go

実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。

package mqtt

import (
    "bufio"
    "fmt"
    "io"
    "net"

    "github.com/bati11/oreno-mqtt/mqtt/handler"
    "github.com/bati11/oreno-mqtt/mqtt/packet"
)

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    pub := make(chan *packet.Publish)
    defer close(pub)
    subscriptions := make(chan Subscription)
    defer close(subscriptions)

    go Broker(pub, subscriptions)

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        go func() {
            err = handle(conn, pub, subscriptions)
            if err != nil {
                panic(err)
            }
        }()
    }
}

func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        mqttReader := packet.NewMQTTReader(r)
        packetType, err := mqttReader.ReadPacketType()
        if err != nil {
            if err == io.EOF {
                return nil
            }
            return err
        }
        switch packetType {
        case packet.PUBLISH:
            err = handler.HandlePublish(mqttReader)
            if err != nil {
                return err
            }
            // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む
        case packet.CONNECT:
            connack, err := handler.HandleConnect(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(connack.ToBytes())
            if err != nil {
                return err
            }
        case packet.SUBSCRIBE:
            suback, err := handler.HandleSubscribe(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(suback.ToBytes())
            if err != nil {
                return err
            }
            // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む
        case packet.PINGREQ:
            pingresp, err := handler.HandlePingreq(mqttReader)
            if err != nil {
                return err
            }
            _, err = conn.Write(pingresp.ToBytes())
            if err != nil {
                return err
            }
        case packet.DISCONNECT:
            return nil
        }
    }
}

後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。

PUBLISHのハンドリング

PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker へ配送する。

                case packet.PUBLISH:
-                       err = handler.HandlePublish(mqttReader)
+                       publish, err := handler.HandlePublish(mqttReader)
                        if err != nil {
                                return err
                        }
+                       publishToBroker <- publish

SUBSCRIBEのハンドリング

SUBSCRIBEのハンドリングでは、Broker からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub 関数を実行する。

                case packet.SUBSCRIBE:
                        suback, err := handler.HandleSubscribe(mqttReader)
                        if err != nil {
                            return err
                        }
                        _, err = conn.Write(suback.ToBytes())
                        if err != nil {
                                return err
                        }
+                       sub := make(chan *packet.Publish)
+                       subscriptionToBroker <- sub
+                       go handleSub(conn, sub)

sub というchannelを生成している。このchannelは2つのgoroutineに渡す。

  • 1: subscriptionToBroker channelに書き込むことで Broker goroutineに渡す。受け取り側では、型が chan *packet.Publish から chan<- *packet.Publish (つまり Subscription )になる
  • 2: handleSub 関数に引数で sub を渡す。 handleSub 関数では <-chan *packet.Publish として引数の型を定義しておく

これで Broker から配送されてくるメッセージを handleSub goroutineで受け取れる。

channelの所有権の観点から見ると、 Broker goroutineが sub channelの所有者であり、 handleSub goroutineが sub channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish )と読み取り専用channel( <-chan *packet.Publish )は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。

handleSub 関数の実装は以下。

func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) {
    for publishMessage := range fromBroker {
        bs := publishMessage.ToBytes()
        _, err := conn.Write(bs)
        if err != nil {
            // FIXME
            fmt.Println(err)
        }
    }
}

Broker から配送されてくるメッセージを channelから読み込んで、 net.Conn に書き込む。 conn に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。

動かす

実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。

$ mosquitto_sub -i custom-client-id -t hoge

mosquittoクライアントでパブリッシュする。

$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"

サブスクライバの方に "Hello" と表示された!

$ mosquitto_sub -i custom-client-id -t hoge
Hello

ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。

Topicのフィルタを無視していますが、そのうち・・・。

おしまい

ここまでの実装は以下にあります(tagが0.0.2)。

github.com

server.goで sub というchannelを生成し Broker goroutineへ渡しました。 Broker goroutineはこのchannelの所有者ですが、channelを close していません。サブスクライバへ送信するために net.Conn に書き込みましたが、このときエラーが発生した場合は Broker goroutineでchannelを close したいです。

次回は、 handleSub goroutineで発生するエラーのハンドリングを実装してみたいと思います。

今回の学び