diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index fe8a1f0024..8e9c8c1efa 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -24,7 +24,7 @@ Akka persistence is a separate jar file. Make sure that you have the following d com.typesafe.akka - akka-persistence_@binVersion@ + akka-persistence-experimental_@binVersion@ @version@ diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 078408a3d2..899dfa30ef 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -22,7 +22,7 @@ Dependencies Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:: - "com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@ + "com.typesafe.akka" %% "akka-persistence-experimental" % "@version@" @crossString@ Architecture ============ diff --git a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala index b4d20176f5..e7b5cb9a6e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala @@ -11,9 +11,9 @@ object ChannelSpec { |akka.persistence.journal.leveldb.dir = "target/journal-channel-spec" """.stripMargin - class TestProcessor extends Processor { + class TestProcessor(name: String) extends NamedProcessor(name) { val destination = context.actorOf(Props[TestDestination]) - val channel = context.actorOf(Channel.props(), "channel") + val channel = context.actorOf(Channel.props("channel")) def receive = { case m @ Persistent(s: String, _) if s.startsWith("a") ⇒ { @@ -51,7 +51,7 @@ class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with val forwardProbe = TestProbe() val replyProbe = TestProbe() - val processor = system.actorOf(Props[TestProcessor], name) + val processor = system.actorOf(Props(classOf[TestProcessor], name)) system.eventStream.subscribe(confirmProbe.ref, classOf[Confirm]) @@ -65,61 +65,53 @@ class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with // for replay so that channels can drop confirmed messages) confirmProbe.expectMsgType[Confirm] confirmProbe.expectMsgType[Confirm] + } - stopAndAwaitTermination(processor) + def actorRefFor(topLevelName: String) = { + extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName) } "A channel" must { "forward un-confirmed messages to destination" in { - val processor = system.actorOf(Props[TestProcessor], name) + val processor = system.actorOf(Props(classOf[TestProcessor], name)) processor ! Persistent("a2") expectMsgPF() { case m @ Persistent("fw: a2", _) ⇒ m.confirm() } } "reply un-confirmed messages to senders" in { - val processor = system.actorOf(Props[TestProcessor], name) + val processor = system.actorOf(Props(classOf[TestProcessor], name)) processor ! Persistent("b2") expectMsgPF() { case m @ Persistent("re: b2", _) ⇒ m.confirm() } } "must resolve sender references and preserve message order" in { - val channel = system.actorOf(Channel.props(), "testChannel1") + val channel = system.actorOf(Channel.props()) val destination = system.actorOf(Props[TestDestination]) - val sender1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender") - channel tell (Deliver(Persistent("a"), destination), sender1) - expectMsg("a") - stopAndAwaitTermination(sender1) - - // create new incarnation of sender (with same actor path) - val sender2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender") + val empty = actorRefFor("testSender") // will be an EmptyLocalActorRef + val sender = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender") // replayed message (resolved = false) and invalid sender reference - channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), sender1) + channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), empty) // new messages (resolved = true) and valid sender references - channel tell (Deliver(Persistent("b"), destination), sender2) - channel tell (Deliver(Persistent("c"), destination), sender2) + channel tell (Deliver(Persistent("b"), destination), sender) + channel tell (Deliver(Persistent("c"), destination), sender) expectMsg("a") expectMsg("b") expectMsg("c") } "must resolve destination references and preserve message order" in { - val channel = system.actorOf(Channel.props(), "testChannel2") - val destination1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") + val channel = system.actorOf(Channel.props()) - channel ! Deliver(Persistent("a"), destination1) - expectMsg("a") - stopAndAwaitTermination(destination1) - - // create new incarnation of destination (with same actor path) - val destination2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") + val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef + val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") // replayed message (resolved = false) and invalid destination reference - channel ! Deliver(PersistentImpl("a", resolved = false), destination1, Resolve.Destination) + channel ! Deliver(PersistentImpl("a", resolved = false), empty, Resolve.Destination) // new messages (resolved = true) and valid destination references - channel ! Deliver(Persistent("b"), destination2) - channel ! Deliver(Persistent("c"), destination2) + channel ! Deliver(Persistent("b"), destination) + channel ! Deliver(Persistent("c"), destination) expectMsg("a") expectMsg("b") diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 274aca85d4..5bb808a107 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -3,12 +3,13 @@ package akka.persistence import java.io.File import java.util.concurrent.atomic.AtomicInteger +import scala.reflect.ClassTag + import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfterEach -import akka.actor.ActorRef +import akka.actor.Props import akka.testkit.AkkaSpec -import akka.testkit.TestActor.Watch trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ private var _name: String = _ @@ -26,11 +27,11 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ */ def namePrefix: String = "processor" - protected def stopAndAwaitTermination(ref: ActorRef) { - testActor ! Watch(ref) - system.stop(ref) - expectTerminated(ref) - } + /** + * Creates a processor with current name as constructor argument. + */ + def namedProcessor[T <: NamedProcessor: ClassTag] = + system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name)) override protected def beforeEach() { _name = namePrefix + counter.incrementAndGet() @@ -41,6 +42,10 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ } } +abstract class NamedProcessor(name: String) extends Processor { + override def processorId: String = name +} + trait TurnOffRecoverOnStart { this: Processor ⇒ override def preStartProcessor(): Unit = () } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index 7f88277343..b009812fc8 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -13,7 +13,7 @@ object ProcessorSpec { case object GetState - class RecoverTestProcessor extends Processor { + class RecoverTestProcessor(name: String) extends NamedProcessor(name) { var state = List.empty[String] def receive = { case "boom" ⇒ throw new Exception("boom") @@ -31,22 +31,22 @@ object ProcessorSpec { } } - class RecoverOffTestProcessor extends RecoverTestProcessor with TurnOffRecoverOnStart + class RecoverOffTestProcessor(name: String) extends RecoverTestProcessor(name) with TurnOffRecoverOnStart - class StoredSenderTestProcessor extends Processor { + class StoredSenderTestProcessor(name: String) extends NamedProcessor(name) { def receive = { case Persistent(payload, _) ⇒ sender ! payload } } - class RecoveryStatusTestProcessor extends Processor { + class RecoveryStatusTestProcessor(name: String) extends NamedProcessor(name) { def receive = { case Persistent("c", _) if !recoveryRunning ⇒ sender ! "c" case Persistent(payload, _) if recoveryRunning ⇒ sender ! payload } } - class BehaviorChangeTestProcessor extends Processor { + class BehaviorChangeTestProcessor(name: String) extends NamedProcessor(name) { val acceptA: Actor.Receive = { case Persistent("a", _) ⇒ { sender ! "a" @@ -64,7 +64,7 @@ object ProcessorSpec { def receive = acceptA } - class FsmTestProcessor extends Processor with FSM[String, Int] { + class FsmTestProcessor(name: String) extends NamedProcessor(name) with FSM[String, Int] { startWith("closed", 0) when("closed") { @@ -80,7 +80,7 @@ object ProcessorSpec { } } - class OutboundMessageTestProcessor extends Processor { + class OutboundMessageTestProcessor(name: String) extends NamedProcessor(name) { def receive = { case Persistent(payload, snr) ⇒ sender ! Persistent(snr) } @@ -88,8 +88,8 @@ object ProcessorSpec { class ResumeTestException extends Exception("test") - class ResumeTestSupervisor extends Actor { - val processor = context.actorOf(Props[ResumeTestProcessor], "processor") + class ResumeTestSupervisor(name: String) extends Actor { + val processor = context.actorOf(Props(classOf[ResumeTestProcessor], name)) override val supervisorStrategy = OneForOneStrategy() { @@ -101,7 +101,7 @@ object ProcessorSpec { } } - class ResumeTestProcessor extends Processor { + class ResumeTestProcessor(name: String) extends NamedProcessor(name) { var state: List[String] = Nil def receive = { case "boom" ⇒ throw new ResumeTestException @@ -110,7 +110,7 @@ object ProcessorSpec { } } - class LastReplayedMsgFailsTestProcessor extends RecoverTestProcessor { + class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { message match { case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) @@ -120,7 +120,7 @@ object ProcessorSpec { } } - class AnyReplayedMsgFailsTestProcessor extends RecoverTestProcessor { + class AnyReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { val failOnReplayedA: Actor.Receive = { case Persistent("a", _) if recoveryRunning ⇒ throw new Exception("boom") } @@ -135,34 +135,33 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec override protected def beforeEach() { super.beforeEach() - val processor = system.actorOf(Props[RecoverTestProcessor], name) + val processor = namedProcessor[RecoverTestProcessor] processor ! Persistent("a") processor ! Persistent("b") processor ! GetState expectMsg(List("a-1", "b-2")) - stopAndAwaitTermination(processor) } "A processor" must { "recover state on explicit request" in { - val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + val processor = namedProcessor[RecoverOffTestProcessor] processor ! Recover() processor ! GetState expectMsg(List("a-1", "b-2")) } "recover state automatically" in { - val processor = system.actorOf(Props[RecoverTestProcessor], name) + val processor = namedProcessor[RecoverTestProcessor] processor ! GetState expectMsg(List("a-1", "b-2")) } "recover state automatically on restart" in { - val processor = system.actorOf(Props[RecoverTestProcessor], name) + val processor = namedProcessor[RecoverTestProcessor] processor ! "boom" processor ! GetState expectMsg(List("a-1", "b-2")) } "buffer new messages until recovery completed" in { - val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + val processor = namedProcessor[RecoverOffTestProcessor] processor ! Persistent("c") processor ! Recover() processor ! Persistent("d") @@ -170,7 +169,7 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec expectMsg(List("a-1", "b-2", "c-3", "d-4")) } "ignore redundant recovery requests" in { - val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + val processor = namedProcessor[RecoverOffTestProcessor] processor ! Persistent("c") processor ! Recover() processor ! Persistent("d") @@ -180,7 +179,7 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5")) } "buffer new messages until restart-recovery completed" in { - val processor = system.actorOf(Props[RecoverTestProcessor], name) + val processor = namedProcessor[RecoverTestProcessor] processor ! "boom" processor ! Persistent("c") processor ! Persistent("d") @@ -188,13 +187,13 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec expectMsg(List("a-1", "b-2", "c-3", "d-4")) } "allow deletion of journaled messages on failure" in { - val processor = system.actorOf(Props[RecoverTestProcessor], name) + val processor = namedProcessor[RecoverTestProcessor] processor ! Persistent("boom") // journaled message causes failure and will be deleted processor ! GetState expectMsg(List("a-1", "b-2")) } "allow deletion of journaled messages on failure and buffer new messages until restart-recovery completed" in { - val processor = system.actorOf(Props[RecoverTestProcessor], name) + val processor = namedProcessor[RecoverTestProcessor] processor ! Persistent("boom") // journaled message causes failure and will be deleted processor ! Persistent("c") processor ! Persistent("d") @@ -202,66 +201,63 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec expectMsg(List("a-1", "b-2", "c-4", "d-5")) // deleted message leaves gap in sequence } "store sender references and restore them for replayed messages" in { - system.actorOf(Props[StoredSenderTestProcessor], name) + namedProcessor[StoredSenderTestProcessor] List("a", "b") foreach (expectMsg(_)) } "properly indicate its recovery status" in { - val processor = system.actorOf(Props[RecoveryStatusTestProcessor], name) + val processor = namedProcessor[RecoveryStatusTestProcessor] processor ! Persistent("c") List("a", "b", "c") foreach (expectMsg(_)) } "continue journaling when changing behavior" in { - val processor = system.actorOf(Props[BehaviorChangeTestProcessor], name) + val processor = namedProcessor[BehaviorChangeTestProcessor] processor ! Persistent("a") processor ! Persistent("b") List("a", "b", "a", "b") foreach (expectMsg(_)) } "derive outbound messages from the current message" in { - val processor = system.actorOf(Props[OutboundMessageTestProcessor], name) + val processor = namedProcessor[OutboundMessageTestProcessor] processor ! Persistent("c") 1 to 3 foreach { _ ⇒ expectMsgPF() { case Persistent(payload, snr) ⇒ payload must be(snr) } } } "support recovery with upper sequence number bound" in { - val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + val processor = namedProcessor[RecoverOffTestProcessor] processor ! Recover(1L) processor ! GetState expectMsg(List("a-1")) } "never replace journaled messages" in { - val processor1 = system.actorOf(Props[RecoverOffTestProcessor], name) + val processor1 = namedProcessor[RecoverOffTestProcessor] processor1 ! Recover(1L) processor1 ! Persistent("c") processor1 ! GetState expectMsg(List("a-1", "c-3")) - stopAndAwaitTermination(processor1) - val processor2 = system.actorOf(Props[RecoverOffTestProcessor], name) + val processor2 = namedProcessor[RecoverOffTestProcessor] processor2 ! Recover() processor2 ! GetState expectMsg(List("a-1", "b-2", "c-3")) } "be able to skip restart recovery when being resumed" in { - val supervisor1 = system.actorOf(Props[ResumeTestSupervisor], name) + val supervisor1 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor")) supervisor1 ! Persistent("a") supervisor1 ! Persistent("b") supervisor1 ! GetState expectMsg(List("a-1", "b-2")) - stopAndAwaitTermination(supervisor1) - val supervisor2 = system.actorOf(Props[ResumeTestSupervisor], name) + val supervisor2 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor")) supervisor2 ! Persistent("c") supervisor2 ! "boom" supervisor2 ! Persistent("d") supervisor2 ! GetState expectMsg(List("a-1", "b-2", "c-3", "d-4")) - stopAndAwaitTermination(supervisor2) - val supervisor3 = system.actorOf(Props[ResumeTestSupervisor], name) + val supervisor3 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor")) supervisor3 ! GetState expectMsg(List("a-1", "b-2", "c-3", "d-4")) } "be able to re-run restart recovery when it fails with last replayed message" in { - val processor = system.actorOf(Props[LastReplayedMsgFailsTestProcessor], name) + val processor = namedProcessor[LastReplayedMsgFailsTestProcessor] processor ! Persistent("c") processor ! Persistent("boom") processor ! Persistent("d") @@ -269,13 +265,13 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec expectMsg(List("a-1", "b-2", "c-3", "d-5")) } "be able to re-run initial recovery when it fails with a message that is not the last replayed message" in { - val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], name) + val processor = namedProcessor[AnyReplayedMsgFailsTestProcessor] processor ! Persistent("c") processor ! GetState expectMsg(List("b-2", "c-3")) } "be able to re-run restart recovery when it fails with a message that is not the last replayed message" in { - val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], "other") // new processor, no initial replay + val processor = system.actorOf(Props(classOf[AnyReplayedMsgFailsTestProcessor], "other")) // new processor, no initial replay processor ! Persistent("b") processor ! Persistent("a") processor ! Persistent("c") @@ -294,7 +290,7 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec "A processor" can { "be a finite state machine" in { - val processor = system.actorOf(Props[FsmTestProcessor], name) + val processor = namedProcessor[FsmTestProcessor] processor ! Persistent("a") processor ! Persistent("b") List(0, 1, 2, 3) foreach (expectMsg(_)) diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index 1260a6c60e..6ff69569a5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -13,7 +13,7 @@ object ProcessorStashSpec { case object GetState - class StashingProcessor extends Processor { + class StashingProcessor(name: String) extends NamedProcessor(name) { var state: List[String] = Nil val behaviorA: Actor.Receive = { @@ -38,7 +38,7 @@ object ProcessorStashSpec { } } - class RecoveryFailureStashingProcessor extends StashingProcessor { + class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) { override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { message match { case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) @@ -54,15 +54,14 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis "A processor" must { "support user stash and unstash operations for persistent messages" in { - val p1 = system.actorOf(Props[StashingProcessor], name) + val p1 = namedProcessor[StashingProcessor] p1 ! Persistent("a") p1 ! Persistent("b") p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-3", "b-2")) - stopAndAwaitTermination(p1) - val p2 = system.actorOf(Props[StashingProcessor], name) + val p2 = namedProcessor[StashingProcessor] p2 ! Persistent("a") p2 ! Persistent("b") p2 ! Persistent("c") @@ -70,16 +69,15 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) } "support user stash and unstash operations for persistent and transient messages" in { - val p1 = system.actorOf(Props[StashingProcessor], name) + val p1 = namedProcessor[StashingProcessor] p1 ! Persistent("a") p1 ! "x" p1 ! Persistent("b") p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-3", "x-0", "b-2")) - stopAndAwaitTermination(p1) - val p2 = system.actorOf(Props[StashingProcessor], name) + val p2 = namedProcessor[StashingProcessor] p2 ! Persistent("a") p2 ! "x" p2 ! Persistent("b") @@ -88,16 +86,15 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "x-0", "b-5")) } "support restarts between user stash and unstash operations" in { - val p1 = system.actorOf(Props[StashingProcessor], name) + val p1 = namedProcessor[StashingProcessor] p1 ! Persistent("a") p1 ! Persistent("b") p1 ! "boom" p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-3", "b-2")) - stopAndAwaitTermination(p1) - val p2 = system.actorOf(Props[StashingProcessor], name) + val p2 = namedProcessor[StashingProcessor] p2 ! Persistent("a") p2 ! Persistent("b") p2 ! "boom" @@ -106,16 +103,15 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) } "support multiple restarts between user stash and unstash operations" in { - val p1 = system.actorOf(Props[RecoveryFailureStashingProcessor], name) + val p1 = namedProcessor[RecoveryFailureStashingProcessor] p1 ! Persistent("a") p1 ! Persistent("b") p1 ! Persistent("boom") p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-4", "b-2")) - stopAndAwaitTermination(p1) - val p2 = system.actorOf(Props[RecoveryFailureStashingProcessor], name) + val p2 = namedProcessor[RecoveryFailureStashingProcessor] p2 ! Persistent("a") p2 ! Persistent("b") p2 ! Persistent("boom")