Temporal Platformというものを触る機会があったのでTutorialを通して自分なりに要約してみました!
また、要約内容はtutorialから得たものなのでtutorialもぜひ挑戦してみてください
https://docs.temporal.io/cloud/get-started
Temporal Platformとは
アプリケーションのDurable Executionを保証するプラットフォーム
「Temporal Workflow Executions」と呼ばれるスケーラブルで信頼性の高いランタイム
Durable Execution
Durable Exectionとはワークフロー実行が障害やクラッシュに直面しても、その進捗と状態を維持し、途中から再開できること
- temporalがEvent Historyを使用し、各stepでのワークフロー実行の状態を記録している
- 障害が発生した場合ワークフローは最後に記録されたイベントから再開され、進捗が失われることはない
- 一度実行すれば何年かかるかに関係なく最後まで効率的に実行
特徴
障害が存在しないかのように開発可能。ネットワーク障害やサーバークラッシュなどの問題が発生してもアプリケーションは確実に実行される
→ 障害の検知、回復のためのコードの代わりにビジネスロジックに集中できる
Temporal Workflow
- Temporal Workflow(以下Workflow)はアプリケーション全体のフローを定義する
- Workflowは汎用的なプログラミング言語で記述されている
例えばサブスクの購読、ビザの注文、チケットの購入などあらゆるものがワークフローになる
Workflowの中でも以下の3つを区別する
- Workflow Definition
- Workflow Type
- Workflow Execution
1. Workflow Definition
- Workflowを定義するコード
- temporal SDKを使用してプログラムを組む
2. Workflow Type
Workflowの「種類」を識別するラベルのようなもの、名前
Workflow Execution
definitionを実行することで生成される
- Workflow Definition:
- 「どう動くか」を定義したコードそのもの(レシピ)
- 例「オンライン注文処理をするワークフロー」のコード
- Workflow Type
- definitionに付けられた「名前」や「ラベル」のようなもの
- 例:
"OrderProcessingWorkflow"
- Workflow Execution
- 実際に実行されるインスタンス
- 例: 注文Aに対するワークフローと注文Bに対するワークフローは別物
goで書いたWorkflowサンプル
func GreetSomeone(ctx workflow.Context, name string) (string, error) {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Second * 5,
}
ctx = workflow.WithActivityOptions(ctx, options)
var spanishGreeting string
err := workflow.ExecuteActivity(ctx, GreetInSpanish, name).Get(ctx, &spanishGreeting)
if err != nil {
return "", err
}
var spanishFarewell string
err = workflow.ExecuteActivity(ctx, FarewellInSpanish, name).Get(ctx, &spanishFarewell)
if err != nil {
return "", err
}
var helloGoodbye = "\n" + spanishGreeting + "\n" + spanishFarewell
return helloGoodbye, nil
}
このサンプルではgoでWorkflowを定義しています。ExecuteActivity()で2回Activityを呼び出ししています。Activity実行の際にはClusterへcommandを送信してイベント履歴の保存をします。また、optionsでActivityの実行の際の条件を書いています。
- WorkflowはActivityを実行しない
- 代わりにWorkflowはTemporal ClusterにActivity実行のスケジューリングを依頼する
- ExecuteActivityはFutureを返却し、これはActivityが完了するまでアクセスできない。
また、以下のようにActivityの試行回数などのretry policyが設定できる
func GreetSomesone(ctx workflow.Context, name string) (string, error) {
retlyPolicy := &temporal.RetryPolicy{
InitialInterval: 15 * time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 100,
}
option := workflow.ActivityOptions{
RetryPolicy: retlyPolicy,
StartToCloseTimeout: time.Second * 5,
}
ctx = workflow.WithActivityOptions(ctx, option)
var spanishGreeting string
err := workflow.ExecuteActivity(ctx, GreetInSpanish, name).Get(ctx, &spanishGreeting)
if err != nil {
return "", err
}
return spanishGreeting, nil
}
Temporal Activity
決定論的であるWorkflowに対してActivityを用いて失敗する可能性のあるビジネスロジックをカプセル化しておくことができる
- ActivityはWorkflowの一部として実行されるが、失敗すると再試行される
- ActivityはDBやマイクロサービスなどの外部サービスへアクセスすることが許される
- Activity関数の第一引数はgolangの場合context.Contextにすることがおすすめ→ハードビートなどの追加機能が利用できる
goで書いたActivityサンプル
func GreetInSpanish(ctx context.Context, name string) (string, error) {
base := "http://localhost:9999/get-spanish-greeting?name=%s"
url := fmt.Sprintf(base, url.QueryEscape(name))
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
translation := string(body)
status := resp.StatusCode
if status >= 400 {
message := fmt.Sprintf("HTTP Error %d: %s", status, translation)
return "", errors.New(message)
}
return translation, nil
}
このコードではGreetInSpanishというActivity関数を定義している。関数の中でhttp.Getをしており、非決定論的な外部への呼び出しを行なっている
Deterministic(決定論的):
- 同じ入力に対して、常に同じ結果を返す。入力に基づいて結果が完全にきまる。外部要因に影響されない
Idempotent(冪等性):
- 同じ操作を何度繰り返しても結果が変わらない性質。
似た意味だが着目ポイントが少し違う
Temporal Architecture
Temporal Platformは大きく分けてTemporal ServerとTemporal Clientsの2つで構成されている
Temporal Server
フロントエンドサービスとアプリケーションコードの実行を管理する複数のバックエンドサービスで構成されている
Temporal Clients
Temporal Serverと通信するクライアント。以下の3種類がある
- CLI
- Web UI
- アプリケーションコードに組み込まれたクライアント
通信の流れ
クライアントはフロントエンドサーバーにリクエストを発行し、フロントエンドサーバーはgRPC通信を用いて必要なバックエンドサーバーに通信する
Temporal Cluster
ざっくり言うとTemporal ServerにDBやエラサーなどのコンポーネントを追加し、デプロイもしくはマシンに配置したものを総称してTemporal Clusterという
- Temporal Clusterはワークフローのすべての実行の現在の状態を追跡する
- 履歴やタイマー、キューに関する詳細な情報をDBに保存する
Worker
コードの実行を担当するentity。WorkerはWorkflowとActivityを実行し、クライアントを用いてTemporal Clusterと通信するもの。
- Workerは継続的にcluster上のtask queueをポーリングし続ける
- Workerはactivityの実行などの関数(ExecuteActivity)などを行う際に、Clusterに対してcommandを送信する。Clusterはコマンドを保存したりしてWorkerのクラッシュなどに対応する
Workerの初期化の際にはWorkflow及びActivityの登録が必要になる。
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
w := worker.New(c, "greeting-tasks", worker.Options{})
w.RegisterWorkflow(app.GreetSomeone)
w.RegisterActivity(app.GreetInSpanish)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
上記の例ではまずクライアントを作成し、workerをインスタンス化している。RegisterWorkflow ,ResisterActivity 関数でそれぞれを登録している。
参考
https://docs.temporal.io/cloud/get-started