From 11d628d27fde5545b3d65a63e2efce97e6ef54bb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 5 Dec 2017 06:24:56 +0100 Subject: [PATCH] enforce right order of Timers and PersistentActor trait, #24076 (#24081) * the order was also wrong in the AbstractPersistentActorWithTimers * mima complains about this change for AbstractPersistentActor and AbstractPersistentActorWithAtLeastOnceDelivery, but I think it is ok --- .../mima-filters/2.5.7.backwards.excludes | 2 + .../scala/akka/persistence/Eventsourced.scala | 11 +- .../akka/persistence/PersistentActor.scala | 42 ++++++- .../TimerPersistentActorSpec.scala | 113 ++++++++++++++++++ 4 files changed, 163 insertions(+), 5 deletions(-) create mode 100644 akka-persistence/src/main/mima-filters/2.5.7.backwards.excludes create mode 100644 akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala diff --git a/akka-persistence/src/main/mima-filters/2.5.7.backwards.excludes b/akka-persistence/src/main/mima-filters/2.5.7.backwards.excludes new file mode 100644 index 0000000000..9e127fed22 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.5.7.backwards.excludes @@ -0,0 +1,2 @@ +# #24076 PersistentActor with Timers +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.AbstractPersistentActorLike.createReceiveRecover") diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index efdc1db2e6..e5b83b9fcf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -42,12 +42,21 @@ private[persistence] object Eventsourced { * * Scala API and implementation details of [[PersistentActor]] and [[AbstractPersistentActor]]. */ -private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash with PersistenceIdentity with PersistenceRecovery { +private[persistence] trait Eventsourced extends Snapshotter with PersistenceStash + with PersistenceIdentity with PersistenceRecovery { import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult import SnapshotProtocol.LoadSnapshotFailed import Eventsourced._ + { + val interfaces = getClass.getInterfaces + val i = interfaces.indexOf(classOf[PersistentActor]) + val j = interfaces.indexOf(classOf[akka.actor.Timers]) + if (i != -1 && j != -1 && i < j) + throw new IllegalStateException("use Timers with PersistentActor, instead of PersistentActor with Timers") + } + private val extension = Persistence(context.system) private[persistence] lazy val journal = extension.journalFor(journalPluginId) diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index e2614d3a0d..e170fcb9ec 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -10,10 +10,11 @@ import akka.japi.Procedure import akka.japi.Util import akka.persistence.Eventsourced.{ AsyncHandlerInvocation, StashingHandlerInvocation } import com.typesafe.config.Config - import scala.collection.immutable import scala.util.control.NoStackTrace +import akka.annotation.InternalApi + abstract class RecoveryCompleted /** * Sent to a [[PersistentActor]] when the journal replay has been finished. @@ -406,7 +407,40 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit /** * Java API: an persistent actor - can be used to implement command or event sourcing. */ -abstract class AbstractPersistentActor extends AbstractActor with Eventsourced { +abstract class AbstractPersistentActor extends AbstractActor with AbstractPersistentActorLike { + /** + * Recovery handler that receives persisted events during recovery. If a state snapshot + * has been captured and saved, this handler will receive a [[SnapshotOffer]] message + * followed by events that are younger than the offered snapshot. + * + * This handler must not have side-effects other than changing persistent actor state i.e. it + * should not perform actions that may fail, such as interacting with external services, + * for example. + * + * If there is a problem with recovering the state of the actor from the journal, the error + * will be logged and the actor will be stopped. + * + * @see [[Recovery]] + */ + def createReceiveRecover(): AbstractActor.Receive + + /** + * An persistent actor has to define its initial receive behavior by implementing + * the `createReceive` method, also known as the command handler. Typically + * validates commands against current state (and/or by communication with other actors). + * On successful validation, one or more events are derived from a command and + * these events are then persisted by calling `persist`. + */ + def createReceive(): AbstractActor.Receive + + // Note that abstract methods createReceiveRecover and createReceive are also defined in + // AbstractPersistentActorLike. They were included here also for binary compatibility reasons. +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] trait AbstractPersistentActorLike extends Eventsourced { /** * Recovery handler that receives persisted events during recovery. If a state snapshot @@ -433,7 +467,7 @@ abstract class AbstractPersistentActor extends AbstractActor with Eventsourced { * On successful validation, one or more events are derived from a command and * these events are then persisted by calling `persist`. */ - override def createReceive(): AbstractActor.Receive + def createReceive(): AbstractActor.Receive override final def receiveCommand: Receive = createReceive().onMessage.asInstanceOf[Receive] @@ -533,4 +567,4 @@ abstract class AbstractPersistentActor extends AbstractActor with Eventsourced { /** * Java API: Combination of [[AbstractPersistentActor]] and [[akka.actor.AbstractActorWithTimers]]. */ -abstract class AbstractPersistentActorWithTimers extends AbstractPersistentActor with Timers +abstract class AbstractPersistentActorWithTimers extends AbstractActor with Timers with AbstractPersistentActorLike diff --git a/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala new file mode 100644 index 0000000000..883cbd6c6b --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.persistence + +import scala.concurrent.duration._ +import scala.runtime.BoxedUnit +import scala.runtime.BoxedUnit +import scala.util.control.NoStackTrace + +import akka.actor +import akka.actor._ +import akka.event.Logging +import akka.event.Logging.Warning +import akka.japi.Procedure +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } +import com.typesafe.config.ConfigFactory +import akka.testkit.TestProbe +import akka.testkit.TestActors +import akka.testkit.TestEvent.Mute + +object TimerPersistentActorSpec { + + def testProps(name: String): Props = + Props(new TestPersistentActor(name)) + + final case class Scheduled(msg: Any, replyTo: ActorRef) + + class TestPersistentActor(name: String) extends Timers with PersistentActor { + + override def persistenceId = name + + override def receiveRecover: Receive = { + case _ ⇒ + } + + override def receiveCommand: Receive = { + case Scheduled(msg, replyTo) ⇒ + replyTo ! msg + case msg ⇒ + timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero) + persist(msg)(_ ⇒ ()) + } + } + + // this should fail in constructor + class WrongOrder extends PersistentActor with Timers { + override def persistenceId = "notused" + override def receiveRecover: Receive = { + case _ ⇒ + } + override def receiveCommand: Receive = { + case _ ⇒ () + } + } + + def testJavaProps(name: String): Props = + Props(new JavaTestPersistentActor(name)) + + class JavaTestPersistentActor(name: String) extends AbstractPersistentActorWithTimers { + + override def persistenceId: String = name + + override def createReceiveRecover(): AbstractActor.Receive = + AbstractActor.emptyBehavior + + override def createReceive(): AbstractActor.Receive = + new AbstractActor.Receive({ + case Scheduled(msg, replyTo) ⇒ + replyTo ! msg + BoxedUnit.UNIT + case msg ⇒ + timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero) + persist(msg, new Procedure[Any] { + override def apply(evt: Any): Unit = () + }) + BoxedUnit.UNIT + }) + } + +} + +class TimerPersistentActorSpec extends PersistenceSpec(ConfigFactory.parseString( + s""" + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.actor.warn-about-java-serializer-usage = off + """)) with ImplicitSender { + import TimerPersistentActorSpec._ + + system.eventStream.publish(Mute(EventFilter[ActorInitializationException]())) + + "PersistentActor with Timer" must { + "not discard timer msg due to stashing" in { + val pa = system.actorOf(testProps("p1")) + pa ! "msg1" + expectMsg("msg1") + } + + "not discard timer msg due to stashing for AbstractPersistentActorWithTimers" in { + val pa = system.actorOf(testJavaProps("p2")) + pa ! "msg2" + expectMsg("msg2") + } + + "reject wrong order of traits, PersistentActor with Timer" in { + val pa = system.actorOf(Props[WrongOrder]) + watch(pa) + expectTerminated(pa) + } + } + +} +