MQTTサーバーを実装しながらGoを学ぶ - その6 const, iota

前回の続きです。handlerのエラーハンドリングからやります。その後、mosquitto_clientからPUBLISHパケットを自作サーバーで受け取れるようにしました。最後にhandlerのリファクタリングで Untyped constant declaration というconstの便利な使い方を知りました。

今回学ぶこと。

  • handlerのエラーハンドリング
  • PUBLISHパケットとDISCONNECTパケット
  • Goのconstとiota

handlerでのエラーハンドリング

handlerでのエラーハンドリングを実装する。前々回調べた通りでError TypeもしくはOpaque Patternを使う。

すでに handler → packet という依存関係ができているので、 packet パッケージにError Typeを作ることにする。 ConnectError というインタフェースを用意する。このインタフェースに Error() を持たせてError Typeとして、さらに Connack を取得するためのメソッドも追加する。

--- a/study/packet/connack.go
+++ b/study/packet/connack.go
@@ -28,29 +28,47 @@ func (c Connack) ToBytes() []byte {
        return result
 }
 
+func newConnack() Connack {
+       fixedHeader := FixedHeader{
+               PacketType:      2,
+               RemainingLength: 2,
+       }
+       variableHeader := ConnackVariableHeader{SessionPresent: false}
+       return Connack{fixedHeader, variableHeader}
+}
+
 func NewConnackForAccepted() Connack {
        result := newConnack()
        result.ReturnCode = 0
        return result
 }
 
-func NewConnackForRefusedByUnacceptableProtocolVersion() Connack {
-       result := newConnack()
-       result.ReturnCode = 1
-       return result
+type ConnectError interface {
+       Connack() Connack
+       Error() string
 }
 
-func NewConnackForRefusedByIdentifierRejected() Connack {
-       result := newConnack()
-       result.ReturnCode = 2
-       return result
+type connectError struct {
+       connack Connack
+       msg     string
 }
 
-func newConnack() Connack {
-       fixedHeader := FixedHeader{
-               PacketType:      2,
-               RemainingLength: 2,
-       }
-       variableHeader := ConnackVariableHeader{SessionPresent: false}
-       return Connack{fixedHeader, variableHeader}
+func (e connectError) Connack() Connack {
+       return e.connack
+}
+
+func (e connectError) Error() string {
+       return e.msg
+}
+
+func RefusedByUnacceptableProtocolVersion(s string) ConnectError {
+       connack := newConnack()
+       connack.ReturnCode = 1
+       return connectError{connack, s}
+}
+
+func RefusedByIdentifierRejected(s string) ConnectError {
+       connack := newConnack()
+       connack.ReturnCode = 2
+       return connectError{connack, s}
 }
--- a/study/packet/connect_payload.go
+++ b/study/packet/connect_payload.go
@@ -5,8 +5,6 @@ import (
        "encoding/binary"
        "io"
        "regexp"
-
-       "github.com/pkg/errors"
 )
 
 type ConnectPayload struct {
@@ -30,10 +28,10 @@ func ToConnectPayload(r *bufio.Reader) (ConnectPayload, error) {
        }
        clientID := string(clientIDBytes)
        if len(clientID) < 1 || len(clientID) > 23 {
-               return ConnectPayload{}, errors.New("ClientID length is invalid")
+               return ConnectPayload{}, RefusedByIdentifierRejected("ClientID length is invalid")
        }
        if !clientIDRegex.MatchString(clientID) {
-               return ConnectPayload{}, errors.New("clientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"")
+               return ConnectPayload{}, RefusedByIdentifierRejected("ClientId format shoud be \"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"")
        }
        return ConnectPayload{ClientID: clientID}, nil
 }
--- a/study/packet/connect_variable_header.go
+++ b/study/packet/connect_variable_header.go
@@ -30,11 +30,11 @@ func ToConnectVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (ConnectV
        protocolName := make([]byte, 6)
        _, err := io.ReadFull(r, protocolName)
        if err != nil || !isValidProtocolName(protocolName) {
-               return ConnectVariableHeader{}, errors.New("protocol name is invalid")
+               return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol name is invalid")
        }
        protocolLevel, err := r.ReadByte()
        if err != nil || protocolLevel != 4 {
-               return ConnectVariableHeader{}, errors.New("protocol level must be 4")
+               return ConnectVariableHeader{}, RefusedByUnacceptableProtocolVersion("protocol level must be 4")
        }
 
        // TODO

handlerパッケージの変更。返された errorpacket.ConnectError だった場合は、 Connack を取得して返すように変更する。

--- a/study/handler/connect_handler.go
+++ b/study/handler/connect_handler.go
@@ -13,14 +13,18 @@ var variableHeaderLength = 10
 func HandleConnect(fixedHeader packet.FixedHeader, r *bufio.Reader) (packet.Connack, error) {
        variableHeader, err := packet.ToConnectVariableHeader(fixedHeader, r)
        if err != nil {
-               // TODO err応じたCONNACKを生成して返す
-               return packet.NewConnackForRefusedByUnacceptableProtocolVersion(), nil
+               if ce, ok := err.(packet.ConnectError); ok {
+                       return ce.Connack(), nil
+               }
+               return packet.Connack{}, err
        }

        payload, err := packet.ToConnectPayload(r)
        if err != nil {
-               // TODO err応じたCONNACKを生成して返す
-               return packet.NewConnackForRefusedByIdentifierRejected(), nil
+               if ce, ok := err.(packet.ConnectError); ok {
+                       return ce.Connack(), nil
+               }
+               return packet.Connack{}, err
        }

        // TODO variableHeaderとpayloadを使って何かしらの処理

PUBLISHパケットとDISCONNECTパケット

やりたかったことを思い出す。最初の回に書いたように、まず実現したいのは以下のフロー。

  1. クライアント → Connect Command → サーバー
  2. クライアント ← Connect Ack ← サーバー
  3. クライアント → Publish Message → サーバー
  4. クライアント → Disconnect Req → サーバー

ここまでで、1と2のCONNECT(Connect Command)とCONNACK(Connect Ack)はできた。

次は、PUBLISH(Publish Message)とDISSCONNECT(Disconnect Req)に取り掛かる。

PUBLISHパケット

PUBLISHパケットの可変ヘッダー

PUBLISHパケットの可変ヘッダーは以下の情報を持つ。

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718039

  • Topic Name
  • Packet Identifier

クライアントはPUBLISHパケットの Topic Name でトピックを指定してメッセージをサーバー(MQTT Broker)に送る。サーバーは、(まだ未実装だけど)PUBLISHで指定されたトピックをサブスクライブしてるクライアントにメッセージを転送する。

例えば以下のようなイメージ。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元:API Builder and MQTT for IoT - Part 1

今はサブスクライブしてるクライアントのことは考えてないので、上の図でいうと左側の "temp" というトピックに対して "75°F" というメッセージをPUBLISHしてる部分を実装する。

Topic Name について以下のような記述がある。Topic Nameのワイルドカードというのは #+ の2文字。サブスクライブ時にワイルドカードを指定することで複数のTopicをサブスクライブすることができる。Publishの可変ヘッダーにはこれらの文字を含んではいけない。

The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters

Packet Identifier はQoS1かQoS2の場合に使う。今はQoS0固定で考えているので後回し。

可変ヘッダーを実装する。

package packet

import (
    "bufio"
    "fmt"
    "io"
    "strings"
)

type PublishVariableHeader struct {
    TopicName        string
    PacketIdentifier *uint16
}

func ToPublishVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (PublishVariableHeader, error) {
    if fixedHeader.PacketType != 3 {
        return PublishVariableHeader{}, fmt.Errorf("packet type is invalid. it got is %v", fixedHeader.PacketType)
    }

    _, err := r.ReadByte()
    if err != nil {
        return PublishVariableHeader{}, err
    }
    lengthLSB, err := r.ReadByte()
    if err != nil {
        return PublishVariableHeader{}, err
    }
    if lengthLSB == 0 {
        return PublishVariableHeader{}, fmt.Errorf("length LSB should be > 0")
    }
    topicNameBytes := make([]byte, lengthLSB)
    _, err = io.ReadFull(r, topicNameBytes)
    if err != nil {
        return PublishVariableHeader{}, err
    }
    topicName := string(topicNameBytes)
    if strings.ContainsAny(topicName, "# +") {
        return PublishVariableHeader{}, fmt.Errorf("topic name must not contain wildcard. it got is %v", topicName)
    }

    result := PublishVariableHeader{string(topicNameBytes), nil}
    return result, nil
}

PUBLISHパケットのペイロード

ペイロードは、サブスクライバーに対して送信するメッセージそのもの。 bufio.Reader をそのまま使うことにする。

PUBLISHパケットに対するレスポンス

QoS0の時はレスポンスなし。

DISCONNECTパケット

DISCONNECTパケットの仕様はこちら

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090

DISCONNECTパケットには可変ヘッダーもペイロードもない。

serverとhandler実装

前回のserver.goはCONNECTパケットしか想定していなかった。PUBLISHパケットとDISCONNECTパケットも想定した実装にする。

server.goの実装。 Accept() でクライアントからの接続を待つ。接続が来て net.Conn を取得できたら handle() 関数へ渡す。 handle() 関数内でループしてクライアントからのMQTTパケットを受け取り FixedHeader を生成。パケットタイプによるswitchで処理を分岐し、それぞれのパケットに対応したhandlerを呼ぶ。DISCONNECTパケットの場合はコネクションを切るだけなので return nil だけして defer で接続を切る。

package study

import (
    "bufio"
    "fmt"
    "io"
    "net"

    "github.com/bati11/oreno-mqtt/study/handler"
    "github.com/bati11/oreno-mqtt/study/packet"
)

func Run() {
    ln, err := net.Listen("tcp", "localhost:1883")
    if err != nil {
        panic(err)
    }
    fmt.Println("server starts at localhost:1883")

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        err = handle(conn)
        if err != nil {
            panic(err)
        }
    }
}

func handle(conn net.Conn) error {
    defer conn.Close()

    for {
        r := bufio.NewReader(conn)
        fixedHeader, err := packet.ToFixedHeader(r)
        if err != nil {
            if err == io.EOF {
                // クライアント側から既に切断してる場合
                return nil
            }
            return err
        }
        fmt.Printf("-----\n%+v\n", fixedHeader)

        switch fixedHeader.PacketType {
        // CONNECT
        case 1:
            connack, err := handler.HandleConnect(fixedHeader, r)
            if err != nil {
                return err
            }
            _, err = conn.Write(connack.ToBytes())
            if err != nil {
                return err
            }
        // PUBLISH
        case 3:
            err := handler.HandlePublish(fixedHeader, r)
            if err != nil {
                return err
            }
        // DISCONNECT
        case 14:
            return nil
        }
    }
}

HandleConnect() は前回から変更なし。

HandlePublish() は以下のように実装。

package handler

import (
    "bufio"
    "fmt"
    "io"

    "github.com/bati11/oreno-mqtt/study/packet"
)

func HandlePublish(fixedHeader packet.FixedHeader, r *bufio.Reader) error {
    fmt.Printf("  HandlePublish\n")
    variableHeader, err := packet.ToPublishVariableHeader(fixedHeader, r)
    if err != nil {
        return err
    }
    fmt.Printf("  %#v\n", variableHeader)

    payloadLength := fixedHeader.RemainingLength - variableHeader.Length
    payload := make([]byte, payloadLength)
    _, err = io.ReadFull(r, payload)
    if err != nil {
        return err
    }
    fmt.Printf("  Payload: %v\n", string(payload))

    // TODO QoS0なのでレスポンスなし
    return nil
}

mosquittoクライアントからメッセージを送ってみる

サーバーを起動する。

$ go run app/main.go 
server starts at localhost:1883

Wiresharkを起動しておく。

mosquittoクライアントからpublishする。

$ mosquitto_pub -t hoge -m "Hello"

結果を見てみる。

https://i.gyazo.com/38af18d974d97c8694564f9eb8f2320a.png

お、できてそう!サーバーの標準出力も確認。

-----
{PacketType:1 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:28}
HandleConnect
  packet.ConnectVariableHeader{ProtocolName:"MQTT", ProtocolLevel:0x4, ConnectFlags:packet.ConnectFlags{CleanSession:true, WillFlag:true, WillQoS:0x1, WillRetain:false, PasswordFlag:true, UserNameFlag:true}, KeepAlive:0xa}
  packet.ConnectPayload{ClientID:"custom-client-id"}
-----
{PacketType:3 Dup:0 QoS1:0 QoS2:0 Retain:0 RemainingLength:11}
  HandlePublish
  packet.PublishVariableHeader{TopicName:"hoge", PacketIdentifier:(*uint16)(nil), Length:0x6}
  Payload: Hello

ちゃんとPUBLISHパケットを解釈できてる!

これで最初の目標の以下の流れが実現できた。

  1. クライアント → Connect Command → サーバー
  2. クライアント ← Connect Ack ← サーバー
  3. クライアント → Publish Message → サーバー
  4. クライアント → Disconnect Req → サーバー

const

ところで、server.goの fixedHeader.PacketType の値、マジックナンバーでswitchしてるところを分かりやすくしたい。

switch fixedHeader.PacketType {
// CONNECT
case 1:
    connack, err := handler.HandleConnect(fixedHeader, r)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
// PUBLISH
case 3:
    err := handler.HandlePublish(fixedHeader, r)
    if err != nil {
        return err
    }
// DISCONNECT
case 14:
    return nil
}

マジックナンバーをconstで置き換える。定数化することでコードが読みやすくなる。また、定数は実行時ではなくコンパイル時にコンパイラが最適化してくれて、パフォーマンスが良くなる場合もある。

fixed_header.goに定数を定義。

+const (
+       CONNECT    byte = 1
+       PUBLISH    byte = 3
+       DISCONNECT byte = 14
+)
+
 type FixedHeader struct {
        PacketType      byte
        Dup             byte

server.goのswitchは以下のようになる。

switch fixedHeader.PacketType {
case packet.CONNECT:
    connack, err := handler.HandleConnect(fixedHeader, r)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
case packet.PUBLISH:
    err := handler.HandlePublish(fixedHeader, r)
    if err != nil {
        return err
    }
case packet.DISCONNECT:
    return nil
}

Untyped constant declaration

定数の型を byte ではなく int にするとどうなるか?

const (
       CONNECT    int = 1
       PUBLISH    byte = 3
       DISCONNECT byte = 14
)

server.goのswitch-caseのところで以下のようなコンパイルエラーになる。fixedHeader.PacketType の型が byte なのに、 int 型の定数と比較してるためコンパイルエラー。

server.go:49:3: invalid case packet.CONNECT in switch on fixedHeader.PacketType (mismatched types int and byte)

では、定数の型を 書かない 場合はどうなるか?

const (
    CONNECT         = 1
    PUBLISH    byte = 3
    DISCONNECT byte = 14
)

これだとコンパイルエラーにならない。 しかも、先程と異なり、 int と比較してる箇所もコンパイルエラーにならない!

コンパイル時に適切な精度の型として埋め込んでくれるらしい。先ほどの記事ではこれを「 Untyped constant declaration 」と呼んでいる。

必要がない限り、型指定なしでconstを定義した方が良い。

const (
    CONNECT    = 1
    PUBLISH    = 3
    DISCONNECT = 14
)

iota

constsについて、Effective Goも読んでみると以下のように書いてある。

In Go, enumerated constants are created using the iota enumerator.

enumというとJavaの列挙型を思い出す。けど、それは一旦置いておいてGoでは iota という演算子を使うことで定数の定義を簡単かつ柔軟にできる。

PacketTypeの値の定数定義は iota を使って以下のように書ける。

const (
    _ = iota
    CONNECT
    CONNACK
    PUBLISH
    PUBACK
    PUBREC
    PUBREL
    PUBCOMP
    SUBSCRIBE
    SUBACK
    UNSUBSCRIBE
    UNSUBACK
    PINGREQ
    PINGRESP
    DISCONNECT
)

さらに、型を作ってメソッドを定義すれば、Enum(列挙型)のように使うこともできる。

type PacketType byte

const (
    _ PacketType = iota
    CONNECT
    CONNACK
    PUBLISH
    PUBACK
    PUBREC
    PUBREL
    PUBCOMP
    SUBSCRIBE
    SUBACK
    UNSUBSCRIBE
    UNSUBACK
    PINGREQ
    PINGRESP
    DISCONNECT
)

func (v PacketType) String() string {
    names := [...]string{
        "CONNECT",
        "CONNACK",
        "PUBLISH",
        "PUBACK",
        "PUBREC",
        "PUBREL",
        "PUBCOMP",
        "SUBSCRIBE",
        "SUBACK",
        "UNSUBSCRIBE",
        "UNSUBACK",
        "PINGREQ",
        "PINGRESP",
        "DISCONNECT"}
    if v < CONNECT || v > DISCONNECT {
        return "Unknown"
    }
    return names[v]
}

Enumについては以下の記事が詳しい。

どういう時にEnumが必要なのか?が書いてある。

Why do we need enums?

Grouping and expecting only some related values
Sharing common behavior
Avoids using invalid values
To increase the code readability and the maintainability

また、iotaを使うと定義する順番を間違えると値が変わってしまうため、値に意味があると問題になる場合もある。

いまのところ型や共通のメソッドは不要であるので、Enumはいらない気がする。値を定義する順番については、プロトコルで決まっている値で変更される頻度がほとんどないので気にしないでおく。結果、 iota を使って型指定なしのconstを定義する形にする。

const (
    _ = iota
    CONNECT
    CONNACK
    PUBLISH
    PUBACK
    PUBREC
    PUBREL
    PUBCOMP
    SUBSCRIBE
    SUBACK
    UNSUBSCRIBE
    UNSUBACK
    PINGREQ
    PINGRESP
    DISCONNECT
)

おしまい

ここまでで、MQTTクライアントからメッセージをPublishするところまでができました。次はいよいよ別のクライアントがSubscribeところに着手します。goroutineをどう使ってクライアントを管理するのかを考えていきます。

今回の学び。