Temporal Platformとは何か

Temporal Platformというものを触る機会があったのでTutorialを通して自分なりに要約してみました!

https://temporal.io/cloud

また、要約内容はtutorialから得たものなのでtutorialもぜひ挑戦してみてください

https://docs.temporal.io/cloud/get-started

目次

Temporal Platformとは

アプリケーションのDurable Executionを保証するプラットフォーム

「Temporal Workflow Executions」と呼ばれるスケーラブルで信頼性の高いランタイム

Durable Execution

Durable Exectionとはワークフロー実行が障害やクラッシュに直面しても、その進捗と状態を維持し、途中から再開できること

  • temporalがEvent Historyを使用し、各stepでのワークフロー実行の状態を記録している
  • 障害が発生した場合ワークフローは最後に記録されたイベントから再開され、進捗が失われることはない
  • 一度実行すれば何年かかるかに関係なく最後まで効率的に実行

特徴

障害が存在しないかのように開発可能。ネットワーク障害やサーバークラッシュなどの問題が発生してもアプリケーションは確実に実行される

→ 障害の検知、回復のためのコードの代わりにビジネスロジックに集中できる

Temporal Workflow

  • Temporal Workflow(以下Workflow)はアプリケーション全体のフローを定義する
  • Workflowは汎用的なプログラミング言語で記述されている

例えばサブスクの購読、ビザの注文、チケットの購入などあらゆるものがワークフローになる

Workflowの中でも以下の3つを区別する

  1. Workflow Definition
  2. Workflow Type
  3. Workflow Execution

1. Workflow Definition

  • Workflowを定義するコード
  • temporal SDKを使用してプログラムを組む

2. Workflow Type

Workflowの「種類」を識別するラベルのようなもの、名前

Workflow Execution

definitionを実行することで生成される

オンライン注文処理を例にしたイメージ
  1. Workflow Definition:
    1. 「どう動くか」を定義したコードそのもの(レシピ)
    2. 例「オンライン注文処理をするワークフロー」のコード
  2. Workflow Type
    1. definitionに付けられた「名前」や「ラベル」のようなもの
    2. 例: "OrderProcessingWorkflow"
  3. Workflow Execution
    1. 実際に実行されるインスタンス
    2. 例: 注文Aに対するワークフローと注文Bに対するワークフローは別物

goで書いたWorkflowサンプル

func GreetSomeone(ctx workflow.Context, name string) (string, error) {
	options := workflow.ActivityOptions{
		StartToCloseTimeout: time.Second * 5,
	}
	ctx = workflow.WithActivityOptions(ctx, options)

	var spanishGreeting string
	err := workflow.ExecuteActivity(ctx, GreetInSpanish, name).Get(ctx, &spanishGreeting)
	if err != nil {
		return "", err
	}

	var spanishFarewell string
	err = workflow.ExecuteActivity(ctx, FarewellInSpanish, name).Get(ctx, &spanishFarewell)
	if err != nil {
		return "", err
	}

	var helloGoodbye = "\n" + spanishGreeting + "\n" + spanishFarewell
	return helloGoodbye, nil
}

このサンプルではgoでWorkflowを定義しています。ExecuteActivity()で2回Activityを呼び出ししています。Activity実行の際にはClusterへcommandを送信してイベント履歴の保存をします。また、optionsでActivityの実行の際の条件を書いています。

  • WorkflowはActivityを実行しない
  • 代わりにWorkflowはTemporal ClusterにActivity実行のスケジューリングを依頼する
  • ExecuteActivityはFutureを返却し、これはActivityが完了するまでアクセスできない。

また、以下のようにActivityの試行回数などのretry policyが設定できる

func GreetSomesone(ctx workflow.Context, name string) (string, error) {
	retlyPolicy := &temporal.RetryPolicy{
		InitialInterval:    15 * time.Second,
		BackoffCoefficient: 2.0,
		MaximumInterval:    time.Minute,
		MaximumAttempts:    100,
	}
	option := workflow.ActivityOptions{
		RetryPolicy:         retlyPolicy,
		StartToCloseTimeout: time.Second * 5,
	}
	ctx = workflow.WithActivityOptions(ctx, option)
	var spanishGreeting string
	err := workflow.ExecuteActivity(ctx, GreetInSpanish, name).Get(ctx, &spanishGreeting)
	if err != nil {
		return "", err
	}
	return spanishGreeting, nil
}

Temporal Activity

決定論的であるWorkflowに対してActivityを用いて失敗する可能性のあるビジネスロジックをカプセル化しておくことができる

  • ActivityはWorkflowの一部として実行されるが、失敗すると再試行される
  • ActivityはDBやマイクロサービスなどの外部サービスへアクセスすることが許される
  • Activity関数の第一引数はgolangの場合context.Contextにすることがおすすめ→ハードビートなどの追加機能が利用できる

goで書いたActivityサンプル

func GreetInSpanish(ctx context.Context, name string) (string, error) {
        base := "http://localhost:9999/get-spanish-greeting?name=%s"
        url := fmt.Sprintf(base, url.QueryEscape(name))

        resp, err := http.Get(url)
        if err != nil {
                return "", err
        }
        defer resp.Body.Close()

        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
                return "", err
        }

        translation := string(body)
        status := resp.StatusCode
        if status >= 400 {
                message := fmt.Sprintf("HTTP Error %d: %s", status, translation)
                return "", errors.New(message)
        }

        return translation, nil
}

このコードではGreetInSpanishというActivity関数を定義している。関数の中でhttp.Getをしており、非決定論的な外部への呼び出しを行なっている

「決定論的」と「冪等性」の違い

Deterministic(決定論的):

  • 同じ入力に対して、常に同じ結果を返す。入力に基づいて結果が完全にきまる。外部要因に影響されない

Idempotent(冪等性):

  • 同じ操作を何度繰り返しても結果が変わらない性質。

似た意味だが着目ポイントが少し違う

Temporal Architecture

Temporal Platformは大きく分けてTemporal ServerとTemporal Clientsの2つで構成されている

Temporal Server

フロントエンドサービスとアプリケーションコードの実行を管理する複数のバックエンドサービスで構成されている

Temporal Clients

Temporal Serverと通信するクライアント。以下の3種類がある

  1. CLI
  2. Web UI
  3. アプリケーションコードに組み込まれたクライアント

通信の流れ

クライアントはフロントエンドサーバーにリクエストを発行し、フロントエンドサーバーはgRPC通信を用いて必要なバックエンドサーバーに通信する

Temporal Cluster

ざっくり言うとTemporal ServerにDBやエラサーなどのコンポーネントを追加し、デプロイもしくはマシンに配置したものを総称してTemporal Clusterという

  • Temporal Clusterはワークフローのすべての実行の現在の状態を追跡する
  • 履歴やタイマー、キューに関する詳細な情報をDBに保存する

Worker

コードの実行を担当するentity。WorkerはWorkflowとActivityを実行し、クライアントを用いてTemporal Clusterと通信するもの。

  • Workerは継続的にcluster上のtask queueをポーリングし続ける
  • Workerはactivityの実行などの関数(ExecuteActivity)などを行う際に、Clusterに対してcommandを送信する。Clusterはコマンドを保存したりしてWorkerのクラッシュなどに対応する

Workerの初期化の際にはWorkflow及びActivityの登録が必要になる。

func main() {
        c, err := client.Dial(client.Options{})
        if err != nil {
                log.Fatalln("Unable to create client", err)
        }
        defer c.Close()

        w := worker.New(c, "greeting-tasks", worker.Options{})

        w.RegisterWorkflow(app.GreetSomeone)
        w.RegisterActivity(app.GreetInSpanish)

        err = w.Run(worker.InterruptCh())
        if err != nil {
                log.Fatalln("Unable to start worker", err)
        }
}

上記の例ではまずクライアントを作成し、workerをインスタンス化している。RegisterWorkflow ,ResisterActivity 関数でそれぞれを登録している。

参考

https://docs.temporal.io/cloud/get-started

目次