Classi開発者ブログ

教育プラットフォーム「Classi」を開発・運営するClassi株式会社の開発者ブログです。

Cloud Composer 2へのupgradeでどハマりした話

この記事は Classi developers Advent Calendar 2021 の10日目の記事です。

こんにちは、データプラットフォームチームの滑川(@tomoyanamekawa)です。
Google CloudのCloud Composerのversion2(Cloud Composer 2)がpreview公開され、Terraformでも10月末から作成可能になりました*1

「Cloud Composer 2ならworker数をautoscalingしてくれるらしい。そんなに設定変わらないだろうからサクッと移行しよう。」 くらいの軽い気持ちでCloud Composer 1からupgradeをしましたが、てこずってだいぶ時間を溶かしてしまいました。

そのハマったポイント4つとCloud Composer 2へupgrade完了した上での所感をまとめた記事です。

※追記:
2021年12月16日にgenerally available (GA)になりました。 https://cloud.google.com/composer/docs/release-notes#December_16_2021

前提

Cloud Composer 2について

  • そもそもCloud Composerとは

  • Cloud Composer 2の変更点

    • 裏側で動いているGKEがGKE Autopilotになった
      • 合わせて、各インスタンスがGCE外で管理されるようになった
    • Apache AirflowのWorker数をautoscalingしてくれるようになった
      • Cloud Composer 1の頃もGKEをいじれば出来たかもしれないが、推奨されていなかった
  • Cloud Composer 2の環境作成方法
    • Terraformの場合、既存のResourceのgoogle_composer_environmentで作成できる
    • 必要な引数がCloud Composer 1から変わっているため、Cloud Composer 1で使っていたものをそのまま使い回すことは出来ないが、何箇所か書き換えるだけ
  • その他
    • Airflow 2も1年前くらいに出たが、本記事での「2」はCloud Composerのversionなので別のもの
    • Cloud Composerの裏側のGKE立ち上げに25分程度かかるので、環境構築の試行錯誤が辛い
      • しかも失敗した時はtimeoutになるが、timeoutになるまで1時間以上かかる

ハマったポイント

Cloud Composer 1で動かしていたDAGをCloud Composer 2で同じように動作させるまでに詰まったポイントを4つ紹介します。

1. サービスアカウントの権限が足りないとPyPI Packagesの追加・更新ができない

起きたこと

環境自体は作成できるが、環境へのPyPI Packagesの追加・更新が出来ず、下記のようなエラーメッセージが出る。

UPDATE operation on this environment failed 5 minutes ago with the following error message:
Composer Backend timed out. Currently running tasks are [stage: CP_COMPOSER_AGENT_RUNNING
description: "Composer Agent Running. Latest Agent Stage: stage: DEPLOYMENTS_UPDATED\nunblock_gae_creation: false\ndescription: \"\"\n ."

原因

Cloud Composer 2では3種類のサービスアカウントが必要になるが、その権限が足りていなかったため。

  • Cloud Composer 1ではCloud Composerの環境に直接付与するサービスアカウントのみで十分だった
  • Cloud Composer 2ではさらにこの2つのサービスアカウントに権限が必要になる
    • Compute Engine default service account
      • ${project_number}-compute@developer.gserviceaccount.com
    • Cloud Composer Service Agent
      • service-${project_number}@cloudcomposer-accounts.iam.gserviceaccount.com
  • Compute Engine default service accountの権限が足りないと上記のエラーなどが発生する

対処方法

3つサービスアカウントに必要な権限付与する。
※ この例ではまだ権限調整しきれておらず大きめに権限付与しているので、このあとIAM Recommenderなどを使って権限調整する必要あり。

  • Cloud Composerの環境で利用するサービスアカウントに付与するRole
    • Composer Worker
    • Private Logs Viewer
    • Airflow内のTaskで必要になるIAM Role
  • Cloud Composer Service Agentに付与するRole
    • Cloud Composer API Service Agent
    • Cloud Composer v2 API Service Agent Extension
    • Environment and Storage Object Administrator
  • Compute Engine default service accountに付与するRole
    • ちょうど良いRoleが見つけられず、一時的に強めの権限を渡している
      • 必要な権限はこのように書いてある
      • Caution: Cloud Composer 2 uses the default Compute Engine service account to pull images for your environment from Container Registry. Make sure that this service account is present in your project and has enough permissions to perform this operation. https://cloud.google.com/composer/docs/composer-2/access-control

2. 全てのtaskがLog file is not foundで失敗する

起きたこと

  • DAGを追加して実行すると、下記のエラーで全taskがfailしている

      *** Log file is not found: gs://${DAG Bucket}/logs/{DAG名}/{task名}/${日付}/1.log. The task might not have been executed or worker executing it might have finished abnormally (e.g. was evicted)
      *** 404 GET https://storage.googleapis.com/download/storage/v1/b/${logへのpath}?alt=media: No such object: ${logへのpath}: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)
    
  • GCSを見に行くとlogが生成されていない
  • GKEを見に行くとworkerでこのエラーが発生していた
    • ログを書き込む処理(logging_mixin.py)でOSError: [Errno 107] Transport endpoint is not connected

原因

単純にスペックが足りていなかった。

対処方法

下記の2つの調整を行なった。

  • Cloud Composerのスペック上げ
    おそらくworkerのmemoryが足りていなかったと思うが、パフォーマンスも低かったので全体的にスペックをあげて対応した。
    公式ドキュメントはこちら。

    • Workloads configuration

      • Scheduler、Web server、Workerのスペックを設定する項目
    • Environment size

      • database等上記以外のスペックを設定する項目
    • ※Google CloudのConsoleから作成すると下記のようにEnvironment resourcesというまとめてスペック設定できる項目があるが、Terraformでは存在しない
      • Environment sizeと混同しがちなので注意
        Cloud Composer環境作成画面
  • AirflowのConfigで1つのworkerでの同時処理数を調整する

    • celery.worker_concurrencyがそのconfigパラメータ
    • Terraformの場合はairflow_config_overridesブロックで設定できる
    • 調整の目安はこちらのブログを参考

      celery.worker_concurrency
      Workerノード(1台あたり)のWorkerプロセス数を指定します。Workerが同時に処理できるタスク数を意味します。デフォルト値は16です。Workerノードの6-8 * core数または6-8 * memory/3.75GB程度が推奨値です。2coreであればデフォルトの設定でもおおむね問題なく、4core/16GBあたりのマシンであれば32くらいに設定します。

workerのspecを小さめにして、celery.worker_concurrencyも小さくしておくのが、autoscalingによる料金削減の恩恵を一番受けられるのでオススメです。

3. KubernetesPodOperatorが動かない

起きたこと

  • Cloud Composer 1の頃は動いていたKubernetesPodOperatorを使ったtaskでエラーが発生した
  • エラーはpods is forbidden

原因

  • Cloud Composer2ではGKEのautopilotを使っていて、workload identityを設定をして、明示的にPodでの権限設定する必要がある
    • Cloud Composer 1の時はなにもしなくてもCloud Composerで使っているサービスアカウントを使うことができた

対処方法

ここに書いてあるように

  • Kubernetes側で
    • namespaceの作成
    • 作成したnamespaceでKubernetes Service Account(KSA)の作成
  • Google Cloud側で
    • Google Cloud Service Account(GSA)の作成
    • 作成したGSAに必要な権限を付与
  • KSAとGSAをお互いにbinding(annotation)する

をすれば使う準備ができます。
あとはこのようにKubernetesPodOperatorでnamespaceとservice_account_nameを指定すればOKです。

...
task = KubernetesPodOperator(
    ...
    namespace="{作成したnamespace名}",
    service_account_name="{作成したKSA名}",
    ...
)

注意点

この作業の中で私が引っかかったポイントも紹介しておきます。
GSAからKSAへのiamのbindingをするときにTerraformを使う場合は注意が必要です。
Cloud Composerで使っているサービスアカウントをKubernetesPodOperatorでも使おうとして、下記のようにするとAirflowが動かなくなります。

resource "google_service_account_iam_binding" "pod_operator" {
  service_account_id = ${Google Cloud Service Account ID}
  role               = "roles/iam.workloadIdentityUser"
  members = ["serviceAccount:${var.project_id}.svc.id.goog[${var.K8S_NAMESPACE}/${var.KSA_NAME}]"]
}

なぜなら、実はすでにAirflowを動かすための他のKSAとのbindingが存在していて、それを上書きして消してしまうからです。

そのため本当はTerraformで状態をうまく管理したかったのですが、Cloud Composerの裏側のGKEはTerraformの管理外だったので諦めました。
そこで作業量も少ないし、実行頻度も高くないので下記のようなshell scriptを実行する形で一旦落ち着きました。

# Cloud Composerのpod operatorでGoogle Cloud Service Account使うための設定するやつ
# Cloud Composerの環境を作り替えた時に実行する必要がある
# GSAは作成済みの前提
# 参考: https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity#authenticating_to

# 該当の環境にkubectlできる状態にしてから実行する

PROJECT_ID="プロジェクト名"
GSA="Google Cloudのサービスアカウント名"

K8S_NAMESPACE="namespace"
KSA_NAME="K8Sのサービスアカウント名"

kubectl create namespace ${K8S_NAMESPACE}
kubectl create serviceaccount --namespace=${K8S_NAMESPACE} ${KSA_NAME}
kubectl annotate serviceaccount \
    --namespace ${K8S_NAMESPACE} ${KSA_NAME} \
    iam.gke.io/gcp-service-account=${GSA}

gcloud --project=${PROJECT_ID} iam service-accounts add-iam-policy-binding ${GSA} \
    --role roles/iam.workloadIdentityUser \
    --member "serviceAccount:${PROJECT_ID}.svc.id.goog[${K8S_NAMESPACE}/${KSA_NAME}]"

4. 外部からAirflowのAPIを使ってDAGをtriggerしていた処理が動かなくなった

  • もともとCloud Composer 1 時点で外部からDAGをtriggerするためのCloud Functionsを用意していた
  • そのためにAirflow Experimental Rest APIを使っていた

起きたこと

  • Cloud Functionsでエラー起きた

原因

  • upgradeによってそもそもAirflow Web serverのURLのフォーマットや認証方法が変わったため

対処方法

  • 公式ドキュメントに従ってAirflow stabel REST APIで実装しなおした
    • Experimental Rest APIは非推奨になっていたこともあり、この機会にAirflow stabel REST APIへ移行した
    • 参考: APIの仕様
  • 公式ドキュメントの例ではAirflow Web server URLを別途取得しているので、下記のような関数で自動で取ってくるようにした
def fetch_webserver_url(
    project_id: str, env_name: str, location: str = "asia-northeast1"
) -> str:
    composer_client = discovery.build("composer", "v1")
    request = (
        composer_client.projects()
        .locations()
        .environments()
        .get(name=f"projects/{project_id}/locations/{location}/environments/{env_name}")
    )
    response = request.execute()
    airflow_url = response.get("config").get("airflowUri")
    return airflow_url

upgradeした結果

以上のようなポイントに対処して、無事にCloud Composer 2へのupgradeが完了しました。 upgradeしての所感は下記です。

  • Cloud Composer2になったことで、Cloud Composer 1の時よりも裏側のGKEを意識しないといけない機会が多くなった

    • ハマったポイントで紹介した点や、そのためのエラー調査でCloud Composer側では出てこないログをGKEの中で探すことが増えた
    • 普段からGKEになれている場合は問題ないと思うが、そうではない人にはハードルが上がった印象
  • Cloud Composer 1の時よりも料金は安くなり、taskの負荷に対して柔軟になった

    • スペック調整してautoscalingの恩恵を受けられるようにした結果、ピーク時以外のworker数が抑えられた
  • GCEを管理しなくて良くなったので、インスタンスの管理コストが下がった

    • Cloud Composer 1の時はGKE内の各インスタンスがGCEで管理されていて、OS Loginの対応など気にする必要があった
    • しかもGKEのnode pool versionによってはそもそもOS Loginに対応していなかった

総じてupgradeして良かったですが、まだpreviewでGAになっていないことやドキュメントがまだ少ないことに比べるとupgradeのメリットは大きくないので、大規模に利用していてよほど料金が高い人以外はまだ様子見をしてもいいかなと思いました。

※ちなみに公式ドキュメントはCloud Composerに限らず、Englishのページの方が更新が早いので、新機能を試すときや細かい設定をするときはEnglishのページを見ることを推奨します。

*1:v3.90.0からnode_configを含めたCloud Composer 2をTerraformで作成できるようになった

composer: removed config.node_config.zone requirement on google_composer_environment (#10353) https://github.com/hashicorp/terraform-provider-google/releases/tag/v3.90.0

© 2020 Classi Corp.