前回で、クライアントからのPUBLISHを受け取るところまで実装できました。今回はSUBSCRIBEを受け取るところを実装します。いつも引用してる図で言うと、右側の「Computer」や「Mobile device」からMQTT Brokerへの通信です。
引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
サブスクライブしているクライアントにパブリッシュされたメッセージを送るところまでやりたかったのですが、Goのinterfaceに対する考え方 "Accept interfaces, Return structs" について考えていたらそこまでいきませんでした・・・。
目次。
- SUBSCRIBE
- CONNECTとCONNACK
- SUBSCRIBEとSUBACK
- "Accept interfaces, Return structs"
- 改めてSUBSCRIBEとSUBACK
- SUBSCRIBE, SUBACKを試してみる
- PINGREQ, PINGRESP
- おしまい
SUBSCRIBE
SUBSCRIBEパケットを見てみる
実装に入る前に、mosquittoを使ってSUBSCRIBEパケットを眺めてみる。まずはWiresharkを起動。その後、mosquittoサーバーを起動。
$ /usr/local/sbin/mosquitto -v
mosquittoクライアントでサブスクライブする。-i
で指定しているのは Client Identifier。 -t
で指定しているのはトピック名。
$ mosquitto_sub -i sample-subscriber -t hoge
Wiresharkを見てみる。
ふむふむ。クライアントからサーバーへ Subscribe Requestが送られ、サーバーからクライアントへSubscribe Ackを応答してる。
しばらく放っておくと...pingしてる。
実装する通信の流れ
以下の通信を実現できるようにMQTTサーバーを実装していく。
- クライアント → CONNECT → サーバー
- クライアント ← CONNACK ← サーバー
- クライアント → SUBSCRIBE → サーバー
- クライアント ← SUBACK ← サーバー
以下も実装する。
- クライアント → PINGREQ → サーバー
- クライアント ← PINGRESP ← サーバー
CONNECTとCONNACK
実装済み。
SUBSCRIBEとSUBACK
固定ヘッダーの仕様を勘違いしていたことに気がつく...
SUBSCRIBEパケットを実装する。
SUBSCRIBEパケットは固定ヘッダー、可変ヘッダー、ペイロードで構成される。
固定ヘッダーの仕様を読んでると、大きな勘違いをしてることに気がついた・・・。
Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection
固定ヘッダーの3,2,1,0ビットは予約されてる。以下のようになってるらしい。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte1 | MQTT Control Packet type (8) | Reserved | ||||||
1 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | |
byte2... | Remaining Length |
ふむふむ。つまり、固定ヘッダーの Dup
QoS
Retain
フィールドが定数なのかな、と思ったけれども・・・。
あれ?と思って改めて固定ヘッダーの仕様を見てみる。
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte1 | MQTT Control Packet type | Flags specific to each MQTT Control Packet type | ||||||
byte2... | Remaining Length |
3,2,1,0ビット目は Flags specific to each MQTT Control Packet type
って書いてある!!!パケットタイプによって意味が変わるのか!
interfaceとtype switchesを使った修正方針
今の固定ヘッダーの実装は以下のようになっている。 Dup
QoS
Retain
フィールドがあるので、これは PUBLISH
パケット用の固定ヘッダーだ。 PUBLISH
用の固定ヘッダーを全てのパケットタイプで使うような実装をしていた・・・。
type FixedHeader struct { PacketType byte Dup byte QoS1 byte QoS2 byte Retain byte RemainingLength uint }
仕様を改めて読み直すと、 PUBLISH
パケットのみ3,2,1,0ビットを使っており、他のパケットタイプは全てReservedであり実質使われていない。
修正方針としては以下のようにする。
- 既存のFixedHeaderをinterfaceにする
- そして、sturctを
DefaultFixedHeader
とPublishFixedHeader
に分ける - server.goでは以下のコードのようにtype switchesを使って分岐する
fixedHeader, err := packet.ToFixedHeader(r) ... switch fh := fixedHeader.(type) { case PublishFixedHeader: err := handler.HandlePublish(fh, r) ... case packet.DefaultFixedHeader: switch fh.PacketType { case packet.CONNECT: ... }
この書き方は、以前エラーハンドリングの部分でやったError Typeパターンと同じ考え方になる。
と、思ったけどちょっと考え直す。
"Accept interfaces, Return structs"
Goには "Accept interfaces, Return structs" という言葉がある。関数の引数をinterfaceにして返り値はstructにしよう、という考え方。
なぜか?
以下の記事のまとめにこう書いてある。
The primary reasons listed are:
- Remove unneeded abstractions
- Ambiguity of a user’s need on function input
- Simplify function inputs
本文を読んで自分なりにまとめると
- Goでは、interfaceを満たすために明示的にimplementsというような記述をする必要はない。メソッドのシグニチャさえ合っていれば適合するinterfaceとして扱うことができる。そのため、本当に必要になるまでinterfaceを使って抽象化した型を作らなくて良い。呼び出される関数側で抽象化が必要なければstructを返せばいい
- 関数を利用する側の都合を全て把握するのは難しい。そのため引数で受け取るのはinterfaceで抽象化しておくと良い。そうすれば関数を利用する側はinterfaceを満たす色々なstructを渡せるようになる
- 関数は利用しない引数を受け取るべきではない。同様に、利用しない振る舞い(メソッド)はいらないので、最小のinterfaceを受け取るようにするべき(インタフェース分離の法則)
さっき自分で立てた方針は異なっている。 packet.ToFixedHeader()
がinterfaceを返し、 handler.HandlePublish()
がstructを引数にとるという "Accept interfaces, Return structs" とは逆の構造。
fixedHeader, err := packet.ToFixedHeader(r) ... switch fh := fixedHeader.(type) { case PublishFixedHeader: err := handler.HandlePublish(fh, r) ... case packet.DefaultFixedHeader: switch fh.PacketType { case packet.CONNECT: ... }
先ほどの記事の最後にも書いてあるが、関数が複数の型を返す場合はinterfaceを返すしかないし、structの振る舞い(関数)ではなくフィールドの値が必要な場合は、structとして受け取る必要がある。そのため、現在の実装は "Accept interfaces, Return structs" に従っていないが、それでも良い。
しかし、関数が複数の型を返したいという理由でinterfaceを返して、呼び出し元がtype switchで処理を分岐する、というのがGoのinterfaceの使い方として微妙な気がしてきた。interfaceはあくまで振る舞いを定義するためのもので、型をまとめるためのものではないのでは(じゃあなぜtype switchという構文があるの?というのもあるけど)。
抽象化しない
もっと愚直に実装してみよう。
やりたいことはPacketTypeがPUBLISHかどうかで、FixedHeaderの構造を変えたいだけ。
packetType := PacketType(r) if packetType == PUBLISH { publishFixedHeader := ToPublishFixedHeader(r) ... } else { defaultFixedHeader := ToDefaultFixedHeader(r) ... }
こんな感じのコードが書ければいい。ただ、packetTypeを取得するには1バイトreaderから読み取る必要があって、この1バイトにはPacketType以外の情報も含まれている。そのため、readerを1バイト分巻き戻すか、取得した1バイトをFixedHeaderを生成する関数に渡す必要がある。
bufio.Readerを含んだMQTTReaderという独自のstructを用意することにしよう。このstructに読み込んだ1バイトとreaderとを保持させる。
type MQTTReader struct { byte1 *byte r *bufio.Reader } func NewMQTTReader(r io.Reader) *MQTTReader { bufr := bufio.NewReader(r) return &MQTTReader{r: bufr} } func (d *MQTTReader) ReadPacketType() (PacketType, error) { if d.byte1 == nil { byte1, err := d.r.ReadByte() if err != nil { return PacketType(0), err } d.byte1 = &byte1 } return PacketType(*d.byte1 >> 4), nil }
server.goでは以下のように分岐する。
r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() ... if packetType == packet.PUBLISH { fixedHeader, err := packet.ToPublishFixedHeader(mqttReader) ... err = handler.HandlePublish(fixedHeader, r) ... } else { fixedHeader, err := packet.ToFixedHeader(mqttReader) ... switch packetType { case packet.CONNECT: connack, err := handler.HandleConnect(fixedHeader, r) ... }
ゴリゴリ変更。差分はこちら
今の実装だと
- server.goでFixedHeaderを取得
- handlerで
- VariableHeaderとPayloadを取得
- なんらかの処理
- レスポンス生成
という流れになってる。readerから FixedHeader
と VariableHeader
と Palyload
を保持するstruct(例えば Connect
とか)を作成してパケットを表現した方が良さそう。
server.go
r := bufio.NewReader(conn) mqttReader := packet.NewMQTTReader(r) packetType, err := mqttReader.ReadPacketType() if err != nil { if err == io.EOF { return nil } return err } switch packetType { case packet.PUBLISH: err = handler.HandlePublish(mqttReader) if err != nil { return err } case packet.CONNECT: connack, err := handler.HandleConnect(mqttReader) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.DISCONNECT: return nil }
例えば、CONNECTパケットなら以下のような実装をする。
// packet/connect.go package packet type Connect struct { FixedHeader *FixedHeader VariableHeader *ConnectVariableHeader Payload *ConnectPayload } func (reader *MQTTReader) ReadConnect() (*Connect, error) { fixedHeader, err := reader.readFixedHeader() if err != nil { return nil, err } variableHeader, err := reader.readConnectVariableHeader() if err != nil { return nil, err } payload, err := reader.readConnectPayload() if err != nil { return nil, err } return &Connect{fixedHeader, variableHeader, payload}, nil } // handler/connect_handle.go package handler func HandleConnect(reader *packet.MQTTReader) (packet.Connack, error) { fmt.Printf("HandleConnect\n") connect, err := reader.ReadConnect() if err != nil { if ce, ok := err.(packet.ConnectError); ok { return ce.Connack(), nil } return packet.Connack{}, err } // TODO variableHeaderとpayloadを使って何かしらの処理 fmt.Printf(" %#v\n", connect.VariableHeader) fmt.Printf(" %#v\n", connect.Payload) return packet.NewConnackForAccepted(), nil }
良い感じ、なんだけどテストで困った。
プライベートな関数のテスト
FixedHeaderを生成するのがパブリックな packet.ToPublishFixedHeader()
からプライベートな readFixedHeader()
というプライベートなメソッドに変更したので、 packet_test
という別パッケージから参照することができず、テストできなくなってしまった。
プライベートな関数などのテストをするパターンがある。
まず、packet/export_test.goというファイルを用意して、テストしたいプライベートメソッドをパブリックにする。
package packet var ExportReadPublishFixedHeader = (*MQTTReader).readPublishFixedHeader
structの実態がないメソッドを変数にセットして、あとからレシーバーを渡してあげる。こんなことができたんですねー、Method valuesというらしい。
あとは、fixed_header_test.goからパブリックに ExportReadPublishFixeder
を呼ぶように書き換えてテストすれば良い。
- got, err := packet.ToPublishFixedHeader(tt.args.r) + got, err := packet.ExportReadPublishFixedHeader(tt.args.r)
なるほどー。export_test.goという _test
というファイル名なのでテストの時のみビルド対象に含まれるため、本番ビルドには影響がないのでパブリックにしても良いでしょう、と。
MQTTReader
に実装を寄せていった差分はこちら
改めてSUBSCRIBEとSUBACK
SUBSCRIBEパケットの仕様。
- 固定ヘッダー
- PacketType
8
- Reservedの部分は 0010 である。チェックして違ったら接続を切る
- PacketType
- 可変ヘッダー
- Packet Identifer 2byte
- Packet Identiferは、PUBLISHパケットの可変ヘッダーにもあった。ただし、PUBLISHパケットは省略可能だったのに対して、SUBSCRIBEパケットでは必須
- ペイロード
ペイロードの仕様を読むと、1回のSUBSCRIBEパケットで複数のトピックをサブスクライブでき、なおかつそれぞれQoSの指定ができるようだ。今のところQoSのことは無視して実装してきてるので後で考えることにする。
実装したものはこちら
SUBSCRIBE, SUBACKを試してみる
よし、試すぞー。
実装してるサーバーを起動。
$ go run app/main.go` server starts at localhost:1883
SUBSCRIBEを送信。
$ mosquitto_sub -i custom-client-id -t hoge
Wiresharkで見てみる。
ちゃんと動いてるー!!
これで以下の通信が実現できた。
- クライアント → CONNECT → サーバー
- クライアント ← CONNACK ← サーバー
- クライアント → SUBSCRIBE → サーバー
- クライアント ← SUBACK ← サーバー
PINGREQ, PINGRESP
次は以下の通信。
- クライアント → PINGREQ → サーバー
- クライアント ← PINGRESP ← サーバー
PINGREQ, PINGRESP共に固定ヘッダーのみ。可変ヘッダーとペイロードはなし。
実装したものはこちら
試してみる。SUBSCRIBEを送信。
$ mosquitto_sub -i custom-client-id -t hoge
その後、接続したまま放っておくと・・・
できました!!
おしまい
クライアントがSubscribeするところまでできました。次回こそは、goroutineを使って複数クライアントを捌き、PublishされたメッセージをSubscribeしてるクライアントに送信する実装をします。
今回の学び。
- interfaceとtype switch
- "Accept interfaces, Return structs"
- プライベートな関数などのテスト