diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 1b1d995732..875cbab6f3 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -243,6 +243,34 @@ akka.persistence.journal.leveldb-shared { } } +akka.persistence.journal.proxy { + # Class name of the plugin. + class = "akka.persistence.journal.JournalProxy" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + # Set this to on in the configuration of the ActorSystem + # that will host the target journal + start-target-journal = off + # The journal plugin config path to use for the target journal + target-journal-plugin = "" + # Initialization timeout of target lookup + init-timeout = 10s +} + +akka.persistence.snapshot-store.proxy { + # Class name of the plugin. + class = "akka.persistence.journal.JournalProxy" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + # Set this to on in the configuration of the ActorSystem + # that will host the target snapshot-store + start-target-snapshot-store = off + # The journal plugin config path to use for the target snapshot-store + target-snapshot-store-plugin = "" + # Initialization timeout of target lookup + init-timeout = 10s +} + # LevelDB persistence requires the following dependency declarations: # # SBT: diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/JournalProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/JournalProxy.scala new file mode 100644 index 0000000000..02e2dc55ff --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/JournalProxy.scala @@ -0,0 +1,181 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.journal + +import akka.util.Helpers.Requiring +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.Stash +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorRef +import akka.persistence.JournalProtocol +import akka.actor.ActorSystem +import akka.persistence.Persistence +import scala.util.control.NoStackTrace +import java.util.concurrent.TimeoutException +import akka.persistence.AtomicWrite +import akka.persistence.NonPersistentRepr +import akka.persistence.DeleteMessagesFailure +import akka.actor.ActorLogging +import com.typesafe.config.Config +import akka.actor.Address +import akka.actor.ActorIdentity +import akka.actor.RootActorPath +import akka.actor.Identify +import akka.actor.ReceiveTimeout +import akka.actor.ExtendedActorSystem +import akka.persistence.SaveSnapshotFailure +import akka.persistence.DeleteSnapshotFailure +import akka.persistence.DeleteSnapshotsFailure +import akka.persistence.SnapshotProtocol + +object JournalProxy { + final case class TargetLocation(address: Address) + private case object InitTimeout + + def setTargetLocation(system: ActorSystem, address: Address): Unit = { + Persistence(system).journalFor(null) ! TargetLocation(address) + if (system.settings.config.getString("akka.persistence.snapshot-store.plugin") != "") + Persistence(system).snapshotStoreFor(null) ! TargetLocation(address) + } + + private sealed trait PluginType { + def qualifier: String + } + private case object Journal extends PluginType { + override def qualifier: String = "journal" + } + private case object SnapshotStore extends PluginType { + override def qualifier: String = "snapshot-store" + } +} + +// FIXME document me +final class JournalProxy(config: Config) extends Actor with Stash with ActorLogging { + import JournalProxy._ + import JournalProtocol._ + import SnapshotProtocol._ + + private val pluginId = self.path.name + private val pluginType: PluginType = pluginId match { + case "akka.persistence.journal.proxy" ⇒ Journal + case "akka.persistence.snapshot-store.proxy" ⇒ SnapshotStore + case other ⇒ + throw new IllegalArgumentException("Unknown plugin type: " + other) + } + + private val timeout: FiniteDuration = config.getDuration("init-timeout", MILLISECONDS).millis + private val targetPluginId: String = { + val key = s"target-${pluginType.qualifier}-plugin" + config.getString(key).requiring(_ != "", s"$pluginId.$key must be defined") + } + private val startTarget: Boolean = config.getBoolean(s"start-target-${pluginType.qualifier}") + + override def preStart(): Unit = { + if (startTarget) { + val target = pluginType match { + case Journal ⇒ + log.info("Starting target journal [{}]", targetPluginId) + Persistence(context.system).journalFor(targetPluginId) + case SnapshotStore ⇒ + log.info("Starting target snapshot-store [{}]", targetPluginId) + Persistence(context.system).snapshotStoreFor(targetPluginId) + } + context.become(active(target, targetAtThisNode = true)) + } else { + context.system.scheduler.scheduleOnce(timeout, self, InitTimeout)(context.dispatcher) + } + } + + private val selfAddress: Address = + context.system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + + private def timeoutException() = new TimeoutException(s"Target ${pluginType.qualifier} not initialized. " + + "Use `JournalProxy.setTargetLocation`") + + def receive = init + + def init: Receive = { + case TargetLocation(address) ⇒ + context.setReceiveTimeout(1.second) // for retries + context.become(identifying(address)) + case InitTimeout ⇒ + log.info("Initialization timeout, Use `JournalProxy.setTargetLocation`") + context.become(initTimedOut) + unstashAll() // will trigger appropriate failures + case msg ⇒ + stash() + } + + def becomeIdentifying(address: Address): Unit = { + sendIdentify(address) + context.setReceiveTimeout(1.second) // for retries + context.become(identifying(address)) + } + + def sendIdentify(address: Address): Unit = { + val sel = context.actorSelection(RootActorPath(address) / "system" / targetPluginId) + log.info("Trying to identify target {} at {}", pluginType.qualifier, sel) + sel ! Identify(targetPluginId) + } + + def identifying(address: Address): Receive = ({ + case ActorIdentity(`targetPluginId`, Some(target)) ⇒ + log.info("Found target {} at [{}]", pluginType.qualifier, address) + context.setReceiveTimeout(Duration.Undefined) + unstashAll() + context.become(active(target, address == selfAddress)) + case _: ActorIdentity ⇒ // will retry after ReceiveTimeout + case ReceiveTimeout ⇒ + sendIdentify(address) + }: Receive).orElse(init) + + def active(targetJournal: ActorRef, targetAtThisNode: Boolean): Receive = { + case TargetLocation(address) ⇒ + if (targetAtThisNode && address != selfAddress) + becomeIdentifying(address) + case InitTimeout ⇒ + case msg ⇒ + targetJournal.forward(msg) + } + + def initTimedOut: Receive = { + + case req: JournalProtocol.Request ⇒ req match { // exhaustive match + case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ + persistentActor ! WriteMessagesFailed(timeoutException) + messages.foreach { + case a: AtomicWrite ⇒ + a.payload.foreach { p ⇒ + persistentActor ! WriteMessageFailure(p, timeoutException, actorInstanceId) + } + case r: NonPersistentRepr ⇒ + persistentActor ! LoopMessageSuccess(r.payload, actorInstanceId) + } + case ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ + persistentActor ! ReplayMessagesFailure(timeoutException) + case DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ + persistentActor ! DeleteMessagesFailure(timeoutException, toSequenceNr) + } + + case req: SnapshotProtocol.Request ⇒ req match { // exhaustive match + case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ + sender() ! LoadSnapshotResult(None, toSequenceNr) + case SaveSnapshot(metadata, snapshot) ⇒ + sender() ! SaveSnapshotFailure(metadata, timeoutException) + case DeleteSnapshot(metadata) ⇒ + sender() ! DeleteSnapshotFailure(metadata, timeoutException) + case DeleteSnapshots(persistenceId, criteria) ⇒ + sender() ! DeleteSnapshotsFailure(criteria, timeoutException) + } + + case TargetLocation(address) ⇒ + becomeIdentifying(address) + + case other ⇒ + val e = timeoutException() + log.error(e, "Failed JournalProxy request: {}", e.getMessage) + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index f6e7a24a74..3339af6052 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -5,28 +5,45 @@ package akka.persistence.journal.inmem import scala.collection.immutable +import scala.concurrent.Future import scala.concurrent.duration._ -import scala.language.postfixOps -import akka.actor._ -import akka.persistence._ -import akka.persistence.journal.AsyncWriteJournal -import akka.persistence.journal.{ WriteJournalBase, AsyncWriteProxy, AsyncWriteTarget } -import akka.util.Timeout import scala.util.Try +import akka.persistence.journal.AsyncWriteJournal +import akka.persistence.PersistentRepr +import akka.persistence.AtomicWrite /** * INTERNAL API. * * In-memory journal for testing purposes only. */ -private[persistence] class InmemJournal extends AsyncWriteProxy { - import AsyncWriteProxy.SetStore +private[persistence] class InmemJournal extends AsyncWriteJournal with InmemMessages { + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { + for (w ← messages; p ← w.payload) + add(p) + Future.successful(Nil) // all good + } - val timeout = Timeout(5 seconds) + override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + Future.successful(highestSequenceNr(persistenceId)) + } - override def preStart(): Unit = { - super.preStart() - self ! SetStore(context.actorOf(Props[InmemStore])) + override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( + recoveryCallback: PersistentRepr ⇒ Unit): Future[Unit] = { + val highest = highestSequenceNr(persistenceId) + if (highest != 0L && max != 0L) + read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach(recoveryCallback) + Future.successful(()) + } + + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + val toSeqNr = math.min(toSequenceNr, highestSequenceNr(persistenceId)) + var snr = 1L + while (snr <= toSeqNr) { + delete(persistenceId, snr) + snr += 1 + } + Future.successful(()) } } @@ -69,31 +86,3 @@ private[persistence] trait InmemMessages { if (Int.MaxValue < l) Int.MaxValue else l.toInt } -/** - * INTERNAL API. - */ -private[persistence] class InmemStore extends Actor with InmemMessages with WriteJournalBase { - import AsyncWriteTarget._ - - def receive = { - case WriteMessages(msgs) ⇒ - val results: immutable.Seq[Try[Unit]] = - for (a ← msgs) yield { - Try(a.payload.foreach(add)) - } - sender() ! results - case DeleteMessagesTo(pid, tsnr) ⇒ - val toSeqNr = math.min(tsnr, highestSequenceNr(pid)) - var snr = 1L - while (snr <= toSeqNr) { - delete(pid, snr) - snr += 1 - } - sender().tell((), self) - case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - val highest = highestSequenceNr(pid) - if (highest != 0L && max != 0L) - read(pid, fromSnr, math.min(toSnr, highest), max).foreach { sender() ! _ } - sender() ! ReplaySuccess(highest) - } -} diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 98e93b1834..0411fb7436 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -6,16 +6,16 @@ package akka.persistence import akka.actor.{ OneForOneStrategy, _ } import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages } -import akka.persistence.journal.inmem.InmemStore -import akka.persistence.journal.{ AsyncWriteJournal, AsyncWriteProxy } +import akka.persistence.journal.AsyncWriteJournal import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } import akka.util.Timeout - import scala.collection.immutable import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NoStackTrace import scala.util.{ Failure, Try } +import akka.persistence.journal.inmem.InmemJournal +import scala.concurrent.Future object PersistentActorFailureSpec { import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } @@ -23,46 +23,42 @@ object PersistentActorFailureSpec { class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace class SimulatedSerializationException(msg: String) extends RuntimeException(msg) with NoStackTrace - class FailingInmemJournal extends AsyncWriteProxy { - import AsyncWriteProxy.SetStore + class FailingInmemJournal extends InmemJournal { - val timeout = Timeout(3 seconds) - - override def preStart(): Unit = { - super.preStart() - self ! SetStore(context.actorOf(Props[FailingInmemStore]())) + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { + if (isWrong(messages)) throw new SimulatedException("Simulated Store failure") + else { + val ser = checkSerializable(messages) + if (ser.exists(_.isFailure)) + Future.successful(ser) + else + super.asyncWriteMessages(messages) + } } - } - - class FailingInmemStore extends InmemStore { - def failingReceive: Receive = { - case w: WriteMessages if isWrong(w) ⇒ - throw new SimulatedException("Simulated Store failure") - case w: WriteMessages if checkSerializable(w).exists(_.isFailure) ⇒ - sender() ! checkSerializable(w) - case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - val highest = highestSequenceNr(pid) - val readFromStore = read(pid, fromSnr, toSnr, max) - if (readFromStore.isEmpty) - sender() ! ReplaySuccess(highest) - else if (isCorrupt(readFromStore)) - sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr")) - else { - readFromStore.foreach(sender() ! _) - sender() ! ReplaySuccess(highest) - } + override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( + recoveryCallback: PersistentRepr ⇒ Unit): Future[Unit] = { + val highest = highestSequenceNr(persistenceId) + val readFromStore = read(persistenceId, fromSequenceNr, toSequenceNr, max) + if (readFromStore.isEmpty) + Future.successful(()) + else if (isCorrupt(readFromStore)) + Future.failed(new SimulatedException(s"blahonga $fromSequenceNr $toSequenceNr")) + else { + readFromStore.foreach(recoveryCallback) + Future.successful(()) + } } - def isWrong(w: WriteMessages): Boolean = - w.messages.exists { + def isWrong(messages: immutable.Seq[AtomicWrite]): Boolean = + messages.exists { case a: AtomicWrite ⇒ a.payload.exists { case PersistentRepr(Evt(s: String), _) ⇒ s.contains("wrong") } case _ ⇒ false } - def checkSerializable(w: WriteMessages): immutable.Seq[Try[Unit]] = - w.messages.collect { + def checkSerializable(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = + messages.collect { case a: AtomicWrite ⇒ a.payload.collectFirst { case PersistentRepr(Evt(s: String), _: Long) if s.contains("not serializable") ⇒ s @@ -75,7 +71,6 @@ object PersistentActorFailureSpec { def isCorrupt(events: Seq[PersistentRepr]): Boolean = events.exists { case PersistentRepr(Evt(s: String), _) ⇒ s.contains("corrupt") } - override def receive = failingReceive.orElse(super.receive) } class OnRecoveryFailurePersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/SteppingInmemJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/SteppingInmemJournal.scala index 261bc37286..09c3e4fb9d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/SteppingInmemJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/SteppingInmemJournal.scala @@ -11,11 +11,11 @@ import akka.persistence.{ AtomicWrite, PersistentRepr } import akka.util.Timeout import akka.testkit._ import com.typesafe.config.{ ConfigFactory, Config } - import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.concurrent.{ Await, Future, Promise } import scala.util.Try +import scala.util.Success object SteppingInmemJournal { @@ -97,7 +97,10 @@ final class SteppingInmemJournal extends InmemJournal { val promise = Promise[Try[Unit]]() val future = promise.future doOrEnqueue { () ⇒ - promise.completeWith(super.asyncWriteMessages(Seq(message)).map(_.head)) + promise.completeWith(super.asyncWriteMessages(Seq(message)).map { + case Nil ⇒ AsyncWriteJournal.successUnit + case head :: _ ⇒ head + }) future.map(_ ⇒ ()) } future