Azure Cosmos DB ChangeFeed + Functions

Azure

これは知っている、理解している人には別に大した話ではないのだけれど、ひょっとしてということがあるので。

例えば、Pythonで

  1. とあるデータをhttp/httpsで取得して
  2. そのデータをJSONに加工して保存して
  3. もしJSONの中に条件に該当するデータがあれば、さらに後続の処理を行う

というプログラムを書くとする。おそらく以下のように関数が密に結合した処理を思い浮かべるのが一般的だろう。

  1. main()からデータを取得する、例えば、urllib.requestで取得する関数を呼ぶ
  2. 1の関数の返値、ファイルパスからも知れないし、変数に格納したファイルの中身をJSONに加工する、例えば、xmltodictを使うとか
  3. 2の関数の返値、JSONのパスなり格納した変数をイテレータで回してチェック、条件に該当するデータを、後続の処理をする関数に渡す

これをAzure Cosmos DB ChangeFeedとAzure Functionsで実装すると、以下のようになる。

  1. Converter Functionsがデータを取得して、Cosmos for NoSQLで格納する
  2. Cosmos ChangeFeedがDetector Functionsをトリガーする。Detector Functionsは条件に該当するデータを検出して、それだけをCosmos DBに格納する
  3. Cosmos ChangeFeedがDispatcher Functionsをトリガーする。Dispatcherはまた別のCosmosやStorage Queueにデータを格納し、ChangeFeedやQueueトリガーによって後続の処理がキックされる。

それぞれのFunctionsはデータを1件だけ、といってもドキュメントなので「1件」の範囲をどう考えるのかは難しいけれど、それを処理してCosmosに格納したら完了。後続の処理については面倒を見ない。いわば「手離れの良い」処理が実装でき、もしいずれかの処理がコケても一連の処理全体がコケることがない。各Functionsは非常に小さいそれこそ「関数」でしかなく、処理の責任範囲が小さく、メンテナンスが簡単。

この例では、ChageFeedは1つのFunctions、つまりコンシューマーしかキックしていないけれど、複数のコンシューマーをキックすることも可能なので、例えば、DetectorをキックしているChangeFeedが、最初のConverterが格納したRAW dataをStorage Accountに保存するといったことが可能になる。そうすると、RAW dataは3600秒でCosmosから削除されるが、Storage Accountにファイルとして格納され続ける、といった処理が可能になる、と。