+per #16339 adds actorSelection support to AtLeastOnceDelivery

TODO: need to add docs updates
This commit is contained in:
Konrad Malawski 2015-07-09 18:01:27 +02:00
parent 235df6ce09
commit b335b6ae9b
10 changed files with 183 additions and 75 deletions

View file

@ -91,8 +91,7 @@ class NoPersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upS
override def receiveCommand = {
case n: Int =>
deliver(downStream, deliveryId =>
Msg(deliveryId, n))
deliver(downStream)(deliveryId => Msg(deliveryId, n))
if (n == respondAfter)
//switch to wait all message confirmed
context.become(waitConfirm)
@ -125,8 +124,7 @@ class PersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStr
override def receiveCommand = {
case n: Int =>
persist(MsgSent(n)) { e =>
deliver(downStream, deliveryId =>
Msg(deliveryId, n))
deliver(downStream)(deliveryId => Msg(deliveryId, n))
if (n == respondAfter)
//switch to wait all message confirmed
context.become(waitConfirm)
@ -160,8 +158,7 @@ class PersistAsyncPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val
override def receiveCommand = {
case n: Int =>
persistAsync(MsgSent(n)) { e =>
deliver(downStream, deliveryId =>
Msg(deliveryId, n))
deliver(downStream)(deliveryId => Msg(deliveryId, n))
if (n == respondAfter)
//switch to wait all message confirmed
context.become(waitConfirm)

View file

@ -3,11 +3,7 @@
*/
package docs.persistence;
import akka.actor.AbstractActor;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.*;
import akka.japi.Procedure;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.BackoffSupervisor;
@ -170,9 +166,9 @@ public class LambdaPersistenceDocTest {
}
class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
private final ActorPath destination;
private final ActorSelection destination;
public MyPersistentActor(ActorPath destination) {
public MyPersistentActor(ActorSelection destination) {
this.destination = destination;
}

View file

@ -6,13 +6,9 @@ package docs.persistence;
import java.util.concurrent.TimeUnit;
import akka.actor.*;
import akka.pattern.BackoffSupervisor;
import scala.concurrent.duration.Duration;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.persistence.*;
@ -147,12 +143,12 @@ public class PersistenceDocTest {
}
class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
private final ActorPath destination;
private final ActorSelection destination;
@Override
public String persistenceId() { return "persistence-id"; }
public MyPersistentActor(ActorPath destination) {
public MyPersistentActor(ActorSelection destination) {
this.destination = destination;
}

View file

@ -497,4 +497,21 @@ signal for a completed recovery was named ``ReplayMessagesSuccess``.
This is now fixed, and all methods use the same "recovery" wording consistently across the entire API.
The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure``
has been introduced.
has been introduced.
AtLeastOnceDelivery deliver signature
-------------------------------------
The signature of ``deliver`` changed slightly in order to allow both ``ActorSelection`` and ``ActorPath`` to be
used with it.
Previously:
def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit
Now:
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit
def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit
The Java API remains unchanged and has simply gained the 2nd overload which allows ``ActorSelection`` to be
passed in directly (without converting to ``ActorPath``).

View file

@ -4,7 +4,7 @@
package docs.pattern
import akka.actor.{ActorSystem, Props}
import akka.actor.{ ActorSystem, Props }
import akka.pattern.BackoffSupervisor
import akka.testkit.TestActors.EchoActor
@ -16,7 +16,7 @@ class BackoffSupervisorDocSpec {
//#backoff
val childProps = Props(classOf[EchoActor])
val supervisor = BackoffSupervisor.props(
childProps,
childName = "myEcho",

View file

@ -102,7 +102,7 @@ object PersistenceDocSpec {
object AtLeastOnce {
//#at-least-once-example
import akka.actor.{ Actor, ActorPath }
import akka.actor.{ Actor, ActorPath, ActorSelection }
import akka.persistence.AtLeastOnceDelivery
case class Msg(deliveryId: Long, s: String)
@ -112,7 +112,7 @@ object PersistenceDocSpec {
case class MsgSent(s: String) extends Evt
case class MsgConfirmed(deliveryId: Long) extends Evt
class MyPersistentActor(destination: ActorPath)
class MyPersistentActor(destination: ActorSelection)
extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId: String = "persistence-id"
@ -128,7 +128,7 @@ object PersistenceDocSpec {
def updateState(evt: Evt): Unit = evt match {
case MsgSent(s) =>
deliver(destination, deliveryId => Msg(deliveryId, s))
deliver(destination)(deliveryId => Msg(deliveryId, s))
case MsgConfirmed(deliveryId) => confirmDelivery(deliveryId)
}

View file

@ -7,8 +7,7 @@ import scala.annotation.tailrec
import scala.collection.breakOut
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.{ ActorSelection, Actor, ActorPath }
import akka.persistence.serialization.Message
object AtLeastOnceDelivery {
@ -197,7 +196,7 @@ trait AtLeastOnceDelivery extends Eventsourced {
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorPath, deliveryIdToMessage: Long Any): Unit = {
def deliver(destination: ActorPath)(deliveryIdToMessage: Long Any): Unit = {
if (unconfirmed.size >= maxUnconfirmedMessages)
throw new MaxUnconfirmedMessagesExceededException(
s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]")
@ -212,6 +211,34 @@ trait AtLeastOnceDelivery extends Eventsourced {
send(deliveryId, d, now)
}
/**
* Scala API: Send the message created by the `deliveryIdToMessage` function to
* the `destination` actor. It will retry sending the message until
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
* between `deliver` and `confirmDelivery` is performed with the
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
* function. The `deliveryId` is typically passed in the message to the
* destination, which replies with a message containing the same `deliveryId`.
*
* The `deliveryId` is a strictly monotonically increasing sequence number without
* gaps. The same sequence is used for all destinations of the actor, i.e. when sending
* to multiple destinations the destinations will see gaps in the sequence if no
* translation is performed.
*
* During recovery this method will not send out the message, but it will be sent
* later if no matching `confirmDelivery` was performed.
*
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long Any): Unit = {
val isWildcardSelection = destination.pathString.contains("*")
require(!isWildcardSelection, "Delivering to wildcard actor selections is not supported by AtLeastOnceDelivery. " +
"Introduce an mediator Actor which this AtLeastOnceDelivery Actor will deliver the messages to," +
"and will handle the logic of fan-out and collecting individual confirmations, until it can signal confirmation back to this Actor.")
deliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage)
}
/**
* Call this method when a message has been confirmed by the destination,
* or to abort re-sending.
@ -345,7 +372,30 @@ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPers
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
super.deliver(destination, id deliveryIdToMessage.apply(id))
super.deliver(destination)(id deliveryIdToMessage.apply(id))
/**
* Java API: Send the message created by the `deliveryIdToMessage` function to
* the `destination` actor. It will retry sending the message until
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
* between `deliver` and `confirmDelivery` is performed with the
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
* function. The `deliveryId` is typically passed in the message to the
* destination, which replies with a message containing the same `deliveryId`.
*
* The `deliveryId` is a strictly monotonically increasing sequence number without
* gaps. The same sequence is used for all destinations, i.e. when sending to
* multiple destinations the destinations will see gaps in the sequence if no
* translation is performed.
*
* During recovery this method will not send out the message, but it will be sent
* later if no matching `confirmDelivery` was performed.
*
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
super.deliver(destination)(id deliveryIdToMessage.apply(id))
}
/**
@ -379,5 +429,28 @@ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPe
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
super.deliver(destination, id deliveryIdToMessage.apply(id))
super.deliver(destination)(id deliveryIdToMessage.apply(id))
/**
* Java API: Send the message created by the `deliveryIdToMessage` function to
* the `destination` actor. It will retry sending the message until
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
* between `deliver` and `confirmDelivery` is performed with the
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
* function. The `deliveryId` is typically passed in the message to the
* destination, which replies with a message containing the same `deliveryId`.
*
* The `deliveryId` is a strictly monotonically increasing sequence number without
* gaps. The same sequence is used for all destinations, i.e. when sending to
* multiple destinations the destinations will see gaps in the sequence if no
* translation is performed.
*
* During recovery this method will not send out the message, but it will be sent
* later if no matching `confirmDelivery` was performed.
*
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
super.deliver(destination)(id deliveryIdToMessage.apply(id))
}

View file

@ -48,7 +48,7 @@ object AtLeastOnceDeliveryCrashSpec {
}
def send() = {
deliver(testProbe.path, { id SendingMessage(id, false) })
deliver(testProbe.path) { id SendingMessage(id, false) }
}
}

View file

@ -104,7 +104,7 @@ object AtLeastOnceDeliveryFailureSpec {
def updateState(evt: Evt): Unit = evt match {
case MsgSent(i)
add(i)
deliver(destination.path, deliveryId Msg(deliveryId, i))
deliver(destination.path)(deliveryId Msg(deliveryId, i))
case MsgConfirmed(deliveryId, i)
confirmDelivery(deliveryId)

View file

@ -9,6 +9,7 @@ import akka.testkit._
import com.typesafe.config._
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.control.NoStackTrace
object AtLeastOnceDeliverySpec {
@ -29,17 +30,20 @@ object AtLeastOnceDeliverySpec {
def senderProps(testActor: ActorRef, name: String,
redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int,
redeliveryBurstLimit: Int, async: Boolean, destinations: Map[String, ActorPath]): Props =
redeliveryBurstLimit: Int,
destinations: Map[String, ActorPath],
async: Boolean, actorSelectionDelivery: Boolean = false): Props =
Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts,
redeliveryBurstLimit, async, destinations))
redeliveryBurstLimit, destinations, async, actorSelectionDelivery))
class Sender(testActor: ActorRef,
name: String,
override val redeliverInterval: FiniteDuration,
override val warnAfterNumberOfUnconfirmedAttempts: Int,
override val redeliveryBurstLimit: Int,
destinations: Map[String, ActorPath],
async: Boolean,
destinations: Map[String, ActorPath])
actorSelectionDelivery: Boolean)
extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
override def persistenceId: String = name
@ -48,9 +52,13 @@ object AtLeastOnceDeliverySpec {
var lastSnapshotAskedForBy: Option[ActorRef] = None
def updateState(evt: Evt): Unit = evt match {
case AcceptedReq(payload, destination) if actorSelectionDelivery
log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning)
deliver(context.actorSelection(destination))(deliveryId Action(deliveryId, payload))
case AcceptedReq(payload, destination)
log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning)
deliver(destination, deliveryId Action(deliveryId, payload))
deliver(destination)(deliveryId Action(deliveryId, payload))
case ReqDone(id)
log.debug(s"confirmDelivery($id), recovery: " + recoveryRunning)
@ -147,46 +155,67 @@ object AtLeastOnceDeliverySpec {
}
}
class DeliverToStarSelection(name: String) extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId = name
override def receiveCommand = {
case any
// this is not supported currently, so expecting exception
try deliver(context.actorSelection("*"))(id s"$any$id")
catch { case ex: Exception sender() ! Failure(ex) }
}
override def receiveRecover = Actor.emptyBehavior
}
}
abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
import akka.persistence.AtLeastOnceDeliverySpec._
"AtLeastOnceDelivery" must {
"deliver messages in order when nothing is lost" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a"))
probeA.expectNoMsg(1.second)
List(true, false).foreach { deliverUsingActorSelection
s"deliver messages in order when nothing is lost (using actorSelection: $deliverUsingActorSelection)" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name)
snd.tell(Req("a"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a"))
probeA.expectNoMsg(1.second)
}
s"re-deliver lost messages (using actorSelection: $deliverUsingActorSelection)" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false, actorSelectionDelivery = deliverUsingActorSelection), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd.tell(Req("a-2"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd.tell(Req("a-3"), probe.ref)
snd.tell(Req("a-4"), probe.ref)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
probeA.expectNoMsg(1.second)
}
}
"re-deliver lost messages" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd.tell(Req("a-2"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd.tell(Req("a-3"), probe.ref)
snd.tell(Req("a-4"), probe.ref)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
probeA.expectNoMsg(1.second)
"not allow using actorSelection with wildcards" in {
system.actorOf(Props(classOf[DeliverToStarSelection], name)) ! "anything, really."
expectMsgType[Failure[_]].toString should include("not supported")
}
"re-deliver lost messages after restart" taggedAs (TimingTest) in {
@ -194,7 +223,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
@ -228,7 +257,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
@ -265,7 +294,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
@ -303,7 +332,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
val probeA = TestProbe()
val probeB = TestProbe()
val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 3, 1000, async = false, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 3, 1000, destinations, async = false), name)
snd.tell(Req("a-1"), probe.ref)
snd.tell(Req("b-1"), probe.ref)
snd.tell(Req("b-2"), probe.ref)
@ -330,7 +359,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
"A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path,
"B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path,
"C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = true, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = true), name)
val N = 100
for (n 1 to N) {
snd.tell(Req("a-" + n), probe.ref)
@ -353,7 +382,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 2, async = true, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 2, destinations, async = true), name)
val N = 10
for (n 1 to N) {