こんにちは・こんばんは・おはようございます、エンジニアのid:aerealです。
この記事では筆者が開発に参加しているサービスの監視フレームワークをOpenTelemetryへ移行した際の体験を紹介します。
OpenTelemetryとは
OpenTelemetry is an Observability framework and toolkit designed to create and manage telemetry data such as traces, metrics, and logs.
What is OpenTelemetry?
サイトの説明にある通り分散トレースやメトリクス、ログなどの指標を扱う監視フレームワークです。
OpenTracingやOpenCensusなどを継承・統合したプロジェクトと言うと合点がいく方も多いのではないでしょうか。
OpenTelemetryは、指標や属性などの意味論やプロトコルを定めた仕様、それら仕様を実現する各言語の実装、そしてプロトコルに従いデータを処理・送受信するCollectorの計3つからおおまかに成り立っています。
アプリケーションにSDKを組み込み、サイドカーもしくはゲートウェイとして配置したCollectorに対し各指標を送り、Collectorは設定で記述された通りに指標を加工し外部サービスへ送信するという流れになっています。
アプリケーションはSDKを介してOpenTelemetryプロトコルでCollectorに送信することだけに関心を持てばよく、その指標がどのように外部サービスへ送信されるかはCollectorへ任せられるという点が各ベンダのSDKをそれぞれ利用する場合に対する利点です。
またCollectorはバッファリングだけではなく、属性 (attributes) と呼ばれる指標のメタデータを追加・加工・削除する仕組みや、属性をもとに送信先を変えたり、送信された指標を集積してカウンタ値を生成するといった高度な仕組みも備えています。
Collectorは公式に提供されるリリースビルドのほか、組み込むコンポーネントをカスタマイズしてビルドする方法が提供されており、組織内部で共通して利用したいコンポーネントの組み込みが比較的簡単になっています。
この点に関してもまた機会を改めて紹介したいと思います。
Classiで採用した理由
当社で新規開発したサービスはAWSのマネージドサービスを積極的に取り入れており、コードを書いて構築するアプリケーションとマネージドサービスの境界は曖昧になってきているくらい対等な存在になっています。
refs. dron: クラウドネイティブなcron代替の紹介 - Classi開発者ブログ
そんな複数のコンポーネントからなるアプリケーションを監視する上で分散トレーシングは欠かせません。
Classiでは監視ソリューションとしてDatadogを導入・活用しています:
分散トレーシングもDatadog APMを活用しています。実際、先の記事で紹介したdronをはじめとしたすべてのClassiのサービスではDatadogのSDKを用いてAPMへトレースを送っていました。
一方、Step FunctionsなどのAWSのマネージドサービスは分散トレーシングに対応していますが、トレースの収集先はAWS X-Rayのみです。
トレースをDatadog APMのみに送っているとX-Rayのみに存在するマネージドサービスのトレースと断絶してしまいます。
仕方がなくアプリケーションはAWS X-Ray SDKとDatadog APM SDKの 両方 を導入し、X-RayとDatadog APMの両方へトレースを送ることにしていました。
しかし、トレースのメタデータ追加などあらゆる実装が二度手間になり、実装の抜け漏れも起きやすくなります。
実際、アプリケーションの不調を調べるためにAPMを見たらDatadog APMにはほしいメタデータを送っていなかったということが起き、ほとほと嫌気が差したので二重の計装をやめるべく調査をはじめ、OpenTelemetryに行き着きました。
なおX-Rayへ統一するという選択肢は、アラートの設定など監視全般をDatadogに寄せる方針であったこと・それを差し引いてもDatadogのほうが使い勝手が良かったことから、検討していません。
アプリケーションへの導入
監視SDKとしてOpenTelemetryへ移行することに決めたあとにやることは至って単純で、ドキュメントに従いライブラリを導入し、OpenTelemetry Collectorをサイドカーに追加し、動作確認をします。
移行中の基本的な動作確認する際は、実際にX-RayやDatadogに送るよりもローカルで起動したZipkinを送信先に設定したCollectorを動かすと素早くトライアンドエラーを繰り返すようになりとても重宝しました。
もちろん各exporterの変換処理が挟まること・それら変換が設定によって異なることから、最終的には実際にX-RayやDatadogに送った上で確認すべきですが、たとえばそもそも収集したいスパンが送られているかとか、きちんと属性を追加できているかとか、そういった基本的な確認はZipkinでも十分です。
属性の管理
OpenTelemetryは属性 (attributes) と呼ばれるメタデータがトレースやメトリクスなどの各指標に紐付けられます。
これらはX-Rayではアノテーション・Datadogではタグにそれぞれ変換されます。
属性は指標の情報量を増やし監視を大いに助けてくれますが、アプリケーション全体で統一しておかないといざという時に使えません。
そこで属性を設定ファイルで管理し、属性を注入する実装は設定ファイルから生成した関数だけが行えるようにすることで、アプリケーション全体で使われている属性を統一することにしました。
以下に実際の設定ファイルの例を紹介します:
属性の設定ファイル (telemetry.yml) 例
---
attributes:
db_migration.is_dirty: { type: bool }
db_migration.step: { type: int }
git.commit.sha:
type: string
description: |
現在動いているコミットハッシュを表す属性名。
Datadogのactive commit linkingに使う。
refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
git.repository_url:
type: string
description: |
サービスのソースコードがホストされているリポジトリを指す属性値。
Datadogのactive commit linkingに使う。
refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
measurements:
test_import_count: { type: Int64Gauge }
またスキーマは以下の通りです:
telemetry.schema.json
{
"type": "object",
"properties": {
"attributes": {
"$ref": "#/definitions/attributes"
},
"measurements": {
"$ref": "#/definitions/measurements"
}
},
"definitions": {
"deprecationDescription": {
"type": "object",
"properties": {
"reason": {
"type": "string"
}
},
"required": [
"reason"
]
},
"attributeValueType": {
"enum": [
"string",
"bool",
"int",
"int64"
]
},
"attributeDefinition": {
"type": "object",
"properties": {
"type": {
"$ref": "#/definitions/attributeValueType"
},
"deprecated": {
"$ref": "#/definitions/deprecationDescription"
},
"description": {
"type": "string"
}
},
"required": [
"type"
]
},
"attributes": {
"type": "object",
"patternProperties": {
".+": {
"$ref": "#/definitions/attributeDefinition"
}
}
},
"measurementType": {
"enum": [
"Float64Counter",
"Float64Gauge",
"Float64Histogram",
"Float64UpDownCounter",
"Int64Counter",
"Int64Gauge",
"Int64Histogram",
"Int64UpDownCounter"
]
},
"measurementDefinition": {
"type": "object",
"properties": {
"type": {
"$ref": "#/definitions/measurementType"
}
}
},
"measurements": {
"type": "object",
"patternProperties": {
".+": {
"$ref": "#/definitions/measurementDefinition"
}
}
}
}
}
attributes.$name.type
: 属性の型。stringやintなどOpenTelemetryの仕様に従う
attributes.$name.description
: 属性の説明。省略可能で、生成されたコードのドキュメントにも含まれる
VS Codeにredhat.vscode-yamlを入れておくと、ローカルに配置したJSON Schemaを読み込んでフィールドの補完を行ってくれます。
以下に上記ファイルから生成されたGoのコードを載せます:
生成されたGoのコード
var KeyDbMigrationIsDirty = attribute.Key("db_migration.is_dirty")
func AttrDbMigrationIsDirty(v ...bool) attribute.KeyValue {
switch {
case len(v) == 0:
return emptyKeyValue
case len(v) == 1:
return KeyDbMigrationIsDirty.Bool(v[0])
default:
return KeyDbMigrationIsDirty.BoolSlice(v)
}
}
var KeyDbMigrationStep = attribute.Key("db_migration.step")
func AttrDbMigrationStep(v ...int) attribute.KeyValue {
switch {
case len(v) == 0:
return emptyKeyValue
case len(v) == 1:
return KeyDbMigrationStep.Int(v[0])
default:
return KeyDbMigrationStep.IntSlice(v)
}
}
var KeyGitCommitSha = attribute.Key("git.commit.sha")
func AttrGitCommitSha(v ...string) attribute.KeyValue {
switch {
case len(v) == 0:
return emptyKeyValue
case len(v) == 1:
return KeyGitCommitSha.String(v[0])
default:
return KeyGitCommitSha.StringSlice(v)
}
}
var KeyGitRepositoryUrl = attribute.Key("git.repository_url")
func AttrGitRepositoryUrl(v ...string) attribute.KeyValue {
switch {
case len(v) == 0:
return emptyKeyValue
case len(v) == 1:
return KeyGitRepositoryUrl.String(v[0])
default:
return KeyGitRepositoryUrl.StringSlice(v)
}
}
ご覧の通り設定ファイルに記述した description
が関数のコメントに含まれています。
最後に以下のようにアプリケーション内の任意のファイルから go.opentelemetry.io/otel/attribute
を用いて設定ファイルで管理されていない属性の追加ができないようgolangci-lintを設定します:
linters:
disable-all: true
enable:
- forbidigo
linters-settings:
forbidigo:
forbid:
- "^attribute[.]"
厳密には attribute.
で始まる式が違反となるもので想定よりファジーですが実用上困っていないのでこのままとします。
最後にコード生成に使ったスクリプトを以下に示します:
generate-signals/main.go
package main
import (
"bytes"
_ "embed"
"errors"
"flag"
"fmt"
"go/format"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"text/template"
"unicode"
"gopkg.in/yaml.v3"
)
var (
tmplBody string
pkg string
typeName string
doFormat bool
configPath string
outPath string
errPkgRequired = errors.New("-pkg is required")
errOutRequired = errors.New("-out is required")
)
func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%+v\n", err)
os.Exit(1)
}
}
func run() error {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of %s:\n", filepath.Base(os.Args[0]))
flag.PrintDefaults()
}
flag.Parse()
if pkg == "" {
return errPkgRequired
}
if outPath == "" {
return errOutRequired
}
parsed, err := template.New("src.gotpl").
Funcs(template.FuncMap{"quote": strconv.Quote, "prefixLines": prefixLines}).
Parse(tmplBody)
if err != nil {
return fmt.Errorf("template.Parse: %w", err)
}
cfg, err := readConfig(configPath)
if err != nil {
return fmt.Errorf("readConfig: %w", err)
}
attrs := make([]*attribute, 0, len(cfg.Attributes))
for name, def := range cfg.Attributes {
def := def
a := &attribute{Name: name, Type: def.Type, Description: def.Description}
if def.Deprecation != nil {
a.DeprecationReason = def.Deprecation.Reason
}
attrs = append(attrs, a)
}
measurements := make([]*measurement, 0, len(cfg.Measurements))
for name, def := range cfg.Measurements {
def := def
measurements = append(measurements, &measurement{Name: name, DataType: def.DataType})
}
sort.Slice(attrs, func(i, j int) bool { return attrs[i].Name < attrs[j].Name })
sort.Slice(measurements, func(i, j int) bool { return measurements[i].Name < measurements[j].Name })
out := new(bytes.Buffer)
data := struct {
Package string
Attributes []*attribute
Measurements []*measurement
TypeName string
SingletonName string
}{
Package: pkg,
Attributes: attrs,
Measurements: measurements,
TypeName: typeName,
SingletonName: "Measurements",
}
if err := parsed.Execute(out, data); err != nil {
return fmt.Errorf("template.Execute: %w", err)
}
body := out.Bytes()
if doFormat {
var err error
body, err = format.Source(body)
if err != nil {
return fmt.Errorf("format.Source: %w", err)
}
}
if err := os.WriteFile(outPath, body, 0600); err != nil {
return fmt.Errorf("os.WriteFile(%s): %w", outPath, err)
}
return nil
}
type attribute struct {
Name string
Type string
DeprecationReason string
Description string
}
func (a *attribute) inputType() (elmType string, isSlice bool) {
t, ok := strings.CutPrefix(a.Type, "[]")
return t, ok
}
func (a *attribute) ElemType() string {
et, _ := a.inputType()
return et
}
func (a *attribute) AttributeConstructor() string {
cn, _ := a.inputType()
return strings.ToUpper(string(cn[0])) + cn[1:]
}
func (a *attribute) AttributeSliceConstructor() string {
return a.AttributeConstructor() + "Slice"
}
func (a *attribute) GoType() string {
return a.Type
}
func (a *attribute) Identifier() string {
b := new(strings.Builder)
shouldUpNextRune := true
for _, r := range a.Name {
switch {
case shouldUpNextRune:
b.WriteRune(unicode.ToUpper(r))
shouldUpNextRune = false
case r == '_' || r == '.':
shouldUpNextRune = true
default:
b.WriteRune(r)
}
}
return b.String()
}
type measurement struct {
Name string
DataType string
}
func (m *measurement) QualifiedName() string {
return "app." + m.Name
}
func (m *measurement) FieldName() string {
b := new(strings.Builder)
shouldUpNextRune := true
for _, r := range m.Name {
switch {
case shouldUpNextRune:
b.WriteRune(unicode.ToUpper(r))
shouldUpNextRune = false
case r == '_':
shouldUpNextRune = true
default:
b.WriteRune(r)
}
}
return b.String()
}
func (m *measurement) SDKReturnType() string {
switch m.DataType {
case "Int64Gauge":
return "Int64ObservableGauge"
case "Float64Gauge":
return "Float64ObservableGauge"
default:
return m.DataType
}
}
type deprecationDescription struct {
Reason string `yaml:"reason"`
}
type attributeDefinition struct {
Type string `yaml:"type"`
Deprecation *deprecationDescription `yaml:"deprecated"`
Description string `yaml:"description"`
}
type measurementDefinition struct {
DataType string `yaml:"type"`
}
type telemetryConfig struct {
Attributes map[string]*attributeDefinition `yaml:"attributes"`
Measurements map[string]*measurementDefinition `yaml:"measurements"`
}
func readConfig(cfgPath string) (*telemetryConfig, error) {
f, err := os.Open(cfgPath)
if err != nil {
return nil, err
}
defer f.Close()
cfg := new(telemetryConfig)
if err := yaml.NewDecoder(f).Decode(cfg); err != nil {
return nil, err
}
return cfg, nil
}
func prefixLines(prefix, s string) string {
return prefix + strings.ReplaceAll(s, "\n", "\n"+prefix)
}
func init() {
flag.StringVar(&pkg, "package", "observability", "package name")
flag.StringVar(&typeName, "type", "MeasurementDefinition", "type name")
flag.StringVar(&configPath, "config", "etc/telemetry.yml", "config file path")
flag.BoolVar(&doFormat, "format", true, "whether do format")
flag.StringVar(&outPath, "out", "", "output file path")
}
src.gotpl
package {{ .Package }}
// Code generated by tools/gen-telemetry-signal/main.go; DO NOT EDIT.
import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
var emptyKeyValue attribute.KeyValue
{{ range $attr := .Attributes }}
// Key{{ $attr.Identifier }} is an attribute key that means {{ $attr.Name | quote }}.
{{- with $attr.Description }}
//
{{ . | prefixLines "// " }}{{ end }}
{{- with $attr.DeprecationReason }}
//
{{ (. | printf "Deprecated: %s") | prefixLines "// " }}{{ end }}
var Key{{ $attr.Identifier }} = attribute.Key({{ $attr.Name | quote }})
// Attr{{ $attr.Identifier }} returns a new attribute that named {{ $attr.Name | quote }}.
{{- with $attr.Description }}
//
{{ . | prefixLines "// " }}{{ end }}
{{- with $attr.DeprecationReason }}
//
{{ (. | printf "Deprecated: %s") | prefixLines "// " }}{{ end }}
func Attr{{ $attr.Identifier }}(v ...{{ $attr.ElemType }}) attribute.KeyValue {
switch {
case len(v) == 0:
return emptyKeyValue
case len(v) == 1:
return Key{{ $attr.Identifier }}.{{ $attr.AttributeConstructor }}(v[0])
default:
return Key{{ $attr.Identifier }}.{{ $attr.AttributeSliceConstructor }}(v)
}
}
{{ end -}}
// measurements
func {{ .SingletonName }}() {{ .TypeName }} {
return __singleton_{{ .SingletonName }}
}
var __singleton_{{ .SingletonName }} = {{ .TypeName }}{
{{- range $measurement := .Measurements -}}
{{ $measurement.FieldName }}: {{ $measurement.QualifiedName | quote }},
{{- end -}}
}
type {{ .TypeName }} struct {
{{ range $measurement := .Measurements }}
{{ $measurement.FieldName }} string
{{- end -}}
}
{{ range $measurement := .Measurements }}
func Measure{{ $measurement.FieldName }}(meter metric.Meter) (metric.{{ $measurement.SDKReturnType }}, error) {
return meter.{{ $measurement.SDKReturnType }}({{ $measurement.QualifiedName | quote }})
}
{{ end -}}
むすび
AWS X-RayおよびDatadog APMへトレースを送る仕組みとしてOpenTelemetryを採用・移行した経緯と具体的なエピソードを紹介しました。
他にもOpenTelemetryを活用する上で様々な取り組みをしているので今後も紹介したいと思います。