MQTTサーバーを実装しながらGoを学ぶ - その7 type switch, interface再考, プライベートな関数のテスト

前回で、クライアントからのPUBLISHを受け取るところまで実装できました。今回はSUBSCRIBEを受け取るところを実装します。いつも引用してる図で言うと、右側の「Computer」や「Mobile device」からMQTT Brokerへの通信です。

https://s3.amazonaws.com/www.appcelerator.com.images/MQTT_1.png

引用元:API Builder and MQTT for IoT - Part 1

サブスクライブしているクライアントにパブリッシュされたメッセージを送るところまでやりたかったのですが、Goのinterfaceに対する考え方 "Accept interfaces, Return structs" について考えていたらそこまでいきませんでした・・・。

目次。

SUBSCRIBE

SUBSCRIBEパケットを見てみる

実装に入る前に、mosquittoを使ってSUBSCRIBEパケットを眺めてみる。まずはWiresharkを起動。その後、mosquittoサーバーを起動。

$ /usr/local/sbin/mosquitto -v

mosquittoクライアントでサブスクライブする。-iで指定しているのは Client Identifier。 -t で指定しているのはトピック名。

$ mosquitto_sub -i sample-subscriber -t hoge

Wiresharkを見てみる。

https://i.gyazo.com/45685fa968124810406d4a52c0826e5d.png

ふむふむ。クライアントからサーバーへ Subscribe Requestが送られ、サーバーからクライアントへSubscribe Ackを応答してる。

しばらく放っておくと...pingしてる。

https://i.gyazo.com/bc3c0136e9868024ff86543b58b2bbea.png

実装する通信の流れ

以下の通信を実現できるようにMQTTサーバーを実装していく。

  1. クライアント → CONNECT → サーバー
  2. クライアント ← CONNACK ← サーバー
  3. クライアント → SUBSCRIBE → サーバー
  4. クライアント ← SUBACK ← サーバー

以下も実装する。

  1. クライアント → PINGREQ → サーバー
  2. クライアント ← PINGRESP ← サーバー

CONNECTとCONNACK

実装済み。

SUBSCRIBEとSUBACK

固定ヘッダーの仕様を勘違いしていたことに気がつく...

SUBSCRIBEパケットを実装する。

SUBSCRIBEパケットは固定ヘッダー、可変ヘッダー、ペイロードで構成される。

固定ヘッダーの仕様を読んでると、大きな勘違いをしてることに気がついた・・・。

Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection

固定ヘッダーの3,2,1,0ビットは予約されてる。以下のようになってるらしい。

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Control Packet type (8) Reserved
1 0 0 0 0 0 0 1
byte2... Remaining Length

ふむふむ。つまり、固定ヘッダーの Dup QoS Retain フィールドが定数なのかな、と思ったけれども・・・。

あれ?と思って改めて固定ヘッダーの仕様を見てみる。

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Control Packet type Flags specific to each MQTT Control Packet type
byte2... Remaining Length

3,2,1,0ビット目は Flags specific to each MQTT Control Packet type って書いてある!!!パケットタイプによって意味が変わるのか!

interfaceとtype switchesを使った修正方針

今の固定ヘッダーの実装は以下のようになっている。 Dup QoS Retain フィールドがあるので、これは PUBLISH パケット用の固定ヘッダーだ。 PUBLISH 用の固定ヘッダーを全てのパケットタイプで使うような実装をしていた・・・。

type FixedHeader struct {
    PacketType      byte
    Dup             byte
    QoS1            byte
    QoS2            byte
    Retain          byte
    RemainingLength uint
}

仕様を改めて読み直すと、 PUBLISH パケットのみ3,2,1,0ビットを使っており、他のパケットタイプは全てReservedであり実質使われていない。

修正方針としては以下のようにする。

  1. 既存のFixedHeaderをinterfaceにする
  2. そして、sturctを DefaultFixedHeaderPublishFixedHeader に分ける
  3. server.goでは以下のコードのようにtype switchesを使って分岐する
fixedHeader, err := packet.ToFixedHeader(r)
...
switch fh := fixedHeader.(type) {
case PublishFixedHeader:
    err := handler.HandlePublish(fh, r)
    ...
case packet.DefaultFixedHeader:
    switch fh.PacketType {
    case packet.CONNECT:
        ...
}

この書き方は、以前エラーハンドリングの部分でやったError Typeパターンと同じ考え方になる。

と、思ったけどちょっと考え直す。

"Accept interfaces, Return structs"

Goには "Accept interfaces, Return structs" という言葉がある。関数の引数をinterfaceにして返り値はstructにしよう、という考え方。

なぜか?

以下の記事のまとめにこう書いてある。

The primary reasons listed are:

  • Remove unneeded abstractions
  • Ambiguity of a user’s need on function input
  • Simplify function inputs

本文を読んで自分なりにまとめると

  • Goでは、interfaceを満たすために明示的にimplementsというような記述をする必要はない。メソッドのシグニチャさえ合っていれば適合するinterfaceとして扱うことができる。そのため、本当に必要になるまでinterfaceを使って抽象化した型を作らなくて良い。呼び出される関数側で抽象化が必要なければstructを返せばいい
  • 関数を利用する側の都合を全て把握するのは難しい。そのため引数で受け取るのはinterfaceで抽象化しておくと良い。そうすれば関数を利用する側はinterfaceを満たす色々なstructを渡せるようになる
  • 関数は利用しない引数を受け取るべきではない。同様に、利用しない振る舞い(メソッド)はいらないので、最小のinterfaceを受け取るようにするべき(インタフェース分離の法則)

さっき自分で立てた方針は異なっている。 packet.ToFixedHeader() がinterfaceを返し、 handler.HandlePublish() がstructを引数にとるという "Accept interfaces, Return structs" とは逆の構造。

fixedHeader, err := packet.ToFixedHeader(r)
...
switch fh := fixedHeader.(type) {
case PublishFixedHeader:
    err := handler.HandlePublish(fh, r)
    ...
case packet.DefaultFixedHeader:
    switch fh.PacketType {
    case packet.CONNECT:
        ...
}

先ほどの記事の最後にも書いてあるが、関数が複数の型を返す場合はinterfaceを返すしかないし、structの振る舞い(関数)ではなくフィールドの値が必要な場合は、structとして受け取る必要がある。そのため、現在の実装は "Accept interfaces, Return structs" に従っていないが、それでも良い。

しかし、関数が複数の型を返したいという理由でinterfaceを返して、呼び出し元がtype switchで処理を分岐する、というのがGoのinterfaceの使い方として微妙な気がしてきた。interfaceはあくまで振る舞いを定義するためのもので、型をまとめるためのものではないのでは(じゃあなぜtype switchという構文があるの?というのもあるけど)。

抽象化しない

もっと愚直に実装してみよう。

やりたいことはPacketTypeがPUBLISHかどうかで、FixedHeaderの構造を変えたいだけ。

packetType := PacketType(r)
if packetType == PUBLISH {
    publishFixedHeader := ToPublishFixedHeader(r)
    ...
} else {
    defaultFixedHeader := ToDefaultFixedHeader(r)
    ...
}

こんな感じのコードが書ければいい。ただ、packetTypeを取得するには1バイトreaderから読み取る必要があって、この1バイトにはPacketType以外の情報も含まれている。そのため、readerを1バイト分巻き戻すか、取得した1バイトをFixedHeaderを生成する関数に渡す必要がある。

bufio.Readerを含んだMQTTReaderという独自のstructを用意することにしよう。このstructに読み込んだ1バイトとreaderとを保持させる。

type MQTTReader struct {
    byte1 *byte
    r     *bufio.Reader
}

func NewMQTTReader(r io.Reader) *MQTTReader {
    bufr := bufio.NewReader(r)
    return &MQTTReader{r: bufr}
}

func (d *MQTTReader) ReadPacketType() (PacketType, error) {
    if d.byte1 == nil {
        byte1, err := d.r.ReadByte()
        if err != nil {
            return PacketType(0), err
        }
        d.byte1 = &byte1
    }
    return PacketType(*d.byte1 >> 4), nil
}

server.goでは以下のように分岐する。

r := bufio.NewReader(conn)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
...
if packetType == packet.PUBLISH {
    fixedHeader, err := packet.ToPublishFixedHeader(mqttReader)
    ...
    err = handler.HandlePublish(fixedHeader, r)
    ...
} else {
    fixedHeader, err := packet.ToFixedHeader(mqttReader)
    ...
    switch packetType {
    case packet.CONNECT:
        connack, err := handler.HandleConnect(fixedHeader, r)
        ...
}

ゴリゴリ変更。差分はこちら

今の実装だと

  • server.goでFixedHeaderを取得
  • handlerで
    • VariableHeaderとPayloadを取得
    • なんらかの処理
    • レスポンス生成

という流れになってる。readerから FixedHeaderVariableHeaderPalyload を保持するstruct(例えば Connect とか)を作成してパケットを表現した方が良さそう。

server.go

r := bufio.NewReader(conn)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
if err != nil {
    if err == io.EOF {
        return nil
    }
    return err
}
switch packetType {
case packet.PUBLISH:
    err = handler.HandlePublish(mqttReader)
    if err != nil {
        return err
    }
case packet.CONNECT:
    connack, err := handler.HandleConnect(mqttReader)
    if err != nil {
        return err
    }
    _, err = conn.Write(connack.ToBytes())
    if err != nil {
        return err
    }
case packet.DISCONNECT:
    return nil
}

例えば、CONNECTパケットなら以下のような実装をする。

// packet/connect.go
package packet

type Connect struct {
    FixedHeader    *FixedHeader
    VariableHeader *ConnectVariableHeader
    Payload        *ConnectPayload
}

func (reader *MQTTReader) ReadConnect() (*Connect, error) {
    fixedHeader, err := reader.readFixedHeader()
    if err != nil {
        return nil, err
    }
    variableHeader, err := reader.readConnectVariableHeader()
    if err != nil {
        return nil, err
    }
    payload, err := reader.readConnectPayload()
    if err != nil {
        return nil, err
    }
    return &Connect{fixedHeader, variableHeader, payload}, nil
}

// handler/connect_handle.go
package handler

func HandleConnect(reader *packet.MQTTReader) (packet.Connack, error) {
    fmt.Printf("HandleConnect\n")
    connect, err := reader.ReadConnect()
    if err != nil {
        if ce, ok := err.(packet.ConnectError); ok {
            return ce.Connack(), nil
        }
        return packet.Connack{}, err
    }

    // TODO variableHeaderとpayloadを使って何かしらの処理
    fmt.Printf("  %#v\n", connect.VariableHeader)
    fmt.Printf("  %#v\n", connect.Payload)

    return packet.NewConnackForAccepted(), nil
}

良い感じ、なんだけどテストで困った。

プライベートな関数のテスト

FixedHeaderを生成するのがパブリックな packet.ToPublishFixedHeader() からプライベートな readFixedHeader() というプライベートなメソッドに変更したので、 packet_test という別パッケージから参照することができず、テストできなくなってしまった。

プライベートな関数などのテストをするパターンがある。

まず、packet/export_test.goというファイルを用意して、テストしたいプライベートメソッドをパブリックにする。

package packet

var ExportReadPublishFixedHeader = (*MQTTReader).readPublishFixedHeader

structの実態がないメソッドを変数にセットして、あとからレシーバーを渡してあげる。こんなことができたんですねー、Method valuesというらしい。

あとは、fixed_header_test.goからパブリックに ExportReadPublishFixeder を呼ぶように書き換えてテストすれば良い。

-                       got, err := packet.ToPublishFixedHeader(tt.args.r)
+                       got, err := packet.ExportReadPublishFixedHeader(tt.args.r)

なるほどー。export_test.goという _test というファイル名なのでテストの時のみビルド対象に含まれるため、本番ビルドには影響がないのでパブリックにしても良いでしょう、と。

MQTTReader に実装を寄せていった差分はこちら

改めてSUBSCRIBEとSUBACK

SUBSCRIBEパケットの仕様。

  • 固定ヘッダー
    • PacketType 8
    • Reservedの部分は 0010 である。チェックして違ったら接続を切る
  • 可変ヘッダー
    • Packet Identifer 2byte
    • Packet Identiferは、PUBLISHパケットの可変ヘッダーにもあった。ただし、PUBLISHパケットは省略可能だったのに対して、SUBSCRIBEパケットでは必須
  • ペイロード
    • Packet FilterとQoSのペアのリスト
    • Packet Fileterとは、サブスクライブするトピックを指定する文字列のこと。ワイルドカードを指定することができる。今の実装ではワイルドカードはサポートしないことにする
    • ワイルドカードをサポートしない場合は、ワイルドカード文字を拒否しないといけない
    • PacketFilterとQoSのペアが1つもない場合は拒否しないといけない

ペイロードの仕様を読むと、1回のSUBSCRIBEパケットで複数のトピックをサブスクライブでき、なおかつそれぞれQoSの指定ができるようだ。今のところQoSのことは無視して実装してきてるので後で考えることにする。

実装したものはこちら

SUBSCRIBE, SUBACKを試してみる

よし、試すぞー。

実装してるサーバーを起動。

$ go run app/main.go`
  server starts at localhost:1883

SUBSCRIBEを送信。

$ mosquitto_sub -i custom-client-id -t hoge

Wiresharkで見てみる。

https://i.gyazo.com/0c25a55702a02e1f94d651bd3dc64402.png

ちゃんと動いてるー!!

これで以下の通信が実現できた。

  1. クライアント → CONNECT → サーバー
  2. クライアント ← CONNACK ← サーバー
  3. クライアント → SUBSCRIBE → サーバー
  4. クライアント ← SUBACK ← サーバー

PINGREQ, PINGRESP

次は以下の通信。

  1. クライアント → PINGREQ → サーバー
  2. クライアント ← PINGRESP ← サーバー

PINGREQ, PINGRESP共に固定ヘッダーのみ。可変ヘッダーとペイロードはなし。

実装したものはこちら

試してみる。SUBSCRIBEを送信。

$ mosquitto_sub -i custom-client-id -t hoge

その後、接続したまま放っておくと・・・

https://i.gyazo.com/1585f3d8b58d8cfd84d53fa4248652f7.png

できました!!

おしまい

クライアントがSubscribeするところまでできました。次回こそは、goroutineを使って複数クライアントを捌き、PublishされたメッセージをSubscribeしてるクライアントに送信する実装をします。

今回の学び。