アクターの諸刃の剣:非同期のメッセージパッシングと状態隔離対障害モデル、そしてそれを克服するAkka Streamの可能性

ScalaのアクターライブラリであるAkkaでは今までHTTPがIOパッケージでサポートされてきませんでした。HTTPのサポートはAkka Streamの開発とともになされたのです。なぜアクターモデルとHTTPをすぐ統合せずにStreamの登場を待つことになったのか?この疑問に答えることはアクターの問題を考えるよい題材です。

Sprayの革新とその限界

SprayはAkkaのアクターモデルとHTTPを統合して成功したフレームワークです。SprayはJVMベースのフレームワークの中で最高のパフォーマンスを誇りました。SprayはServletがもつがスレッドの高速性と、NodeJSがもつイベントループによる大量のリクエストを効率的に捌くことの特性の両立を、アクターによって可能にしたのです。

ところがSprayはアクターモデルならではの問題とも戦うことになります。Reactive Manifestoが定義する特性に対障害性とメッセージ駆動があります。アクターでは対障害モデルにより、互いアクターの状態は完全に隔離され、その状態を知る唯一の手段は非同期のメッセージパッシングしかないという特性です。これが諸刃の剣となったのです。

すべてを非同期なメッセージパッシングにすることの弊害:メッセージやリソース消費速度調整の困難さ

問題を理解するためにメッセージを送ったらHTTPリクエストを送るアクターを作ります。リクエストを受けるサーバーは少し待ってからレスポンスを返すようにしておきます。このアクターにメッセージを送り続けると、後半のリクエストがタイムアウトし続ける現象がおきます。タイムアウトを起こしている箇所はここです

import akka.io.IO
import akka.util.Timeout
import spray.can.Http
import spray.client.pipelining._

trait RequestProvider { this: Actor =>
  import context.system
  import context.dispatcher

  implicit val timeout = Timeout(60 seconds)

  val requestUrl = "http://some.slow.server"

  lazy val pipeline = {
    sendReceive(IO(Http)(system)) ~> unmarshal[String]
  }

  def request(): Future[String] = pipeline(Get(requestUrl))
}

class SprayClientActor extends Actor with RequestProvider {
  import context.dispatcher

  def receive: Receive = {
    case "request" => request().onComplete {
      case Success(s) => println("response", s)
      case Failure(e) => println("error", e.getMessage)
    }
  }
}
[ERROR] [09/10/2015 19:21:44.934] [SprayClient-akka.actor.default-dispatcher-25] [akka://SprayClient/user/$a] failed
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://SprayClient/user/IO-HTTP#-302794656]] after [60000 ms]
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
    at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
    at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
    at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
    at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
    at java.lang.Thread.run(Thread.java:745)

まずこの現象をアクターの非同期性から理解します。アクターのメールボックスにメッセージを送るのは非同期に行われます。なのでメッセージを送ると制御はすぐに返ります。さらにSprayのHTTPクライアントはノンブロッキングIOです。なのでリクエストを送ると制御はすぐに返ります。これらはスレッドプールからスレッドをつかめたときに行われ、処理時間は一瞬です。 一方HTTPリクエストが返ってくるのには時間がかかります。またHTTPの仕様によりリクエストが返ってくるまで次のリクエストは送れません。その結果HTTPコネクションが開放されて利用可能になる回転率はスレッドよりもはるかに悪くなります。

Sprayはこの問題を軽減するためにホストごとに複数のHTTPコネクションをプールしたり、HTTP pipeliningでリクエストが返る前に次のリクエストを送れるようにしています。しかしHTTPコネクションは外部のサービスだと張れる数に限界がありますし、HTTP pipeliningもサーバーはリクエストを受けた順にレスポンスを返さなければならないという制約がある以上、1つでもレスポンスが遅くなるとすべてレスポンスが遅れてしまいます。

ところでHTTPリクエストを行うスレッドをレスポンスが返るまでブロックすれば、たしかにスレッドとコネクション両者の消費速度は一致します。これはブロッキングIOを使ったレガシーなクライアントが行ってきたことです。しかしこれではアクターの性能を発揮できないばかりでなく、デッドロックの危険性を高めます。

根本的に問題を解決するには、スレッドとコネクションという特性の異なる2つのリソース、これらの消費速度を非同期に調整する必要があります。

状態隔離が引き起こす最悪の事態:メモリーの枯渇によるVMのクラッシュ

アクターの対障害性を支える状態隔離も問題を難しくしています。

上記の問題はIOにかかわらず速い生産者と遅い消費者が混在する以上起こりえます。その場合遅い消費者のメールボックスがメッセージで溢れかえることになります。

これは時間が経過するごとにメールボックス中にメッセージが貯まり続けることを引き起こし、いずれメモリーの枯渇を招きます。OutOfMemoryErrorは致命的なエラーなので、アクターシステムは対処を諦め、VM自体のクラッシュに至ります。

またこれは悪意ある過剰なリクエストでVMを落とせる可能性があることを意味します。

これはとても皮肉なことです。状態を隔離しそれを知ることをできなくすることで高めていた対障害性が、最悪の事態であるVMのクラッシュに導く危険性を高めているのですから。

状態が隔離されたアクターはどうやって相手に自分の状況を伝えるといいのでしょうか?それはメッセージパッシングで伝えるしかありません。消費が追いついていないので、相手にメッセージを送るスピードを下げてほしいというメッセージを送るのです。

Akka Streamという解決策

Akka Streamは目標の1つとして、"No more OutOfMemoryErrors"をかかげ、この問題に対処してきました。 Akka Streamはメッセージの消費速度をアクター間で調整する仕組みをもつことで、これらの問題を解決しています。生産者であるActorPublisherと消費者であるActorSubscriberのコードを以下に示します。receive関数の中に注目してください。ActorPublisherは Request(n: Int)メッセージを受け取ることで下流のアクターがいくつメッセージを要求するのか教えてもらいます。ActorSubscriberはOnNext(element: Any)メッセージで処理するメッセージを受け取り、OnCompleteメッセージで上流のアクターのメッセージの生成が完了したことを受信し、OnError(cause: Throwable)メッセージで上流でエラーがあったことを知ることができます。この仕組によって互いの消費スピードを自動調整するのです。Reactive Extensionに似たAPIですね。

ActorPublisher

class TweetPublisher extends ActorPublisher[Tweet] {
  import akka.stream.actor.ActorPublisherMessage._
  import TweetPublisher._

  var buffer = Vector.empty[Tweet]

  def receive: Receive = {
    case Request(_) => deliverBuffers()
    case Cancel => context.stop(self)
    case Post(tweet) => {
      buffer = buffer :+ tweet
      deliverBuffers()
    }
  }

  @annotation.tailrec
  final def deliverBuffers(): Unit = {
    if (totalDemand > 0 && !buffer.isEmpty) {
      val (head, tail) = (buffer.head, buffer.tail)
      buffer = tail
      onNext(head)
      deliverBuffers()
    }
  }
}

object TweetPublisher {
  case class Post(tweet: Tweet)
  def props = Props(new TweetPublisher)
}

ActorSubscriber

class TweetSubscriber extends ActorSubscriber with ActorLogging {
  import ActorSubscriberMessage._

  val requestStrategy = ZeroRequestStrategy

  request(1)

  def receive: Receive = {
    case OnNext(Tweet(message, hashTags)) => {
      log.info(s"tweet `$message` with tags ${hashTags.mkString(", ")}")
      request(1)
    }
    case OnComplete => log.info("OnComplete: This actor will be stopped")
    case OnError(e) => log.error(e, "OnError: This actor will be stopped")
  }
}

object TweetSubscriber {
  def props = Props(new TweetSubscriber)
}

Akka Streamのこのメッセージの消費速度の自動調整、背圧制御はAkka HTTPにも応用され、スレッドとコネクションという2つのリソース消費速度の自動調整を可能にしています。

Akka Streamを使わずに問題に対処するには

地道に対処するしかなさそうです。

Akka Streamのアクターのコードで見たようにメッセージパッシングを使って背圧制御を実現できます。これを自分で実装するとよいでしょう。

またアクターの最適化では遅い消費者に対して別のスレッドプールを割り与えます。こうして遅い消費者に必要な分のスレッドを与えかつ他の部分と独立に処理を進められるようにします。これにより各アクターのメールボックスの量のバランスをとり、リソースが許す限り滞留メッセージが均等に0に近くなるように調整することができます。

もはやメモリーの制約をも超えるAkka Persistence Query

少し未来の話をします。

Akka Persistenceはメッセージを永続化することによりVMごとクラッシュしてもメッセージをイベントソーシング経由で復元することを可能にします。

Akka Persistenceではメッセージの書き込みはできたのですが、書き込んだメッセージを検索して取得することはできませんでした。これはコマンドクエリー責務分離に従っているからです。

Akka Persistence QueryはAkka Persistenceの読み込み専用のAPIです。Akka Persistence Queryが革命的な点は、それがAkka Streamベースに作られていることです。これがどういうことを意味するかというと、背圧制御により下流から要求されるまでデータをディスクからメモリーに読み込まないということです。これは実質的にメモリーの制約を取り払います。もはやOutOfMemoryErrorとはお別れです。Akka Persistence Queryの登場によりSQSのようなキューサーバーとアクターのユースケースの差異が無くなる未来が来るかもしれません。

まとめ

リアクティブシステムを実現するための最適解であるアクターは、非同期メッセージパッシングと状態隔離によりメッセージやリソースの消費速度の調整が困難でありメモリーの枯渇の危険があるという問題がありました。これはアクターに背圧制御を取り入れたAkka Streamが解決します。またAkka StreamはAkka Persistence Queryなどの新たな技術の核になり、次世代のアクタープログラミングを牽引していくでしょう。