Argo WorkflowsとGKEで構築するLLMを使った要約サービスの機械学習パイプライン

こんにちは。

AIチームの干飯(@hosimesi11_)です。今回はAI Shiftで取り組んでいる新規サービスであるAI Messenger Summaryの機械学習パイプラインと、Proof of Concept(PoC)から実際のプロダクトへと展開する過程についてご紹介します。

AI Messenger Summaryとは

AI Messenger Summary

AI Messenger Summaryはコールセンター事業における、会話内容の要約サービスになります(プレスリリース)。コールセンターでは、顧客との会話内容をまとめるアフターコールワークに多くの時間が割かれるという課題があります。この状況を解決するため、オペレーターと顧客の会話データや録音データを音声認識し、大規模言語モデル(LLM)で処理して要約し、導入企業に合わせたフォーマットで出力するようなサービスとなっています。要約の他、応対品質評価やラベリングなど複数のタスクに対応しています。

初期の課題感

AI Messenger Summaryでは、各企業ごとの要約が実現可能かどうかを判別するために、PoC(Proof of Concept)フェーズを設けていました。
初期ではGradioを使った一つのサーバ内で、音声認識や要約処理など全ての処理を同期的に行っていました。処理の例として以下のようなものがあります。

  1. 音声ファイルアップロード
  2. 前処理
  3. 音声認識
  4. 後処理
  5. 要約
Display of Gradio
初期のPoC用のGradioの画面

元々データサイエンティスト一人で環境を整備しており、PoC初期では十分な機能を果たせていましたが、導入企業が増えてくるにつれ徐々に課題感が出てきました。音声認識のパラメータの変更や、前処理の変更、複数のプロンプトの試行など個社ごとに試行錯誤する必要があります。また処理の特性上、一ファイルの一回の実行に数十分から数時間かかる場合もあります。PoCを行う企業が増えるにつれ、PoCの負担はさらに増すことが予想されました。
PoCの負担を下げつつ、素早くMVP(Minimum Viable Product)としてリリースできるようにするため、開発フェーズを以下の2つに定義し、段階的に達成するように進めることにしました。

Phase 1. PoCの負担削減と高速化
Phase 2. 実プロダクト実装

 

プロジェクトの進め方

Phase 1

Phase 1では、PoCの負担削減と高速化にだけ観点を置き、リッチな構成は取らないようにすることで実装コストを大幅に下げました。具体的には、要約処理などの画面に関係のない処理の中心をCloud Run Jobsに載せて処理を分離しました。これによって下記のことが可能になりました。

  1. 複数音声ファイルの処理を並列で同時実行
  2. 処理を分離し、画面から非同期で実行
  3. slackへの失敗通知

処理のリトライや細かい部分での並列実行にworkflowツールを導入したかったのですが、スピード感とPhase 2の実プロダクト実装を踏まえて、スコープ外としました。また今のUIであるGradioはリプレイスされることが予期されたため、手をつけずにジョブの分離に注力しました。

Old System Architecture
元のシステム構成図
Phase 1 System Architecture
Phase 1システム構成図

PoCの高速化をしつつ、この段階から実プロダクトの設計も開始しました。こうすることで、大部分の実装を本番時にそのまま再利用でき、実プロダクトのリリースまでの時間を短縮することができました。

Phase 2

Phase 2は実プロダクトの実装になります。実プロダクトでは各企業のCTI(Computer Telephony Integration)やCRM(Customer Relationship Management)と自動連携することになります。さらにPoCの処理も全てこのシステムに載ってくることが予期されました。本番運用とPoCの処理の共通化を考慮するとリトライ処理なども完備したかったため、ワークフローツールを導入しました。インフラ基盤とセットでワークフローツールをいくつか検討しました。

インフラ基盤とワークフローツールの選定

ここでは以下のような方針を採用することを避けました。

  • 流行っているという理由での技術選定
  • 必要以上にリッチな構成
  • ML独自のツールの採用

これらの方針を避ける理由は次の通りです。

  • プロダクト立ち上げなのでシンプルで管理しやすいシステムを目指したい
  • ML独自のツールでは運用もMLエンジニアが全て見る必要があり、ソフトウェアエンジニアと共同して運用できない

そのため、以下を選定の基準としました。

  • 達成したい目的を整理し、それを実現できる点
  • ソフトウェアエンジニアと共同で運用できる点

AI ShiftではクラウドサービスとしてGoogle Cloudを採用しており、ほとんどのアプリケーションがGKE上で動作しています。インフラ基盤の選定に関して、元々は素早く立ち上げるためにCloud Run Jobsを使用する予定でしたが、ワークフローツールとの兼ね合いや音声認識などをGPU上で動かしたい場面が想像できたため、既に動いているGKEを選定しました。他のものに比べて初期実装コストが少し高いですが、チームとしての運用の知見アセットがある点も含めて選定しました。

ワークフローツールの選定に関しては社内の別プロダクトの方にもヒアリングしつつ、柔軟性や既存システムとの相性、運用ノウハウの点からArgo Workflowsを選定しました。AI ShiftがArgo CDをCDツールとして使用していることも、ソフトウェアエンジニアと一緒に運用する上で大きな利点となりました。

上記を含めて、Phase 2(プロダクト)のインフラ構成は以下になります。

Phase 2 System Architecture
Phase 2構成図

機械学習パイプラインの構築

AI Shiftの本番環境のGKEの管理にはKustomizeが使用されているため、検証が終わり次第デプロイするまでの差分を減らすため、ローカルでもKustomizeを使用しました。本番ではCTIなどと自動連携し、PubSubへのパブリッシュをトリガーにワークフローを動かしたかったため、Argo Eventsも使用しています。

Argo Workflowsとは

Kubernetes 上で並列ジョブを調整するためのオープンソースのワークフローツールです。柔軟なDAGを組むことができます。

Argo Eventsとは

Kubernetes 用のイベント駆動型ワークフロー実行ツールで、Webhook・MQなど様々なツールに対応しています。Sensor、EventSource、EventBusの3つのコンポーネントからなります。

Argo Events System Architecture
イメージ図

Google Cloudリソースの作成

まずは、以下のようにTerraformで必要なGoogle Cloudのリソースを作成しています。

Argo Workflows


# External IP for Argo Workflows
resource "google_compute_global_address" "argo_workflows_external_ip" {
  name         = "argo-workflows-server-ip"
  address_type = "EXTERNAL"
  ip_version   = "IPV4"
  project      = var.project
}

# DNS Zone
data "google_dns_managed_zone" "dns_zone" {
  name    = "dns-zone"
  project = var.project
}

# DNS Record for Argo Workflows
resource "google_dns_record_set" "argo_workflows" {
  project      = var.dns_management_project
  name         = "sample.${data.google_dns_managed_zone.dns_zone.dns_name}"
  type         = "A"
  ttl          = 300
  managed_zone = data.google_dns_managed_zone.dns_zone.name
  rrdatas      = [google_compute_global_address.argo_workflows_external_ip.address]
  lifecycle {
    ignore_changes = [rrdatas]
  }
}

# Access Policy for Argo Workflows
resource "google_compute_security_policy" "argo_workflows_policy" {
  name = "${var.env}-argo-workflows-policy"

  rule {
    action   = "deny(403)"
    priority = "2147483647"
    match {
      versioned_expr = "SRC_IPS_V1"
      config {
        src_ip_ranges = ["*"]
      }
    }
    description = "Default"
  }

  rule {
    action   = "allow"
    priority = "1001"
    match {
      versioned_expr = "SRC_IPS_V1"
      config {
        src_ip_ranges = [ "xxx" ]
      }
    }
    description = "Allow IP Ranges"
  }
}

# History DB for Argo Workflows
resource "google_sql_database" "argo_workflows_db" {
  name      = "$argo_workflows_db"
  instance  = var.db_instance_name
  charset   = "utf8mb4"
  collation = "utf8mb4_general_ci"
  project   = var.project
}


# Service Account for Argo Workflows
resource "google_service_account" "argo_workflows_sa" {
  account_id   = "argo-workflows-sa"
  display_name = "argo workflows service account"
}

# Attach secret manager accessor to Service Account for Argo Workflows
resource "google_project_iam_member" "workflows_secret_manager_iam" {
  project = var.project
  role    = "roles/secretmanager.secretAccessor"
  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"
}

# Attach pub/sub publisher to Service Account for Argo Workflows
resource "google_project_iam_member" "workflows_pubsub_publisher_iam" {
  project = var.project
  role    = "roles/pubsub.publisher"
  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"
}

# Attach storage object admin to Service Account for Argo Workflows
resource "google_project_iam_member" "workflows_storage_iam" {
  project = var.project
  role    = "roles/storage.objectAdmin"
  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"
}

# Attach artifact registry writer to Service Account for Argo Workflows
resource "google_project_iam_member" "workflows_artifact_registry_iam" {
  project = var.project
  role    = "roles/artifactregistry.writer"
  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"
}

# Attach service account token creator to Service Account for Argo Workflows
resource "google_project_iam_member" "workflows_service_account_token_creator_iam" {
  project = var.project
  role    = "roles/iam.serviceAccountTokenCreator"
  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"
}

# Attach cloud sql client to Service Account for Argo Workflows
resource "google_project_iam_member" "workflows_cloudsql_client_iam" {
  project = var.project
  role    = "roles/cloudsql.client"
  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"
}

# Attach Workload Identity to Service Account for Argo Workflows
resource "google_service_account_iam_binding" "argo_workflows_workload_identity" {
  service_account_id = google_service_account.argo_workflows_sa.name
  role               = "roles/iam.workloadIdentityUser"
  members            = ["serviceAccount:${var.project}.svc.id.goog[argo-workflows/argo-workflows-sa]"]
}

# Bucket for Argo Workflows Log
resource "google_storage_bucket" "argo_workflows_logs_bucket" {
  name          = "argo-workflows-logs"
  location      = var.region
  project       = var.project
  force_destroy = true
}

Argo Events

# Argo Eventsのservice account
resource "google_service_account" "argo_events_sa" {
  account_id   = "argo-events-sa"
  display_name = "用のargo events用のservice account"
}

# Attach pub/sub subscriber to Service Account for Argo Events
resource "google_project_iam_member" "pubsub_subscriber_iam" {
  project = var.project
  role    = "roles/pubsub.subscriber"
  member  = "serviceAccount:${google_service_account.argo_events_sa.email}"
}

# Attach pub/sub viewer to Service Account for Argo Events
resource "google_project_iam_member" "pubsub_viewer_iam" {
  project = var.project
  role    = "roles/pubsub.viewer"
  member  = "serviceAccount:${google_service_account.argo_events_sa.email}"
}

# Attach service account token creator to Service Account for Argo Events
resource "google_project_iam_member" "workflows_service_account_token_creator_iam" {
  project = var.project
  role    = "roles/iam.serviceAccountTokenCreator"
  member  = "serviceAccount:${google_service_account.argo_events_sa.email}"
}

# Attach Workload Identity to Service Account for Argo Events
resource "google_service_account_iam_binding" "argo_events_workload_identity" {
  service_account_id = google_service_account.argo_events_sa.id
  role               = "roles/iam.workloadIdentityUser"
  members            = ["serviceAccount:${var.project}.svc.id.goog[argo-events/argo-events-sa]"]
}

 

Argo Workflowsのインストール

Argo WorkflowsをGKEに構築していきます。(一部抜粋)

まず、以下のページからinstall.yamlをダウンロードし、kustomizeの配下におきます。

https://github.com/argoproj/argo-events/releases

ConfigMapを作成します。ここでは、ワークフローの履歴をMySQLに入れるための設定をしています。また、GCSにもログが上がるように設定しています。

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: workflow-controller-configmap
  namespace: argo-workflows
data:
  parallelism: "10"
  namespace: argo-workflows
  resourceRateLimit: |
    limit: 10
    burst: 1
  persistence: |
    archive: true
    archiveTTL: 10d
    skipMigration: false
    connectionPool:
      maxIdleConns: 100
      maxOpenConns: 100
    nodeStatusOffLoad: true
    mysql:
      host: xxxxx
      port: 3306
      database: xxxx
      tableName: argo_workflows
      userNameSecret:
        name: argo-workflows-secret
        key: MYSQL_USER
      passwordSecret:
        name: argo-workflows-secret
        key: MYSQL_PASSWORD

  artifactRepository: |
    archiveLogs: true
    gcs:
      bucket: sample-log-bucket
      keyFormat: "argo\
        /{{workflow.creationTimestamp.Y}}\
        /{{workflow.creationTimestamp.m}}\
        /{{workflow.creationTimestamp.d}}\
        /{{workflow.name}}\
        /{{pod.name}}"

Service Account

ここで、Google Cloud側で作成済みのサービスアカウントとKubernetesのサービスアカウントを紐付けます。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: argo-workflows-sa
  namespace: argo-workflows
  annotations:
    iam.gke.io/gcp-service-account: sample-gcp-service-account

Deployment

Deploymentの定義をします。ここでは--auth-modeはclient、server、ssoが対応しています。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: argo-server
  namespace: argo-workflows
spec:
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: argo-server
  template:
    metadata:
      labels:
        app: argo-server
    spec:
      serviceAccountName: argo-workflows-sa
      terminationGracePeriodSeconds: 60
      nodeSelector:
        cloud.google.com/gke-nodepool: sample-node-pool
      containers:
      - name: argo-server
        args: ["server", "--secure=false", "--auth-mode=sso"]
        ports:
        - containerPort: 2746
        livenessProbe:
          httpGet:
            path: /
            port: 2746
            scheme: HTTP
          initialDelaySeconds: 3
          periodSeconds: 30
          timeoutSeconds: 1
          successThreshold: 1
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /
            port: 2746
            scheme: HTTP
          initialDelaySeconds: 3
          periodSeconds: 30
          timeoutSeconds: 1
          successThreshold: 1
          failureThreshold: 3
        lifecycle:
          preStop:
            exec:
              command:
                - /bin/sh
                - -c
                - sleep 10

Service

apiVersion: v1
kind: Service
metadata:
  name: argo-server
  namespace: argo-workflows
  annotations:
    cloud.google.com/neg: '{"ingress":true}'
spec:
  type: ClusterIP
  selector:
    app: argo-server
  ports:
  - name: argo-workflows-ui
    port: 80
    targetPort: 2746
    protocol: TCP

Ingress

Ingressを作成します。ここでは先ほどTerraformで作成済みのExternal IPや、DNSレコードを設定します。

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: argo-server-ingress
  namespace: argo-workflows
  annotations:
    kubernetes.io/ingress.global-static-ip-name: sample-ip-name
    networking.gke.io/managed-certificates: sample-certificates
spec:
  defaultBackend:
    service:
      name: argo-server
      port:
        number: 80
  rules:
    - host: sample-domain
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: argo-server
                port:
                  number: 80

これらをapplyすることで、Argo Serverが起動し、Argo UIが表示されます。

Argo Events

Argo EventsをGKEに構築していきます。(一部抜粋)

まず、以下のページからinstall.yamlをダウンロードし、kustomizeの配下におきます。

https://github.com/argoproj/argo-events/releases

EventBus

EventBusを定義します。

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: argo-events-bus
  namespace: argo-events
spec:
  jetstream:
    version: 2.9.12
    replicas: 3
    nodeSelector:
      cloud.google.com/gke-nodepool: sample-node-pool
    streamConfig: |
      maxAge: 24h
    settings: |
      max_file_store: 1GB
    startArgs:
      - "-D"

EventSource

EventSourceを作成します。ここでは先ほどTerraformで作成した、ワークフローをキックする際に使用するPub/Subを指定します。

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: gcp-pubsub
  namespace: argo-events
spec:
  eventBusName: argo-events-bus
  type: pubsub
  template:
    serviceAccountName: argo-events-sa
    spec:
      nodeSelector:
        cloud.google.com/gke-nodepool: sample-node-pool
  pubSub:
    workflow-trigger:
      jsonBody: true
      projectID: sample-project
      topic: sample-topic
      subscriptionID: sample-topic-sub

Sensor

どのワークフローをキックするかを定義します。ここで、dependenciesはEventSourceと名前を合わせる必要があります。

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: pubsub-sensor
  namespace: argo-events
spec:
  template:
    serviceAccountName: argo-events-sa
  eventBusName: argo-events-bus
  dependencies:
    - name: sample-trigger-dependency
      eventSourceName: gcp-pubsub
      eventName: workflow-trigger
  triggers:
    - template:
        name: trigger-template
        k8s:
          operation: create
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: workflow-
                namespace: argo-workflows
              spec:
                serviceAccountName: argo-workflows-sa
                workflowTemplateRef:
                  name: sample-workflow
                arguments:
                  parameters:
                  - name: config
                    valueFrom:
                      path: /event
          parameters:
            - src:
                dependencyName: sample-trigger-dependency
                dataKey: body
              dest: spec.arguments.parameters.0.value

これらをapplyすることで、Argo Events用のpodが作成されます。

 

Workflowの本体のデプロイ

Argo Events経由でワークフローをキックしたいため、今回はWorkflowTemplateを用いて機械学習のワークフローを定義します。

大まかな処理は以下のようになります。

  1. 前処理
  2. 音声認識
  3. 後処理
  4. 要約
  5. 結果格納

はじめにentrypoint-jobを定義し、inputやファイルによってワークフローを切り替えるために判定用のjobを一つ入れています。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: sample-workflow
  namespace: argo-workflows
spec:
  templates:
  - name: main-job
    nodeSelector:
      cloud.google.com/gke-nodepool: sample-node-pool
    inputs:
      parameters:
      - name: config
    steps:
    - - name: entrypoint-job
        templateRef:
          name: entrypoint-job
          template: entrypoint
        arguments:
          parameters:
          - name: config
            value: "{{inputs.parameters.config}}"

    - - name: main-job
        templateRef:
          name: main-job
          template: main-audio

  - name: exit-handler-job
    steps:
    - - name: slack-notification
        templateRef:
          name: slack-notification-job
          template: slack-notification
        when: "{{workflow.status}} != Succeeded"
        arguments:
          parameters:
          - name: title
            value: "{{workflow.name}}が失敗しました"
                    :

実際の処理やロジックは割愛しますが、高速化のため複数ステップで並列化させています。また、処理の内容によってはGPUを使用したいため、nodeSelectorによってインスタンスを分けています。

構築したワークフロー

今回構築したワークフローの全体像は以下のようになっています。音声を扱うのでMLロジックが盛りだくさんですが、柔軟なワークフローを組むことができます。

Argo Workflows Result
ワークフローの実行画面

おわりに

本記事では新規プロダクトであるAI Messenger Summaryのプロジェクトの進め方と、機械学習パイプラインについて紹介しました。AI Messenger Summaryでは機械学習パイプラインにArgo Workflowsを導入しました。すでにKubernetesを運用している方にとっては選択肢になりうると思います。
最後までお読みいただきありがとうございました!

PICK UP

TAG