前回、goroutine, channel, selectを使って Broker
を実装しました。この Broker
をサーバーで利用し、MQTTでメッセージを配送してみます。
その前に <-chan chan<- []byte
という型が読みづらいので type
を使って対応します。その後、読み書き可能なchannelを、 chan<-
, <-chan
という型に変換しgoroutineへ渡しメッセージの配送を実現します。
目次
typeで型をつくる
現在の実装だとサブスクリプションを chan<- []byte
で表現している。channelのchannel( <-chan chan<- []byte
)やchannelの配列( []chan<- []byte
)が登場したので少々読みづらい。
type
を使って chan<- []byte
という型から Subscription
という型を作ることにする。
type Subscription chan<- []byte
Broker
の実装はこうなる。 <-chan chan<- []byte
が <-chan Subscription
、 []chan<- []byte
が []Subscription
となって読みやすい。
// broker.go package mqtt type Subscription chan<- []byte func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for { select { case sub := <- subscriptions: // channelからサブスクリプション(chan<- []byte)を読み取ったら配列に追加 ss = append(ss, sub) case message := <- fromPub: // fromPub channelからメッセージを読み取ったら全てのサブスクリプションへ配送 for _, sub := range ss { sub <- message } } } }
サーバーに組み込む
[]byteからPublishへ変更
サブスクライバへメッセージを配送する時、サーバーからサブスクライバへMQTTのPUBLISHパケットを送る。 Broker
で配送してるメッセージの型を []byte
から packet.Publish
に変更する。 packet.Publish を生成するための packet.NewPublish()
という関数を追加。
func NewPublish(topicName string, message []byte) Publish { variableHeader := NewPublishVariableHeader(topicName) fixedHeader := NewPublishFixedHeader(PUBLISH, variableHeader.Length()+uint(len(message))) return Publish{fixedHeader, variableHeader, message} }
Subscription
と Broker
の定義を変更する。
-type Subscription chan<- []byte +import "github.com/bati11/oreno-mqtt/mqtt/packet" -func Broker(fromPub <-chan []byte, subscriptions <-chan Subscription) { +type Subscription chan<- *packet.Publish + +func Broker(fromPub <-chan *packet.Publish, subscriptions <-chan Subscription) { // サブスクリプションの配列 var ss []Subscription for {
server.go
実装したBrokerをserver.goに組み込む。途中の状態だけどserver.goのコードは以下のようになる。
package mqtt import ( "bufio" "fmt" "io" "net" "github.com/bati11/oreno-mqtt/mqtt/handler" "github.com/bati11/oreno-mqtt/mqtt/packet" ) func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } defer ln.Close() pub := make(chan *packet.Publish) defer close(pub) subscriptions := make(chan Subscription) defer close(subscriptions) go Broker(pub, subscriptions) for { conn, err := ln.Accept() if err != nil { panic(err) } go func() { err = handle(conn, pub, subscriptions) if err != nil { panic(err) } }() } } func handle(conn net.Conn, publishToBroker chan<- *packet.Publish, subscriptionToBroker chan<- Subscription) error { defer conn.Close() for { r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } // TODO "publishToBroker" channelにPUBLISHメッセージを書き込む case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } // TODO "subscriptionToBroker" channelに新たにSUBSCRIPTIONを生成し書き込む case packet.PINGREQ: pingresp, err := handler.HandlePingreq(mqttReader) if err != nil { return err } _, err = conn.Write(pingresp.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil } } }
後はTODOコメントにある通り、PUBLISHとSUBSCRIBEのハンドリングを実装していく。
PUBLISHのハンドリング
PUBLISHのハンドリングは簡単。publishToBrokerチャネルにクライアントから送信されてきたPUBLISHメッセージを書き込み、 Broker
へ配送する。
case packet.PUBLISH: - err = handler.HandlePublish(mqttReader) + publish, err := handler.HandlePublish(mqttReader) if err != nil { return err } + publishToBroker <- publish
SUBSCRIBEのハンドリング
SUBSCRIBEのハンドリングでは、Broker
からPUBLISHメッセージを受け取るためのチャネルを読み取るgoroutineをもう1つ作る。このgoroutineは handleSub
関数を実行する。
case packet.SUBSCRIBE: suback, err := handler.HandleSubscribe(mqttReader) if err != nil { return err } _, err = conn.Write(suback.ToBytes()) if err != nil { return err } + sub := make(chan *packet.Publish) + subscriptionToBroker <- sub + go handleSub(conn, sub)
sub
というchannelを生成している。このchannelは2つのgoroutineに渡す。
- 1:
subscriptionToBroker
channelに書き込むことでBroker
goroutineに渡す。受け取り側では、型がchan *packet.Publish
からchan<- *packet.Publish
(つまりSubscription
)になる - 2:
handleSub
関数に引数でsub
を渡す。handleSub
関数では<-chan *packet.Publish
として引数の型を定義しておく
これで Broker
から配送されてくるメッセージを handleSub
goroutineで受け取れる。
channelの所有権の観点から見ると、 Broker
goroutineが sub
channelの所有者であり、 handleSub
goroutineが sub
channelの利用者である。生成したchannelは読み書きできるけど、それぞれのgoroutineでは書き込み専用、読み取り専用として型を定義しておくことでchannelの所有者が分かりやすくなる。また 書き込み専用channel( chan<- *packet.Publish
)と読み取り専用channel( <-chan *packet.Publish
)は型が違うので、誤ったコードを書いた場合にコンパイルエラーになってくれる。
handleSub
関数の実装は以下。
func handleSub(conn net.Conn, fromBroker <-chan *packet.Publish) { for publishMessage := range fromBroker { bs := publishMessage.ToBytes() _, err := conn.Write(bs) if err != nil { // FIXME fmt.Println(err) } } }
Broker
から配送されてくるメッセージを channelから読み込んで、 net.Conn
に書き込む。 conn
に書き込めなかった場合、例えばサブスクライバの接続が切れてる場合などのエラーハンドリングはあとで考える。
動かす
実際に動かす。自作サーバーを起動してから、mosquittoクライアントでサブスクライブする。
$ mosquitto_sub -i custom-client-id -t hoge
mosquittoクライアントでパブリッシュする。
$ mosquitto_pub -i sample-publisher -t hoge -m "Hello"
サブスクライバの方に "Hello" と表示された!
$ mosquitto_sub -i custom-client-id -t hoge Hello
ついにパブリッシャからのメッセージをサブスクライバに配信できました!複数サブスクライバがいる場合も全てに配送されます。
Topicのフィルタを無視していますが、そのうち・・・。
おしまい
ここまでの実装は以下にあります(tagが0.0.2)。
server.goで sub
というchannelを生成し Broker
goroutineへ渡しました。 Broker
goroutineはこのchannelの所有者ですが、channelを close
していません。サブスクライバへ送信するために net.Conn
に書き込みましたが、このときエラーが発生した場合は Broker
goroutineでchannelを close
したいです。
次回は、 handleSub
goroutineで発生するエラーのハンドリングを実装してみたいと思います。
今回の学び