Event Sourcingやってみた(Go + EventHorizon)

event sourcing
目次

0. はじめに

最近Event Sourcingのデータモデリングをやる機会があったので初めてEvent Sourcing触ってみました。

正直に最初はかなり混乱でして

  • 状態を保存しない
  • イベントが真実
  • Read ModelとAggregateの違い

という概念らが出てきて割と頭の中が「???」でした…

特にわからなかったのがデータモデリングの考え方が普段触っているステートソーシングと根本から違ったことです。テーブル設計から考える癖があったのでそれが全く通用しないという感じでした泣

そういう時は実際にやってみよう!ということで「本」というシンプルなドメインを題材にして、

Event Sourcing + CQRSをGo + EventHorizonというossを使って実装してみました。

この記事で実装をしながら学んだことや実装を解説できればと思います!

実際にやってみたソースコードはこちら

※この記事は個人の見解ですので誤りを含む可能性がございます。

1. Event Sourcing / CQRSとは?

知らないですが、多分こういう考え方です。

Event Sourcing

「今の状態」ではなく「起きた事実」を全て保存する

CQRS

書き込み用Model(Commands)と読み込み用Model(Read)を分ける

CQRSは単体で使われることはよくみますね。Event Sourcingをする場合、ほぼほぼCQRSはセットなんだと理解しておりまする。Event Sourcingでは

  • 書き込みは「イベントを追加するだけ」
  • 読み込みは「イベントから作られた別のデータを見る」

と言う構造になります。

2. 用語

先に雑に用語の説明。

用語意味
Command命令
CommandBusCommand(命令)の振り分け係
Aggregateルールを守ってEventを決める存在
Event「起きた事実」
EventStoreEventを全て保存する唯一の真実(SSOT)
ProjectorEventをRead Modelに変換する
Read Model表示、閲覧用のデータ

3. AggregateとRead Modelの違いを考える

最初に混乱したところでしたが「Book」と言うドメインに対して、

  • BookAggregate
  • BookView(Read Model)

を用意しました。(全てのそうなのかはわからないですがEventHorizonではこんなやり方でした)

どちらもModelなのですがそれぞれ役割が異なっており

3-1. Aggregate

  • Commandsを受け取って
  • ドメインルールをチェックして
  • Eventを発生させる

ためのモデルです。

つまり「判断するためのモデル」です。ちなみにAggregate自体はDBに保存されません。イベントから毎回復元される「一時的な現在状態」です。

3-2. Read Model

こちらはイメージがつきやすかったのですが

  • 画面表示
  • 検索

などのためのモデルです。CQRSでよく出る概念ですね、こちらはDBに永続化されます。ちなみにこちらのモデルは正規化しなくてもいいし、冗長でもOKとされているみたいです。こやつがステートソーシングでいつも考えているモデルですね。

重要な点は

壊れても、イベントから作り直せる

と言う点です(壊れるって何事..?は置いておいて)

3-3. フィールドを持たせる境界

モデリングする際に重要だと感じたのですがドメインモデルに落とし込む際にAggregateとRead Modelのそれぞれのフィールドの持たせ方が違ったのですが

基本的に全てのフィールドはRead Modelには持たせておいて良さそうと考えています。ではAggregateに持たせる情報は何かというと

  • ルール判断に必要な情報

これらを持つべきみたいです。つまりドメインルールに必要な情報はAggregateに持たせます。

例えば本のReadModelがこんな感じだった場合

type BookView struct {
	ID            uuid.UUID
	Title         string
	Description   string
	CoverImageURL string
	Categories    []string
	LikeCount     int
	ViewCount     int
	Status        string
	LastViewedAt  time.Time
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

Aggregateはこんな感じでもいいのです

type BookAggregate struct {
	ID      uuid.UUID
	Title   string
	Created bool
}

ですが、例えば以下のようなルールが追加された場合

  • 本は下書き中Statusだけタイトル変更可能

AggregateはStatusを持つ必要が出てきます。

つまり、Read Modelは雑になんでも入れておこう、Aggregateには「操作して良いか?」の判断に必要なフィールドだけ入れておけば良いと思います。

4. 実装していく

ここから実装した順に見ていきます。

ソースコードはこちら

ディレクトリ構成はこうなっています。

cmd/app
└── main.go              # アプリのエントリーポイント

internal
├── domain
│   └── book
│       ├── aggregate.go # Aggregate(ルール判断)
│       ├── commands.go  # Command(命令)
│       ├── events.go    # Event(起きた事実)
│       ├── model.go     # Read Modelの構造
│       ├── projector.go # Event → Read Modelの反映するマン
│       └── setup.go     # ドメインの配線
│
└── infrastructure
    ├── book
    │   └── readrepo     # Read Modelの保存先(DAO)
    │       └── bookview.go
    │
    └── eventstore       # EventStoreの実装
        └── eventstore.go

4-1. commands.go

// Command Types
const (
	CommandCreateBook      eh.CommandType = "CreateBook"
)

// 本を作りたいという命令
type CreateBook struct {
	BookID uuid.UUID
	Title  string
}

func (c *CreateBook) AggregateID() uuid.UUID {
	return c.BookID
}

func (c *CreateBook) AggregateType() eh.AggregateType {
	return AggregateTypeBook
}

func (c *CreateBook) CommandType() eh.CommandType {
	return CommandCreateBook
}

Commandsは「こうしたい」と言う命令です。このコードでは

CreateBook = “本を作る”と言うCommandを定義しています。

CreateBook自体は何も処理しなくて、ただ「やりたいこと」と「必要な値」を持っているだけです。

usecaseに渡すDTOみたいな存在だなぁと思いつつこいつは

ユーザーの意図(命令)

を表すものです。

4-2. events.go

package book

import eh "github.com/looplab/eventhorizon"

// Event Type
const (
	EventBookCreated      eh.EventType = "BookCreated"
)

// Eventに対するData(Payload)
type BookCreated struct {
	Title string `json:"title"`
}

// Eventに対するData(Payload)をHolizonに登録する
func init() {
	eh.RegisterEventData(EventBookCreated, func() eh.EventData { return &BookCreated{} })
}

Eventは「起きた事実」を表しています

EventTypeは過去形で表すみたいです。(起きた事実なのでね)ここでは”BookCreated”を定義しています。

また、Eventにはpayload(Data)があり「その事実に付随する情報」を持ちます。

Eventは後から変更せず、履歴として永続化するものです。

後に出てきますが、EventをEventStoreに情報のSSOT(single source of truth)として保存します。

4-3. aggregate.go

const AggregateTypeBook eh.AggregateType = "book"

type BookAggregate struct {
	ID      uuid.UUID
	Title   string
	Created bool
}

このようにBookのAggregateを定義しました。

  • CQRSのコマンド側(C)の Aggregate
  • 過去のイベントから一時的に復元される判断用の現在状態
  • ビジネスルールを守る
  • DBに保存される前提はない

Aggregateでは

  • HandleCommand
  • ApplyEvent

の2つのメソッドを持ちます。

HandleCommand

// commands(命令)を受け取り、events(イベント)を返す
func (b *BookAggregate) HandleCommand(ctx context.Context, cmd eh.Command) ([]eh.Event, error) {
	switch c := cmd.(type) {
	case *CreateBook:
		if b.Created {
			return nil, errors.New("book already created")
		}
		if strings.TrimSpace(c.Title) == "" {
			return nil, errors.New("title is required")
		}

		ev := eh.NewEvent(
			EventBookCreated,
			&BookCreated{
				Title: c.Title,
			},
			time.Now().UTC(),
			eh.ForAggregate(AggregateTypeBook, c.BookID, 0),
		)
		return []eh.Event{ev}, nil
	case *ChangeBookTitle:
		if !b.Created {
			return nil, errors.New("book not created")
		}
		if strings.TrimSpace(c.NewTitle) == "" {
			return nil, errors.New("new title is required")
		}
		if b.Title == c.NewTitle {
			return nil, errors.New("new title is the same as the current title")
		}

		ev := eh.NewEvent(
			EventBookTitleChanged,
			&BookTitleChanged{
				NewTitle: c.NewTitle,
			},
			time.Now().UTC(),
			eh.ForAggregate(AggregateTypeBook, c.BookID, 0),
		)
		return []eh.Event{ev}, nil
	default:
		return nil, errors.New("unknown command")
	}
}

ここがEvent Sourcingの肝になるところかと思います!

HandleCommandではCommandを受け取り、ドメインルールをチェックして問題なければEventを作ります。

ここが「ビジネスルールの置き場」になります。HandleCommandの中で引数のcommandをswitchで判定して、commandごとのビジネスルールを書き問題なければ対応するEventを返していますね。

ApplyEvent

func (b *BookAggregate) ApplyEvent(ctx context.Context, event eh.Event) error {
	switch event.EventType() {
	case EventBookCreated:
		data := event.Data().(*BookCreated)
		id, err := uuid.Parse(event.AggregateID().String())
		if err != nil {
			return err
		}
		b.ID = id
		b.Title = data.Title
		b.Created = true

	case EventBookTitleChanged:
		data := event.Data().(*BookTitleChanged)
		b.Title = data.NewTitle

	default:
		return errors.New("unknown event")
	}

	return nil
}

ApplyEventはAggregateを復元するために使われます。

  1. Commandが来る
  2. EventStoreからそのAggregateの過去イベントをロード
  3. ApplyEventを順番に適応してAggregateの現在状態を復元
  4. HandleCommandで判断して新しいイベントを作成
  5. EventStoreにappend

ここの3番を実行するためにApplyEventが利用されます。

つまり

HandleCommand: 未来を決める

ApplyEvent: 過去から今を作る

と言う風に考えています。(今回の実装では簡単のためApplyEventはやっていないのですが)

4-4. model.go(Read Model)

type BookView struct {
	ID        uuid.UUID `json:"id" bson:"_id"`
	Version   int       `json:"version" bson:"version"`
	Title     string    `json:"title" bson:"title"`
	CreatedAt time.Time `json:"created_at" bson:"created_at"`
	UpdatedAt time.Time `json:"updated_at" bson:"updated_at"`
}

var _ = eh.Entity(&BookView{})
var _ = eh.Versionable(&BookView{})

func (b *BookView) EntityID() uuid.UUID {
	return b.ID
}

func (b *BookView) AggregateVersion() int {
	return b.Version
}

Read Modelは、CQRSでいうQuery側のデータです。役割はシンプルでステートソーシングでいつも考えているモデルですね。今回はフィールドもTitleくらいにしています。

また、EventHorizonに必要な登録やinterfaceを満たすためのお決まりのメソッドを書いています。

Event Sourcingの世界では、真実(Single Source of Truth)はEventStoreにあります。

Read Modelはそれをアプリが使いやすい形に加工して持っているだけなので、壊れてもイベントから作り直せます。

4-5. projector.go

ProjectorはEventをRead Modelに変換(投影)するロジックです。

映画とかをみるProjectorと意味は同じですね。

映画は映像をスクリーンに投影しますが、

Event SourcingではEventをRead Modelに投影します。

type BookProjector struct{}

func (p *BookProjector) ProjectorType() projector.Type {
	return projector.Type(AggregateTypeBook.String())
}

// Eventを受け取り、Read Modelを返す
func (p *BookProjector) Project(
	ctx context.Context,
	event eh.Event,
	entity eh.Entity,
) (eh.Entity, error) {
	now := time.Now().UTC()
	var model *BookView
	if entity == nil {
		model = &BookView{}
	} else {
		var ok bool
		model, ok = entity.(*BookView)
		if !ok {
			return nil, errors.New("entity is not a BookView")
		}
	}

	switch event.EventType() {
	case EventBookCreated:
		data, ok := event.Data().(*BookCreated)
		if !ok {
			return nil, errors.New("event data is not a BookCreated")
		}
		model.ID = event.AggregateID()
		model.Title = data.Title
		model.CreatedAt = now
	case EventBookTitleChanged:
		data, ok := event.Data().(*BookTitleChanged)
		if !ok {
			return nil, errors.New("event data is not a BookTitleChanged")
		}
		model.Title = data.NewTitle
	default:
		return model, errors.New("unknown event")
	}

	model.Version++
	model.UpdatedAt = now

	return model, nil
}

メソッドはシンプルでEventを受け取ってswitchで判定して対応するRead Modelを返却しているだけですね!ちなみにその時にversion++したりしてますね!

4-6. EventStore と Read Model Repo

Event Sourcing + CQRSでは同じBookのデータでもEvent履歴(真実)、Read Modelの2種類保存します。

eventStore

今回であればこんなテーブル設計をしました。

CREATE TABLE event_store_events (
  aggregate_id   uuid         NOT NULL,
  aggregate_type varchar(255) NOT NULL,
  version        int          NOT NULL,
  event_type     varchar(255) NOT NULL,
  occurred_at    timestamp    NOT NULL,
  payload        jsonb        NOT NULL,
  PRIMARY KEY (aggregate_id, version)
);

今回はシンプルにAppendEventのメソッドだけ用意しましたが、本来はLoadなど色々メソッドが必要になります。ちなみにeventはappend onlyで書き換えは禁止です。

Read Model

CREATE TABLE book_views (
  id         uuid PRIMARY KEY,
  version    int NOT NULL,
  title      varchar(255) NOT NULL,
  created_at timestamp NOT NULL,
  updated_at timestamp NOT NULL
);

今回はシンプルにこんな感じのbooks_viewテーブルを用意しました。こちらはupdateしてOK。クエリに最適化したテーブルですね!

4-7. main.goで全体の流れを追う

func main() {
	ctx := context.Background()

	eventStore := &eventstore.EventStore{}
	bookViewRepo := &readrepo.BookViewRepo{}

	bookID := uuid.New()
	// command作成
	create := &book.CreateBook{
		BookID: bookID,
		Title:  "Event Sourcing Book",
	}

	// aggregateにてcommand -> eventに変換
	aggregate := &book.BookAggregate{}
	events, err := aggregate.HandleCommand(ctx, create)
	if err != nil {
		panic(err)
	}

	// eventStoreにeventsを永続化
	eventStore.AppendEvents(
		bookID.String(),
		book.AggregateTypeBook.String(),
		events,
	)

	// Projectorを使ってEvent -> Read Modelに変換
	projector := &book.BookProjector{}
	var readModel *book.BookView
	for _, e := range events {
		entity, err := projector.Project(ctx, e, readModel)
		if err != nil {
			panic(err)
		}
		readModel = entity.(*book.BookView)
	}

	// Read ModelをDBに保存
	bookViewRepo.Upsert(
		readModel.ID,
		readModel.Version,
		readModel.Title,
		readModel.CreatedAt,
		readModel.UpdatedAt,
	)
}

最後にmain.goです。こちらを順に追うことでイベントソーシングの処理流れを追ってみます。

シンプルなケースとして「本を作成する」という流れを考えます。

まずcommand作成

bookID := uuid.New()
// command作成
create := &book.CreateBook{
	BookID: bookID,
	Title:  "Event Sourcing Book",
}

ここはまだ何もしていなくて「意思表明」の段階です。

次にAggregateを復元します。(過去->現在)

ここで過去のAggregate群をLoadしてきて順々にApplyしていき、現在の状態を作成します。が今回はやっていないです!(ごめんなさい)

次にcommand->Eventを作成します。先ほど作ったHandleCommandを呼びますね。

// aggregateにてcommand -> eventに変換
aggregate := &book.BookAggregate{}
events, err := aggregate.HandleCommand(ctx, create)
if err != nil {
	panic(err)
}

そしてEventStoreにEventを保存します。

// eventStoreにeventsを永続化
eventStore.AppendEvents(
	bookID.String(),
	book.AggregateTypeBook.String(),
	events,
)

次にRead Modelを更新します。

// Projectorを使ってEvent -> Read Modelに変換
projector := &book.BookProjector{}
var readModel *book.BookView
for _, e := range events {
	entity, err := projector.Project(ctx, e, readModel)
	if err != nil {
		panic(err)
	}
	readModel = entity.(*book.BookView)
}

// Read ModelをDBに保存
bookViewRepo.Upsert(
	readModel.ID,
	readModel.Version,
	readModel.Title,
	readModel.CreatedAt,
	readModel.UpdatedAt,
)

ここはもちろん先ほど作成したProjectorを使いますね。

5. まとめ

Event Sourcingでは1つの操作に対して次のような流れを取りました。(本来はAggregateの復元も必要)

Command
  ↓
Aggregate(ルールチェック)
  ↓
Event(起きた事実)
  ↓
EventStore(履歴として保存)
  ↓
Projector
  ↓
Read Model(表示・検索用データ)

正直結構回りくどいな、と学習コストたけぇと思いましたね。CRUDで1テーブルUPDATEするだけの話に見えます。ただもちろんEvent Sourcingで得られるメリットもあるんですよね。

「なぜこの状態になったか」を説明できます。

ステートソーシングでは、DBにあるのは今の状態だけです。

title = "Event Sourcing やってみた"
status = "published"

これがイベントソーシングだと

BookCreated(title="Event Sourcing 入門")
BookTitleChanged(new_title="Event Sourcing やってみた")
BookPublished

というデータに残ります。

ステートソーシングの時にわからなかった

  • 最初は仮のタイトルを入れていた
  • 公開前にタイトルをブラッシュアップした
  • 公開という明確な節目があった

というストーリーがわかります。

もう少し踏み込むと

BookCreated
BookTitleChanged
BookTitleChanged
BookTitleChanged
BookPublished

という履歴があると

  • 何度もタイトルを変えている
  • 公開前にタイトルにかなり悩んでいる

という文脈が見えてきますよね。これらは例えば

  • 公開前までに平均何回タイトルを変更しているのか
  • タイトル変更回数が多い本ほど、いいねが多いか
  • 公開までのリードタイムはどのくらいか

などさまざまな分析や改善に繋げられますよね。

初めてEvent Sourcingしてみて面白かったです!

目次