enforce right order of Timers and PersistentActor trait, #24076 (#24081)

* the order was also wrong in the AbstractPersistentActorWithTimers
* mima complains about this change for AbstractPersistentActor and
  AbstractPersistentActorWithAtLeastOnceDelivery, but I think it is ok
This commit is contained in:
Patrik Nordwall 2017-12-05 06:24:56 +01:00 committed by Konrad `ktoso` Malawski
parent 7452d361cd
commit 11d628d27f
4 changed files with 163 additions and 5 deletions

View file

@ -0,0 +1,2 @@
# #24076 PersistentActor with Timers
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.AbstractPersistentActorLike.createReceiveRecover")

View file

@ -42,12 +42,21 @@ private[persistence] object Eventsourced {
*
* Scala API and implementation details of [[PersistentActor]] and [[AbstractPersistentActor]].
*/
private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery {
private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash
with PersistenceIdentity with PersistenceRecovery {
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
import SnapshotProtocol.LoadSnapshotFailed
import Eventsourced._
{
val interfaces = getClass.getInterfaces
val i = interfaces.indexOf(classOf[PersistentActor])
val j = interfaces.indexOf(classOf[akka.actor.Timers])
if (i != -1 && j != -1 && i < j)
throw new IllegalStateException("use Timers with PersistentActor, instead of PersistentActor with Timers")
}
private val extension = Persistence(context.system)
private[persistence] lazy val journal = extension.journalFor(journalPluginId)

View file

@ -10,10 +10,11 @@ import akka.japi.Procedure
import akka.japi.Util
import akka.persistence.Eventsourced.{ AsyncHandlerInvocation, StashingHandlerInvocation }
import com.typesafe.config.Config
import scala.collection.immutable
import scala.util.control.NoStackTrace
import akka.annotation.InternalApi
abstract class RecoveryCompleted
/**
* Sent to a [[PersistentActor]] when the journal replay has been finished.
@ -406,7 +407,40 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class AbstractPersistentActor extends AbstractActor with Eventsourced {
abstract class AbstractPersistentActor extends AbstractActor with AbstractPersistentActorLike {
/**
* Recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing persistent actor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If there is a problem with recovering the state of the actor from the journal, the error
* will be logged and the actor will be stopped.
*
* @see [[Recovery]]
*/
def createReceiveRecover(): AbstractActor.Receive
/**
* An persistent actor has to define its initial receive behavior by implementing
* the `createReceive` method, also known as the command handler. Typically
* validates commands against current state (and/or by communication with other actors).
* On successful validation, one or more events are derived from a command and
* these events are then persisted by calling `persist`.
*/
def createReceive(): AbstractActor.Receive
// Note that abstract methods createReceiveRecover and createReceive are also defined in
// AbstractPersistentActorLike. They were included here also for binary compatibility reasons.
}
/**
* INTERNAL API
*/
@InternalApi private[akka] trait AbstractPersistentActorLike extends Eventsourced {
/**
* Recovery handler that receives persisted events during recovery. If a state snapshot
@ -433,7 +467,7 @@ abstract class AbstractPersistentActor extends AbstractActor with Eventsourced {
* On successful validation, one or more events are derived from a command and
* these events are then persisted by calling `persist`.
*/
override def createReceive(): AbstractActor.Receive
def createReceive(): AbstractActor.Receive
override final def receiveCommand: Receive = createReceive().onMessage.asInstanceOf[Receive]
@ -533,4 +567,4 @@ abstract class AbstractPersistentActor extends AbstractActor with Eventsourced {
/**
* Java API: Combination of [[AbstractPersistentActor]] and [[akka.actor.AbstractActorWithTimers]].
*/
abstract class AbstractPersistentActorWithTimers extends AbstractPersistentActor with Timers
abstract class AbstractPersistentActorWithTimers extends AbstractActor with Timers with AbstractPersistentActorLike

View file

@ -0,0 +1,113 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.runtime.BoxedUnit
import scala.runtime.BoxedUnit
import scala.util.control.NoStackTrace
import akka.actor
import akka.actor._
import akka.event.Logging
import akka.event.Logging.Warning
import akka.japi.Procedure
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
import com.typesafe.config.ConfigFactory
import akka.testkit.TestProbe
import akka.testkit.TestActors
import akka.testkit.TestEvent.Mute
object TimerPersistentActorSpec {
def testProps(name: String): Props =
Props(new TestPersistentActor(name))
final case class Scheduled(msg: Any, replyTo: ActorRef)
class TestPersistentActor(name: String) extends Timers with PersistentActor {
override def persistenceId = name
override def receiveRecover: Receive = {
case _
}
override def receiveCommand: Receive = {
case Scheduled(msg, replyTo)
replyTo ! msg
case msg
timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero)
persist(msg)(_ ())
}
}
// this should fail in constructor
class WrongOrder extends PersistentActor with Timers {
override def persistenceId = "notused"
override def receiveRecover: Receive = {
case _
}
override def receiveCommand: Receive = {
case _ ()
}
}
def testJavaProps(name: String): Props =
Props(new JavaTestPersistentActor(name))
class JavaTestPersistentActor(name: String) extends AbstractPersistentActorWithTimers {
override def persistenceId: String = name
override def createReceiveRecover(): AbstractActor.Receive =
AbstractActor.emptyBehavior
override def createReceive(): AbstractActor.Receive =
new AbstractActor.Receive({
case Scheduled(msg, replyTo)
replyTo ! msg
BoxedUnit.UNIT
case msg
timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero)
persist(msg, new Procedure[Any] {
override def apply(evt: Any): Unit = ()
})
BoxedUnit.UNIT
})
}
}
class TimerPersistentActorSpec extends PersistenceSpec(ConfigFactory.parseString(
s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.actor.warn-about-java-serializer-usage = off
""")) with ImplicitSender {
import TimerPersistentActorSpec._
system.eventStream.publish(Mute(EventFilter[ActorInitializationException]()))
"PersistentActor with Timer" must {
"not discard timer msg due to stashing" in {
val pa = system.actorOf(testProps("p1"))
pa ! "msg1"
expectMsg("msg1")
}
"not discard timer msg due to stashing for AbstractPersistentActorWithTimers" in {
val pa = system.actorOf(testJavaProps("p2"))
pa ! "msg2"
expectMsg("msg2")
}
"reject wrong order of traits, PersistentActor with Timer" in {
val pa = system.actorOf(Props[WrongOrder])
watch(pa)
expectTerminated(pa)
}
}
}