遡及イベント

既に処理された不正なイベントの結果を自動的に修正します。

これは、私が2000年代半ばに行っていたエンタープライズアプリケーションアーキテクチャ開発のさらなる発展に関する記述の一部です。残念ながら、それ以来、多くの他のことに気を取られてしまい、これ以上取り組む時間がありませんでしたし、近い将来、時間を見つけることも難しいでしょう。そのため、この資料は草案段階のものであり、再び取り組むことができるようになるまで、修正や更新を行うことはありません。

エンタープライズアプリケーションは、世界から情報を取得し、その情報に対して様々な計算を行い、その世界でさらなるアクションを開始することです。それらが実行する計算と開始するアクションは、受信する情報の正確性と同じくらいしか正確ではありません。入力にエラーがあると、出力にもエラーが発生します。

私たちはすでにこの現象に慣れていますが、入力エラーを発見した後、修正するのが難しいことがよくあります。ほとんどの場合、人間が不正な入力に応答してシステムが何をしたのか、システムがどのように反応したのか、システムが正しい入力を受け取った場合にどのように反応すべきだったのか、そしてどのように修正すべきかを把握する必要があります。

イベントソーシングの魅力の1つは、この厄介な作業をかなり容易にする基盤を提供することです。実際、イベントとその結果の注意深いログは、人間が修正を行うことをはるかに容易にします。遡及イベントはさらに一歩進み、システムが多くの不正なイベントの結果を自動的に修正できるようにします。

遡及イベントイベントソーシングの基盤の上に構築されているため、遡及イベントを理解するには、イベントソーシングに精通している必要があります。特に、イベントソーシングで定義した用語を多く使用します。

動作方法

遡及イベントを処理することは、現在のアプリケーションの状態が何らかの形で不正であることを意味します。遡及イベントを受信していた場合、現在とは異なるアプリケーションの状態になっていたでしょう。これは3つの並列モデルとして考えることができます。

  • 不正な現実:遡及イベントを考慮しなかった現在のライブアプリケーションの状態。
  • 正しいブランチ:遡及イベントが処理されていた場合に取得するはずだったアプリケーションの状態。
  • 修正された現実:最終的に到達したいライブアプリケーションの最終状態。

多くの場合、修正された現実は正しいブランチと同じになります。つまり、遡及イベントを考慮してイベントログを再処理します。しかし、それができない場合もあります。

最初のステップは、不正な現実と正しいブランチがどこで分岐したかを突き止めることです。これは基本的に、遡及イベントを処理するべきだったイベントログの時点です。ソースコード管理の世界からの用語を使用して、これを分岐点と呼びます。分岐点は、遡及イベントを挿入する直前のイベントログの時点です。

分岐点を構築するには、再構築と巻き戻しの2つの方法があります。

  • 再構築では、遡及イベント前の最後のスナップショットにアプリケーションの状態を戻します。次に、分岐点に達するまで、すべてのイベントを順に処理します。
  • 巻き戻し/再生では、最新のイベントから分岐点までイベントを逆に辿ります。

並列モデルでこの分岐点を構築することも、ライブアプリケーションの状態を分岐点に戻すこともできます。ライブアプリケーションの状態を戻した場合、先に進むにつれて自動的に正しいブランチが作成され、修正された現実は同じになります。並列モデルを使用する場合、並列モデルに正しいブランチがあり、ライブアプリケーションの状態に変更を加えて修正された現実にする必要があります。本質的には、違いをライブ状態にマージします。

遡及イベントには、順序が間違ったイベント、拒否されたイベント、不正なイベントの3つの主な種類があります。順序が間違ったイベントとは、遅れて受信されたイベントで、順序が間違ったイベントを受信した後に処理されるべきだったイベントを既に処理しているほど遅れているものです。拒否されたイベントとは、現在偽であり、処理されるべきではなかったと認識されたイベントです。不正なイベントとは、イベントに関する情報が間違っていたイベントです。

この3つのそれぞれについて、分岐点以降のイベントストリームに対して異なる操作を行う必要があります。

  • 順序が間違ったイベントの場合、分岐点に遡及イベントを挿入して処理します。
  • 拒否されたイベントの場合、イベントを元に戻して拒否済みとしてマークします。イベントログへの削除と効果的に同じです。
  • 不正なイベントの場合、元のイベントを元に戻して拒否済みとしてマークします。遡及イベントを挿入して処理します。これは、拒否されたイベントと順序が間違ったイベントが1つとして処理されると考えることができます。

拒否済みとしてマークされたイベントは、それ以降の処理では無視されるため、再生中に再処理されたり、将来の巻き戻しで元に戻されたりすることはありません。履歴を維持するためにイベントログに残りますが、それ以外は無視されます。これを行う必要はありません。代替として、元のイベントの直後に元に戻す処理を置くことができますが、このようにしてイベントを処理し、直後に元に戻すのは明らかに非効率です。

不正な情報が処理順序を変更する場合、不正なイベントはさらに複雑な問題を引き起こす可能性があります。そのため、4月1日に到着する船があったとしましょう。修正により、3月1日に到着することになります。これらの場合、分岐点は拒否されたイベントと正しいイベントの早い方です。次の処理は、どちらが早いかによって異なります。この例のように、正しいイベントの方が早い場合は、それを処理して古いイベントを拒否済みとしてマークします。その後、順方向に再生すると、古いイベントはスキップされます。古いイベントの方が早い場合は、それを元に戻し、新しいイベントまで再生し、それを処理して処理を続けます。

これを議論している間、遡及イベントは常にそれ自体がイベントであることを覚えておいてください。これは順序が間違ったイベントには影響しませんが、他の2つのケースには影響します。拒否されたイベントのケースを考えてみましょう。これは効果的にイベントログからのイベントの削除です。しかし、イベントログのポイントは、イベントを削除しないことです。そのため、拒否イベントをログに挿入することができます。その拒否を処理することで、説明した変更が行われます。拒否イベントは常に拒否されるため、再構築時には処理されません。不正なイベントは、古いイベントと新しいイベントの両方を含む置換イベントを使用して同じように処理できます。実際、拒否イベントは修正されたイベントを持たない置換イベントと考えることができます。

拒否されたイベントと置換イベントは常に拒否されるため、再生されないと言いました。これは、並列モデルを常に最新の最適な知識で構築したい場合に当てはまります。しかし、過去の時点での知識という観点から、つまりバイテンポラルな並列モデルを構築している場合は、拒否されたイベントの一部を考慮に入れるより複雑な処理を行います。

並列モデルからのマージ

並列モデルで正しいブランチを構築する方法に従うと、正しいブランチと現在の現実の違いを把握し、それらの違いをマージして修正された現実を作成する必要があります。

変更を検出するには、両方のモデルのすべてのオブジェクトを調べて、変更されたオブジェクトを確認し、1つのモデルのみに存在するオブジェクトを確認する必要があります。明らかにこれは膨大な作業になる可能性がありますが、分岐点以降に処理されたイベントによって影響を受けるオブジェクトを追跡することで、この作業を減らすことができます。遡及イベントによって変更されるオブジェクトのサブセットを決定するために、選択的再生のような分析を行うこともできます。

変更を見つけた後、それらの変更をライブモデルに適用する必要があります。これらの変更をイベント自体として適用するかどうかをここで検討する価値があります。初期イベントの分析からすべての変更を完全に計算できるため、厳密に必要ではありませんが、この分析は非常に複雑であるため、マージ変更を独自のイベントにキャプチャすることが役立つ場合があります。

明らかにこのマージ処理は非常に複雑になる可能性があり、マージ処理の複雑さは、遡及イベントの処理方法を決定する際に考慮すべき重要な要素です。

最適化

遡及イベントの処理は並列モデルの作成と非常に似ているため、そこで議論されている最適化の多くもここで適用されます。反転、スナップショット、選択的再生の手法はすべて、少ないイベント処理で分岐点に到達するのに役立ちます。

選択的再生を使用して分岐点に到達した場合、同じ選択的再生を使用して、分岐点以降のイベントを順に処理できます。

テストに関する考察

反転で特に役立つテスト手法は、追加する動作が再生下で正常に機能することを常に確認することです。テストケースのイベントに遡及イベントを追加することで、テスト対象のイベントシーケンス全体が再生されます。

別のテストアイデア(ただし、まだ直接見たケースはありません)は、イベントシーケンスをランダムに生成し、それらをランダムに異なる順序で処理して、常に同じアプリケーション状態になることを確認することです。

外部システムの更新

イベントソーシングは、同じ方法で構築されていない外部システムとの不一致につながります。通常のイベントソーシングでは、再構築中に外部システムに更新メッセージが送信されないようにする必要があります。これは比較的簡単で、ゲートウェイをオフにするだけで済みます。しかし、遡及イベントでは、さらに進める必要があります。遡及イベントが、行うべきだった更新にどのような変更をもたらすかを判断し、これらの変更を通知で処理する必要があります。

この問題には、検出と修正の2つの部分があります。

基本的な検出スキームは、不正な現実で送信された更新、正しいブランチで送信されるべきだった更新を判断し、その2つの違いを見つけることです。

これを行う良い方法は、すべての外部更新をイベントに変換することで、ゲートウェイ自体でイベントソーシングを使用することです。これを行うことで、これらのイベントを異なるモデルでキャプチャし、比較できます。これを行うには、すべての外部更新イベントをキャプチャし、2つのリストを作成するメカニズムがあることを確認します。分岐点以降に送信されたすべての更新イベントに関心があります。巻き戻しを使用している場合、イベントの反転がキャプチャできる更新イベントを送信することを確認すれば、不正な現実の更新をキャプチャできます。

不正確な現実と正しいブランチの2つの更新イベントリストを取得したら、それらを比較して不一致を探します。両方のリストで同じ更新は無視できます。重要なのは、一方のリストにイベントがあり、もう一方のリストにない場合です。次に、異なるイベントの2つの新しいリストを作成できます。これらの更新イベントは、修正する必要があるものです。

上記で定義したすべてのことは、汎用的に行うことができます。汎用コンポーネントはイベントを追跡し、不一致イベントの2つのリストを見つけることができます。ただし、不一致の処理は、外部システムごとに個別にプログラムする必要があります。外部システムが取る必要がある補償アクションによって異なります。修正を自動化できない可能性があり、代わりに人間が介入して混乱を解消する必要があるかもしれません。しかし、少なくともシステムは、何が起こったのか、何が起こるべきだったのかについて、人間に非常に良い説明を与えることができます。

いつ使用するのか

「遡及イベント」を使用する主な理由は、過去の入力に対する修正を自動的に実行する場合に役立つためです。他の自動化と同様に、修正の頻度と難易度を確認する必要があります。人間の時間が修正に多く費やされている場合、「遡及イベント」のようなものを使用して自動化することを検討する価値があります。「遡及イベント」を使用して自動化することの大きな利点は、完全に汎用的なプロセスであることです。「遡及イベント」が1つのイベントで機能するようになれば、他のイベントに拡張するのは比較的簡単です。

私は「遡及イベント」をあまり見たことがなく、それには正当な理由があります。「遡及イベント」を実装するには、いくつかの重要な準備が必要です。「イベントソーシング」が必要であり、それでも足りない場合は、可逆性または「パラレルモデル」を追加する必要があります。これらは小さな前提条件ではありません。その結果、「遡及イベント」をサポートするアプリケーションの構築は、システム全体に影響を与える重要な決定です。既存のシステムに必要なリファクタリングを追加することも容易ではありません。この種の自動エラー修正の必要性は、通常、初期の要件ではないため、設計に多くの作業が必要になる可能性があります。「遡及イベント」を可能にするためには。

システムが外部システムに多くのリンクを持っている場合、それは「遡及イベント」の使用に相当な複雑さを加える可能性があります。完全な遡及処理には、実行するために完全な情報へのアクセスが必要であり、外部更新のあらゆる小さな変更が発生します。多くの外部システムとの統合がある場合(これは一般的な状況です)、それらとの遡及性をどのように処理するかを慎重に検討して、「遡及イベント」が使用できるアプローチであるかどうかを確認する必要があります。

「遡及イベント」は、すべてか無かの選択ではないことを忘れないでください。範囲を狭めるためにいくつかの方法で「遡及イベント」を制限できますが、それでもいくつかの有用性を維持できます。1つの削減は、システムのサブセット(特に外部の影響が少ない領域)にのみ「遡及イベント」を適用することです。実際、これが私が見たところです。アカウントで使用されています。もう1つの範囲の削減は時間です。多くのビジネスオペレーションは、毎週、毎月、毎年など、固定されたサイクルに収まります。この状況では、処理サイクル内(例:過去1週間)のイベントに対してのみ、「遡及イベント」を使用します。サイクル内ではよりアクティブな形式の「遡及イベント」、クローズされたサイクルではよりパッシブな形式を使用することもできます。

私が「遡及イベント」がまれであると考える最後の理由は、それを実現するために何をする必要があるかを理解していないと思うからです。このパターンが、この障害を取り除くのに役立ち、ひいては私たちが他の人を見つけるのに役立つことを願っています。

例:巻き戻しによる遡及貨物(C#)

私は「イベントソーシング」に関するほとんどの例を、船舶による港間での貨物の移動に基づいた例で説明してきました。今回は、「遡及イベント」の使用方法について、その例を続けます。

まず、「遡及イベント」の実施方法に関するいくつかの戦略的決定を行う必要があります。この例では、ライブモデルを巻き戻すことで分岐点に到達するというアプローチを使用します。この例は単純であるため、選択的な再生は使用せず、イベントリスト全体を巻き戻します。

巻き戻しを使用しているため、すべてのイベントが可逆的であることが不可欠です。イベント処理または逆転の詳細には触れません。「イベントソーシング」の例を参照して、それがどのように機能するかを確認してください。

「遡及イベント」の動作は、特別な種類のイベントによってトリガーされます。

class ReplacementEvent...

  private DomainEvent original, replacement;
  public DomainEvent Original {get { return original; }}
  public DomainEvent Replacement {get { return replacement; }}

  public ReplacementEvent(DomainEvent oldEvent, DomainEvent replacement)
    : base(oldEvent.Occurred) {
    this.original = oldEvent;
    this.replacement = replacement;
  }
  internal override void Process() {
    throw new Exception("Replacements should not be processed directly");
  }
  internal override void Reverse() {
    throw new Exception("Cannot reverse replacements");
  }

これらの場合、基本的なプロセスと逆転メソッドをブロックしていることに気付くでしょう。これらの例では、ほとんどの場合、イベントが独自の処理ロジックを処理することを好んでいました。置換は異なります。ドメインロジックを含まず、その処理にはイベントキューに関する知識と、そのイベントキューの操作が含まれます。イベントキューとの親密なやり取りは、イベントプロセッサだけに任せておく方が良いと思うので、実際にはすべての置換動作をイベントキューに組み込みました。

class EventProcessor...

  public void Process(DomainEvent ev) {
    try {
      if (ev is ReplacementEvent)
        ProcessReplacement((ReplacementEvent) ev);
      else if (OutOfOrder(ev))
        ProcessOutOfOrder(ev);
      else BasicProcessEvent(ev);
    } catch (Exception ex) {
      ev.ProcessingError = ex;
      if (ShouldRethrowExceptions) throw ex;
    }
    InsertToLog(ev);
  }

ここで、オブジェクト指向プログラミングの重大な罪を犯していることに気付くでしょう。メソッドの引数の型に基づいた明示的な条件付き動作です。私はこれを頻繁に行いませんが、ここではイベントプロセッサがキューに関する知識を自分自身に保持させたいので行います。(ダブルディスパッチを使用することもできますが、ここでは状況が複雑すぎてそれを正当化するには十分ではありません。)

拒否されたイベントについては、ここではケースがありません。後で見るように、それらを置換イベントがnullである置換として処理します。

最も単純なケースから始めます。基本的なProcess(およびreverse)メソッドは、必要な場合にトレース動作を追加できるようにする単純なラッパーです。

class EventProcessor...

  private void BasicProcessEvent(DomainEvent e) {
    traceProcess(e);
    e.Process();
  }
  private void BasicReverseEvent(DomainEvent e) {
    traceReverse(e);
    e.Reverse();
  }

シーケンス外のイベントは、最初に説明するのが最も簡単です。これらは、シーケンス外で受信した通常のイベントであり、置換イベントを使用してモデル化しません。本質的にこれらはストリームに新しいイベントを挿入します。プロセッサは、それを最後の一つと比較するだけで、シーケンス外のイベントをテストします。

class EventProcessor...

  private bool OutOfOrder(DomainEvent e) {
    if (LogIsEmpty()) return false;
    return (LastEvent().after(e));
  }
  private DomainEvent LastEvent() {
    if (LogIsEmpty()) return null;
    return (DomainEvent) log[log.Count - 1];
  }
  private bool LogIsEmpty() {
    return 0 == log.Count;
  }

この例を非常に単純にするために、発生日によってイベントの順序付けを行います。これを実際に行うには、時間ポイントの解像度を向上させるだけで十分な場合があります。場合によっては、他の要因が順序付けに影響を与える可能性があります。

シーケンス外ケースの処理の概要は簡単です。

class EventProcessor...

  private void ProcessOutOfOrder(DomainEvent e) {
    RewindTo(e);
    BasicProcessEvent(e);
    ReplayAfter(e);
  }

巻き戻すには、シーケンス外のイベントより後のログからすべてのイベントを選択します。簡略化のために、ログ(または少なくともそのキャッシュ)をメモリに保持しています。

class EventProcessor...

  private void RewindTo(DomainEvent priorEvent) {
    IList consequences = Consequences(priorEvent);
    for (int i = consequences.Count - 1; i >= 0; i--)
      BasicReverseEvent(((DomainEvent) consequences[i]));
  }
  private IList Consequences(DomainEvent baseEvent) {
    IList result = new ArrayList();
    foreach (DomainEvent candidate in log)
      if (candidate.IsConsequenceOf(baseEvent)) result.Add(candidate);
    return result;
  }

class DomainEvent...

  public bool IsConsequenceOf(DomainEvent other) {
    return (!ShouldIgnoreOnReplay && this.after(other));
  }

ご覧のとおり、選択的再生を使用していないにもかかわらず、すべてのイベントが巻き戻し中に再処理のために選択されるわけではありません。本質的に、エラーイベントまたは拒否されたイベントのいずれも再処理しません。

イベントを再び前方へ再生するのは簡単です。

class EventProcessor...

  private void ReplayAfter(DomainEvent ev) {
    foreach (DomainEvent e in Consequences(ev)) BasicProcessEvent(e);
  }

では、置換に進みましょう。

class EventProcessor...

  private void ProcessReplacement(ReplacementEvent e) {
    if (e.Original.ShouldIgnoreOnReplay)
      throw new ProcessingException("Cannot replace event twice");
    else if (null == e.Replacement) 
      ProcessRejection(e);
    else if (e.HasPriorReplacement) 
      ProcessPriorReplacement(e);
    else 
      ProcessPriorOriginal(e);
    
  }

class ReplacementEvent...

  public bool HasPriorReplacement {
    get {
      if (null == replacement) return false;
      else return original.after(replacement);
    }
  }

ここで処理するケースはいくつかあります。元のイベントがすでに無視するようにマークされている場合、拒否済みのイベントを拒否すべきではないため、処理エラーが発生しています。それを終えたら、3つの主なケースがあります。置換は拒否であり、置換イベントは元よりも早く、元は置換よりも早いです。

拒否のケースから始めましょう。これは、置換イベントがnullであることで示されます。ここでは、拒否されたイベントに巻き戻し、拒否し、反転し、前方へ再生します。

class EventProcessor...

  private void ProcessRejection(ReplacementEvent e) {
    RewindTo(e.Original);
    BasicReverseEvent(e.Original);
    e.Original.Reject();
    ReplayAfter(e.Original);
  }

class DomainEvent...

  public bool after (DomainEvent other) {
    return this.CompareTo(other) > 0;
  }
  public void Reject() {
    _isRejected = true;
  }
  private bool _isRejected;
  public virtual bool ShouldIgnoreOnReplay {
    get {
      if (WasProcessingError) return true;
      return _isRejected;
    }
  }

拒否すると、イベントがマークされ、再度処理または巻き戻されなくなります。

以前の置換がある場合、置換に巻き戻し、元を拒否し、置換を処理し、前方へ再生します。

class EventProcessor...

  private void ProcessPriorReplacement(ReplacementEvent e) {
    RewindTo(e.Replacement);
    e.Original.Reject();
    BasicProcessEvent(e.Replacement);
    ReplayAfter(e.Replacement);
  }

元のイベントが以前にある場合、元に戻し、反転して拒否し、置換まで再生し、処理し、前方への再生を続けます。

class EventProcessor...

  private void ProcessPriorOriginal(ReplacementEvent e) {
    RewindTo(e.Original);
    BasicReverseEvent(e.Original);
    e.Original.Reject();
    ReplayBetween(e.Original, e.Replacement);
    BasicProcessEvent(e.Replacement);
    ReplayAfter(e.Replacement);
  }
  private void ReplayBetween(DomainEvent first, DomainEvent last) {
    IList eventsToReplay = new ArrayList();
    foreach (DomainEvent e in log) {
      if (e.IsConsequenceOf(first) && last.after(e))
        eventsToReplay.Add(e);
    }
    foreach (DomainEvent e in eventsToReplay)
      BasicProcessEvent(e);
  }

ここでは、イベントが正常に処理されるまでログにイベントを追加しないという規則に従っています。これは、ログがアプリケーション状態と同じトランザクションにない場合に役立ちます。すべてが1つのトランザクションにある場合、分岐点に到達して前方へ再生した後に(新しいイベントを取得する)イベントをログに追加するだけです。

例:外部システムの更新(C#)

単純な「イベントソーシング」の場合、再生中に外部通知をオフにするだけで十分です。ただし、「遡及イベント」の場合、さらに進んで検出と修正を行う必要があります。巻き戻し/再生中にこれを行うには、生成されたすべてのイベントを保存して比較し、修正する必要があるものを決定します。次に、キャンセルメッセージと変更の新しいメッセージを送信するという単純な自動修正ケースを想定します。

ここでは、カナダを通過した貨物が米国の港に入港した場合、米国の税関当局に通知する必要があると述べることで、出荷の例を続けます。

外部通知は、到着イベントを処理しているときにCargoオブジェクトによって行われます。

class Cargo...

  public void HandleArrival(ArrivalEvent ev) {
    ev.priorCargoInCanada[this] = _hasBeenInCanada;
    if ("CA" == ev.Port.Country) 
      _hasBeenInCanada = true;
    if (HasBeenInCanada && "US" == ev.Port.Country) {
      Registry.CustomsGateway.Notify(ev.Occurred, ev.Ship, ev.Port);
      ev.WasNotificationSent = true;
    }
  }
  private bool _hasBeenInCanada = false;
  public bool HasBeenInCanada {get { return _hasBeenInCanada;}}

このイベントを反転するために、前方再生で通知を送信したかどうかをメモし、行った場合に再度送信します。

class Cargo...

  public void ReverseArrival(ArrivalEvent ev) {
    _hasBeenInCanada = (bool) ev.priorCargoInCanada[this];
    if (ev.WasNotificationSent) 
      Registry.CustomsGateway.Notify(ev.Occurred, ev.Ship, ev.Port);
  }

ここでは、ドメインモデルはイベント処理の再生ロジックを認識する必要がないという原則に従っています。各イベントに対して独自のステートを反転する方法を知っていますが、外部システムとの通信の複雑さを気にする必要はありません。

外部システムとの通信のロジックは、この場合はオブジェクトのクラスタであるゲートウェイによって処理されます。

図1:オブジェクトのクラスタは、遡及性を処理するためのゲートウェイを構成します。

この例では、CustomsGatewayFrontは、ドメイン指向インターフェース(ICustomsGatewayによって定義)を実際のメッセージングインフラストラクチャに変換する通常のゲートウェイです。それが何をしているかは安全に無視できます。メソッドを呼び出すと、税関にメッセージが届くことを確認すると仮定します。

興味深いことは、実際のCustomsGatewayFrontをラップするCustomsGatewayBufferによって駆動される前に発生します。同じインターフェースを実装しますが、遡及的な巻き戻しに対処する機能を追加します。

遡及的な巻き戻しの多くは汎用的であるため、汎用的な動作を異なるクラスReplayBufferに配置できます。これにより、Customs Gateway Bufferは税関のケースに固有のものだけを処理します。再生バッファは、巻き戻し/再生プロセスのさまざまな段階に対処するために、イベントプロセッサとの通信を必要とします。また、最終的に不一致イベントを調整する方法も必要であり、この場合は税関ゲートウェイバッファに割り当てます。これら両方のケースで、巻き戻しバッファは、その汎用性の純粋性を維持するために、コラボレーションインターフェースを介して通信します。

それが登場人物なので、アクションに入りましょう。ライブ実行時の通常のケースから始めます。税関ゲートウェイバッファは、更新イベントを作成して再生バッファに送信することで、通知操作を実装します。

class CustomsGatewayBuffer...

  public void Notify (DateTime arrivalDate, Ship ship, Port port) {
    CustomsNotificationEvent ev =
        new CustomsNotificationEvent(gateway, arrivalDate, ship, port);
    buffer.Send(ev);
  }
  ICustomsGateway gateway;
  ReplayBuffer buffer;

class CustomsNotificationEvent...

  public CustomsNotificationEvent(
    ICustomsGateway gateway, DateTime arrivalDate, 
    Ship ship, Port port) 
  {
    this.gateway = gateway;
    this.arrivalDate = arrivalDate;
    this.ship = ship;
    this.port = port;
  }

再生バッファは、アクティブな場合にイベントを処理し、実際のゲートウェイフロントへの呼び出しにつながります。

ReplayBufferクラス…

  internal void Send(IGatewayEvent ev) {
    current.Add(ev);
    if (isActive) 
      ev.Process();
  }

class CustomsNotificationEvent...

  public virtual void Process() {
    gateway.Notify(arrivalDate, ship, port) ;
  }

(Replayバッファがどのようにアクティブになり、現在のリストがどうなるかについては、後で説明します。)

ご覧のように、ドメインイベントの積極的な処理では、ゲートウェイバッファがゲートウェイイベントを作成し、それをReplayバッファに送信して処理することで、元の呼び出しが実際のゲートウェイに適用されます。これは非常に複雑な単純な委譲です。

この複雑さの利点は、遡及的なイベントの処理にあります。Replayバッファは、巻き戻し可能なインターフェースを介してイベントプロセッサに接続されています。

ReplayBufferクラス…

  IRewindable eventProcessor;
internal interface IRewindable  {
  event EventProcessor.EventHandler RewindStarted, RewindFinished, ReplayFinished;    
}

このインターフェースは、重要な3つのイベントを定義します。イベントプロセッサは、様々なケースの処理中にこれらのイベントをシグナルします。これが順序外れのケースです。

class EventProcessor...

  private void ProcessOutOfOrder(DomainEvent e) {
    RewindStarted();
    RewindTo(e);
    RewindFinished();
    BasicProcessEvent(e);
    ReplayAfter(e);
    ReplayFinished();
  }

置換の場合、全体的な開始と終了は共通ですが、各ケースでは巻き戻しが終了する時点を示すために、具体的な配置が必要です。

class EventProcessor...

  private void ProcessReplacement(ReplacementEvent e) {
    RewindStarted();
    if (e.Original.ShouldIgnoreOnReplay)
      throw new ProcessingException("Cannot replace event twice");
    else if (null == e.Replacement) 
      ProcessRejection(e);
    else if (e.HasPriorReplacement) 
      ProcessPriorReplacement(e);
    else 
      ProcessPriorOriginal(e);
    ReplayFinished();
  }
private void ProcessRejection(ReplacementEvent e) {
  RewindTo(e.Original);
  BasicReverseEvent(e.Original);
  RewindFinished();
  e.Original.Reject();
  ReplayAfter(e.Original);
}
private void ProcessPriorOriginal(ReplacementEvent e) {
  RewindTo(e.Original);
  BasicReverseEvent(e.Original);
  RewindFinished();
  e.Original.Reject();
  ReplayBetween(e.Original, e.Replacement);
  BasicProcessEvent(e.Replacement);
  ReplayAfter(e.Replacement);
}
private void ProcessPriorReplacement(ReplacementEvent e) {
  RewindTo(e.Replacement);
  RewindFinished();
  e.Original.Reject();
  BasicProcessEvent(e.Replacement);
  ReplayAfter(e.Replacement);
}

これらのイベントにより、Replayバッファは遡及的な処理を正しく処理するための正しいタイミング情報を取得します。ここではイベントを使用していますが、どのイベントプロセッサにも任意の数のReplayバッファが存在し、これらのオブジェクトについて、これらの3つのイベントを理解できること以外何も知る必要はありません。

それでは、Replayバッファに移り、それがどのようにこれらのイベントを処理するかを見てみましょう。Replayバッファは、複数のリストで作成されます。

ReplayBufferクラス…

  internal ReplayBuffer (IRewindable processor, IAdjustable adjuster) {
    this.eventProcessor = processor;
    this.adjuster = adjuster;
    SubscribeToProcessorEvents();
    sent = new ArrayList();
    isActive = true;
    current = sent;
  }
  IList sent,rewound, replayed, current;
  IAdjustable adjuster;
  private bool isActive;
  private void SubscribeToProcessorEvents() {
    eventProcessor.RewindStarted += 
      new EventProcessor.EventHandler(processor_RewindStarted);
    eventProcessor.RewindFinished += 
      new EventProcessor.EventHandler(processor_RewindFinished);
    eventProcessor.ReplayFinished += 
      new EventProcessor.EventHandler(processor_ReplayFinished);
  }

作成時にバッファはアクティブになり、送信されたイベントを処理します。`current list`変数は、巻き戻しの位置に応じて3つのコアリストのいずれかに設定されます。アクティブな場合、`sent list`を指します。これにより、実際に送信されたすべてのイベントの記録が得られます。また、イベントプロセッサの関連するすべてのイベントを購読します。(調整については今は気にしないでください。修正時に説明します。)

巻き戻しの開始時に、バッファは自身を非アクティブにして、イベントを実際のゲートウェイに渡さないようにします。また、`current list`変数を新しいリストを指すように切り替えて、巻き戻されたすべてのイベントをキャプチャできるようにします。

ReplayBufferクラス…

  private void processor_RewindStarted() {
    isActive = false;
    rewound = new ArrayList();
    current = rewound;
  }

この例から、巻き戻しが停止したときに何が起こるかはおそらく推測できると思いますが、念のため…

ReplayBufferクラス…

  private void processor_RewindFinished() {
    replayed = new ArrayList();
    current = replayed;
  }

ここまでで、最後のイベントが最も興味深いものであることは明らかです。ここで不一致リストを計算する必要があるからです。

ReplayBufferクラス…

  private void processor_ReplayFinished() {
    current = sent;
    DetermineChange();
    isActive = true;
    rewound = null;
    replayed = null;
  }
  private void DetermineChange() {
    IList matchingEvents = new ArrayList();
    foreach (IGatewayEvent ev in replayed) {
      if (rewound.Contains(ev)) matchingEvents.Add(ev);
    }
    foreach (IGatewayEvent ev in matchingEvents) {
      replayed.Remove(ev);
      rewound.Remove(ev);
    }
    adjuster.Adjust(rewound, replayed);
  }

アルゴリズムは非常に単純で、一致するイベントを見つけ、両方のリストから削除するだけです。完了すると、不一致イベントの2つのリストが得られ、調整器に渡して処理します。

CustomsGatewayBufferクラス…

  public void Adjust(IList oldEvents, IList newEvents) {
    foreach (CustomsNotificationEvent e in oldEvents)
      new CustomsCancellationEvent(e).Process();
    foreach (CustomsNotificationEvent e in newEvents)
      e.Process();
  }
class CustomsCancellationEvent {
  private CustomsNotificationEvent original;
  public CustomsCancellationEvent(CustomsNotificationEvent original) {
    this.original = original;
  }
  public void Process() {
    original.Gateway.Cancel(original.ArrivalDate, original.Ship, original.Port);
  }
}

この場合、巻き戻されたすべてのイベントのキャンセル通知と新しいイベントの新しい通知を送信するだけで、CustomsGatewayBufferに調整を処理させます。私の理想の世界では、政府機関は非常に協力的です。実際には、各調整を個別に検討する必要があり、非常に複雑になり、人的介入が必要になる可能性があります。しかし、イベントのリストは、外部システムに対して何を行う必要があるかを整理するのに大いに役立つはずです。