diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala index 32f15a4677..6f3a62c14b 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala @@ -4,6 +4,8 @@ package akka.actor.typed.delivery +import scala.concurrent.duration._ + import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -12,12 +14,12 @@ import akka.actor.typed.delivery.ConsumerController.DeliverThenStop import akka.actor.typed.delivery.internal.ConsumerControllerImpl import akka.actor.typed.delivery.internal.ProducerControllerImpl -class ConsumerControllerSpec - extends ScalaTestWithActorTestKit(""" - akka.reliable-delivery.consumer-controller.flow-control-window = 20 - """) - with AnyWordSpecLike - with LogCapturing { +class ConsumerControllerSpec extends ScalaTestWithActorTestKit(""" + akka.reliable-delivery.consumer-controller { + flow-control-window = 20 + resend-interval-min = 1s + } + """) with AnyWordSpecLike with LogCapturing { import TestConsumer.sequencedMessage private var idCount = 0 @@ -62,6 +64,7 @@ class ConsumerControllerSpec val producerControllerProbe2 = createTestProbe[ProducerControllerImpl.InternalCommand]() consumerController ! ConsumerController.RegisterToProducerController(producerControllerProbe2.ref) producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController)) + consumerProbe.receiveMessage().confirmTo ! ConsumerController.Confirmed // expected resend producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController)) @@ -80,14 +83,12 @@ class ConsumerControllerSpec val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]() consumerController ! ConsumerController.Start(consumerProbe.ref) - consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]] - producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false)) - producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, true)) - consumerController ! ConsumerController.Confirmed producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, false)) + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, true)) + testKit.stop(consumerController) } @@ -185,15 +186,17 @@ class ConsumerControllerSpec consumerController ! sequencedMessage(producerId, 2, producerControllerProbe.ref) consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]] consumerController ! ConsumerController.Confirmed + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true)) consumerController ! sequencedMessage(producerId, 3, producerControllerProbe.ref) consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].seqNr should ===(3) - - producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true)) - consumerController ! ConsumerController.Confirmed producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true)) + // exponential back, so now it should be more than 1 sec + producerControllerProbe.expectNoMessage(1.1.second) + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true)) + testKit.stop(consumerController) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala index d09a8aa488..6fffc2bb2d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala @@ -19,6 +19,7 @@ import akka.actor.typed.delivery.internal.ProducerControllerImpl class DurableProducerControllerSpec extends ScalaTestWithActorTestKit(""" akka.reliable-delivery.consumer-controller.flow-control-window = 20 + akka.reliable-delivery.consumer-controller.resend-interval-min = 1s """) with AnyWordSpecLike with LogCapturing { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala index ff7839ae79..9ef9fd1fd9 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala @@ -42,12 +42,13 @@ object ReliableDeliveryRandomSpec { } } -class ReliableDeliveryRandomSpec - extends ScalaTestWithActorTestKit(""" - akka.reliable-delivery.consumer-controller.flow-control-window = 20 - """) - with AnyWordSpecLike - with LogCapturing { +class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" + akka.reliable-delivery.consumer-controller { + flow-control-window = 20 + resend-interval-min = 500 ms + resend-interval-max = 2 s + } + """) with AnyWordSpecLike with LogCapturing { import ReliableDeliveryRandomSpec._ private var idCount = 0 diff --git a/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes b/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes index e59db0fb17..07a92c4b2e 100644 --- a/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes +++ b/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes @@ -1,2 +1,9 @@ # Changes to internals of reliable delivery ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*") + +# #28720 Dynamically adjust the ConsumerController's resend interval +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.resendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.getResendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.this") diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index b3f085dfb5..f3f36a1f40 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -90,8 +90,10 @@ akka.reliable-delivery { flow-control-window = 50 # The ConsumerController resends flow control messages to the - # ProducerController with this interval. - resend-interval = 1s + # ProducerController with the resend-interval-min, and increasing + # it gradually to resend-interval-max when idle. + resend-interval-min = 2s + resend-interval-max = 30s # If this is enabled lost messages will not be resent, but flow control is used. # This can be more efficient since messages don't have to be diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala index 7b587fc84c..7df92fe216 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala @@ -162,7 +162,8 @@ object ConsumerController { def apply(config: Config): Settings = { new Settings( flowControlWindow = config.getInt("flow-control-window"), - resendInterval = config.getDuration("resend-interval").asScala, + resendIntervalMin = config.getDuration("resend-interval-min").asScala, + resendIntervalMax = config.getDuration("resend-interval-max").asScala, onlyFlowControl = config.getBoolean("only-flow-control")) } @@ -183,7 +184,8 @@ object ConsumerController { final class Settings private ( val flowControlWindow: Int, - val resendInterval: FiniteDuration, + val resendIntervalMin: FiniteDuration, + val resendIntervalMax: FiniteDuration, val onlyFlowControl: Boolean) { def withFlowControlWindow(newFlowControlWindow: Int): Settings = @@ -192,20 +194,32 @@ object ConsumerController { /** * Scala API */ - def withResendInterval(newResendInterval: FiniteDuration): Settings = - copy(resendInterval = newResendInterval) + def withResendIntervalMin(newResendIntervalMin: FiniteDuration): Settings = + copy(resendIntervalMin = newResendIntervalMin) + + /** + * Scala API + */ + def withResendIntervalMax(newResendIntervalMax: FiniteDuration): Settings = + copy(resendIntervalMax = newResendIntervalMax) /** * Java API */ - def withResendInterval(newResendInterval: JavaDuration): Settings = - copy(resendInterval = newResendInterval.asScala) + def withResendIntervalMin(newResendIntervalMin: JavaDuration): Settings = + copy(resendIntervalMin = newResendIntervalMin.asScala) /** * Java API */ - def getResendInterval(): JavaDuration = - resendInterval.asJava + def withResendIntervalMax(newResendIntervalMax: JavaDuration): Settings = + copy(resendIntervalMax = newResendIntervalMax.asScala) + + /** + * Java API + */ + def getResendIntervalMax(): JavaDuration = + resendIntervalMax.asJava def withOnlyFlowControl(newOnlyFlowControl: Boolean): Settings = copy(onlyFlowControl = newOnlyFlowControl) @@ -215,12 +229,13 @@ object ConsumerController { */ private def copy( flowControlWindow: Int = flowControlWindow, - resendInterval: FiniteDuration = resendInterval, + resendIntervalMin: FiniteDuration = resendIntervalMin, + resendIntervalMax: FiniteDuration = resendIntervalMax, onlyFlowControl: Boolean = onlyFlowControl) = - new Settings(flowControlWindow, resendInterval, onlyFlowControl) + new Settings(flowControlWindow, resendIntervalMin, resendIntervalMax, onlyFlowControl) override def toString: String = - s"Settings($flowControlWindow, $resendInterval, $onlyFlowControl)" + s"Settings($flowControlWindow, $resendIntervalMin, $onlyFlowControl)" } def apply[A](): Behavior[Command[A]] = diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index 1649ef6bcb..362b6efc87 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -4,6 +4,8 @@ package akka.actor.typed.delivery.internal +import scala.concurrent.duration.FiniteDuration + import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.PostStop @@ -128,8 +130,9 @@ import akka.util.ConstantFun.scalaIdentityFunction context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo)) flightRecorder.consumerStarted(context.self.path) + val retryTimer = new RetryTimer(timers, settings.resendIntervalMin, settings.resendIntervalMax) val activeBehavior = - new ConsumerControllerImpl[A](context, timers, stashBuffer, settings) + new ConsumerControllerImpl[A](context, retryTimer, stashBuffer, settings) .active(initialState(context, s, registering, stopping)) context.log.debug("Received Start, unstash [{}] messages.", stashBuffer.size) stashBuffer.unstash(activeBehavior, 1, scalaIdentityFunction) @@ -171,7 +174,7 @@ import akka.util.ConstantFun.scalaIdentityFunction } - timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval) + timers.startTimerWithFixedDelay(Retry, Retry, settings.resendIntervalMin) waitForStart(None, stopping = false) } } @@ -207,11 +210,47 @@ import akka.util.ConstantFun.scalaIdentityFunction if (ref.path.address.hasGlobalScope) throw new IllegalArgumentException(s"Consumer [$ref] should be local.") } + + private class RetryTimer( + timers: TimerScheduler[ConsumerControllerImpl.InternalCommand], + val minBackoff: FiniteDuration, + maxBackoff: FiniteDuration) { + private var _interval = minBackoff + + def interval(): FiniteDuration = + _interval + + def start(): Unit = { + _interval = minBackoff + timers.startTimerWithFixedDelay(Retry, _interval) + } + + def scheduleNext(): Unit = { + val newInterval = + if (_interval eq maxBackoff) + maxBackoff + else + maxBackoff.min(_interval * 1.5) match { + case f: FiniteDuration => f + case _ => maxBackoff + } + if (newInterval != _interval) { + _interval = newInterval + timers.startTimerWithFixedDelay(Retry, _interval) + } + } + + def reset(): Unit = { + if (_interval ne minBackoff) + start() + } + + } } private class ConsumerControllerImpl[A]( context: ActorContext[ConsumerControllerImpl.InternalCommand], - timers: TimerScheduler[ConsumerControllerImpl.InternalCommand], + retryTimer: ConsumerControllerImpl.RetryTimer, stashBuffer: StashBuffer[ConsumerControllerImpl.InternalCommand], settings: ConsumerController.Settings) { @@ -230,7 +269,7 @@ private class ConsumerControllerImpl[A]( private val traceEnabled = context.log.isTraceEnabled - startRetryTimer() + retryTimer.start() private def resendLost = !settings.onlyFlowControl @@ -245,6 +284,7 @@ private class ConsumerControllerImpl[A]( val expectedSeqNr = s.receivedSeqNr + 1 flightRecorder.consumerReceived(pid, seqNr) + retryTimer.reset() if (s.isProducerChanged(seqMsg)) { if (seqMsg.first && traceEnabled) @@ -269,6 +309,7 @@ private class ConsumerControllerImpl[A]( if (resendLost) { seqMsg.producerController ! Resend(fromSeqNr = expectedSeqNr) stashBuffer.clear() + retryTimer.start() resending(s) } else { deliver(s.copy(receivedSeqNr = seqNr), seqMsg) @@ -336,6 +377,7 @@ private class ConsumerControllerImpl[A]( // request resend of all unconfirmed, and mark first seqMsg.producerController ! Resend(0) stashBuffer.clear() + retryTimer.start() resending(s) } else { context.log.warnN( @@ -469,7 +511,7 @@ private class ConsumerControllerImpl[A]( seqNr, newRequestedSeqNr) s.producerController ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaTimeout = false) - startRetryTimer() // reset interval since Request was just sent + retryTimer.start() // reset interval since Request was just sent newRequestedSeqNr } else { if (seqMsg.ack) { @@ -518,7 +560,8 @@ private class ConsumerControllerImpl[A]( Behaviors.same case Retry => - receiveRetry(s, () => waitingForConfirmation(retryRequest(s), seqMsg)) + // no retries when waitingForConfirmation, will be performed from (idle) active + Behaviors.same case start: Start[A] => start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr) @@ -542,6 +585,9 @@ private class ConsumerControllerImpl[A]( } private def receiveRetry(s: State[A], nextBehavior: () => Behavior[InternalCommand]): Behavior[InternalCommand] = { + retryTimer.scheduleNext() + if (retryTimer.interval() != retryTimer.minBackoff) + context.log.debug("Schedule next retry in [{} ms]", retryTimer.interval().toMillis) s.registering match { case None => nextBehavior() case Some(reg) => @@ -574,6 +620,7 @@ private class ConsumerControllerImpl[A]( "Register to new ProducerController [{}], previous was [{}].", reg.producerController, s.producerController) + retryTimer.start() reg.producerController ! ProducerController.RegisterConsumer(context.self) nextBehavior(s.copy(registering = Some(reg.producerController))) } else { @@ -602,25 +649,17 @@ private class ConsumerControllerImpl[A]( Behaviors.unhandled } - private def startRetryTimer(): Unit = { - timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval) - } - // in case the Request or the SequencedMessage triggering the Request is lost private def retryRequest(s: State[A]): State[A] = { if (s.producerController == context.system.deadLetters) { s } else { - // TODO #28720 Maybe try to adjust the retry frequency. Maybe some exponential backoff and less need for it when - // SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each - // incoming SequenceMessage. val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2 flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr) context.log.debug( "Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].", s.confirmedSeqNr, newRequestedSeqNr) - // TODO #28720 maybe watch the producer to avoid sending retry Request to dead producer s.producerController ! Request(s.confirmedSeqNr, newRequestedSeqNr, resendLost, viaTimeout = true) s.copy(requestedSeqNr = newRequestedSeqNr) } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md index dbf5cc3a37..b081eecef3 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md @@ -9,7 +9,6 @@ Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` @apidoc[Source.expand](Source) { scala="#expand[U](expander:Out=>Iterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" } @apidoc[Flow.expand](Flow) { scala="#expand[U](expander:Out=>Iterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" } - ## Description Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original @@ -18,6 +17,21 @@ element, allowing for it to be rewritten and/or filtered. See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +## Example + +Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit +unreliable. It's fine, as long as the audio remains fluent, it doesn't matter if we can't decode +a frame or two (or more). But we also want to watermark every decoded frame with the name of +our colleague. `expand` provides access to the element flowing through the stream +and let's us create extra frames in case the producer slows down: + +Scala +: @@snip [ExtrapolateAndExpand.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala) { #expand } + +Java +: @@snip [ExtrapolateAndExpand.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java) { #expand } + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md index 1b99c108a2..cb5b38b6f4 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md @@ -9,7 +9,6 @@ Allow for a faster downstream by expanding the last emitted element to an `Itera @apidoc[Source.extrapolate](Source) { scala="#extrapolate[U>:Out](extrapolator:U=>Iterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function,java.lang.Object)" } @apidoc[Flow.extrapolate](Flow) { scala="#extrapolate[U>:Out](extrapolator:U=>Iterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function,java.lang.Object)" } - ## Description Allow for a faster downstream by expanding the last emitted element to an `Iterator`. For example, an @@ -23,6 +22,19 @@ Includes an optional `initial` argument to prevent blocking the entire stream wh See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +## Example + +Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible +the network bandwidth is a bit unreliable. It's fine, as long as the audio remains fluent, it doesn't matter +if we can't decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last +frame decoded: + +Scala +: @@snip [ExtrapolateAndExpand.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala) { #extrapolate } + +Java +: @@snip [ExtrapolateAndExpand.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java) { #extrapolate } + ## Reactive Streams semantics @@@div { .callout } @@ -34,4 +46,3 @@ and examples. **completes** when upstream completes @@@ - diff --git a/akka-docs/src/main/paradox/stream/stream-rate.md b/akka-docs/src/main/paradox/stream/stream-rate.md index fb4e405066..5482fdf30b 100644 --- a/akka-docs/src/main/paradox/stream/stream-rate.md +++ b/akka-docs/src/main/paradox/stream/stream-rate.md @@ -234,7 +234,7 @@ Scala Java : @@snip [RateTransformationDocTest.java](/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-seed } -`extrapolate` and `expand` also allow to produce metainformation based on demand signalled from the downstream. +`extrapolate` and `expand` also allow to produce meta-information based on demand signalled from the downstream. Leveraging this, here is a flow that tracks and reports a drift between a fast consumer and a slow producer. Scala @@ -260,3 +260,5 @@ This makes `expand` able to transform or even filter out (by providing an empty Regardless, since we provide a non-empty `Iterator` in both examples, this means that the output of this flow is going to report a drift of zero if the producer is fast enough - or a larger drift otherwise. + +See also @ref:[`extrapolate`](operators/Source-or-Flow/extrapolate.md) and @ref:[`expand`](operators/Source-or-Flow/expand.md) for more information and examples. \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java new file mode 100644 index 0000000000..4593d5ded3 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.actor.Cancellable; +import akka.japi.Pair; +import akka.japi.function.Function; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import docs.stream.operators.sourceorflow.ExtrapolateAndExpandCommon; +import docs.stream.operators.sourceorflow.ExtrapolateAndExpandCommon.Frame; + +import java.time.Duration; +import java.util.stream.Stream; + +/** */ +public class ExtrapolateAndExpand { + public static Function decodeAsFrame = + ExtrapolateAndExpandCommon.Frame$.MODULE$::decode; + + public static Frame BLACK_FRAME = ExtrapolateAndExpandCommon.Frame$.MODULE$.blackFrame(); + + public static long nowInSeconds() { + return ExtrapolateAndExpand.nowInSeconds(); + } + + public static void main(String[] args) { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + Source networkSource = ExtrapolateAndExpandCommon.networkSource().asJava(); + + Flow decode = Flow.of(ByteString.class).map(decodeAsFrame); + + // #extrapolate + // if upstream is too slow, produce copies of the last frame but grayed out. + Flow rateControl = + Flow.of(Frame.class) + .extrapolate( + lastFrame -> { + Frame gray = + new Frame( + ByteString.fromString( + "gray frame!! - " + lastFrame.pixels().utf8String())); + return Stream.iterate(gray, i -> i).iterator(); + }, + BLACK_FRAME // initial value + ); + + Source videoSource = networkSource.via(decode).via(rateControl); + + // let's create a 25fps stream (a Frame every 40.millis) + Source tickSource = + Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); + + Source videoAt25Fps = tickSource.zip(videoSource).map(Pair::second); + + // #extrapolate + + // #expand + // each element flowing through the stream is expanded to a watermark copy + // of the upstream frame and grayed out copies. The grayed out copies should + // only be used downstream if the producer is too slow. + Flow watermarkerRateControl = + Flow.of(Frame.class) + .expand( + lastFrame -> { + Frame watermarked = + new Frame( + lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark"))); + Frame gray = + new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray"))); + return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i)) + .iterator(); + }); + + Source watermakedVideoSource = + networkSource.via(decode).via(watermarkerRateControl); + + // let's create a 25fps stream (a Frame every 40.millis) + Source ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); + + Source watermarkedVideoAt25Fps = + ticks.zip(watermakedVideoSource).map(Pair::second); + + // #expand + videoAt25Fps + .map(Frame::pixels) + .map(ByteString::utf8String) + .map(pixels -> nowInSeconds() + " - " + pixels) + .to(Sink.foreach(System.out::println)) + .run(actorSystem); + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala new file mode 100644 index 0000000000..59fc31e473 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.actor.Cancellable +import akka.stream.DelayOverflowStrategy +import akka.stream.scaladsl.DelayStrategy +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.fps +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.nowInSeconds +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.periodInMillis +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.videoAt25Fps + +import scala.concurrent.duration._ +import scala.util.Random + +/** + * + */ +object ExtrapolateAndExpandMain extends App { + implicit val sys = ActorSystem("25fps-stream") + videoAt25Fps.map(_.pixels.utf8String).map(frame => s"$nowInSeconds - $frame").to(Sink.foreach(println)).run() + +} +object ExtrapolateAndExpand { + + val periodInMillis = 40 + val fps = 1000 / periodInMillis + + import ExtrapolateAndExpandCommon._ + + val decode: Flow[ByteString, Frame, NotUsed] = + Flow[ByteString].map(Frame.decode) + + // #extrapolate + // if upstream is too slow, produce copies of the last frame but grayed out. + val rateControl: Flow[Frame, Frame, NotUsed] = + Flow[Frame].extrapolate((frame: Frame) => { + val grayedOut = frame.withFilter(Gray) + Iterator.continually(grayedOut) + }, Some(Frame.blackFrame)) + + val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) + + // let's create a 25fps stream (a Frame every 40.millis) + val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) + + val videoAt25Fps: Source[Frame, Cancellable] = + tickSource.zip(videoSource).map(_._2) + // #extrapolate + + // #expand + // each element flowing through the stream is expanded to a watermark copy + // of the upstream frame and grayed out copies. The grayed out copies should + // only be used downstream if the producer is too slow. + val watermarkerRateControl: Flow[Frame, Frame, NotUsed] = + Flow[Frame].expand((frame: Frame) => { + val watermarked = frame.withFilter(Watermark) + val grayedOut = frame.withFilter(Gray) + (Iterator.single(watermarked) ++ Iterator.continually(grayedOut)) + }) + + val watermarkedVideoSource: Source[Frame, NotUsed] = + networkSource.via(decode).via(rateControl) + + // let's create a 25fps stream (a Frame every 40.millis) + val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) + + val watermarkedVideoAt25Fps: Source[Frame, Cancellable] = + ticks.zip(watermarkedVideoSource).map(_._2) + + // #expand + + def nowInSeconds = System.nanoTime() / 1000000000 +} + +object ExtrapolateAndExpandCommon { + // This `networkSource` simulates a client sending frames over the network. There's a + // stage throttling the elements at 24fps and then a `delayWith` that randomly delays + // frames simulating network latency and bandwidth limitations (uses buffer of + // default capacity). + val networkSource: Source[ByteString, NotUsed] = + Source + .fromIterator(() => Iterator.from(0)) // produce frameIds + .throttle(fps, 1.second) + .map(i => ByteString.fromString(s"fakeFrame-$i")) + .delayWith( + () => + new DelayStrategy[ByteString] { + override def nextDelay(elem: ByteString): FiniteDuration = + Random.nextInt(periodInMillis * 10).millis + }, + DelayOverflowStrategy.dropBuffer) + + case object Tick + + sealed trait Filter { + def filter(fr: Frame): Frame + } + object Gray extends Filter { + override def filter(fr: Frame): Frame = + Frame(ByteString.fromString(s"gray frame!! - ${fr.pixels.utf8String}")) + } + object Watermark extends Filter { + override def filter(fr: Frame): Frame = + Frame(fr.pixels.++(ByteString.fromString(" - watermark"))) + } + + case class Frame(pixels: ByteString) { + def withFilter(f: Filter): Frame = f.filter(this) + } + object Frame { + val blackFrame: Frame = Frame(ByteString.empty) + def decode(bs: ByteString): Frame = Frame(bs) + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 350687af66..d4d4fd532b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -4,12 +4,20 @@ package akka.remote.artery +import java.util.UUID + import scala.concurrent.duration._ - import com.typesafe.config.{ Config, ConfigFactory } - -import akka.actor.{ Actor, ActorIdentity, ActorRef, Deploy, Identify, PoisonPill, Props, RootActorPath } +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorPath +import akka.actor.ActorRef import akka.actor.ActorSelection +import akka.actor.Deploy +import akka.actor.Identify +import akka.actor.PoisonPill +import akka.actor.Props +import akka.actor.RootActorPath import akka.testkit.{ ImplicitSender, TestActors, TestProbe } class ArteryUpdSendConsistencyWithOneLaneSpec @@ -62,6 +70,17 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) val addressB = address(systemB) val rootB = RootActorPath(addressB) + private def actorRefBySelection(path: ActorPath) = { + + val correlationId = Some(UUID.randomUUID().toString) + system.actorSelection(path) ! Identify(correlationId) + + val actorIdentity = expectMsgType[ActorIdentity](5.seconds) + actorIdentity.correlationId shouldBe correlationId + + actorIdentity.ref.get + } + "Artery" must { "be able to identify a remote actor and ping it" in { @@ -71,11 +90,9 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) } }), "echo") - val echoSel = system.actorSelection(rootB / "user" / "echo") - val echoRef = { - system.actorSelection(rootB / "user" / "echo") ! Identify(None) - expectMsgType[ActorIdentity](5.seconds).ref.get - } + val actorPath = rootB / "user" / "echo" + val echoSel = system.actorSelection(actorPath) + val echoRef = actorRefBySelection(actorPath) echoRef ! "ping" expectMsg("pong") @@ -119,18 +136,9 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) systemB.actorOf(TestActors.echoActorProps, "echoB") systemB.actorOf(TestActors.echoActorProps, "echoC") - val remoteRefA = { - system.actorSelection(rootB / "user" / "echoA") ! Identify(None) - expectMsgType[ActorIdentity].ref.get - } - val remoteRefB = { - system.actorSelection(rootB / "user" / "echoB") ! Identify(None) - expectMsgType[ActorIdentity].ref.get - } - val remoteRefC = { - system.actorSelection(rootB / "user" / "echoC") ! Identify(None) - expectMsgType[ActorIdentity].ref.get - } + val remoteRefA = actorRefBySelection(rootB / "user" / "echoA") + val remoteRefB = actorRefBySelection(rootB / "user" / "echoB") + val remoteRefC = actorRefBySelection(rootB / "user" / "echoC") def senderProps(remoteRef: ActorRef) = Props(new Actor {