以前の記事では、Prometheusでサービスディスカバリを行うためのさまざまな新しい方法を紹介しました。それ以来、多くのことが起こりました。内部実装を改善し、コミュニティから素晴らしい貢献を受け、KubernetesとMarathonでのサービスディスカバリのサポートを追加しました。これらはバージョン0.16のリリースで利用可能になります。
また、カスタムサービスディスカバリについても触れました。
すべてのタイプのサービスディスカバリが、Prometheusに直接含めることができるほど汎用的であるとは限りません。あなたの組織が独自のシステムを導入していて、Prometheusで動作させる必要がある可能性があります。これは、新しい監視対象を自動的に検出するというメリットを享受できないことを意味するものではありません。
この記事では、高整合性分散キーバリューストアであるetcdに基づくカスタムサービスディスカバリアプローチをPrometheusに接続する小さなユーティリティプログラムを実装します。
etcdとPrometheusのターゲット
架空のサービスディスカバリシステムは、サービスとそのインスタンスを明確に定義されたキースキーマの下に格納します。
/services/<service_name>/<instance_id> = <instance_address>
Prometheusは、既存のすべてのサービスのターゲットが追加または削除されると、自動的にそれらを追加および削除する必要があります。Prometheusのファイルベースのサービスディスカバリと統合できます。これは、ターゲットをJSON形式のターゲットグループのリストとして記述する一連のファイルを監視します。
単一のターゲットグループは、一連のラベルに関連付けられたアドレスのリストで構成されます。これらのラベルは、それらのターゲットから取得されたすべての時系列に添付されます。etcdのサービスディスカバリから抽出された1つのターゲットグループの例を次に示します。
{
"targets": ["10.0.33.1:54423", "10.0.34.12:32535"],
"labels": {
"job": "node_exporter"
}
}
プログラム
必要なのは、etcdクラスタに接続し、/services
パスにあるすべてのサービスのルックアップを実行し、それらをターゲットグループのファイルに書き出す小さなプログラムです。
いくつかの準備から始めましょう。ツールには2つのフラグがあります。接続するetcdサーバーと、ターゲットグループが書き込まれるファイルです。内部的には、サービスはサービス名からインスタンスへのマップとして表されます。インスタンスは、etcdパス内のインスタンス識別子からそのアドレスへのマップです。
const servicesPrefix = "/services"
type (
instances map[string]string
services map[string]instances
)
var (
etcdServer = flag.String("server", "http://127.0.0.1:4001", "etcd server to connect to")
targetFile = flag.String("target-file", "tgroups.json", "the file that contains the target groups")
)
main
関数はフラグを解析し、現在のサービスを保持するオブジェクトを初期化します。次に、etcdサーバーに接続し、/services
パスの再帰的な読み取りを行います。結果として、指定されたパスのサブツリーを受け取り、srvs.handle
を呼び出します。これは、サブツリー内の各ノードに対してsrvs.update
メソッドを再帰的に実行します。 update
メソッドは、srvs
オブジェクトの状態をetcdのサブツリーの状態と一致するように変更します。最後に、srvs.persist
を呼び出します。これは、srvs
オブジェクトをターゲットグループのリストに変換し、-target-file
フラグで指定されたファイルに書き出します。
func main() {
flag.Parse()
var (
client = etcd.NewClient([]string{*etcdServer})
srvs = services{}
)
// Retrieve the subtree of the /services path.
res, err := client.Get(servicesPrefix, false, true)
if err != nil {
log.Fatalf("Error on initial retrieval: %s", err)
}
srvs.handle(res.Node, srvs.update)
srvs.persist()
}
これを動作する実装として考えてみましょう。このツールを30秒ごとに実行して、サービスディスカバリの現在のターゲットのほぼ正確なビューを取得できます。
しかし、もっとうまくできるでしょうか?
答えは*はい*です。etcdはウォッチを提供します。これにより、任意のパスとそのサブパスの更新をリッスンできます。これにより、変更がすぐに通知され、すぐに適用できます。また、/services
サブツリー全体を何度も処理する必要はありません。これは、多数のサービスとインスタンスにとって重要になる可能性があります。
main
関数を次のように拡張します。
func main() {
// ...
updates := make(chan *etcd.Response)
// Start recursively watching for updates.
go func() {
_, err := client.Watch(servicesPrefix, 0, true, updates, nil)
if err != nil {
log.Errorln(err)
}
}()
// Apply updates sent on the channel.
for res := range updates {
log.Infoln(res.Action, res.Node.Key, res.Node.Value)
handler := srvs.update
if res.Action == "delete" {
handler = srvs.delete
}
srvs.handle(res.Node, handler)
srvs.persist()
}
}
/services
のエントリの変更を再帰的に監視するゴルーチンを開始します。これは永遠にブロックし、すべての変更をupdates
チャネルに送信します。次に、チャネルから更新を読み取り、前と同じように適用します。ただし、インスタンスまたはサービス全体が消えた場合は、srvs.delete
メソッドを使用してsrvs.handle
を呼び出します。
Prometheusが監視しているファイルに変更を書き出すために、srvs.persist
を再度呼び出して各更新を終了します。
変更メソッド
ここまでは順調です - 概念的にはこれは機能します。残っているのは、update
およびdelete
ハンドラーメソッドと、persist
メソッドです。
パスが有効であれば、サブツリー内の各ノードに対してそれらを呼び出すだけのhandle
メソッドによって、update
とdelete
が呼び出されます。
var pathPat = regexp.MustCompile(`/services/([^/]+)(?:/(\d+))?`)
func (srvs services) handle(node *etcd.Node, handler func(*etcd.Node)) {
if pathPat.MatchString(node.Key) {
handler(node)
} else {
log.Warnf("unhandled key %q", node.Key)
}
if node.Dir {
for _, n := range node.Nodes {
srvs.handle(n, handler)
}
}
}
update
updateメソッドは、etcdで更新されたノードに基づいて、services
オブジェクトの状態を変更します。
func (srvs services) update(node *etcd.Node) {
match := pathPat.FindStringSubmatch(node.Key)
// Creating a new job directory does not require any action.
if match[2] == "" {
return
}
srv := match[1]
instanceID := match[2]
// We received an update for an instance.
insts, ok := srvs[srv]
if !ok {
insts = instances{}
srvs[srv] = insts
}
insts[instanceID] = node.Value
}
delete
deleteメソッドは、etcdからどのノードが削除されたかに応じて、インスタンスまたはジョブ全体をservices
オブジェクトから削除します。
func (srvs services) delete(node *etcd.Node) {
match := pathPat.FindStringSubmatch(node.Key)
srv := match[1]
instanceID := match[2]
// Deletion of an entire service.
if instanceID == "" {
delete(srvs, srv)
return
}
// Delete a single instance from the service.
delete(srvs[srv], instanceID)
}
persist
persistメソッドは、services
オブジェクトの状態をTargetGroup
のリストに変換します。次に、このリストをJSON形式で-target-file
に書き込みます。
type TargetGroup struct {
Targets []string `json:"targets,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
func (srvs services) persist() {
var tgroups []*TargetGroup
// Write files for current services.
for job, instances := range srvs {
var targets []string
for _, addr := range instances {
targets = append(targets, addr)
}
tgroups = append(tgroups, &TargetGroup{
Targets: targets,
Labels: map[string]string{"job": job},
})
}
content, err := json.Marshal(tgroups)
if err != nil {
log.Errorln(err)
return
}
f, err := create(*targetFile)
if err != nil {
log.Errorln(err)
return
}
defer f.Close()
if _, err := f.Write(content); err != nil {
log.Errorln(err)
}
}
実際に運用する
すべて完了しました。では、どのように実行するのでしょうか?
設定された出力ファイルを使用してツールを起動するだけです。
./etcd_sd -target-file /etc/prometheus/tgroups.json
次に、同じファイルを使用して、ファイルベースのサービスディスカバリでPrometheusを設定します。最も単純な構成は次のようになります。
scrape_configs:
- job_name: 'default' # Will be overwritten by job label of target groups.
file_sd_configs:
- names: ['/etc/prometheus/tgroups.json']
これで完了です。これで、Prometheusは、etcdでサービスディスカバリに出入りするサービスとそのインスタンスと同期を維持します。
結論
Prometheusが組織のサービスディスカバリをネイティブにサポートしていない場合でも、落胆しないでください。小さなユーティリティプログラムを使用すると、ギャップを簡単に埋めることができ、監視対象のシームレスな更新から利益を得ることができます。したがって、デプロイメント方程式から監視構成の変更を削除できます。
KubernetesとMarathonのネイティブサポートを追加してくれた貢献者であるJimmy DysonとRobert Jacobに感謝します。また、Keegan C Smithによるファイルに基づくEC2サービスディスカバリについてもご覧ください。
このブログ記事の完全なソースコードはGitHubにあります。