diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala index a7920cbf7e..de7e55d400 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala @@ -91,8 +91,7 @@ class NoPersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upS override def receiveCommand = { case n: Int => - deliver(downStream, deliveryId => - Msg(deliveryId, n)) + deliver(downStream)(deliveryId => Msg(deliveryId, n)) if (n == respondAfter) //switch to wait all message confirmed context.become(waitConfirm) @@ -125,8 +124,7 @@ class PersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStr override def receiveCommand = { case n: Int => persist(MsgSent(n)) { e => - deliver(downStream, deliveryId => - Msg(deliveryId, n)) + deliver(downStream)(deliveryId => Msg(deliveryId, n)) if (n == respondAfter) //switch to wait all message confirmed context.become(waitConfirm) @@ -160,8 +158,7 @@ class PersistAsyncPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val override def receiveCommand = { case n: Int => persistAsync(MsgSent(n)) { e => - deliver(downStream, deliveryId => - Msg(deliveryId, n)) + deliver(downStream)(deliveryId => Msg(deliveryId, n)) if (n == respondAfter) //switch to wait all message confirmed context.become(waitConfirm) diff --git a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java index b57cc0e92a..48e746c22a 100644 --- a/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/LambdaPersistenceDocTest.java @@ -3,11 +3,7 @@ */ package docs.persistence; -import akka.actor.AbstractActor; -import akka.actor.ActorPath; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; +import akka.actor.*; import akka.japi.Procedure; import akka.japi.pf.ReceiveBuilder; import akka.pattern.BackoffSupervisor; @@ -170,9 +166,9 @@ public class LambdaPersistenceDocTest { } class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery { - private final ActorPath destination; + private final ActorSelection destination; - public MyPersistentActor(ActorPath destination) { + public MyPersistentActor(ActorSelection destination) { this.destination = destination; } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 19175c1574..eedf1d346b 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -6,13 +6,9 @@ package docs.persistence; import java.util.concurrent.TimeUnit; +import akka.actor.*; import akka.pattern.BackoffSupervisor; import scala.concurrent.duration.Duration; -import akka.actor.ActorPath; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.UntypedActor; import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.*; @@ -147,12 +143,12 @@ public class PersistenceDocTest { } class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery { - private final ActorPath destination; + private final ActorSelection destination; @Override public String persistenceId() { return "persistence-id"; } - public MyPersistentActor(ActorPath destination) { + public MyPersistentActor(ActorSelection destination) { this.destination = destination; } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index e04211f0ff..0abc5a5a0e 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -497,4 +497,21 @@ signal for a completed recovery was named ``ReplayMessagesSuccess``. This is now fixed, and all methods use the same "recovery" wording consistently across the entire API. The old ``ReplayMessagesSuccess`` is now called ``RecoverySuccess``, and an additional method called ``onRecoveryFailure`` -has been introduced. \ No newline at end of file +has been introduced. + +AtLeastOnceDelivery deliver signature +------------------------------------- +The signature of ``deliver`` changed slightly in order to allow both ``ActorSelection`` and ``ActorPath`` to be +used with it. + +Previously: + + def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit + +Now: + + def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit + def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit + +The Java API remains unchanged and has simply gained the 2nd overload which allows ``ActorSelection`` to be +passed in directly (without converting to ``ActorPath``). \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala b/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala index ca7666191b..ec70d37ada 100644 --- a/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/pattern/BackoffSupervisorDocSpec.scala @@ -4,7 +4,7 @@ package docs.pattern -import akka.actor.{ActorSystem, Props} +import akka.actor.{ ActorSystem, Props } import akka.pattern.BackoffSupervisor import akka.testkit.TestActors.EchoActor @@ -16,7 +16,7 @@ class BackoffSupervisorDocSpec { //#backoff val childProps = Props(classOf[EchoActor]) - + val supervisor = BackoffSupervisor.props( childProps, childName = "myEcho", diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 68878fdaef..423cccbbc2 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -102,7 +102,7 @@ object PersistenceDocSpec { object AtLeastOnce { //#at-least-once-example - import akka.actor.{ Actor, ActorPath } + import akka.actor.{ Actor, ActorPath, ActorSelection } import akka.persistence.AtLeastOnceDelivery case class Msg(deliveryId: Long, s: String) @@ -112,7 +112,7 @@ object PersistenceDocSpec { case class MsgSent(s: String) extends Evt case class MsgConfirmed(deliveryId: Long) extends Evt - class MyPersistentActor(destination: ActorPath) + class MyPersistentActor(destination: ActorSelection) extends PersistentActor with AtLeastOnceDelivery { override def persistenceId: String = "persistence-id" @@ -128,7 +128,7 @@ object PersistenceDocSpec { def updateState(evt: Evt): Unit = evt match { case MsgSent(s) => - deliver(destination, deliveryId => Msg(deliveryId, s)) + deliver(destination)(deliveryId => Msg(deliveryId, s)) case MsgConfirmed(deliveryId) => confirmDelivery(deliveryId) } diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index dd3f09dce7..0575ce9667 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -7,8 +7,7 @@ import scala.annotation.tailrec import scala.collection.breakOut import scala.collection.immutable import scala.concurrent.duration.FiniteDuration -import akka.actor.Actor -import akka.actor.ActorPath +import akka.actor.{ ActorSelection, Actor, ActorPath } import akka.persistence.serialization.Message object AtLeastOnceDelivery { @@ -197,7 +196,7 @@ trait AtLeastOnceDelivery extends Eventsourced { * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ - def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit = { + def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = { if (unconfirmed.size >= maxUnconfirmedMessages) throw new MaxUnconfirmedMessagesExceededException( s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]") @@ -212,6 +211,34 @@ trait AtLeastOnceDelivery extends Eventsourced { send(deliveryId, d, now) } + /** + * Scala API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations of the actor, i.e. when sending + * to multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = { + val isWildcardSelection = destination.pathString.contains("*") + require(!isWildcardSelection, "Delivering to wildcard actor selections is not supported by AtLeastOnceDelivery. " + + "Introduce an mediator Actor which this AtLeastOnceDelivery Actor will deliver the messages to," + + "and will handle the logic of fan-out and collecting individual confirmations, until it can signal confirmation back to this Actor.") + deliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage) + } + /** * Call this method when a message has been confirmed by the destination, * or to abort re-sending. @@ -345,7 +372,30 @@ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPers * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = - super.deliver(destination, id ⇒ deliveryIdToMessage.apply(id)) + super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) + + /** + * Java API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations, i.e. when sending to + * multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = + super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) } /** @@ -379,5 +429,28 @@ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPe * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = - super.deliver(destination, id ⇒ deliveryIdToMessage.apply(id)) + super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) + + /** + * Java API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations, i.e. when sending to + * multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = + super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala index acb4a31911..0cda0df5ab 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala @@ -48,7 +48,7 @@ object AtLeastOnceDeliveryCrashSpec { } def send() = { - deliver(testProbe.path, { id ⇒ SendingMessage(id, false) }) + deliver(testProbe.path) { id ⇒ SendingMessage(id, false) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala index b1b2693ab7..e614556fe4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -104,7 +104,7 @@ object AtLeastOnceDeliveryFailureSpec { def updateState(evt: Evt): Unit = evt match { case MsgSent(i) ⇒ add(i) - deliver(destination.path, deliveryId ⇒ Msg(deliveryId, i)) + deliver(destination.path)(deliveryId ⇒ Msg(deliveryId, i)) case MsgConfirmed(deliveryId, i) ⇒ confirmDelivery(deliveryId) diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index de74ba62c2..6c66b733e3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -9,6 +9,7 @@ import akka.testkit._ import com.typesafe.config._ import scala.concurrent.duration._ +import scala.util.Failure import scala.util.control.NoStackTrace object AtLeastOnceDeliverySpec { @@ -29,17 +30,20 @@ object AtLeastOnceDeliverySpec { def senderProps(testActor: ActorRef, name: String, redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int, - redeliveryBurstLimit: Int, async: Boolean, destinations: Map[String, ActorPath]): Props = + redeliveryBurstLimit: Int, + destinations: Map[String, ActorPath], + async: Boolean, actorSelectionDelivery: Boolean = false): Props = Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, - redeliveryBurstLimit, async, destinations)) + redeliveryBurstLimit, destinations, async, actorSelectionDelivery)) class Sender(testActor: ActorRef, name: String, override val redeliverInterval: FiniteDuration, override val warnAfterNumberOfUnconfirmedAttempts: Int, override val redeliveryBurstLimit: Int, + destinations: Map[String, ActorPath], async: Boolean, - destinations: Map[String, ActorPath]) + actorSelectionDelivery: Boolean) extends PersistentActor with AtLeastOnceDelivery with ActorLogging { override def persistenceId: String = name @@ -48,9 +52,13 @@ object AtLeastOnceDeliverySpec { var lastSnapshotAskedForBy: Option[ActorRef] = None def updateState(evt: Evt): Unit = evt match { + case AcceptedReq(payload, destination) if actorSelectionDelivery ⇒ + log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning) + deliver(context.actorSelection(destination))(deliveryId ⇒ Action(deliveryId, payload)) + case AcceptedReq(payload, destination) ⇒ log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning) - deliver(destination, deliveryId ⇒ Action(deliveryId, payload)) + deliver(destination)(deliveryId ⇒ Action(deliveryId, payload)) case ReqDone(id) ⇒ log.debug(s"confirmDelivery($id), recovery: " + recoveryRunning) @@ -147,46 +155,67 @@ object AtLeastOnceDeliverySpec { } } + class DeliverToStarSelection(name: String) extends PersistentActor with AtLeastOnceDelivery { + override def persistenceId = name + + override def receiveCommand = { + case any ⇒ + // this is not supported currently, so expecting exception + try deliver(context.actorSelection("*"))(id ⇒ s"$any$id") + catch { case ex: Exception ⇒ sender() ! Failure(ex) } + } + + override def receiveRecover = Actor.emptyBehavior + } + } abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { import akka.persistence.AtLeastOnceDeliverySpec._ "AtLeastOnceDelivery" must { - "deliver messages in order when nothing is lost" taggedAs (TimingTest) in { - val probe = TestProbe() - val probeA = TestProbe() - val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) - snd.tell(Req("a"), probe.ref) - probe.expectMsg(ReqAck) - probeA.expectMsg(Action(1, "a")) - probeA.expectNoMsg(1.second) + List(true, false).foreach { deliverUsingActorSelection ⇒ + + s"deliver messages in order when nothing is lost (using actorSelection: $deliverUsingActorSelection)" taggedAs (TimingTest) in { + val probe = TestProbe() + val probeA = TestProbe() + val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name) + snd.tell(Req("a"), probe.ref) + probe.expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a")) + probeA.expectNoMsg(1.second) + } + + s"re-deliver lost messages (using actorSelection: $deliverUsingActorSelection)" taggedAs (TimingTest) in { + val probe = TestProbe() + val probeA = TestProbe() + val dst = system.actorOf(destinationProps(probeA.ref)) + val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false, actorSelectionDelivery = deliverUsingActorSelection), name) + snd.tell(Req("a-1"), probe.ref) + probe.expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a-1")) + + snd.tell(Req("a-2"), probe.ref) + probe.expectMsg(ReqAck) + probeA.expectMsg(Action(2, "a-2")) + + snd.tell(Req("a-3"), probe.ref) + snd.tell(Req("a-4"), probe.ref) + probe.expectMsg(ReqAck) + probe.expectMsg(ReqAck) + // a-3 was lost + probeA.expectMsg(Action(4, "a-4")) + // and then re-delivered + probeA.expectMsg(Action(3, "a-3")) + probeA.expectNoMsg(1.second) + } } - "re-deliver lost messages" taggedAs (TimingTest) in { - val probe = TestProbe() - val probeA = TestProbe() - val dst = system.actorOf(destinationProps(probeA.ref)) - val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) - snd.tell(Req("a-1"), probe.ref) - probe.expectMsg(ReqAck) - probeA.expectMsg(Action(1, "a-1")) - - snd.tell(Req("a-2"), probe.ref) - probe.expectMsg(ReqAck) - probeA.expectMsg(Action(2, "a-2")) - - snd.tell(Req("a-3"), probe.ref) - snd.tell(Req("a-4"), probe.ref) - probe.expectMsg(ReqAck) - probe.expectMsg(ReqAck) - // a-3 was lost - probeA.expectMsg(Action(4, "a-4")) - // and then re-delivered - probeA.expectMsg(Action(3, "a-3")) - probeA.expectNoMsg(1.second) + "not allow using actorSelection with wildcards" in { + system.actorOf(Props(classOf[DeliverToStarSelection], name)) ! "anything, really." + expectMsgType[Failure[_]].toString should include("not supported") } "re-deliver lost messages after restart" taggedAs (TimingTest) in { @@ -194,7 +223,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name) snd.tell(Req("a-1"), probe.ref) probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -228,7 +257,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name) snd.tell(Req("a-1"), probe.ref) probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -265,7 +294,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = false), name) snd.tell(Req("a-1"), probe.ref) probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -303,7 +332,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c val probeA = TestProbe() val probeB = TestProbe() val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 3, 1000, async = false, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 3, 1000, destinations, async = false), name) snd.tell(Req("a-1"), probe.ref) snd.tell(Req("b-1"), probe.ref) snd.tell(Req("b-2"), probe.ref) @@ -330,7 +359,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c "A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path, "B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path, "C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = true, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, destinations, async = true), name) val N = 100 for (n ← 1 to N) { snd.tell(Req("a-" + n), probe.ref) @@ -353,7 +382,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) - val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 2, async = true, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 2, destinations, async = true), name) val N = 10 for (n ← 1 to N) {