From 80d48ead5a99fb22e0c53bf364b7f2b5054eeb1e Mon Sep 17 00:00:00 2001 From: Nafer Sanabria Date: Sun, 13 Nov 2016 20:30:33 -0500 Subject: [PATCH] per bugfix #21824 pass the message which cause restart in PersistentActor --- .../scala/akka/actor/ActorSystemSpec.scala | 2 -- .../scala/akka/persistence/Eventsourced.scala | 8 ++--- .../persistence/PersistentActorSpec.scala | 31 +++++++++++++++++-- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index fa592e5e89..0fe9f27bb0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -5,7 +5,6 @@ package akka.actor import language.postfixOps import akka.testkit._ -import org.scalatest.junit.JUnitSuiteLike import com.typesafe.config.ConfigFactory import scala.concurrent.{ ExecutionContext, Await, Future } import scala.concurrent.duration._ @@ -17,7 +16,6 @@ import akka.dispatch._ import com.typesafe.config.Config import akka.util.Switch import akka.util.Helpers.ConfigOps -import scala.util.control.NoStackTrace object ActorSystemSpec { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 98c79950c0..1f5794a1c2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -9,12 +9,12 @@ import java.util.UUID import scala.collection.immutable import scala.util.control.NonFatal -import akka.actor.{ DeadLetter, ReceiveTimeout, StashOverflowException } +import akka.actor.{ DeadLetter, StashOverflowException } import akka.util.Helpers.ConfigOps import akka.event.Logging import akka.event.LoggingAdapter -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -205,9 +205,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case Some(ReplayedMessage(m)) ⇒ flushJournalBatch() super.aroundPreRestart(reason, Some(m)) - case mo ⇒ + case mo: Option[Any] ⇒ flushJournalBatch() - super.aroundPreRestart(reason, None) + super.aroundPreRestart(reason, mo) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 6dd4cf8bce..13750aee3a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -411,6 +411,28 @@ object PersistentActorSpec { } } + class RecoverMessageCausedRestart(name: String) extends NamedPersistentActor(name) { + var master: ActorRef = _ + + val receiveCommand: Receive = { + case "Boom" ⇒ + master = sender() + throw new TestException("boom") + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + if (master ne null) { + master ! "failed with " + reason.getClass.getSimpleName + " while processing " + message.getOrElse("") + } + context stop self + } + + override def receiveRecover = { + case _ ⇒ () + } + + } + class MultipleAndNestedPersists(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case s: String ⇒ @@ -911,7 +933,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("d-3") expectNoMsg(100.millis) } - "invoke deferred handlers, perserving the original sender references" in { + "invoke deferred handlers, preserving the original sender references" in { val persistentActor = namedPersistentActor[DeferringWithAsyncPersistActor] val p1, p2 = TestProbe() @@ -945,7 +967,7 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42", RecoveryCompleted)) } - "preserv order of incoming messages" in { + "preserve order of incoming messages" in { val persistentActor = namedPersistentActor[StressOrdering] persistentActor ! Cmd("a") val latch = TestLatch(1) @@ -1092,6 +1114,11 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(Nil) } + "recover the message which caused the restart" in { + val persistentActor = namedPersistentActor[RecoverMessageCausedRestart] + persistentActor ! "Boom" + expectMsg("failed with TestException while processing Boom") + } } }