etcd によるカスタムサービスディスカバリ
2015年8月17日筆者: Fabian Reinartz
以前の投稿では、Prometheus における多数の新しいサービスディスカバリの方法を紹介しました。それ以降、多くのことが起こりました。内部実装を改善し、コミュニティから素晴らしい貢献を受けて、Kubernetes および Marathon によるサービスディスカバリのサポートが追加されました。これらはバージョン0.16のリリースで利用可能になります。
また、カスタムサービスディスカバリのトピックにも触れました。
すべてのサービスディスカバリタイプが Prometheus に直接組み込まれるほど汎用的であるとは限りません。お使いの組織には独自のシステムが導入されており、それを Prometheus で動作させる必要があるかもしれません。だからといって、新しい監視ターゲットを自動的に検出するメリットを享受できないわけではありません。
この投稿では、高一貫性分散キーバリューストアであるetcd に基づくカスタムサービスディスカバリアプローチを Prometheus に接続する小さなユーティリティプログラムを実装します。
etcd と Prometheus におけるターゲット
私たちの架空のサービスディスカバリシステムは、明確に定義されたキーのスキーマの下にサービスとそのインスタンスを格納します。
/services/<service_name>/<instance_id> = <instance_address>
Prometheus は、サービスが出現したり消滅したりするたびに、すべての既存サービスのターゲットを自動的に追加および削除する必要があります。Prometheus のファイルベースのサービスディスカバリと統合できます。これは、JSON形式のターゲットグループのリストとしてターゲットを記述する一連のファイルを監視します。
単一のターゲットグループは、一連のラベルに関連付けられたアドレスのリストで構成されます。これらのラベルは、それらのターゲットから取得されたすべての時系列にアタッチされます。etcd におけるサービスディスカバリから抽出されたターゲットグループの例を次に示します。
{
"targets": ["10.0.33.1:54423", "10.0.34.12:32535"],
"labels": {
"job": "node_exporter"
}
}
プログラム
必要なのは、etcd クラスタに接続し、`/services` パスで見つかったすべてのサービスのルックアップを実行して、ターゲットグループのファイルに書き出す小さなプログラムです。
配線から始めましょう。このツールには2つのフラグがあります。接続する etcd サーバーと、ターゲットグループが書き込まれるファイルです。内部的には、サービスはサービス名からインスタンスへのマップとして表されます。インスタンスは、etcd パス内のインスタンス識別子からアドレスへのマップです。
const servicesPrefix = "/services"
type (
instances map[string]string
services map[string]instances
)
var (
etcdServer = flag.String("server", "http://127.0.0.1:4001", "etcd server to connect to")
targetFile = flag.String("target-file", "tgroups.json", "the file that contains the target groups")
)
私たちの `main` 関数はフラグを解析し、現在のサービスを保持するオブジェクトを初期化します。次に etcd サーバーに接続し、`/services` パスの再帰的な読み取りを実行します。結果として指定されたパスのサブツリーを受け取り、`srvs.handle` を呼び出します。これは、サブツリー内の各ノードに対して `srvs.update` メソッドを再帰的に実行します。`update` メソッドは、etcd のサブツリーの状態と一致するように `srvs` オブジェクトの状態を変更します。最後に、`srvs.persist` を呼び出します。これは `srvs` オブジェクトをターゲットグループのリストに変換し、`-target-file` フラグで指定されたファイルに書き出します。
func main() {
flag.Parse()
var (
client = etcd.NewClient([]string{*etcdServer})
srvs = services{}
)
// Retrieve the subtree of the /services path.
res, err := client.Get(servicesPrefix, false, true)
if err != nil {
log.Fatalf("Error on initial retrieval: %s", err)
}
srvs.handle(res.Node, srvs.update)
srvs.persist()
}
これが動作する実装であると仮定しましょう。このツールを30秒ごとに実行することで、サービスディスカバリにおける現在のターゲットのほぼ正確なビューを得ることができます。
しかし、もっと良くできるでしょうか?
答えははいです。etcd はウォッチ機能を提供しており、これにより任意のパスとそのサブパスの更新をリッスンできます。これにより、変更が加えられたらすぐに通知され、すぐに適用できます。また、多数のサービスとインスタンスの場合に重要になる可能性がある、`/services` サブツリー全体を繰り返し処理する必要もなくなります。
`main` 関数を次のように拡張します。
func main() {
// ...
updates := make(chan *etcd.Response)
// Start recursively watching for updates.
go func() {
_, err := client.Watch(servicesPrefix, 0, true, updates, nil)
if err != nil {
log.Errorln(err)
}
}()
// Apply updates sent on the channel.
for res := range updates {
log.Infoln(res.Action, res.Node.Key, res.Node.Value)
handler := srvs.update
if res.Action == "delete" {
handler = srvs.delete
}
srvs.handle(res.Node, handler)
srvs.persist()
}
}
`/services` のエントリへの変更を再帰的に監視するゴルーチンを開始します。これは永久にブロックされ、すべての変更を `updates` チャネルに送信します。次にチャネルから更新を読み取り、以前と同様に適用します。ただし、インスタンスまたはサービス全体が消失した場合は、代わりに `srvs.delete` メソッドを使用して `srvs.handle` を呼び出します。
各更新を `srvs.persist` の別の呼び出しで終了させ、Prometheus が監視しているファイルに変更を書き出します。
変更メソッド
ここまでは概念的にうまく機能しています。残りは `update` および `delete` ハンドラメソッドと `persist` メソッドです。
`update` および `delete` は `handle` メソッドによって呼び出されます。これは、パスが有効であると仮定して、サブツリー内の各ノードに対してこれらのメソッドを単純に呼び出します。
var pathPat = regexp.MustCompile(`/services/([^/]+)(?:/(\d+))?`)
func (srvs services) handle(node *etcd.Node, handler func(*etcd.Node)) {
if pathPat.MatchString(node.Key) {
handler(node)
} else {
log.Warnf("unhandled key %q", node.Key)
}
if node.Dir {
for _, n := range node.Nodes {
srvs.handle(n, handler)
}
}
}
`update`
update メソッドは、etcd で更新されたノードに基づいて、`services` オブジェクトの状態を変更します。
func (srvs services) update(node *etcd.Node) {
match := pathPat.FindStringSubmatch(node.Key)
// Creating a new job directory does not require any action.
if match[2] == "" {
return
}
srv := match[1]
instanceID := match[2]
// We received an update for an instance.
insts, ok := srvs[srv]
if !ok {
insts = instances{}
srvs[srv] = insts
}
insts[instanceID] = node.Value
}
`delete`
delete メソッドは、etcd から削除されたノードに応じて、`services` オブジェクトからインスタンスまたはジョブ全体を削除します。
func (srvs services) delete(node *etcd.Node) {
match := pathPat.FindStringSubmatch(node.Key)
srv := match[1]
instanceID := match[2]
// Deletion of an entire service.
if instanceID == "" {
delete(srvs, srv)
return
}
// Delete a single instance from the service.
delete(srvs[srv], instanceID)
}
`persist`
persist メソッドは、`services` オブジェクトの状態を `TargetGroup` のリストに変換します。次に、このリストを JSON 形式で `-target-file` に書き込みます。
type TargetGroup struct {
Targets []string `json:"targets,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
func (srvs services) persist() {
var tgroups []*TargetGroup
// Write files for current services.
for job, instances := range srvs {
var targets []string
for _, addr := range instances {
targets = append(targets, addr)
}
tgroups = append(tgroups, &TargetGroup{
Targets: targets,
Labels: map[string]string{"job": job},
})
}
content, err := json.Marshal(tgroups)
if err != nil {
log.Errorln(err)
return
}
f, err := create(*targetFile)
if err != nil {
log.Errorln(err)
return
}
defer f.Close()
if _, err := f.Write(content); err != nil {
log.Errorln(err)
}
}
本番環境への導入
すべて完了しましたが、どのように実行すればよいでしょうか?
設定された出力ファイルでツールを起動するだけです。
./etcd_sd -target-file /etc/prometheus/tgroups.json
次に、同じファイルを使用して Prometheus をファイルベースのサービスディスカバリで構成します。最も単純な構成は次のようになります。
scrape_configs:
- job_name: 'default' # Will be overwritten by job label of target groups.
file_sd_configs:
- names: ['/etc/prometheus/tgroups.json']
これで完了です。これで、Prometheus はサービスディスカバリ (etcd) でサービスとそのインスタンスが出入りする状況と同期し続けます。
結論
Prometheus がお使いの組織のサービスディスカバリをネイティブでサポートしていない場合でも、落胆しないでください。小さなユーティリティプログラムを使用することで、簡単にギャップを埋め、監視対象ターゲットのシームレスな更新の恩恵を受けることができます。これにより、デプロイメントの考慮事項から監視構成の変更を削除できます。
貢献者のJimmy Dyson および Robert Jacob に大いに感謝します。Kubernetes および Marathon のネイティブサポートを追加していただきありがとうございます。また、Keegan C Smith によるファイルベースのEC2 サービスディスカバリ のアプローチも確認してください。
このブログ投稿の全ソースコードはこちらで確認できます (GitHub)。