Merge pull request #16117 from jgordijn/16104-AtLeastOnceDelivery_and_crash_actor-jgordijn

=per #16104 fix no longer send messages when actor crashed on recovery
This commit is contained in:
Patrik Nordwall 2014-10-31 10:00:51 +01:00
commit 24f21292bc
2 changed files with 81 additions and 4 deletions

View file

@ -284,15 +284,16 @@ trait AtLeastOnceDelivery extends Processor {
super.aroundPostStop() super.aroundPostStop()
} }
override private[persistence] def onReplaySuccess(receive: Receive, awaitReplay: Boolean): Unit = {
super.onReplaySuccess(receive, awaitReplay)
redeliverOverdue()
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
message match { message match {
case ReplayMessagesSuccess
redeliverOverdue()
super.aroundReceive(receive, message)
case RedeliveryTick case RedeliveryTick
redeliverOverdue() redeliverOverdue()

View file

@ -0,0 +1,76 @@
package akka.persistence
import akka.actor._
import akka.actor.SupervisorStrategy.{ Stop, Escalate }
import akka.testkit.{ AkkaSpec, TestProbe, ImplicitSender }
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
object AtLeastOnceDeliveryCrashSpec {
class StoppingStrategySupervisor(testProbe: ActorRef) extends Actor {
import scala.concurrent.duration._
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
case _: IllegalStateException Stop
case t super.supervisorStrategy.decider.applyOrElse(t, (_: Any) Escalate)
}
val crashingActor = context.actorOf(Props(new CrashingActor(testProbe)), "CrashingActor")
def receive: Receive = { case msg crashingActor forward msg }
}
object CrashingActor {
case object Message
case object CrashMessage
case class SendingMessage(deliveryId: Long, recovering: Boolean)
}
class CrashingActor(testProbe: ActorRef) extends PersistentActor
with AtLeastOnceDelivery with ActorLogging {
import CrashingActor._
override def receiveRecover: Receive = {
case Message send()
case CrashMessage
log.debug("Crash it")
throw new IllegalStateException("Intentionally crashed") with NoStackTrace
case msg log.debug("Recover message: " + msg)
}
override def receiveCommand: Receive = {
case Message persist(Message)(_ send())
case CrashMessage persist(CrashMessage) { evt }
}
def send() = {
deliver(testProbe.path, { id SendingMessage(id, false) })
}
}
}
class AtLeastOnceDeliveryCrashSpec extends AkkaSpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliveryCrashSpec", serialization = "off")) with ImplicitSender {
import AtLeastOnceDeliveryCrashSpec._
"At least once delivery" should {
"not send when actor crashes" in {
val testProbe = TestProbe()
def createCrashActorUnderSupervisor() = system.actorOf(Props(new StoppingStrategySupervisor(testProbe.ref)), "supervisor")
val superVisor = createCrashActorUnderSupervisor()
superVisor ! CrashingActor.Message
testProbe.expectMsgType[CrashingActor.SendingMessage]
superVisor ! CrashingActor.CrashMessage
val deathProbe = TestProbe()
deathProbe.watch(superVisor)
system.stop(superVisor)
deathProbe.expectTerminated(superVisor)
testProbe.expectNoMsg(250 millis)
createCrashActorUnderSupervisor()
testProbe.expectNoMsg(1 second)
}
}
}