etcdによるカスタムサービスディスカバリ

2015年8月17日筆者: Fabian Reinartz

以前の投稿で、Prometheusでのサービスディスカバリの多くの新しい方法を紹介しました。それ以来、多くのことが起こりました。内部実装を改善し、コミュニティから素晴らしい貢献を受け、KubernetesとMarathonでのサービスディスカバリのサポートが追加されました。これらはバージョン0.16のリリースで利用可能になります。

また、カスタムサービスディスカバリについても触れました。

すべてのタイプのサービスディスカバリがPrometheusに直接含まれるほど汎用的であるわけではありません。おそらく、あなたの組織には独自のシステムがあり、Prometheusで機能させる必要があるでしょう。しかし、これは新しい監視ターゲットを自動的に検出する利点を享受できないという意味ではありません。

この投稿では、一貫性の高い分散キーバリューストアであるetcdに基づくカスタムサービスディスカバリ手法をPrometheusに接続する小さなユーティリティプログラムを実装します。

etcdとPrometheusのターゲット

我々の架空のサービスディスカバリシステムは、サービスとそのインスタンスを明確に定義されたキー・スキーマの下に保存します。

/services/<service_name>/<instance_id> = <instance_address>

Prometheusは、既存のすべてのサービスが追加されたり削除されたりする際に、それらのターゲットを自動的に追加および削除するはずです。Prometheusのファイルベースのサービスディスカバリと統合できます。これは、JSON形式のターゲットグループのリストとしてターゲットを記述する一連のファイルを監視します。

単一のターゲットグループは、一連のラベルに関連付けられたアドレスのリストで構成されます。これらのラベルは、それらのターゲットから取得されたすべての時系列にアタッチされます。etcdのサービスディスカバリから抽出されたターゲットグループの例は次のようになります。

{
  "targets": ["10.0.33.1:54423", "10.0.34.12:32535"],
  "labels": {
    "job": "node_exporter"
  }
}

プログラム

etcdクラスターに接続し、`/services`パスで見つかったすべてのサービスを検索し、それらをターゲットグループのファイルに書き出す小さなプログラムが必要です。

配線から始めましょう。私たちのツールには2つのフラグがあります。etcdサーバーに接続するフラグと、ターゲットグループを書き込むファイルです。内部的には、サービスはサービス名からインスタンスへのマップとして表現されます。インスタンスは、etcdパス内のインスタンス識別子からそのアドレスへのマップです。

const servicesPrefix = "/services"

type (
  instances map[string]string
  services  map[string]instances
)

var (
  etcdServer = flag.String("server", "http://127.0.0.1:4001", "etcd server to connect to")
  targetFile = flag.String("target-file", "tgroups.json", "the file that contains the target groups")
)

`main`関数はフラグを解析し、現在のサービスを保持するオブジェクトを初期化します。次に、etcdサーバーに接続し、`/services`パスを再帰的に読み取ります。指定されたパスのサブツリーを結果として受け取り、`srvs.handle`を呼び出します。これは、サブツリー内の各ノードに対して`srvs.update`メソッドを再帰的に実行します。`update`メソッドは、`srvs`オブジェクトの状態をetcd内のサブツリーの状態と一致するように変更します。最後に、`srvs.persist`を呼び出し、`srvs`オブジェクトをターゲットグループのリストに変換し、`-target-file`フラグで指定されたファイルに書き出します。

func main() {
  flag.Parse()

  var (
    client  = etcd.NewClient([]string{*etcdServer})
    srvs    = services{}
  )

  // Retrieve the subtree of the /services path.
  res, err := client.Get(servicesPrefix, false, true)
  if err != nil {
    log.Fatalf("Error on initial retrieval: %s", err)
  }
  srvs.handle(res.Node, srvs.update)
  srvs.persist()
}

これが動作する実装であると仮定しましょう。このツールを30秒ごとに実行することで、サービスディスカバリの現在のターゲットのほとんど正確なビューを得ることができます。

しかし、もっと良くすることはできないでしょうか?

答えは「はい」です。etcdはウォッチ機能を提供しており、任意のパスとそのサブパスの更新をリッスンできます。これにより、変更がすぐに通知され、すぐに適用できます。また、`/services`サブツリー全体を何度も処理する必要がなくなり、多数のサービスやインスタンスにとって重要になる可能性があります。

`main`関数を次のように拡張します。

func main() {
  // ...

  updates := make(chan *etcd.Response)

  // Start recursively watching for updates.
  go func() {
    _, err := client.Watch(servicesPrefix, 0, true, updates, nil)
    if err != nil {
      log.Errorln(err)
    }
  }()

  // Apply updates sent on the channel.
  for res := range updates {
    log.Infoln(res.Action, res.Node.Key, res.Node.Value)

    handler := srvs.update
    if res.Action == "delete" {
      handler = srvs.delete
    }
    srvs.handle(res.Node, handler)
    srvs.persist()
  }
}

私たちは、`/services`内のエントリの変更を再帰的に監視するゴルーチンを開始します。これは永久にブロックし、すべての変更を`updates`チャネルに送信します。その後、チャネルから更新を読み取り、以前と同じように適用します。ただし、インスタンスまたはサービス全体が消滅した場合は、代わりに`srvs.delete`メソッドを使用して`srvs.handle`を呼び出します。

変更をPrometheusが監視しているファイルに書き出すために、各更新の最後に`srvs.persist`を再度呼び出します。

修正メソッド

ここまでは順調です。概念的にはこれで機能します。残るのは`update`と`delete`ハンドラーメソッド、そして`persist`メソッドです。

`update`と`delete`は、`handle`メソッドによって呼び出されます。`handle`メソッドは、パスが有効であると仮定して、サブツリー内の各ノードに対して単純にそれらを呼び出します。

var pathPat = regexp.MustCompile(`/services/([^/]+)(?:/(\d+))?`)

func (srvs services) handle(node *etcd.Node, handler func(*etcd.Node)) {
  if pathPat.MatchString(node.Key) {
    handler(node)
  } else {
    log.Warnf("unhandled key %q", node.Key)
  }

  if node.Dir {
    for _, n := range node.Nodes {
      srvs.handle(n, handler)
    }
  }
}

更新

更新メソッドは、etcdで更新されたノードに基づいて、`services`オブジェクトの状態を変更します。

func (srvs services) update(node *etcd.Node) {
  match := pathPat.FindStringSubmatch(node.Key)
  // Creating a new job directory does not require any action.
  if match[2] == "" {
    return
  }
  srv := match[1]
  instanceID := match[2]

  // We received an update for an instance.
  insts, ok := srvs[srv]
  if !ok {
    insts = instances{}
    srvs[srv] = insts
  }
  insts[instanceID] = node.Value
}

削除

削除メソッドは、etcdから削除されたノードに応じて、`services`オブジェクトからインスタンスまたはジョブ全体を削除します。

func (srvs services) delete(node *etcd.Node) {
  match := pathPat.FindStringSubmatch(node.Key)
  srv := match[1]
  instanceID := match[2]

  // Deletion of an entire service.
  if instanceID == "" {
    delete(srvs, srv)
    return
  }

  // Delete a single instance from the service.
  delete(srvs[srv], instanceID)
}

永続化

永続化メソッドは、`services`オブジェクトの状態を`TargetGroup`のリストに変換します。その後、このリストをJSON形式で`-target-file`に書き込みます。

type TargetGroup struct {
  Targets []string          `json:"targets,omitempty"`
  Labels  map[string]string `json:"labels,omitempty"`
}

func (srvs services) persist() {
  var tgroups []*TargetGroup
  // Write files for current services.
  for job, instances := range srvs {
    var targets []string
    for _, addr := range instances {
      targets = append(targets, addr)
    }

    tgroups = append(tgroups, &TargetGroup{
      Targets: targets,
      Labels:  map[string]string{"job": job},
    })
  }

  content, err := json.Marshal(tgroups)
  if err != nil {
    log.Errorln(err)
    return
  }

  f, err := create(*targetFile)
  if err != nil {
    log.Errorln(err)
    return
  }
  defer f.Close()

  if _, err := f.Write(content); err != nil {
    log.Errorln(err)
  }
}

実稼働で利用する

すべて完了しました。では、どのように実行するのでしょうか?

設定済みの出力ファイルを使用してツールを起動するだけです。

./etcd_sd -target-file /etc/prometheus/tgroups.json

次に、同じファイルを使用して、ファイルベースのサービスディスカバリでPrometheusを設定します。最も簡単な構成は次のようになります。

scrape_configs:
- job_name: 'default' # Will be overwritten by job label of target groups.
  file_sd_configs:
  - names: ['/etc/prometheus/tgroups.json']

これで完了です。これで、Prometheusはetcdのサービスディスカバリに出入りするサービスとそのインスタンスと同期を保ちます。

結論

Prometheusがあなたの組織のサービスディスカバリをネイティブでサポートしていなくても、絶望しないでください。小さなユーティリティプログラムを使用することで、ギャップを簡単に埋め、監視ターゲットのシームレスな更新から利益を得ることができます。これにより、デプロイメントの計算から監視設定への変更を削除できます。

Kubernetes と Marathon のネイティブサポートを追加してくれた貢献者の Jimmy DysonRobert Jacob に感謝します。また、ファイルベースの EC2 サービスディスカバリ に関する Keegan C Smith の見解 もチェックしてください。

このブログ投稿の全ソースはGitHubで確認できます