Classi開発者ブログ

教育プラットフォーム「Classi」を開発・運営するClassi株式会社の開発者ブログです。

実践OpenTelemetry

こんにちは・こんばんは・おはようございます、エンジニアのid:aerealです。

この記事では筆者が開発に参加しているサービスの監視フレームワークをOpenTelemetryへ移行した際の体験を紹介します。

OpenTelemetryとは

OpenTelemetry is an Observability framework and toolkit designed to create and manage telemetry data such as traces, metrics, and logs.

What is OpenTelemetry?

サイトの説明にある通り分散トレースやメトリクス、ログなどの指標を扱う監視フレームワークです。

OpenTracingやOpenCensusなどを継承・統合したプロジェクトと言うと合点がいく方も多いのではないでしょうか。

OpenTelemetryは、指標や属性などの意味論やプロトコルを定めた仕様、それら仕様を実現する各言語の実装、そしてプロトコルに従いデータを処理・送受信するCollectorの計3つからおおまかに成り立っています。

アプリケーションにSDKを組み込み、サイドカーもしくはゲートウェイとして配置したCollectorに対し各指標を送り、Collectorは設定で記述された通りに指標を加工し外部サービスへ送信するという流れになっています。

アプリケーションはSDKを介してOpenTelemetryプロトコルでCollectorに送信することだけに関心を持てばよく、その指標がどのように外部サービスへ送信されるかはCollectorへ任せられるという点が各ベンダのSDKをそれぞれ利用する場合に対する利点です。

またCollectorはバッファリングだけではなく、属性 (attributes) と呼ばれる指標のメタデータを追加・加工・削除する仕組みや、属性をもとに送信先を変えたり、送信された指標を集積してカウンタ値を生成するといった高度な仕組みも備えています。

Collectorは公式に提供されるリリースビルドのほか、組み込むコンポーネントをカスタマイズしてビルドする方法が提供されており、組織内部で共通して利用したいコンポーネントの組み込みが比較的簡単になっています。 この点に関してもまた機会を改めて紹介したいと思います。

Classiで採用した理由

当社で新規開発したサービスはAWSのマネージドサービスを積極的に取り入れており、コードを書いて構築するアプリケーションとマネージドサービスの境界は曖昧になってきているくらい対等な存在になっています。

refs. dron: クラウドネイティブなcron代替の紹介 - Classi開発者ブログ

そんな複数のコンポーネントからなるアプリケーションを監視する上で分散トレーシングは欠かせません。

Classiでは監視ソリューションとしてDatadogを導入・活用しています:

分散トレーシングもDatadog APMを活用しています。実際、先の記事で紹介したdronをはじめとしたすべてのClassiのサービスではDatadogのSDKを用いてAPMへトレースを送っていました。

一方、Step FunctionsなどのAWSのマネージドサービスは分散トレーシングに対応していますが、トレースの収集先はAWS X-Rayのみです。 トレースをDatadog APMのみに送っているとX-Rayのみに存在するマネージドサービスのトレースと断絶してしまいます。

仕方がなくアプリケーションはAWS X-Ray SDKとDatadog APM SDKの 両方 を導入し、X-RayとDatadog APMの両方へトレースを送ることにしていました。 しかし、トレースのメタデータ追加などあらゆる実装が二度手間になり、実装の抜け漏れも起きやすくなります。

実際、アプリケーションの不調を調べるためにAPMを見たらDatadog APMにはほしいメタデータを送っていなかったということが起き、ほとほと嫌気が差したので二重の計装をやめるべく調査をはじめ、OpenTelemetryに行き着きました。

なおX-Rayへ統一するという選択肢は、アラートの設定など監視全般をDatadogに寄せる方針であったこと・それを差し引いてもDatadogのほうが使い勝手が良かったことから、検討していません。

アプリケーションへの導入

監視SDKとしてOpenTelemetryへ移行することに決めたあとにやることは至って単純で、ドキュメントに従いライブラリを導入し、OpenTelemetry Collectorをサイドカーに追加し、動作確認をします。

移行中の基本的な動作確認する際は、実際にX-RayやDatadogに送るよりもローカルで起動したZipkinを送信先に設定したCollectorを動かすと素早くトライアンドエラーを繰り返すようになりとても重宝しました。

もちろん各exporterの変換処理が挟まること・それら変換が設定によって異なることから、最終的には実際にX-RayやDatadogに送った上で確認すべきですが、たとえばそもそも収集したいスパンが送られているかとか、きちんと属性を追加できているかとか、そういった基本的な確認はZipkinでも十分です。

属性の管理

OpenTelemetryは属性 (attributes) と呼ばれるメタデータがトレースやメトリクスなどの各指標に紐付けられます。

これらはX-Rayではアノテーション・Datadogではタグにそれぞれ変換されます。

属性は指標の情報量を増やし監視を大いに助けてくれますが、アプリケーション全体で統一しておかないといざという時に使えません。

そこで属性を設定ファイルで管理し、属性を注入する実装は設定ファイルから生成した関数だけが行えるようにすることで、アプリケーション全体で使われている属性を統一することにしました。

以下に実際の設定ファイルの例を紹介します:

属性の設定ファイル (telemetry.yml) 例

---
# yaml-language-server: $schema=./telemetry.schema.json
attributes:
  db_migration.is_dirty: { type: bool }
  db_migration.step: { type: int }
  git.commit.sha:
    type: string
    description: |
      現在動いているコミットハッシュを表す属性名。
      Datadogのactive commit linkingに使う。

      refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
  git.repository_url:
    type: string
    description: |
      サービスのソースコードがホストされているリポジトリを指す属性値。
      Datadogのactive commit linkingに使う。

      refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
measurements:
  test_import_count: { type: Int64Gauge }

またスキーマは以下の通りです:

telemetry.schema.json

{
  "type": "object",
  "properties": {
    "attributes": {
      "$ref": "#/definitions/attributes"
    },
    "measurements": {
      "$ref": "#/definitions/measurements"
    }
  },
  "definitions": {
    "deprecationDescription": {
      "type": "object",
      "properties": {
        "reason": {
          "type": "string"
        }
      },
      "required": [
        "reason"
      ]
    },
    "attributeValueType": {
      "enum": [
        "string",
        "bool",
        "int",
        "int64"
      ]
    },
    "attributeDefinition": {
      "type": "object",
      "properties": {
        "type": {
          "$ref": "#/definitions/attributeValueType"
        },
        "deprecated": {
          "$ref": "#/definitions/deprecationDescription"
        },
        "description": {
          "type": "string"
        }
      },
      "required": [
        "type"
      ]
    },
    "attributes": {
      "type": "object",
      "patternProperties": {
        ".+": {
          "$ref": "#/definitions/attributeDefinition"
        }
      }
    },
    "measurementType": {
      "enum": [
        "Float64Counter",
        "Float64Gauge",
        "Float64Histogram",
        "Float64UpDownCounter",
        "Int64Counter",
        "Int64Gauge",
        "Int64Histogram",
        "Int64UpDownCounter"
      ]
    },
    "measurementDefinition": {
      "type": "object",
      "properties": {
        "type": {
          "$ref": "#/definitions/measurementType"
        }
      }
    },
    "measurements": {
      "type": "object",
      "patternProperties": {
        ".+": {
          "$ref": "#/definitions/measurementDefinition"
        }
      }
    }
  }
}

  • attributes.$name.type: 属性の型。stringやintなどOpenTelemetryの仕様に従う
  • attributes.$name.description: 属性の説明。省略可能で、生成されたコードのドキュメントにも含まれる

VS Codeにredhat.vscode-yamlを入れておくと、ローカルに配置したJSON Schemaを読み込んでフィールドの補完を行ってくれます。

以下に上記ファイルから生成されたGoのコードを載せます:

生成されたGoのコード

// KeyDbMigrationIsDirty is an attribute key that means "db_migration.is_dirty".
var KeyDbMigrationIsDirty = attribute.Key("db_migration.is_dirty")

// AttrDbMigrationIsDirty returns a new attribute that named "db_migration.is_dirty".
func AttrDbMigrationIsDirty(v ...bool) attribute.KeyValue {
    switch {
    case len(v) == 0:
        return emptyKeyValue
    case len(v) == 1:
        return KeyDbMigrationIsDirty.Bool(v[0])
    default:
        return KeyDbMigrationIsDirty.BoolSlice(v)
    }
}

// KeyDbMigrationStep is an attribute key that means "db_migration.step".
var KeyDbMigrationStep = attribute.Key("db_migration.step")

// AttrDbMigrationStep returns a new attribute that named "db_migration.step".
func AttrDbMigrationStep(v ...int) attribute.KeyValue {
    switch {
    case len(v) == 0:
        return emptyKeyValue
    case len(v) == 1:
        return KeyDbMigrationStep.Int(v[0])
    default:
        return KeyDbMigrationStep.IntSlice(v)
    }
}

// KeyGitCommitSha is an attribute key that means "git.commit.sha".
//
// 現在動いているコミットハッシュを表す属性名。
// Datadogのactive commit linkingに使う。
//
// refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
var KeyGitCommitSha = attribute.Key("git.commit.sha")

// AttrGitCommitSha returns a new attribute that named "git.commit.sha".
//
// 現在動いているコミットハッシュを表す属性名。
// Datadogのactive commit linkingに使う。
//
// refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
func AttrGitCommitSha(v ...string) attribute.KeyValue {
    switch {
    case len(v) == 0:
        return emptyKeyValue
    case len(v) == 1:
        return KeyGitCommitSha.String(v[0])
    default:
        return KeyGitCommitSha.StringSlice(v)
    }
}

// KeyGitRepositoryUrl is an attribute key that means "git.repository_url".
//
// サービスのソースコードがホストされているリポジトリを指す属性値。
// Datadogのactive commit linkingに使う。
//
// refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
var KeyGitRepositoryUrl = attribute.Key("git.repository_url")

// AttrGitRepositoryUrl returns a new attribute that named "git.repository_url".
//
// サービスのソースコードがホストされているリポジトリを指す属性値。
// Datadogのactive commit linkingに使う。
//
// refs. https://docs.datadoghq.com/integrations/guide/source-code-integration/?tab=dockerruntime#tag-your-telemetry
func AttrGitRepositoryUrl(v ...string) attribute.KeyValue {
    switch {
    case len(v) == 0:
        return emptyKeyValue
    case len(v) == 1:
        return KeyGitRepositoryUrl.String(v[0])
    default:
        return KeyGitRepositoryUrl.StringSlice(v)
    }
}

ご覧の通り設定ファイルに記述した description が関数のコメントに含まれています。

最後に以下のようにアプリケーション内の任意のファイルから go.opentelemetry.io/otel/attribute を用いて設定ファイルで管理されていない属性の追加ができないようgolangci-lintを設定します:

linters:
  disable-all: true
  enable:
    - forbidigo
linters-settings:
  forbidigo:
    forbid:
      - "^attribute[.]"

厳密には attribute. で始まる式が違反となるもので想定よりファジーですが実用上困っていないのでこのままとします。

最後にコード生成に使ったスクリプトを以下に示します:

generate-signals/main.go

package main

import (
    "bytes"
    _ "embed"
    "errors"
    "flag"
    "fmt"
    "go/format"
    "os"
    "path/filepath"
    "sort"
    "strconv"
    "strings"
    "text/template"
    "unicode"

    "gopkg.in/yaml.v3"
)

var (
    //go:embed src.gotpl
    tmplBody string

    pkg        string
    typeName   string
    doFormat   bool
    configPath string
    outPath    string

    errPkgRequired = errors.New("-pkg is required")
    errOutRequired = errors.New("-out is required")
)

func main() {
    if err := run(); err != nil {
        fmt.Fprintf(os.Stderr, "%+v\n", err)
        os.Exit(1)
    }
}

func run() error {
    flag.Usage = func() {
        fmt.Fprintf(os.Stderr, "Usage of %s:\n", filepath.Base(os.Args[0]))
        flag.PrintDefaults()
    }
    flag.Parse()
    if pkg == "" {
        return errPkgRequired
    }
    if outPath == "" {
        return errOutRequired
    }
    parsed, err := template.New("src.gotpl").
        Funcs(template.FuncMap{"quote": strconv.Quote, "prefixLines": prefixLines}).
        Parse(tmplBody)
    if err != nil {
        return fmt.Errorf("template.Parse: %w", err)
    }
    cfg, err := readConfig(configPath)
    if err != nil {
        return fmt.Errorf("readConfig: %w", err)
    }
    attrs := make([]*attribute, 0, len(cfg.Attributes))
    for name, def := range cfg.Attributes {
        def := def
        a := &attribute{Name: name, Type: def.Type, Description: def.Description}
        if def.Deprecation != nil {
            a.DeprecationReason = def.Deprecation.Reason
        }
        attrs = append(attrs, a)
    }
    measurements := make([]*measurement, 0, len(cfg.Measurements))
    for name, def := range cfg.Measurements {
        def := def
        measurements = append(measurements, &measurement{Name: name, DataType: def.DataType})
    }
    sort.Slice(attrs, func(i, j int) bool { return attrs[i].Name < attrs[j].Name })
    sort.Slice(measurements, func(i, j int) bool { return measurements[i].Name < measurements[j].Name })
    out := new(bytes.Buffer)
    data := struct {
        Package       string
        Attributes    []*attribute
        Measurements  []*measurement
        TypeName      string
        SingletonName string
    }{
        Package:       pkg,
        Attributes:    attrs,
        Measurements:  measurements,
        TypeName:      typeName,
        SingletonName: "Measurements",
    }
    if err := parsed.Execute(out, data); err != nil {
        return fmt.Errorf("template.Execute: %w", err)
    }
    body := out.Bytes()
    if doFormat {
        var err error
        body, err = format.Source(body)
        if err != nil {
            return fmt.Errorf("format.Source: %w", err)
        }
    }
    if err := os.WriteFile(outPath, body, 0600); err != nil {
        return fmt.Errorf("os.WriteFile(%s): %w", outPath, err)
    }
    return nil
}

type attribute struct {
    Name              string
    Type              string
    DeprecationReason string
    Description       string
}

func (a *attribute) inputType() (elmType string, isSlice bool) {
    t, ok := strings.CutPrefix(a.Type, "[]")
    return t, ok
}

func (a *attribute) ElemType() string {
    et, _ := a.inputType()
    return et
}

func (a *attribute) AttributeConstructor() string {
    cn, _ := a.inputType()
    return strings.ToUpper(string(cn[0])) + cn[1:]
}

func (a *attribute) AttributeSliceConstructor() string {
    return a.AttributeConstructor() + "Slice"
}

func (a *attribute) GoType() string {
    return a.Type
}

func (a *attribute) Identifier() string {
    b := new(strings.Builder)
    shouldUpNextRune := true
    for _, r := range a.Name {
        switch {
        case shouldUpNextRune:
            b.WriteRune(unicode.ToUpper(r))
            shouldUpNextRune = false
        case r == '_' || r == '.':
            shouldUpNextRune = true
        default:
            b.WriteRune(r)
        }
    }
    return b.String()
}

type measurement struct {
    Name     string
    DataType string
}

func (m *measurement) QualifiedName() string {
    return "app." + m.Name
}

func (m *measurement) FieldName() string {
    b := new(strings.Builder)
    shouldUpNextRune := true
    for _, r := range m.Name {
        switch {
        case shouldUpNextRune:
            b.WriteRune(unicode.ToUpper(r))
            shouldUpNextRune = false
        case r == '_':
            shouldUpNextRune = true
        default:
            b.WriteRune(r)
        }
    }
    return b.String()
}

func (m *measurement) SDKReturnType() string {
    switch m.DataType {
    case "Int64Gauge":
        return "Int64ObservableGauge"
    case "Float64Gauge":
        return "Float64ObservableGauge"
    default:
        return m.DataType
    }
}

type deprecationDescription struct {
    Reason string `yaml:"reason"`
}

type attributeDefinition struct {
    Type        string                  `yaml:"type"`
    Deprecation *deprecationDescription `yaml:"deprecated"`
    Description string                  `yaml:"description"`
}

type measurementDefinition struct {
    DataType string `yaml:"type"`
}

type telemetryConfig struct {
    Attributes   map[string]*attributeDefinition   `yaml:"attributes"`
    Measurements map[string]*measurementDefinition `yaml:"measurements"`
}

func readConfig(cfgPath string) (*telemetryConfig, error) {
    f, err := os.Open(cfgPath)
    if err != nil {
        return nil, err
    }
    defer f.Close()
    cfg := new(telemetryConfig)
    if err := yaml.NewDecoder(f).Decode(cfg); err != nil {
        return nil, err
    }
    return cfg, nil
}

func prefixLines(prefix, s string) string {
    return prefix + strings.ReplaceAll(s, "\n", "\n"+prefix)
}

func init() {
    flag.StringVar(&pkg, "package", "observability", "package name")
    flag.StringVar(&typeName, "type", "MeasurementDefinition", "type name")
    flag.StringVar(&configPath, "config", "etc/telemetry.yml", "config file path")
    flag.BoolVar(&doFormat, "format", true, "whether do format")
    flag.StringVar(&outPath, "out", "", "output file path")
}

src.gotpl

package {{ .Package }}

// Code generated by tools/gen-telemetry-signal/main.go; DO NOT EDIT.

import (
  "go.opentelemetry.io/otel/attribute"
  "go.opentelemetry.io/otel/metric"
)

var emptyKeyValue attribute.KeyValue

{{ range $attr := .Attributes }}
// Key{{ $attr.Identifier }} is an attribute key that means {{ $attr.Name | quote }}.
{{- with $attr.Description }}
//
{{ . | prefixLines "// " }}{{ end }}
{{- with $attr.DeprecationReason }}
//
{{ (. | printf "Deprecated: %s") | prefixLines "// " }}{{ end }}
var Key{{ $attr.Identifier }} = attribute.Key({{ $attr.Name | quote }})

// Attr{{ $attr.Identifier }} returns a new attribute that named {{ $attr.Name | quote }}.
{{- with $attr.Description }}
//
{{ . | prefixLines "// " }}{{ end }}
{{- with $attr.DeprecationReason }}
//
{{ (. | printf "Deprecated: %s") | prefixLines "// " }}{{ end }}
func Attr{{ $attr.Identifier }}(v ...{{ $attr.ElemType }}) attribute.KeyValue {
  switch {
  case len(v) == 0:
    return emptyKeyValue
  case len(v) == 1:
    return Key{{ $attr.Identifier }}.{{ $attr.AttributeConstructor }}(v[0])
  default:
    return Key{{ $attr.Identifier }}.{{ $attr.AttributeSliceConstructor }}(v)
  }
}
{{ end -}}

// measurements

func {{ .SingletonName }}() {{ .TypeName }} {
  return __singleton_{{ .SingletonName }}
}

var __singleton_{{ .SingletonName }} = {{ .TypeName }}{
  {{- range $measurement := .Measurements -}}
    {{ $measurement.FieldName }}: {{ $measurement.QualifiedName | quote }},
  {{- end -}}
}

type {{ .TypeName }} struct {
  {{ range $measurement := .Measurements }}
    {{ $measurement.FieldName }} string
  {{- end -}}
}

{{ range $measurement := .Measurements }}
func Measure{{ $measurement.FieldName }}(meter metric.Meter) (metric.{{ $measurement.SDKReturnType }}, error) {
  return meter.{{ $measurement.SDKReturnType }}({{ $measurement.QualifiedName | quote }})
}
{{ end -}}

むすび

AWS X-RayおよびDatadog APMへトレースを送る仕組みとしてOpenTelemetryを採用・移行した経緯と具体的なエピソードを紹介しました。

他にもOpenTelemetryを活用する上で様々な取り組みをしているので今後も紹介したいと思います。

© 2020 Classi Corp.