Merge pull request #2099 from akka/wip-3933-persistent-channel-patriknw

=per #3933 Correction of seq number logic for persistent channel
This commit is contained in:
Patrik Nordwall 2014-03-25 13:40:37 +01:00
commit d7757b90f6
5 changed files with 166 additions and 29 deletions

View file

@ -285,10 +285,8 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
val p = TestProbe()
within(5.seconds) {
awaitAssert {
println("#Ping")
system.actorSelection("/user/consumerProxy").tell(Ping, p.ref)
p.expectMsg(1.second, Pong)
println("#Pong")
}
}
// then send the real message

View file

@ -138,7 +138,7 @@ final class PersistentChannel private[akka] (_channelId: Option[String], channel
// Persist the Deliver request by sending reliableStorage a Persistent message
// with the Deliver request as payload. This persistent message is referred to
// as the wrapper message, whereas the persistent message contained in the Deliver
// request is referred to as wrapped message (see also class ReliableStorage).
// request is referred to as wrapped message.
if (!persistent.confirms.contains(id)) requestWriter forward Persistent(d)
case Reset requestReader ! Reset
}
@ -247,11 +247,13 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne
def receive = {
case p @ Persistent(Deliver(wrapped: PersistentRepr, _), _)
if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined)
if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined) {
// Write a delivery confirmation to the journal so that replayed Deliver
// requests from a sending processor are not persisted again. Replaying
// Deliver requests is now the responsibility of this processor.
// Deliver requests is now the responsibility of this processor
// and confirmation by destination is done to the wrapper p.sequenceNr.
cbJournal ! DeliveredByChannel(wrapped.processorId, channelId, wrapped.sequenceNr)
}
if (!recoveryRunning && replyPersistent)
sender ! wrapped
@ -374,16 +376,9 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne
* @param wrapper persistent message that contains a deliver request
*/
private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = {
// use the sequence number of the wrapper message if the channel is used standalone,
// otherwise, use sequence number of the wrapped message (that has been generated by
// the sending processor).
val sequenceNr = if (wrapped.sequenceNr == 0L) wrapper.sequenceNr else wrapped.sequenceNr
val updated = wrapped.update(sequenceNr = sequenceNr)
// include the wrapper sequence number in the Confirm message so that the wrapper can
// be deleted later when the confirmation arrives.
ConfirmablePersistentImpl(updated,
ConfirmablePersistentImpl(wrapped,
confirmTarget = dbJournal,
confirmMessage = DeliveredByPersistentChannel(channelId, sequenceNr, channel = self))
confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self))
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {

View file

@ -0,0 +1,91 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.language.postfixOps
import com.typesafe.config._
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
import akka.testkit._
object NumberProcessorSpec {
case class SetNumber(number: Int)
case class Add(number: Int)
case class Subtract(number: Int)
case object DecrementAndGet
case object GetNumber
class NumberProcessorWithPersistentChannel(name: String) extends NamedProcessor(name) {
override def processorId = name
var num = 0
val channel = context.actorOf(PersistentChannel.props(channelId = "stable_id",
PersistentChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
name = "myPersistentChannel")
def receive = {
case Persistent(SetNumber(number), _) num = number
case Persistent(Add(number), _) num = num + number
case Persistent(Subtract(number), _) num = num - number
case GetNumber channel ! Deliver(Persistent(num), sender.path)
case p @ Persistent(DecrementAndGet, _)
num = num - 1
channel ! Deliver(p.withPayload(num), sender.path)
}
}
}
/*
* This test found the problem described in ticket #3933
*/
class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "NumberProcessorSpec"))
with PersistenceSpec {
import NumberProcessorSpec._
"A processor using a persistent channel" must {
"resurrect with the correct state, not replaying confirmed messages to clients" in {
val deliveredProbe = TestProbe()
system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel])
val probe = TestProbe()
val processor = namedProcessor[NumberProcessorWithPersistentChannel]
processor.tell(GetNumber, probe.testActor)
val zero = probe.expectMsgType[ConfirmablePersistent]
zero.confirm()
zero.payload should equal(0)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
processor.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent]
decrementFrom0.confirm()
decrementFrom0.payload should equal(-1)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessorWithPersistentChannel]
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent]
decrementFromMinus1.confirm()
decrementFromMinus1.payload should equal(-2)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
}
}
}

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._

View file

@ -13,9 +13,9 @@ import akka.actor._
import akka.testkit._
object ProcessorChannelSpec {
class TestProcessor(name: String) extends NamedProcessor(name) {
class TestProcessor(name: String, channelProps: Props) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(Channel.props(s"${name}-channel"))
val channel = context.actorOf(channelProps)
def receive = {
case m @ Persistent(s: String, _) if s.startsWith("a")
@ -25,6 +25,10 @@ object ProcessorChannelSpec {
case m @ Persistent(s: String, _) if s.startsWith("b")
// reply to sender via channel
channel ! Deliver(m.withPayload(s"re: ${s}"), sender.path)
case m @ Persistent(s: String, _) if s.startsWith("c")
// don't use channel
sender ! s"got: ${s}"
case "replay" throw new TestException("replay requested")
}
}
@ -34,8 +38,8 @@ object ProcessorChannelSpec {
}
}
class ResendingProcessor(name: String, destination: ActorRef) extends NamedProcessor(name) {
val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)))
class ResendingProcessor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) {
val channel = context.actorOf(channelProps)
def receive = {
case p: Persistent channel ! Deliver(p, destination.path)
@ -43,8 +47,8 @@ object ProcessorChannelSpec {
}
}
class ResendingEventsourcedProcessor(name: String, destination: ActorRef) extends NamedProcessor(name) with EventsourcedProcessor {
val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)))
class ResendingEventsourcedProcessor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) with EventsourcedProcessor {
val channel = context.actorOf(channelProps)
var events: List[String] = Nil
@ -87,12 +91,17 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit
probe.expectMsgType[Delivered]
def createTestProcessor(): ActorRef =
system.actorOf(Props(classOf[TestProcessor], name))
system.actorOf(Props(classOf[TestProcessor], name, testChannelProps))
def testChannelProps: Props
def testResendingChannelProps: Props
def setupTestProcessorData(): Unit = {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
val replyProbe = TestProbe()
val senderProbe = TestProbe()
val processor = createTestProcessor()
@ -100,9 +109,11 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit
processor tell (Persistent("a1"), forwardProbe.ref)
processor tell (Persistent("b1"), replyProbe.ref)
processor tell (Persistent("c1"), senderProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _, _) m.confirm() }
replyProbe.expectMsgPF() { case m @ ConfirmablePersistent("re: b1", _, _) m.confirm() }
senderProbe.expectMsg("got: c1")
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
@ -119,9 +130,29 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit
processor ! Persistent("b2")
expectMsgPF() { case m @ ConfirmablePersistent("re: b2", _, _) m.confirm() }
}
"de-duplicate confirmed messages on restart" in {
processor ! Persistent("c3")
expectMsg("got: c3")
processor ! Persistent("a3")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a3", _, _) m.confirm() }
processor ! "replay"
expectMsg("got: c3")
expectNoMsg(1.second)
}
"de-duplicate confirmed messages on starting new with same processor id" in {
processor ! Persistent("c4")
expectMsg("got: c4")
processor ! Persistent("a4")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a4", _, _) m.confirm() }
val p2 = createTestProcessor()
expectMsg("got: c4")
expectNoMsg(1.second)
}
"resend unconfirmed messages on restart" in {
val probe = TestProbe()
val p = system.actorOf(Props(classOf[ResendingProcessor], "rp", probe.ref))
val p = system.actorOf(Props(classOf[ResendingProcessor], "rp", testResendingChannelProps, probe.ref))
p ! Persistent("a")
@ -139,22 +170,41 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit
"An eventsourced processor that uses a channel" can {
"reliably deliver events" in {
val probe = TestProbe()
val ep = system.actorOf(Props(classOf[ResendingEventsourcedProcessor], "rep", probe.ref))
val ep = system.actorOf(Props(classOf[ResendingEventsourcedProcessor], "rep", testResendingChannelProps, probe.ref))
ep ! "cmd"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 1) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 1) }
probe.expectNoMsg(200 milliseconds)
ep ! "replay"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 1) cp.confirm() }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 1) cp.confirm() }
}
}
}
class LeveldbProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorChannelSpec"))
class InmemProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorChannelSpec"))
class LeveldbProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorChannelSpec")) {
def testChannelProps: Props = Channel.props(s"${name}-channel")
def testResendingChannelProps: Props =
Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
class InmemProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorChannelSpec")) {
def testChannelProps: Props = Channel.props(s"${name}-channel")
def testResendingChannelProps: Props =
Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
class LeveldbProcessorPersistentChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorPersistentChannelSpec")) {
def testChannelProps: Props = PersistentChannel.props(s"${name}-channel")
def testResendingChannelProps: Props =
PersistentChannel.props("channel", PersistentChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
class InmemProcessorPersistentChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorPersistentChannelSpec")) {
def testChannelProps: Props = PersistentChannel.props(s"${name}-channel")
def testResendingChannelProps: Props =
PersistentChannel.props("channel", PersistentChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}