diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 359a4244d1..159474585e 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -285,10 +285,8 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS val p = TestProbe() within(5.seconds) { awaitAssert { - println("#Ping") system.actorSelection("/user/consumerProxy").tell(Ping, p.ref) p.expectMsg(1.second, Pong) - println("#Pong") } } // then send the real message diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala index e38e0554f2..e647f7473b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala @@ -138,7 +138,7 @@ final class PersistentChannel private[akka] (_channelId: Option[String], channel // Persist the Deliver request by sending reliableStorage a Persistent message // with the Deliver request as payload. This persistent message is referred to // as the wrapper message, whereas the persistent message contained in the Deliver - // request is referred to as wrapped message (see also class ReliableStorage). + // request is referred to as wrapped message. if (!persistent.confirms.contains(id)) requestWriter forward Persistent(d) case Reset ⇒ requestReader ! Reset } @@ -247,11 +247,13 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne def receive = { case p @ Persistent(Deliver(wrapped: PersistentRepr, _), _) ⇒ - if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined) + if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined) { // Write a delivery confirmation to the journal so that replayed Deliver // requests from a sending processor are not persisted again. Replaying - // Deliver requests is now the responsibility of this processor. + // Deliver requests is now the responsibility of this processor + // and confirmation by destination is done to the wrapper p.sequenceNr. cbJournal ! DeliveredByChannel(wrapped.processorId, channelId, wrapped.sequenceNr) + } if (!recoveryRunning && replyPersistent) sender ! wrapped @@ -374,16 +376,9 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne * @param wrapper persistent message that contains a deliver request */ private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = { - // use the sequence number of the wrapper message if the channel is used standalone, - // otherwise, use sequence number of the wrapped message (that has been generated by - // the sending processor). - val sequenceNr = if (wrapped.sequenceNr == 0L) wrapper.sequenceNr else wrapped.sequenceNr - val updated = wrapped.update(sequenceNr = sequenceNr) - // include the wrapper sequence number in the Confirm message so that the wrapper can - // be deleted later when the confirmation arrives. - ConfirmablePersistentImpl(updated, + ConfirmablePersistentImpl(wrapped, confirmTarget = dbJournal, - confirmMessage = DeliveredByPersistentChannel(channelId, sequenceNr, channel = self)) + confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self)) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { diff --git a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala new file mode 100644 index 0000000000..8ea76ed6ff --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.persistence + +import scala.language.postfixOps + +import com.typesafe.config._ + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.persistence._ + +import akka.testkit._ + +object NumberProcessorSpec { + case class SetNumber(number: Int) + case class Add(number: Int) + case class Subtract(number: Int) + case object DecrementAndGet + case object GetNumber + + class NumberProcessorWithPersistentChannel(name: String) extends NamedProcessor(name) { + override def processorId = name + var num = 0 + + val channel = context.actorOf(PersistentChannel.props(channelId = "stable_id", + PersistentChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)), + name = "myPersistentChannel") + + def receive = { + case Persistent(SetNumber(number), _) ⇒ num = number + case Persistent(Add(number), _) ⇒ num = num + number + case Persistent(Subtract(number), _) ⇒ num = num - number + case GetNumber ⇒ channel ! Deliver(Persistent(num), sender.path) + case p @ Persistent(DecrementAndGet, _) ⇒ + num = num - 1 + channel ! Deliver(p.withPayload(num), sender.path) + } + } +} + +/* + * This test found the problem described in ticket #3933 + */ +class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "NumberProcessorSpec")) + with PersistenceSpec { + import NumberProcessorSpec._ + + "A processor using a persistent channel" must { + + "resurrect with the correct state, not replaying confirmed messages to clients" in { + val deliveredProbe = TestProbe() + system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel]) + + val probe = TestProbe() + + val processor = namedProcessor[NumberProcessorWithPersistentChannel] + processor.tell(GetNumber, probe.testActor) + + val zero = probe.expectMsgType[ConfirmablePersistent] + zero.confirm() + zero.payload should equal(0) + + deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + + processor.tell(Persistent(DecrementAndGet), probe.testActor) + + val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent] + decrementFrom0.confirm() + decrementFrom0.payload should equal(-1) + + deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + + watch(processor) + system.stop(processor) + expectMsgType[Terminated] + + val processorResurrected = namedProcessor[NumberProcessorWithPersistentChannel] + processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor) + + val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent] + decrementFromMinus1.confirm() + decrementFromMinus1.payload should equal(-2) + + deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + } + } +} + diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index 5708ab46a3..ed9491d200 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ package akka.persistence import scala.concurrent.duration._ diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala index face64a6fb..0dcc668992 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala @@ -13,9 +13,9 @@ import akka.actor._ import akka.testkit._ object ProcessorChannelSpec { - class TestProcessor(name: String) extends NamedProcessor(name) { + class TestProcessor(name: String, channelProps: Props) extends NamedProcessor(name) { val destination = context.actorOf(Props[TestDestination]) - val channel = context.actorOf(Channel.props(s"${name}-channel")) + val channel = context.actorOf(channelProps) def receive = { case m @ Persistent(s: String, _) if s.startsWith("a") ⇒ @@ -25,6 +25,10 @@ object ProcessorChannelSpec { case m @ Persistent(s: String, _) if s.startsWith("b") ⇒ // reply to sender via channel channel ! Deliver(m.withPayload(s"re: ${s}"), sender.path) + case m @ Persistent(s: String, _) if s.startsWith("c") ⇒ + // don't use channel + sender ! s"got: ${s}" + case "replay" ⇒ throw new TestException("replay requested") } } @@ -34,8 +38,8 @@ object ProcessorChannelSpec { } } - class ResendingProcessor(name: String, destination: ActorRef) extends NamedProcessor(name) { - val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))) + class ResendingProcessor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) { + val channel = context.actorOf(channelProps) def receive = { case p: Persistent ⇒ channel ! Deliver(p, destination.path) @@ -43,8 +47,8 @@ object ProcessorChannelSpec { } } - class ResendingEventsourcedProcessor(name: String, destination: ActorRef) extends NamedProcessor(name) with EventsourcedProcessor { - val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))) + class ResendingEventsourcedProcessor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) with EventsourcedProcessor { + val channel = context.actorOf(channelProps) var events: List[String] = Nil @@ -87,12 +91,17 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit probe.expectMsgType[Delivered] def createTestProcessor(): ActorRef = - system.actorOf(Props(classOf[TestProcessor], name)) + system.actorOf(Props(classOf[TestProcessor], name, testChannelProps)) + + def testChannelProps: Props + + def testResendingChannelProps: Props def setupTestProcessorData(): Unit = { val confirmProbe = TestProbe() val forwardProbe = TestProbe() val replyProbe = TestProbe() + val senderProbe = TestProbe() val processor = createTestProcessor() @@ -100,9 +109,11 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit processor tell (Persistent("a1"), forwardProbe.ref) processor tell (Persistent("b1"), replyProbe.ref) + processor tell (Persistent("c1"), senderProbe.ref) forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _, _) ⇒ m.confirm() } replyProbe.expectMsgPF() { case m @ ConfirmablePersistent("re: b1", _, _) ⇒ m.confirm() } + senderProbe.expectMsg("got: c1") awaitConfirmation(confirmProbe) awaitConfirmation(confirmProbe) @@ -119,9 +130,29 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit processor ! Persistent("b2") expectMsgPF() { case m @ ConfirmablePersistent("re: b2", _, _) ⇒ m.confirm() } } + "de-duplicate confirmed messages on restart" in { + processor ! Persistent("c3") + expectMsg("got: c3") + processor ! Persistent("a3") + expectMsgPF() { case m @ ConfirmablePersistent("fw: a3", _, _) ⇒ m.confirm() } + + processor ! "replay" + expectMsg("got: c3") + expectNoMsg(1.second) + } + "de-duplicate confirmed messages on starting new with same processor id" in { + processor ! Persistent("c4") + expectMsg("got: c4") + processor ! Persistent("a4") + expectMsgPF() { case m @ ConfirmablePersistent("fw: a4", _, _) ⇒ m.confirm() } + + val p2 = createTestProcessor() + expectMsg("got: c4") + expectNoMsg(1.second) + } "resend unconfirmed messages on restart" in { val probe = TestProbe() - val p = system.actorOf(Props(classOf[ResendingProcessor], "rp", probe.ref)) + val p = system.actorOf(Props(classOf[ResendingProcessor], "rp", testResendingChannelProps, probe.ref)) p ! Persistent("a") @@ -139,22 +170,41 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit "An eventsourced processor that uses a channel" can { "reliably deliver events" in { val probe = TestProbe() - val ep = system.actorOf(Props(classOf[ResendingEventsourcedProcessor], "rep", probe.ref)) + val ep = system.actorOf(Props(classOf[ResendingEventsourcedProcessor], "rep", testResendingChannelProps, probe.ref)) ep ! "cmd" - probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 0) ⇒ } - probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 1) ⇒ } + probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 0) ⇒ } + probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 1) ⇒ } probe.expectNoMsg(200 milliseconds) ep ! "replay" - probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 0) ⇒ } - probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 1) ⇒ cp.confirm() } + probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 0) ⇒ } + probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 1) ⇒ cp.confirm() } } } } -class LeveldbProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorChannelSpec")) -class InmemProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorChannelSpec")) +class LeveldbProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorChannelSpec")) { + def testChannelProps: Props = Channel.props(s"${name}-channel") + def testResendingChannelProps: Props = + Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)) +} +class InmemProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorChannelSpec")) { + def testChannelProps: Props = Channel.props(s"${name}-channel") + def testResendingChannelProps: Props = + Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)) +} + +class LeveldbProcessorPersistentChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorPersistentChannelSpec")) { + def testChannelProps: Props = PersistentChannel.props(s"${name}-channel") + def testResendingChannelProps: Props = + PersistentChannel.props("channel", PersistentChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)) +} +class InmemProcessorPersistentChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorPersistentChannelSpec")) { + def testChannelProps: Props = PersistentChannel.props(s"${name}-channel") + def testResendingChannelProps: Props = + PersistentChannel.props("channel", PersistentChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)) +}