前回の続きです。handlerのエラーハンドリングからやります。その後、mosquitto_clientからPUBLISHパケットを自作サーバーで受け取れるようにしました。最後にhandlerのリファクタリングで Untyped constant declaration というconstの便利な使い方を知りました。
今回学ぶこと。
- handlerのエラーハンドリング
- PUBLISHパケットとDISCONNECTパケット
- Goのconstとiota
handlerでのエラーハンドリング
handlerでのエラーハンドリングを実装する。前々回調べた通りでError TypeもしくはOpaque Patternを使う。
すでに handler → packet
という依存関係ができているので、 packet
パッケージにError Typeを作ることにする。 ConnectError
というインタフェースを用意する。このインタフェースに Error()
を持たせてError Typeとして、さらに Connack
を取得するためのメソッドも追加する。
--- a/study/packet/connack.go +++ b/study/packet/connack.go @@ -28,29 +28,47 @@ func (c Connack) ToBytes() []byte { return result } +func newConnack() Connack { + fixedHeader := FixedHeader{ + PacketType: 2, + RemainingLength: 2, + } + variableHeader := ConnackVariableHeader{SessionPresent: false} + return Connack{fixedHeader, variableHeader} +} + func NewConnackForAccepted() Connack { result := newConnack() result.ReturnCode = 0 return result } -func NewConnackForRefusedByUnacceptableProtocolVersion() Connack { - result := newConnack() - result.ReturnCode = 1 - return result +type ConnectError interface { + Connack() Connack + Error() string } -func NewConnackForRefusedByIdentifierRejected() Connack { - result := newConnack() - result.ReturnCode = 2 - return result +type connectError struct { + connack Connack + msg string } -func newConnack() Connack { - fixedHeader := FixedHeader{ - PacketType: 2, - RemainingLength: 2, - } - variableHeader := ConnackVariableHeader{SessionPresent: false} - return Connack{fixedHeader, variableHeader} +func (e connectError) Connack() Connack { + return e.connack +} + +func (e connectError) Error() string { + return e.msg +} + +func RefusedByUnacceptableProtocolVersion(s string) ConnectError { + connack := newConnack() + connack.ReturnCode = 1 + return connectError{connack, s} +} + +func RefusedByIdentifierRejected(s string) ConnectError { + connack := newConnack() + connack.ReturnCode = 2 + return connectError{connack, s} }
--- a/study/packet/connect_payload.go +++ b/study/packet/connect_payload.go @@ -5,8 +5,6 @@ import ( "encoding/binary" "io" "regexp" - - "github.com/pkg/errors" ) type ConnectPayload struct { @@ -30,10 +28,10 @@ func ToConnectPayload(r *bufio.Reader) (ConnectPayload, error) { } clientID := string(clientIDBytes) if len(clientID) < 1 || len(clientID) > 23 { - return ConnectPayload{}, errors.New("ClientID length is invalid") + return ConnectPayload{}, RefusedByIdentifierRejected("ClientID length is invalid") } if !clientIDRegex.MatchString(clientID) { - return ConnectPayload{}, errors.New("clientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"") + return ConnectPayload{}, RefusedByIdentifierRejected("ClientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"") } return ConnectPayload{ClientID: clientID}, nil }
--- a/study/packet/connect_variable_header.go +++ b/study/packet/connect_variable_header.go @@ -30,11 +30,11 @@ func ToConnectVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (ConnectV protocolName := make([]byte, 6) _, err := io.ReadFull(r, protocolName) if err != nil || !isValidProtocolName(protocolName) { - return ConnectVariableHeader{}, errors.New("protocol name is invalid") + return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol name is invalid") } protocolLevel, err := r.ReadByte() if err != nil || protocolLevel != 4 { - return ConnectVariableHeader{}, errors.New("protocol level must be 4") + return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol level must be 4") } // TODO
handlerパッケージの変更。返された error
が packet.ConnectError
だった場合は、 Connack
を取得して返すように変更する。
--- a/study/handler/connect_handler.go +++ b/study/handler/connect_handler.go @@ -13,14 +13,18 @@ var variableHeaderLength = 10 func HandleConnect(fixedHeader packet.FixedHeader, r *bufio.Reader) (packet.Connack, error) { variableHeader, err := packet.ToConnectVariableHeader(fixedHeader, r) if err != nil { - // TODO err応じたCONNACKを生成して返す - return packet.NewConnackForRefusedByUnacceptableProtocolVersion(), nil + if ce, ok := err.(packet.ConnectError); ok { + return ce.Connack(), nil + } + return packet.Connack{}, err } payload, err := packet.ToConnectPayload(r) if err != nil { - // TODO err応じたCONNACKを生成して返す - return packet.NewConnackForRefusedByIdentifierRejected(), nil + if ce, ok := err.(packet.ConnectError); ok { + return ce.Connack(), nil + } + return packet.Connack{}, err } // TODO variableHeaderとpayloadを使って何かしらの処理
PUBLISHパケットとDISCONNECTパケット
やりたかったことを思い出す。最初の回に書いたように、まず実現したいのは以下のフロー。
クライアント → Connect Command → サーバー
クライアント ← Connect Ack ← サーバー
クライアント → Publish Message → サーバー
クライアント → Disconnect Req → サーバー
ここまでで、1と2のCONNECT(Connect Command)とCONNACK(Connect Ack)はできた。
次は、PUBLISH(Publish Message)とDISSCONNECT(Disconnect Req)に取り掛かる。
- PUBLISH
- DISCONNECT
PUBLISHパケット
PUBLISHパケットの可変ヘッダー
PUBLISHパケットの可変ヘッダーは以下の情報を持つ。
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039
- Topic Name
- Packet Identifier
クライアントはPUBLISHパケットの Topic Name
でトピックを指定してメッセージをサーバー(MQTT Broker)に送る。サーバーは、(まだ未実装だけど)PUBLISHで指定されたトピックをサブスクライブしてるクライアントにメッセージを転送する。
例えば以下のようなイメージ。
引用元:https://www.appcelerator.com/blog/2018/03/api-builder-and-mqtt-for-iot-part-1/
今はサブスクライブしてるクライアントのことは考えてないので、上の図でいうと左側の "temp" というトピックに対して "75°F" というメッセージをPUBLISHしてる部分を実装する。
Topic Name
について以下のような記述がある。Topic Nameのワイルドカードというのは #
と +
の2文字。サブスクライブ時にワイルドカードを指定することで複数のTopicをサブスクライブすることができる。Publishの可変ヘッダーにはこれらの文字を含んではいけない。
The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters
Packet Identifier
はQoS1かQoS2の場合に使う。今はQoS0固定で考えているので後回し。
可変ヘッダーを実装する。
package packet import ( "bufio" "fmt" "io" "strings" ) type PublishVariableHeader struct { TopicName string PacketIdentifier *uint16 } func ToPublishVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (PublishVariableHeader, error) { if fixedHeader.PacketType != 3 { return PublishVariableHeader{}, fmt.Errorf("packet type is invalid. it got is %v", fixedHeader.PacketType) } _, err := r.ReadByte() if err != nil { return PublishVariableHeader{}, err } lengthLSB, err := r.ReadByte() if err != nil { return PublishVariableHeader{}, err } if lengthLSB == 0 { return PublishVariableHeader{}, fmt.Errorf("length LSB should be > 0") } topicNameBytes := make([]byte, lengthLSB) _, err = io.ReadFull(r, topicNameBytes) if err != nil { return PublishVariableHeader{}, err } topicName := string(topicNameBytes) if strings.ContainsAny(topicName, "# +") { return PublishVariableHeader{}, fmt.Errorf("topic name must not contain wildcard. it got is %v", topicName) } result := PublishVariableHeader{string(topicNameBytes), nil} return result, nil }
PUBLISHパケットのペイロード
ペイロードは、サブスクライバーに対して送信するメッセージそのもの。 bufio.Reader
をそのまま使うことにする。
PUBLISHパケットに対するレスポンス
QoS0の時はレスポンスなし。
DISCONNECTパケット
DISCONNECTパケットの仕様はこちら
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090
DISCONNECTパケットには可変ヘッダーもペイロードもない。
serverとhandler実装
前回のserver.goはCONNECTパケットしか想定していなかった。PUBLISHパケットとDISCONNECTパケットも想定した実装にする。
server.goの実装。 Accept()
でクライアントからの接続を待つ。接続が来て net.Conn
を取得できたら handle()
関数へ渡す。 handle()
関数内でループしてクライアントからのMQTTパケットを受け取り FixedHeader
を生成。パケットタイプによるswitchで処理を分岐し、それぞれのパケットに対応したhandlerを呼ぶ。DISCONNECTパケットの場合はコネクションを切るだけなので return nil
だけして defer
で接続を切る。
package study import ( "bufio" "fmt" "io" "net" "github.com/bati11/oreno-mqtt/study/handler" "github.com/bati11/oreno-mqtt/study/packet" ) func Run() { ln, err := net.Listen("tcp", "localhost:1883") if err != nil { panic(err) } fmt.Println("server starts at localhost:1883") for { conn, err := ln.Accept() if err != nil { panic(err) } err = handle(conn) if err != nil { panic(err) } } } func handle(conn net.Conn) error { defer conn.Close() for { r := bufio.NewReader(conn) fixedHeader, err := packet.ToFixedHeader(r) if err != nil { if err == io.EOF { // クライアント側から既に切断してる場合 return nil } return err } fmt.Printf("-----\n%+v\n", fixedHeader) switch fixedHeader.PacketType { // CONNECT case 1: connack, err := handler.HandleConnect(fixedHeader, r) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } // PUBLISH case 3: err := handler.HandlePublish(fixedHeader, r) if err != nil { return err } // DISCONNECT case 14: return nil } } }
HandleConnect()
は前回から変更なし。
HandlePublish()
は以下のように実装。
package handler import ( "bufio" "fmt" "io" "github.com/bati11/oreno-mqtt/study/packet" ) func HandlePublish(fixedHeader packet.FixedHeader, r *bufio.Reader) error { fmt.Printf(" HandlePublish\n") variableHeader, err := packet.ToPublishVariableHeader(fixedHeader, r) if err != nil { return err } fmt.Printf(" %#v\n", variableHeader) payloadLength := fixedHeader.RemainingLength - variableHeader.Length payload := make([]byte, payloadLength) _, err = io.ReadFull(r, payload) if err != nil { return err } fmt.Printf(" Payload: %v\n", string(payload)) // TODO QoS0なのでレスポンスなし return nil }
mosquittoクライアントからメッセージを送ってみる
サーバーを起動する。
$ go run app/main.go server starts at localhost:1883
Wiresharkを起動しておく。
mosquittoクライアントからpublishする。
$ mosquitto_pub -t hoge -m "Hello"
結果を見てみる。
お、できてそう!サーバーの標準出力も確認。
----- {PacketType:1 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:28} HandleConnect packet.ConnectVariableHeader{ProtocolName:"MQTT", ProtocolLevel:0x4, ConnectFlags:packet.ConnectFlags{CleanSession:true, WillFlag:true, WillQoS:0x1, WillRetain:false, PasswordFlag:true, UserNameFlag:true}, KeepAlive:0xa} packet.ConnectPayload{ClientID:"custom-client-id"} ----- {PacketType:3 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:11} HandlePublish packet.PublishVariableHeader{TopicName:"hoge", PacketIdentifier:(*uint16)(nil), Length:0x6} Payload: Hello
ちゃんとPUBLISHパケットを解釈できてる!
これで最初の目標の以下の流れが実現できた。
クライアント → Connect Command → サーバー
クライアント ← Connect Ack ← サーバー
クライアント → Publish Message → サーバー
クライアント → Disconnect Req → サーバー
const
ところで、server.goの fixedHeader.PacketType
の値、マジックナンバーでswitchしてるところを分かりやすくしたい。
switch fixedHeader.PacketType { // CONNECT case 1: connack, err := handler.HandleConnect(fixedHeader, r) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } // PUBLISH case 3: err := handler.HandlePublish(fixedHeader, r) if err != nil { return err } // DISCONNECT case 14: return nil }
マジックナンバーをconstで置き換える。定数化することでコードが読みやすくなる。また、定数は実行時ではなくコンパイル時にコンパイラが最適化してくれて、パフォーマンスが良くなる場合もある。
fixed_header.goに定数を定義。
+const ( + CONNECT byte = 1 + PUBLISH byte = 3 + DISCONNECT byte = 14 +) + type FixedHeader struct { PacketType byte Dup byte
server.goのswitchは以下のようになる。
switch fixedHeader.PacketType { case packet.CONNECT: connack, err := handler.HandleConnect(fixedHeader, r) if err != nil { return err } _, err = conn.Write(connack.ToBytes()) if err != nil { return err } case packet.PUBLISH: err := handler.HandlePublish(fixedHeader, r) if err != nil { return err } case packet.DISCONNECT: return nil }
Untyped constant declaration
定数の型を byte
ではなく int
にするとどうなるか?
const ( CONNECT int = 1 PUBLISH byte = 3 DISCONNECT byte = 14 )
server.goのswitch-caseのところで以下のようなコンパイルエラーになる。fixedHeader.PacketType
の型が byte
なのに、 int
型の定数と比較してるためコンパイルエラー。
server.go:49:3: invalid case packet.CONNECT in switch on fixedHeader.PacketType (mismatched types int and byte)
では、定数の型を 書かない 場合はどうなるか?
const ( CONNECT = 1 PUBLISH byte = 3 DISCONNECT byte = 14 )
これだとコンパイルエラーにならない。 しかも、先程と異なり、 int
と比較してる箇所もコンパイルエラーにならない!
コンパイル時に適切な精度の型として埋め込んでくれるらしい。先ほどの記事ではこれを「 Untyped constant declaration 」と呼んでいる。
必要がない限り、型指定なしでconstを定義した方が良い。
const ( CONNECT = 1 PUBLISH = 3 DISCONNECT = 14 )
iota
constsについて、Effective Goも読んでみると以下のように書いてある。
「 In Go, enumerated constants are created using the iota enumerator. 」
enumというとJavaの列挙型を思い出す。けど、それは一旦置いておいてGoでは iota
という演算子を使うことで定数の定義を簡単かつ柔軟にできる。
PacketTypeの値の定数定義は iota
を使って以下のように書ける。
const ( _ = iota CONNECT CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT )
さらに、型を作ってメソッドを定義すれば、Enum(列挙型)のように使うこともできる。
type PacketType byte const ( _ PacketType = iota CONNECT CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT ) func (v PacketType) String() string { names := [...]string{ "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT"} if v < CONNECT || v > DISCONNECT { return "Unknown" } return names[v] }
Enumについては以下の記事が詳しい。
どういう時にEnumが必要なのか?が書いてある。
Why do we need enums?
Grouping and expecting only some related values Sharing common behavior Avoids using invalid values To increase the code readability and the maintainability
また、iotaを使うと定義する順番を間違えると値が変わってしまうため、値に意味があると問題になる場合もある。
いまのところ型や共通のメソッドは不要であるので、Enumはいらない気がする。値を定義する順番については、プロトコルで決まっている値で変更される頻度がほとんどないので気にしないでおく。結果、 iota
を使って型指定なしのconstを定義する形にする。
const ( _ = iota CONNECT CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT )
おしまい
ここまでで、MQTTクライアントからメッセージをPublishするところまでができました。次はいよいよ別のクライアントがSubscribeところに着手します。goroutineをどう使ってクライアントを管理するのかを考えていきます。
今回の学び。
- handlerのエラーハンドリング
- 前々回の記事
- PUBLISHパケットとDISCONNECTパケット
- Goのconstとiota