Merge branch 'master' into unify-operator-signature-apidoc
This commit is contained in:
commit
a25c6091b8
13 changed files with 395 additions and 70 deletions
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.actor.typed.delivery
|
package akka.actor.typed.delivery
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import org.scalatest.wordspec.AnyWordSpecLike
|
import org.scalatest.wordspec.AnyWordSpecLike
|
||||||
|
|
||||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||||
|
|
@ -12,12 +14,12 @@ import akka.actor.typed.delivery.ConsumerController.DeliverThenStop
|
||||||
import akka.actor.typed.delivery.internal.ConsumerControllerImpl
|
import akka.actor.typed.delivery.internal.ConsumerControllerImpl
|
||||||
import akka.actor.typed.delivery.internal.ProducerControllerImpl
|
import akka.actor.typed.delivery.internal.ProducerControllerImpl
|
||||||
|
|
||||||
class ConsumerControllerSpec
|
class ConsumerControllerSpec extends ScalaTestWithActorTestKit("""
|
||||||
extends ScalaTestWithActorTestKit("""
|
akka.reliable-delivery.consumer-controller {
|
||||||
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
flow-control-window = 20
|
||||||
""")
|
resend-interval-min = 1s
|
||||||
with AnyWordSpecLike
|
}
|
||||||
with LogCapturing {
|
""") with AnyWordSpecLike with LogCapturing {
|
||||||
import TestConsumer.sequencedMessage
|
import TestConsumer.sequencedMessage
|
||||||
|
|
||||||
private var idCount = 0
|
private var idCount = 0
|
||||||
|
|
@ -62,6 +64,7 @@ class ConsumerControllerSpec
|
||||||
val producerControllerProbe2 = createTestProbe[ProducerControllerImpl.InternalCommand]()
|
val producerControllerProbe2 = createTestProbe[ProducerControllerImpl.InternalCommand]()
|
||||||
consumerController ! ConsumerController.RegisterToProducerController(producerControllerProbe2.ref)
|
consumerController ! ConsumerController.RegisterToProducerController(producerControllerProbe2.ref)
|
||||||
producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController))
|
producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController))
|
||||||
|
consumerProbe.receiveMessage().confirmTo ! ConsumerController.Confirmed
|
||||||
// expected resend
|
// expected resend
|
||||||
producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController))
|
producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController))
|
||||||
|
|
||||||
|
|
@ -80,14 +83,12 @@ class ConsumerControllerSpec
|
||||||
val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]()
|
val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]()
|
||||||
consumerController ! ConsumerController.Start(consumerProbe.ref)
|
consumerController ! ConsumerController.Start(consumerProbe.ref)
|
||||||
|
|
||||||
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]]
|
|
||||||
|
|
||||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false))
|
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false))
|
||||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, true))
|
|
||||||
|
|
||||||
consumerController ! ConsumerController.Confirmed
|
consumerController ! ConsumerController.Confirmed
|
||||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, false))
|
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, false))
|
||||||
|
|
||||||
|
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, true))
|
||||||
|
|
||||||
testKit.stop(consumerController)
|
testKit.stop(consumerController)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -185,15 +186,17 @@ class ConsumerControllerSpec
|
||||||
consumerController ! sequencedMessage(producerId, 2, producerControllerProbe.ref)
|
consumerController ! sequencedMessage(producerId, 2, producerControllerProbe.ref)
|
||||||
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]]
|
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]]
|
||||||
consumerController ! ConsumerController.Confirmed
|
consumerController ! ConsumerController.Confirmed
|
||||||
|
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true))
|
||||||
|
|
||||||
consumerController ! sequencedMessage(producerId, 3, producerControllerProbe.ref)
|
consumerController ! sequencedMessage(producerId, 3, producerControllerProbe.ref)
|
||||||
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].seqNr should ===(3)
|
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].seqNr should ===(3)
|
||||||
|
|
||||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true))
|
|
||||||
|
|
||||||
consumerController ! ConsumerController.Confirmed
|
consumerController ! ConsumerController.Confirmed
|
||||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true))
|
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true))
|
||||||
|
|
||||||
|
// exponential back, so now it should be more than 1 sec
|
||||||
|
producerControllerProbe.expectNoMessage(1.1.second)
|
||||||
|
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true))
|
||||||
|
|
||||||
testKit.stop(consumerController)
|
testKit.stop(consumerController)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import akka.actor.typed.delivery.internal.ProducerControllerImpl
|
||||||
class DurableProducerControllerSpec
|
class DurableProducerControllerSpec
|
||||||
extends ScalaTestWithActorTestKit("""
|
extends ScalaTestWithActorTestKit("""
|
||||||
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
||||||
|
akka.reliable-delivery.consumer-controller.resend-interval-min = 1s
|
||||||
""")
|
""")
|
||||||
with AnyWordSpecLike
|
with AnyWordSpecLike
|
||||||
with LogCapturing {
|
with LogCapturing {
|
||||||
|
|
|
||||||
|
|
@ -42,12 +42,13 @@ object ReliableDeliveryRandomSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ReliableDeliveryRandomSpec
|
class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
|
||||||
extends ScalaTestWithActorTestKit("""
|
akka.reliable-delivery.consumer-controller {
|
||||||
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
flow-control-window = 20
|
||||||
""")
|
resend-interval-min = 500 ms
|
||||||
with AnyWordSpecLike
|
resend-interval-max = 2 s
|
||||||
with LogCapturing {
|
}
|
||||||
|
""") with AnyWordSpecLike with LogCapturing {
|
||||||
import ReliableDeliveryRandomSpec._
|
import ReliableDeliveryRandomSpec._
|
||||||
|
|
||||||
private var idCount = 0
|
private var idCount = 0
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,9 @@
|
||||||
# Changes to internals of reliable delivery
|
# Changes to internals of reliable delivery
|
||||||
ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*")
|
ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*")
|
||||||
|
|
||||||
|
# #28720 Dynamically adjust the ConsumerController's resend interval
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.resendInterval")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.getResendInterval")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.this")
|
||||||
|
|
|
||||||
|
|
@ -90,8 +90,10 @@ akka.reliable-delivery {
|
||||||
flow-control-window = 50
|
flow-control-window = 50
|
||||||
|
|
||||||
# The ConsumerController resends flow control messages to the
|
# The ConsumerController resends flow control messages to the
|
||||||
# ProducerController with this interval.
|
# ProducerController with the resend-interval-min, and increasing
|
||||||
resend-interval = 1s
|
# it gradually to resend-interval-max when idle.
|
||||||
|
resend-interval-min = 2s
|
||||||
|
resend-interval-max = 30s
|
||||||
|
|
||||||
# If this is enabled lost messages will not be resent, but flow control is used.
|
# If this is enabled lost messages will not be resent, but flow control is used.
|
||||||
# This can be more efficient since messages don't have to be
|
# This can be more efficient since messages don't have to be
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,8 @@ object ConsumerController {
|
||||||
def apply(config: Config): Settings = {
|
def apply(config: Config): Settings = {
|
||||||
new Settings(
|
new Settings(
|
||||||
flowControlWindow = config.getInt("flow-control-window"),
|
flowControlWindow = config.getInt("flow-control-window"),
|
||||||
resendInterval = config.getDuration("resend-interval").asScala,
|
resendIntervalMin = config.getDuration("resend-interval-min").asScala,
|
||||||
|
resendIntervalMax = config.getDuration("resend-interval-max").asScala,
|
||||||
onlyFlowControl = config.getBoolean("only-flow-control"))
|
onlyFlowControl = config.getBoolean("only-flow-control"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -183,7 +184,8 @@ object ConsumerController {
|
||||||
|
|
||||||
final class Settings private (
|
final class Settings private (
|
||||||
val flowControlWindow: Int,
|
val flowControlWindow: Int,
|
||||||
val resendInterval: FiniteDuration,
|
val resendIntervalMin: FiniteDuration,
|
||||||
|
val resendIntervalMax: FiniteDuration,
|
||||||
val onlyFlowControl: Boolean) {
|
val onlyFlowControl: Boolean) {
|
||||||
|
|
||||||
def withFlowControlWindow(newFlowControlWindow: Int): Settings =
|
def withFlowControlWindow(newFlowControlWindow: Int): Settings =
|
||||||
|
|
@ -192,20 +194,32 @@ object ConsumerController {
|
||||||
/**
|
/**
|
||||||
* Scala API
|
* Scala API
|
||||||
*/
|
*/
|
||||||
def withResendInterval(newResendInterval: FiniteDuration): Settings =
|
def withResendIntervalMin(newResendIntervalMin: FiniteDuration): Settings =
|
||||||
copy(resendInterval = newResendInterval)
|
copy(resendIntervalMin = newResendIntervalMin)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API
|
||||||
|
*/
|
||||||
|
def withResendIntervalMax(newResendIntervalMax: FiniteDuration): Settings =
|
||||||
|
copy(resendIntervalMax = newResendIntervalMax)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
*/
|
*/
|
||||||
def withResendInterval(newResendInterval: JavaDuration): Settings =
|
def withResendIntervalMin(newResendIntervalMin: JavaDuration): Settings =
|
||||||
copy(resendInterval = newResendInterval.asScala)
|
copy(resendIntervalMin = newResendIntervalMin.asScala)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
*/
|
*/
|
||||||
def getResendInterval(): JavaDuration =
|
def withResendIntervalMax(newResendIntervalMax: JavaDuration): Settings =
|
||||||
resendInterval.asJava
|
copy(resendIntervalMax = newResendIntervalMax.asScala)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def getResendIntervalMax(): JavaDuration =
|
||||||
|
resendIntervalMax.asJava
|
||||||
|
|
||||||
def withOnlyFlowControl(newOnlyFlowControl: Boolean): Settings =
|
def withOnlyFlowControl(newOnlyFlowControl: Boolean): Settings =
|
||||||
copy(onlyFlowControl = newOnlyFlowControl)
|
copy(onlyFlowControl = newOnlyFlowControl)
|
||||||
|
|
@ -215,12 +229,13 @@ object ConsumerController {
|
||||||
*/
|
*/
|
||||||
private def copy(
|
private def copy(
|
||||||
flowControlWindow: Int = flowControlWindow,
|
flowControlWindow: Int = flowControlWindow,
|
||||||
resendInterval: FiniteDuration = resendInterval,
|
resendIntervalMin: FiniteDuration = resendIntervalMin,
|
||||||
|
resendIntervalMax: FiniteDuration = resendIntervalMax,
|
||||||
onlyFlowControl: Boolean = onlyFlowControl) =
|
onlyFlowControl: Boolean = onlyFlowControl) =
|
||||||
new Settings(flowControlWindow, resendInterval, onlyFlowControl)
|
new Settings(flowControlWindow, resendIntervalMin, resendIntervalMax, onlyFlowControl)
|
||||||
|
|
||||||
override def toString: String =
|
override def toString: String =
|
||||||
s"Settings($flowControlWindow, $resendInterval, $onlyFlowControl)"
|
s"Settings($flowControlWindow, $resendIntervalMin, $onlyFlowControl)"
|
||||||
}
|
}
|
||||||
|
|
||||||
def apply[A](): Behavior[Command[A]] =
|
def apply[A](): Behavior[Command[A]] =
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.actor.typed.delivery.internal
|
package akka.actor.typed.delivery.internal
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
import akka.actor.typed.ActorRef
|
import akka.actor.typed.ActorRef
|
||||||
import akka.actor.typed.Behavior
|
import akka.actor.typed.Behavior
|
||||||
import akka.actor.typed.PostStop
|
import akka.actor.typed.PostStop
|
||||||
|
|
@ -128,8 +130,9 @@ import akka.util.ConstantFun.scalaIdentityFunction
|
||||||
context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo))
|
context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo))
|
||||||
|
|
||||||
flightRecorder.consumerStarted(context.self.path)
|
flightRecorder.consumerStarted(context.self.path)
|
||||||
|
val retryTimer = new RetryTimer(timers, settings.resendIntervalMin, settings.resendIntervalMax)
|
||||||
val activeBehavior =
|
val activeBehavior =
|
||||||
new ConsumerControllerImpl[A](context, timers, stashBuffer, settings)
|
new ConsumerControllerImpl[A](context, retryTimer, stashBuffer, settings)
|
||||||
.active(initialState(context, s, registering, stopping))
|
.active(initialState(context, s, registering, stopping))
|
||||||
context.log.debug("Received Start, unstash [{}] messages.", stashBuffer.size)
|
context.log.debug("Received Start, unstash [{}] messages.", stashBuffer.size)
|
||||||
stashBuffer.unstash(activeBehavior, 1, scalaIdentityFunction)
|
stashBuffer.unstash(activeBehavior, 1, scalaIdentityFunction)
|
||||||
|
|
@ -171,7 +174,7 @@ import akka.util.ConstantFun.scalaIdentityFunction
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval)
|
timers.startTimerWithFixedDelay(Retry, Retry, settings.resendIntervalMin)
|
||||||
waitForStart(None, stopping = false)
|
waitForStart(None, stopping = false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -207,11 +210,47 @@ import akka.util.ConstantFun.scalaIdentityFunction
|
||||||
if (ref.path.address.hasGlobalScope)
|
if (ref.path.address.hasGlobalScope)
|
||||||
throw new IllegalArgumentException(s"Consumer [$ref] should be local.")
|
throw new IllegalArgumentException(s"Consumer [$ref] should be local.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class RetryTimer(
|
||||||
|
timers: TimerScheduler[ConsumerControllerImpl.InternalCommand],
|
||||||
|
val minBackoff: FiniteDuration,
|
||||||
|
maxBackoff: FiniteDuration) {
|
||||||
|
private var _interval = minBackoff
|
||||||
|
|
||||||
|
def interval(): FiniteDuration =
|
||||||
|
_interval
|
||||||
|
|
||||||
|
def start(): Unit = {
|
||||||
|
_interval = minBackoff
|
||||||
|
timers.startTimerWithFixedDelay(Retry, _interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
def scheduleNext(): Unit = {
|
||||||
|
val newInterval =
|
||||||
|
if (_interval eq maxBackoff)
|
||||||
|
maxBackoff
|
||||||
|
else
|
||||||
|
maxBackoff.min(_interval * 1.5) match {
|
||||||
|
case f: FiniteDuration => f
|
||||||
|
case _ => maxBackoff
|
||||||
|
}
|
||||||
|
if (newInterval != _interval) {
|
||||||
|
_interval = newInterval
|
||||||
|
timers.startTimerWithFixedDelay(Retry, _interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def reset(): Unit = {
|
||||||
|
if (_interval ne minBackoff)
|
||||||
|
start()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ConsumerControllerImpl[A](
|
private class ConsumerControllerImpl[A](
|
||||||
context: ActorContext[ConsumerControllerImpl.InternalCommand],
|
context: ActorContext[ConsumerControllerImpl.InternalCommand],
|
||||||
timers: TimerScheduler[ConsumerControllerImpl.InternalCommand],
|
retryTimer: ConsumerControllerImpl.RetryTimer,
|
||||||
stashBuffer: StashBuffer[ConsumerControllerImpl.InternalCommand],
|
stashBuffer: StashBuffer[ConsumerControllerImpl.InternalCommand],
|
||||||
settings: ConsumerController.Settings) {
|
settings: ConsumerController.Settings) {
|
||||||
|
|
||||||
|
|
@ -230,7 +269,7 @@ private class ConsumerControllerImpl[A](
|
||||||
|
|
||||||
private val traceEnabled = context.log.isTraceEnabled
|
private val traceEnabled = context.log.isTraceEnabled
|
||||||
|
|
||||||
startRetryTimer()
|
retryTimer.start()
|
||||||
|
|
||||||
private def resendLost = !settings.onlyFlowControl
|
private def resendLost = !settings.onlyFlowControl
|
||||||
|
|
||||||
|
|
@ -245,6 +284,7 @@ private class ConsumerControllerImpl[A](
|
||||||
val expectedSeqNr = s.receivedSeqNr + 1
|
val expectedSeqNr = s.receivedSeqNr + 1
|
||||||
|
|
||||||
flightRecorder.consumerReceived(pid, seqNr)
|
flightRecorder.consumerReceived(pid, seqNr)
|
||||||
|
retryTimer.reset()
|
||||||
|
|
||||||
if (s.isProducerChanged(seqMsg)) {
|
if (s.isProducerChanged(seqMsg)) {
|
||||||
if (seqMsg.first && traceEnabled)
|
if (seqMsg.first && traceEnabled)
|
||||||
|
|
@ -269,6 +309,7 @@ private class ConsumerControllerImpl[A](
|
||||||
if (resendLost) {
|
if (resendLost) {
|
||||||
seqMsg.producerController ! Resend(fromSeqNr = expectedSeqNr)
|
seqMsg.producerController ! Resend(fromSeqNr = expectedSeqNr)
|
||||||
stashBuffer.clear()
|
stashBuffer.clear()
|
||||||
|
retryTimer.start()
|
||||||
resending(s)
|
resending(s)
|
||||||
} else {
|
} else {
|
||||||
deliver(s.copy(receivedSeqNr = seqNr), seqMsg)
|
deliver(s.copy(receivedSeqNr = seqNr), seqMsg)
|
||||||
|
|
@ -336,6 +377,7 @@ private class ConsumerControllerImpl[A](
|
||||||
// request resend of all unconfirmed, and mark first
|
// request resend of all unconfirmed, and mark first
|
||||||
seqMsg.producerController ! Resend(0)
|
seqMsg.producerController ! Resend(0)
|
||||||
stashBuffer.clear()
|
stashBuffer.clear()
|
||||||
|
retryTimer.start()
|
||||||
resending(s)
|
resending(s)
|
||||||
} else {
|
} else {
|
||||||
context.log.warnN(
|
context.log.warnN(
|
||||||
|
|
@ -469,7 +511,7 @@ private class ConsumerControllerImpl[A](
|
||||||
seqNr,
|
seqNr,
|
||||||
newRequestedSeqNr)
|
newRequestedSeqNr)
|
||||||
s.producerController ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaTimeout = false)
|
s.producerController ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaTimeout = false)
|
||||||
startRetryTimer() // reset interval since Request was just sent
|
retryTimer.start() // reset interval since Request was just sent
|
||||||
newRequestedSeqNr
|
newRequestedSeqNr
|
||||||
} else {
|
} else {
|
||||||
if (seqMsg.ack) {
|
if (seqMsg.ack) {
|
||||||
|
|
@ -518,7 +560,8 @@ private class ConsumerControllerImpl[A](
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
|
|
||||||
case Retry =>
|
case Retry =>
|
||||||
receiveRetry(s, () => waitingForConfirmation(retryRequest(s), seqMsg))
|
// no retries when waitingForConfirmation, will be performed from (idle) active
|
||||||
|
Behaviors.same
|
||||||
|
|
||||||
case start: Start[A] =>
|
case start: Start[A] =>
|
||||||
start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr)
|
start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr)
|
||||||
|
|
@ -542,6 +585,9 @@ private class ConsumerControllerImpl[A](
|
||||||
}
|
}
|
||||||
|
|
||||||
private def receiveRetry(s: State[A], nextBehavior: () => Behavior[InternalCommand]): Behavior[InternalCommand] = {
|
private def receiveRetry(s: State[A], nextBehavior: () => Behavior[InternalCommand]): Behavior[InternalCommand] = {
|
||||||
|
retryTimer.scheduleNext()
|
||||||
|
if (retryTimer.interval() != retryTimer.minBackoff)
|
||||||
|
context.log.debug("Schedule next retry in [{} ms]", retryTimer.interval().toMillis)
|
||||||
s.registering match {
|
s.registering match {
|
||||||
case None => nextBehavior()
|
case None => nextBehavior()
|
||||||
case Some(reg) =>
|
case Some(reg) =>
|
||||||
|
|
@ -574,6 +620,7 @@ private class ConsumerControllerImpl[A](
|
||||||
"Register to new ProducerController [{}], previous was [{}].",
|
"Register to new ProducerController [{}], previous was [{}].",
|
||||||
reg.producerController,
|
reg.producerController,
|
||||||
s.producerController)
|
s.producerController)
|
||||||
|
retryTimer.start()
|
||||||
reg.producerController ! ProducerController.RegisterConsumer(context.self)
|
reg.producerController ! ProducerController.RegisterConsumer(context.self)
|
||||||
nextBehavior(s.copy(registering = Some(reg.producerController)))
|
nextBehavior(s.copy(registering = Some(reg.producerController)))
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -602,25 +649,17 @@ private class ConsumerControllerImpl[A](
|
||||||
Behaviors.unhandled
|
Behaviors.unhandled
|
||||||
}
|
}
|
||||||
|
|
||||||
private def startRetryTimer(): Unit = {
|
|
||||||
timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval)
|
|
||||||
}
|
|
||||||
|
|
||||||
// in case the Request or the SequencedMessage triggering the Request is lost
|
// in case the Request or the SequencedMessage triggering the Request is lost
|
||||||
private def retryRequest(s: State[A]): State[A] = {
|
private def retryRequest(s: State[A]): State[A] = {
|
||||||
if (s.producerController == context.system.deadLetters) {
|
if (s.producerController == context.system.deadLetters) {
|
||||||
s
|
s
|
||||||
} else {
|
} else {
|
||||||
// TODO #28720 Maybe try to adjust the retry frequency. Maybe some exponential backoff and less need for it when
|
|
||||||
// SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each
|
|
||||||
// incoming SequenceMessage.
|
|
||||||
val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2
|
val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2
|
||||||
flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr)
|
flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr)
|
||||||
context.log.debug(
|
context.log.debug(
|
||||||
"Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].",
|
"Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].",
|
||||||
s.confirmedSeqNr,
|
s.confirmedSeqNr,
|
||||||
newRequestedSeqNr)
|
newRequestedSeqNr)
|
||||||
// TODO #28720 maybe watch the producer to avoid sending retry Request to dead producer
|
|
||||||
s.producerController ! Request(s.confirmedSeqNr, newRequestedSeqNr, resendLost, viaTimeout = true)
|
s.producerController ! Request(s.confirmedSeqNr, newRequestedSeqNr, resendLost, viaTimeout = true)
|
||||||
s.copy(requestedSeqNr = newRequestedSeqNr)
|
s.copy(requestedSeqNr = newRequestedSeqNr)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ Like `extrapolate`, but does not have the `initial` argument, and the `Iterator`
|
||||||
@apidoc[Source.expand](Source) { scala="#expand[U](expander:Out=>Iterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" }
|
@apidoc[Source.expand](Source) { scala="#expand[U](expander:Out=>Iterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" }
|
||||||
@apidoc[Flow.expand](Flow) { scala="#expand[U](expander:Out=>Iterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" }
|
@apidoc[Flow.expand](Flow) { scala="#expand[U](expander:Out=>Iterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" }
|
||||||
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original
|
Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original
|
||||||
|
|
@ -18,6 +17,21 @@ element, allowing for it to be rewritten and/or filtered.
|
||||||
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
|
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
|
||||||
and examples.
|
and examples.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit
|
||||||
|
unreliable. It's fine, as long as the audio remains fluent, it doesn't matter if we can't decode
|
||||||
|
a frame or two (or more). But we also want to watermark every decoded frame with the name of
|
||||||
|
our colleague. `expand` provides access to the element flowing through the stream
|
||||||
|
and let's us create extra frames in case the producer slows down:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ExtrapolateAndExpand.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala) { #expand }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ExtrapolateAndExpand.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java) { #expand }
|
||||||
|
|
||||||
|
|
||||||
## Reactive Streams semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ Allow for a faster downstream by expanding the last emitted element to an `Itera
|
||||||
@apidoc[Source.extrapolate](Source) { scala="#extrapolate[U>:Out](extrapolator:U=>Iterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function,java.lang.Object)" }
|
@apidoc[Source.extrapolate](Source) { scala="#extrapolate[U>:Out](extrapolator:U=>Iterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function,java.lang.Object)" }
|
||||||
@apidoc[Flow.extrapolate](Flow) { scala="#extrapolate[U>:Out](extrapolator:U=>Iterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function,java.lang.Object)" }
|
@apidoc[Flow.extrapolate](Flow) { scala="#extrapolate[U>:Out](extrapolator:U=>Iterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function,java.lang.Object)" }
|
||||||
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
Allow for a faster downstream by expanding the last emitted element to an `Iterator`. For example, an
|
Allow for a faster downstream by expanding the last emitted element to an `Iterator`. For example, an
|
||||||
|
|
@ -23,6 +22,19 @@ Includes an optional `initial` argument to prevent blocking the entire stream wh
|
||||||
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
|
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
|
||||||
and examples.
|
and examples.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible
|
||||||
|
the network bandwidth is a bit unreliable. It's fine, as long as the audio remains fluent, it doesn't matter
|
||||||
|
if we can't decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last
|
||||||
|
frame decoded:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ExtrapolateAndExpand.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala) { #extrapolate }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ExtrapolateAndExpand.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java) { #extrapolate }
|
||||||
|
|
||||||
## Reactive Streams semantics
|
## Reactive Streams semantics
|
||||||
|
|
||||||
@@@div { .callout }
|
@@@div { .callout }
|
||||||
|
|
@ -34,4 +46,3 @@ and examples.
|
||||||
**completes** when upstream completes
|
**completes** when upstream completes
|
||||||
|
|
||||||
@@@
|
@@@
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -234,7 +234,7 @@ Scala
|
||||||
Java
|
Java
|
||||||
: @@snip [RateTransformationDocTest.java](/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-seed }
|
: @@snip [RateTransformationDocTest.java](/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-seed }
|
||||||
|
|
||||||
`extrapolate` and `expand` also allow to produce metainformation based on demand signalled from the downstream.
|
`extrapolate` and `expand` also allow to produce meta-information based on demand signalled from the downstream.
|
||||||
Leveraging this, here is a flow that tracks and reports a drift between a fast consumer and a slow producer.
|
Leveraging this, here is a flow that tracks and reports a drift between a fast consumer and a slow producer.
|
||||||
|
|
||||||
Scala
|
Scala
|
||||||
|
|
@ -260,3 +260,5 @@ This makes `expand` able to transform or even filter out (by providing an empty
|
||||||
|
|
||||||
Regardless, since we provide a non-empty `Iterator` in both examples, this means that the
|
Regardless, since we provide a non-empty `Iterator` in both examples, this means that the
|
||||||
output of this flow is going to report a drift of zero if the producer is fast enough - or a larger drift otherwise.
|
output of this flow is going to report a drift of zero if the producer is fast enough - or a larger drift otherwise.
|
||||||
|
|
||||||
|
See also @ref:[`extrapolate`](operators/Source-or-Flow/extrapolate.md) and @ref:[`expand`](operators/Source-or-Flow/expand.md) for more information and examples.
|
||||||
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.stream.operators.sourceorflow;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.Cancellable;
|
||||||
|
import akka.japi.Pair;
|
||||||
|
import akka.japi.function.Function;
|
||||||
|
import akka.stream.javadsl.Flow;
|
||||||
|
import akka.stream.javadsl.Sink;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
import akka.util.ByteString;
|
||||||
|
import docs.stream.operators.sourceorflow.ExtrapolateAndExpandCommon;
|
||||||
|
import docs.stream.operators.sourceorflow.ExtrapolateAndExpandCommon.Frame;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/** */
|
||||||
|
public class ExtrapolateAndExpand {
|
||||||
|
public static Function<ByteString, Frame> decodeAsFrame =
|
||||||
|
ExtrapolateAndExpandCommon.Frame$.MODULE$::decode;
|
||||||
|
|
||||||
|
public static Frame BLACK_FRAME = ExtrapolateAndExpandCommon.Frame$.MODULE$.blackFrame();
|
||||||
|
|
||||||
|
public static long nowInSeconds() {
|
||||||
|
return ExtrapolateAndExpand.nowInSeconds();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
ActorSystem actorSystem = ActorSystem.create("25fps-stream");
|
||||||
|
|
||||||
|
Source<ByteString, NotUsed> networkSource = ExtrapolateAndExpandCommon.networkSource().asJava();
|
||||||
|
|
||||||
|
Flow<ByteString, Frame, NotUsed> decode = Flow.of(ByteString.class).<Frame>map(decodeAsFrame);
|
||||||
|
|
||||||
|
// #extrapolate
|
||||||
|
// if upstream is too slow, produce copies of the last frame but grayed out.
|
||||||
|
Flow<Frame, Frame, NotUsed> rateControl =
|
||||||
|
Flow.of(Frame.class)
|
||||||
|
.extrapolate(
|
||||||
|
lastFrame -> {
|
||||||
|
Frame gray =
|
||||||
|
new Frame(
|
||||||
|
ByteString.fromString(
|
||||||
|
"gray frame!! - " + lastFrame.pixels().utf8String()));
|
||||||
|
return Stream.iterate(gray, i -> i).iterator();
|
||||||
|
},
|
||||||
|
BLACK_FRAME // initial value
|
||||||
|
);
|
||||||
|
|
||||||
|
Source<Frame, NotUsed> videoSource = networkSource.via(decode).via(rateControl);
|
||||||
|
|
||||||
|
// let's create a 25fps stream (a Frame every 40.millis)
|
||||||
|
Source<String, Cancellable> tickSource =
|
||||||
|
Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick");
|
||||||
|
|
||||||
|
Source<Frame, Cancellable> videoAt25Fps = tickSource.zip(videoSource).map(Pair::second);
|
||||||
|
|
||||||
|
// #extrapolate
|
||||||
|
|
||||||
|
// #expand
|
||||||
|
// each element flowing through the stream is expanded to a watermark copy
|
||||||
|
// of the upstream frame and grayed out copies. The grayed out copies should
|
||||||
|
// only be used downstream if the producer is too slow.
|
||||||
|
Flow<Frame, Frame, NotUsed> watermarkerRateControl =
|
||||||
|
Flow.of(Frame.class)
|
||||||
|
.expand(
|
||||||
|
lastFrame -> {
|
||||||
|
Frame watermarked =
|
||||||
|
new Frame(
|
||||||
|
lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark")));
|
||||||
|
Frame gray =
|
||||||
|
new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray")));
|
||||||
|
return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i))
|
||||||
|
.iterator();
|
||||||
|
});
|
||||||
|
|
||||||
|
Source<Frame, NotUsed> watermakedVideoSource =
|
||||||
|
networkSource.via(decode).via(watermarkerRateControl);
|
||||||
|
|
||||||
|
// let's create a 25fps stream (a Frame every 40.millis)
|
||||||
|
Source<String, Cancellable> ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick");
|
||||||
|
|
||||||
|
Source<Frame, Cancellable> watermarkedVideoAt25Fps =
|
||||||
|
ticks.zip(watermakedVideoSource).map(Pair::second);
|
||||||
|
|
||||||
|
// #expand
|
||||||
|
videoAt25Fps
|
||||||
|
.map(Frame::pixels)
|
||||||
|
.map(ByteString::utf8String)
|
||||||
|
.map(pixels -> nowInSeconds() + " - " + pixels)
|
||||||
|
.to(Sink.foreach(System.out::println))
|
||||||
|
.run(actorSystem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,123 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.stream.operators.sourceorflow
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.Cancellable
|
||||||
|
import akka.stream.DelayOverflowStrategy
|
||||||
|
import akka.stream.scaladsl.DelayStrategy
|
||||||
|
import akka.stream.scaladsl.Flow
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
import akka.util.ByteString
|
||||||
|
import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.fps
|
||||||
|
import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.nowInSeconds
|
||||||
|
import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.periodInMillis
|
||||||
|
import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.videoAt25Fps
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import scala.util.Random
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
object ExtrapolateAndExpandMain extends App {
|
||||||
|
implicit val sys = ActorSystem("25fps-stream")
|
||||||
|
videoAt25Fps.map(_.pixels.utf8String).map(frame => s"$nowInSeconds - $frame").to(Sink.foreach(println)).run()
|
||||||
|
|
||||||
|
}
|
||||||
|
object ExtrapolateAndExpand {
|
||||||
|
|
||||||
|
val periodInMillis = 40
|
||||||
|
val fps = 1000 / periodInMillis
|
||||||
|
|
||||||
|
import ExtrapolateAndExpandCommon._
|
||||||
|
|
||||||
|
val decode: Flow[ByteString, Frame, NotUsed] =
|
||||||
|
Flow[ByteString].map(Frame.decode)
|
||||||
|
|
||||||
|
// #extrapolate
|
||||||
|
// if upstream is too slow, produce copies of the last frame but grayed out.
|
||||||
|
val rateControl: Flow[Frame, Frame, NotUsed] =
|
||||||
|
Flow[Frame].extrapolate((frame: Frame) => {
|
||||||
|
val grayedOut = frame.withFilter(Gray)
|
||||||
|
Iterator.continually(grayedOut)
|
||||||
|
}, Some(Frame.blackFrame))
|
||||||
|
|
||||||
|
val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl)
|
||||||
|
|
||||||
|
// let's create a 25fps stream (a Frame every 40.millis)
|
||||||
|
val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick)
|
||||||
|
|
||||||
|
val videoAt25Fps: Source[Frame, Cancellable] =
|
||||||
|
tickSource.zip(videoSource).map(_._2)
|
||||||
|
// #extrapolate
|
||||||
|
|
||||||
|
// #expand
|
||||||
|
// each element flowing through the stream is expanded to a watermark copy
|
||||||
|
// of the upstream frame and grayed out copies. The grayed out copies should
|
||||||
|
// only be used downstream if the producer is too slow.
|
||||||
|
val watermarkerRateControl: Flow[Frame, Frame, NotUsed] =
|
||||||
|
Flow[Frame].expand((frame: Frame) => {
|
||||||
|
val watermarked = frame.withFilter(Watermark)
|
||||||
|
val grayedOut = frame.withFilter(Gray)
|
||||||
|
(Iterator.single(watermarked) ++ Iterator.continually(grayedOut))
|
||||||
|
})
|
||||||
|
|
||||||
|
val watermarkedVideoSource: Source[Frame, NotUsed] =
|
||||||
|
networkSource.via(decode).via(rateControl)
|
||||||
|
|
||||||
|
// let's create a 25fps stream (a Frame every 40.millis)
|
||||||
|
val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick)
|
||||||
|
|
||||||
|
val watermarkedVideoAt25Fps: Source[Frame, Cancellable] =
|
||||||
|
ticks.zip(watermarkedVideoSource).map(_._2)
|
||||||
|
|
||||||
|
// #expand
|
||||||
|
|
||||||
|
def nowInSeconds = System.nanoTime() / 1000000000
|
||||||
|
}
|
||||||
|
|
||||||
|
object ExtrapolateAndExpandCommon {
|
||||||
|
// This `networkSource` simulates a client sending frames over the network. There's a
|
||||||
|
// stage throttling the elements at 24fps and then a `delayWith` that randomly delays
|
||||||
|
// frames simulating network latency and bandwidth limitations (uses buffer of
|
||||||
|
// default capacity).
|
||||||
|
val networkSource: Source[ByteString, NotUsed] =
|
||||||
|
Source
|
||||||
|
.fromIterator(() => Iterator.from(0)) // produce frameIds
|
||||||
|
.throttle(fps, 1.second)
|
||||||
|
.map(i => ByteString.fromString(s"fakeFrame-$i"))
|
||||||
|
.delayWith(
|
||||||
|
() =>
|
||||||
|
new DelayStrategy[ByteString] {
|
||||||
|
override def nextDelay(elem: ByteString): FiniteDuration =
|
||||||
|
Random.nextInt(periodInMillis * 10).millis
|
||||||
|
},
|
||||||
|
DelayOverflowStrategy.dropBuffer)
|
||||||
|
|
||||||
|
case object Tick
|
||||||
|
|
||||||
|
sealed trait Filter {
|
||||||
|
def filter(fr: Frame): Frame
|
||||||
|
}
|
||||||
|
object Gray extends Filter {
|
||||||
|
override def filter(fr: Frame): Frame =
|
||||||
|
Frame(ByteString.fromString(s"gray frame!! - ${fr.pixels.utf8String}"))
|
||||||
|
}
|
||||||
|
object Watermark extends Filter {
|
||||||
|
override def filter(fr: Frame): Frame =
|
||||||
|
Frame(fr.pixels.++(ByteString.fromString(" - watermark")))
|
||||||
|
}
|
||||||
|
|
||||||
|
case class Frame(pixels: ByteString) {
|
||||||
|
def withFilter(f: Filter): Frame = f.filter(this)
|
||||||
|
}
|
||||||
|
object Frame {
|
||||||
|
val blackFrame: Frame = Frame(ByteString.empty)
|
||||||
|
def decode(bs: ByteString): Frame = Frame(bs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,12 +4,20 @@
|
||||||
|
|
||||||
package akka.remote.artery
|
package akka.remote.artery
|
||||||
|
|
||||||
|
import java.util.UUID
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
|
import akka.actor.Actor
|
||||||
import akka.actor.{ Actor, ActorIdentity, ActorRef, Deploy, Identify, PoisonPill, Props, RootActorPath }
|
import akka.actor.ActorIdentity
|
||||||
|
import akka.actor.ActorPath
|
||||||
|
import akka.actor.ActorRef
|
||||||
import akka.actor.ActorSelection
|
import akka.actor.ActorSelection
|
||||||
|
import akka.actor.Deploy
|
||||||
|
import akka.actor.Identify
|
||||||
|
import akka.actor.PoisonPill
|
||||||
|
import akka.actor.Props
|
||||||
|
import akka.actor.RootActorPath
|
||||||
import akka.testkit.{ ImplicitSender, TestActors, TestProbe }
|
import akka.testkit.{ ImplicitSender, TestActors, TestProbe }
|
||||||
|
|
||||||
class ArteryUpdSendConsistencyWithOneLaneSpec
|
class ArteryUpdSendConsistencyWithOneLaneSpec
|
||||||
|
|
@ -62,6 +70,17 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config)
|
||||||
val addressB = address(systemB)
|
val addressB = address(systemB)
|
||||||
val rootB = RootActorPath(addressB)
|
val rootB = RootActorPath(addressB)
|
||||||
|
|
||||||
|
private def actorRefBySelection(path: ActorPath) = {
|
||||||
|
|
||||||
|
val correlationId = Some(UUID.randomUUID().toString)
|
||||||
|
system.actorSelection(path) ! Identify(correlationId)
|
||||||
|
|
||||||
|
val actorIdentity = expectMsgType[ActorIdentity](5.seconds)
|
||||||
|
actorIdentity.correlationId shouldBe correlationId
|
||||||
|
|
||||||
|
actorIdentity.ref.get
|
||||||
|
}
|
||||||
|
|
||||||
"Artery" must {
|
"Artery" must {
|
||||||
|
|
||||||
"be able to identify a remote actor and ping it" in {
|
"be able to identify a remote actor and ping it" in {
|
||||||
|
|
@ -71,11 +90,9 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config)
|
||||||
}
|
}
|
||||||
}), "echo")
|
}), "echo")
|
||||||
|
|
||||||
val echoSel = system.actorSelection(rootB / "user" / "echo")
|
val actorPath = rootB / "user" / "echo"
|
||||||
val echoRef = {
|
val echoSel = system.actorSelection(actorPath)
|
||||||
system.actorSelection(rootB / "user" / "echo") ! Identify(None)
|
val echoRef = actorRefBySelection(actorPath)
|
||||||
expectMsgType[ActorIdentity](5.seconds).ref.get
|
|
||||||
}
|
|
||||||
|
|
||||||
echoRef ! "ping"
|
echoRef ! "ping"
|
||||||
expectMsg("pong")
|
expectMsg("pong")
|
||||||
|
|
@ -119,18 +136,9 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config)
|
||||||
systemB.actorOf(TestActors.echoActorProps, "echoB")
|
systemB.actorOf(TestActors.echoActorProps, "echoB")
|
||||||
systemB.actorOf(TestActors.echoActorProps, "echoC")
|
systemB.actorOf(TestActors.echoActorProps, "echoC")
|
||||||
|
|
||||||
val remoteRefA = {
|
val remoteRefA = actorRefBySelection(rootB / "user" / "echoA")
|
||||||
system.actorSelection(rootB / "user" / "echoA") ! Identify(None)
|
val remoteRefB = actorRefBySelection(rootB / "user" / "echoB")
|
||||||
expectMsgType[ActorIdentity].ref.get
|
val remoteRefC = actorRefBySelection(rootB / "user" / "echoC")
|
||||||
}
|
|
||||||
val remoteRefB = {
|
|
||||||
system.actorSelection(rootB / "user" / "echoB") ! Identify(None)
|
|
||||||
expectMsgType[ActorIdentity].ref.get
|
|
||||||
}
|
|
||||||
val remoteRefC = {
|
|
||||||
system.actorSelection(rootB / "user" / "echoC") ! Identify(None)
|
|
||||||
expectMsgType[ActorIdentity].ref.get
|
|
||||||
}
|
|
||||||
|
|
||||||
def senderProps(remoteRef: ActorRef) =
|
def senderProps(remoteRef: ActorRef) =
|
||||||
Props(new Actor {
|
Props(new Actor {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue