diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 50f4467535..ee19e63d24 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -71,6 +71,8 @@ private[persistence] trait Eventsourced extends Processor { override def toString: String = "persisting events" def aroundReceive(receive: Receive, message: Any) = message match { + case c: ConfirmablePersistent ⇒ + processorStash.stash() case PersistentBatch(b) ⇒ b.foreach(p ⇒ deleteMessage(p.sequenceNr, true)) throw new UnsupportedOperationException("Persistent command batches not supported") diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala index e844f565f2..90f6d541a6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala @@ -129,11 +129,15 @@ object EventsourcedSpec { this.events = events } + private def handleCmd(cmd: Cmd): Unit = { + persist(Seq(Evt(s"${cmd.data}-41"), Evt(s"${cmd.data}-42")))(updateState) + } + val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ - persist(Seq(Evt(s"${data}-41"), Evt(s"${data}-42")))(updateState) - case SaveSnapshotSuccess(_) ⇒ probe ! "saved" - case "snap" ⇒ saveSnapshot(events) + case c: Cmd ⇒ handleCmd(c) + case SaveSnapshotSuccess(_) ⇒ probe ! "saved" + case "snap" ⇒ saveSnapshot(events) + case ConfirmablePersistent(c: Cmd, _, _) ⇒ handleCmd(c) } } @@ -329,6 +333,20 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) } + "support confirmable persistent" in { + val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + processor1 ! Cmd("b") + processor1 ! "snap" + processor1 ! ConfirmablePersistentImpl(Cmd("c"), 4711, "some-id", false, 0, Seq.empty, null, null, null) + expectMsg("saved") + processor1 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + + val processor2 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + expectMsg("offered") + processor2 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + } "be able to reply within an event handler" in { val processor = namedProcessor[ReplyInEventHandlerProcessor] processor ! Cmd("a")