diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextThrottleTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextThrottleTest.java new file mode 100644 index 0000000000..b5a925ffd2 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextThrottleTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.stream.ThrottleMode; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class FlowWithContextThrottleTest extends StreamTest { + + public FlowWithContextThrottleTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf()); + + @Test + public void mustWorksForTwoStreams() throws Exception { + final FlowWithContext sharedThrottle = + FlowWithContext.create() + .throttle(1, java.time.Duration.ofDays(1), 1, (a) -> 1, ThrottleMode.enforcing()); + + CompletionStage>> result1 = + Source.single(new Pair<>(1, "context-a")) + .via(sharedThrottle.asFlow()) + .via(sharedThrottle.asFlow()) + .runWith(Sink.seq(), system); + + // If there is accidental shared state then we would not be able to pass through the single + // element + List> pairs1 = result1.toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals(1, pairs1.size()); + assertEquals(Integer.valueOf(1), pairs1.get(0).first()); + assertEquals("context-a", pairs1.get(0).second()); + + // It works with a new stream, too + CompletionStage>> result2 = + Source.single(new Pair<>(2, "context-b")) + .via(sharedThrottle.asFlow()) + .via(sharedThrottle.asFlow()) + .runWith(Sink.seq(), system); + + List> pairs2 = result2.toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals(1, pairs2.size()); + assertEquals(Integer.valueOf(2), pairs2.get(0).first()); + assertEquals("context-b", pairs2.get(0).second()); + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceWithContextThrottleTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceWithContextThrottleTest.java new file mode 100644 index 0000000000..bd56bbbf08 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceWithContextThrottleTest.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.stream.ThrottleMode; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class SourceWithContextThrottleTest extends StreamTest { + + public SourceWithContextThrottleTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf()); + + @Test + public void mustBeAbleToUseThrottle() throws Exception { + List> list = + Arrays.asList( + new Pair<>(0, "context-a"), new Pair<>(1, "context-b"), new Pair<>(2, "context-c")); + Pair result = + SourceWithContext.fromPairs(Source.from(list)) + .throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.shaping()) + .throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.enforcing()) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(list.get(0), result); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextThrottleSpec.scala new file mode 100644 index 0000000000..c42aba23fc --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextThrottleSpec.scala @@ -0,0 +1,208 @@ +/* + * Copyright (C) 2015-2020 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.ThrottleMode.Shaping +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.stream.testkit.scaladsl.TestSink + +import scala.concurrent.duration._ + +class FlowWithContextThrottleSpec extends StreamSpec(""" + akka.stream.materializer.initial-input-buffer-size = 2 + akka.stream.materializer.max-input-buffer-size = 2 + """) { + + private def toMessage(i: Int) = Message(s"data-$i", i.toLong) + + private def genMessage(length: Int, i: Int) = Message("a" * length, i.toLong) + + "throttle() on FlowWithContextOps" must { + "on FlowWithContext" must { + "work for the happy case" in assertAllStagesStopped { + val throttle = FlowWithContext[Message, Long].throttle(19, 1000.millis, -1, Shaping) + val input = (1 to 5).map(toMessage) + val expected = input.map(message => (message, message.offset)) + + Source(input) + .asSourceWithContext(m => m.offset) + .via(throttle) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(5) + .expectNextN(expected) + .expectComplete() + } + + "accept very high rates" in assertAllStagesStopped { + val throttle = FlowWithContext[Message, Long].throttle(1, 1.nanos, 0, Shaping) + val input = (1 to 5).map(toMessage) + val expected = input.map(message => (message, message.offset)) + + Source(input) + .asSourceWithContext(m => m.offset) + .via(throttle) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(5) + .expectNextN(expected) + .expectComplete() + } + + "accept very low rates" in assertAllStagesStopped { + val throttle = FlowWithContext[Message, Long].throttle(1, 100.days, 1, Shaping) + val input = (1 to 5).map(toMessage) + val expected = (input.head, input.head.offset) + + Source(input) + .asSourceWithContext(m => m.offset) + .via(throttle) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(5) + .expectNext(expected) + .expectNoMessage(100.millis) + .cancel() // We won't wait 100 days, sorry + } + + "emit single element per tick" in assertAllStagesStopped { + val upstream = TestPublisher.probe[Message]() + val downstream = TestSubscriber.probe[(Message, Long)]() + val throttle = FlowWithContext[Message, Long].throttle(1, 300.millis, 0, Shaping) + + Source + .fromPublisher(upstream) + .asSourceWithContext(m => m.offset) + .via(throttle) + .asSource + .runWith(Sink.fromSubscriber(downstream)) + + downstream.request(20) + upstream.sendNext(Message("a", 1L)) + downstream.expectNoMessage(150.millis) + downstream.expectNext((Message("a", 1L), 1L)) + + upstream.sendNext(Message("b", 2L)) + downstream.expectNoMessage(150.millis) + downstream.expectNext((Message("b", 2L), 2L)) + + upstream.sendComplete() + downstream.expectComplete() + } + + "emit elements according to cost" in assertAllStagesStopped { + val list = (1 to 4).map(i => genMessage(i * 2, i)) + val throttle = FlowWithContext[Message, Long].throttle(2, 200.millis, 0, _.data.length, Shaping) + + Source(list) + .asSourceWithContext(m => m.offset) + .via(throttle) + .asSource + .map(_._1) + .runWith(TestSink.probe[Message]) + .request(4) + .expectNext(list(0)) + .expectNoMessage(300.millis) + .expectNext(list(1)) + .expectNoMessage(500.millis) + .expectNext(list(2)) + .expectNoMessage(700.millis) + .expectNext(list(3)) + .expectComplete() + } + } + + "on SourceWithContext" must { + "work for the happy case" in assertAllStagesStopped { + val input = (1 to 5).map(toMessage) + val expected = input.map(message => (message, message.offset)) + + Source(input) + .asSourceWithContext(m => m.offset) + .throttle(19, 1000.millis, -1, Shaping) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(5) + .expectNextN(expected) + .expectComplete() + } + + "accept very high rates" in assertAllStagesStopped { + val input = (1 to 5).map(toMessage) + val expected = input.map(message => (message, message.offset)) + + Source(input) + .asSourceWithContext(m => m.offset) + .throttle(1, 1.nanos, 0, Shaping) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(5) + .expectNextN(expected) + .expectComplete() + } + + "accept very low rates" in assertAllStagesStopped { + val input = (1 to 5).map(toMessage) + val expected = (input.head, input.head.offset) + + Source(input) + .asSourceWithContext(m => m.offset) + .throttle(1, 100.days, 1, Shaping) + .asSource + .runWith(TestSink.probe[(Message, Long)]) + .request(5) + .expectNext(expected) + .expectNoMessage(100.millis) + .cancel() // We won't wait 100 days, sorry + } + + "emit single element per tick" in assertAllStagesStopped { + val upstream = TestPublisher.probe[Message]() + val downstream = TestSubscriber.probe[(Message, Long)]() + + Source + .fromPublisher(upstream) + .asSourceWithContext(m => m.offset) + .throttle(1, 300.millis, 0, Shaping) + .asSource + .runWith(Sink.fromSubscriber(downstream)) + + downstream.request(20) + upstream.sendNext(Message("a", 1L)) + downstream.expectNoMessage(150.millis) + downstream.expectNext((Message("a", 1L), 1L)) + + upstream.sendNext(Message("b", 2L)) + downstream.expectNoMessage(150.millis) + downstream.expectNext((Message("b", 2L), 2L)) + + upstream.sendComplete() + downstream.expectComplete() + } + + "emit elements according to cost" in assertAllStagesStopped { + val list = (1 to 4).map(i => genMessage(i * 2, i)) + + Source(list) + .asSourceWithContext(m => m.offset) + .throttle(2, 200.millis, 0, _.data.length, Shaping) + .asSource + .map(_._1) + .runWith(TestSink.probe[Message]) + .request(4) + .expectNext(list(0)) + .expectNoMessage(300.millis) + .expectNext(list(1)) + .expectNoMessage(500.millis) + .expectNext(list(2)) + .expectNoMessage(700.millis) + .expectNext(list(3)) + .expectComplete() + } + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 53aff73403..765d3da160 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -14,6 +14,7 @@ import akka.japi.{ function, Pair, Util } import akka.stream._ import akka.util.ConstantFun import akka.util.ccompat.JavaConverters._ +import akka.util.JavaDurationConverters._ object FlowWithContext { @@ -286,6 +287,50 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( marker: function.Function2[Out, CtxOut, LogMarker]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]]. + * + * @see [[akka.stream.javadsl.Flow.throttle]] + */ + def throttle(elements: Int, per: java.time.Duration): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.throttle(elements, per.asScala)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]]. + * + * @see [[akka.stream.javadsl.Flow.throttle]] + */ + def throttle( + elements: Int, + per: java.time.Duration, + maximumBurst: Int, + mode: ThrottleMode): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.throttle(elements, per.asScala, maximumBurst, mode)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]]. + * + * @see [[akka.stream.javadsl.Flow.throttle]] + */ + def throttle( + cost: Int, + per: java.time.Duration, + costCalculation: function.Function[Out, Integer]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.throttle(cost, per.asScala, costCalculation.apply)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]]. + * + * @see [[akka.stream.javadsl.Flow.throttle]] + */ + def throttle( + cost: Int, + per: java.time.Duration, + maximumBurst: Int, + costCalculation: function.Function[Out, Integer], + mode: ThrottleMode): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode)) + def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = scaladsl.FlowWithContext.fromTuples( scaladsl diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index a08c16b46c..8bde2c5de9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -17,6 +17,7 @@ import akka.japi.function import akka.stream._ import akka.util.ConstantFun import akka.util.ccompat.JavaConverters._ +import akka.util.JavaDurationConverters._ object SourceWithContext { @@ -266,6 +267,50 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def logWithMarker(name: String, marker: function.Function2[Out, Ctx, LogMarker]): SourceWithContext[Out, Ctx, Mat] = this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.throttle]]. + * + * @see [[akka.stream.javadsl.Source.throttle]] + */ + def throttle(elements: Int, per: java.time.Duration): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.throttle(elements, per.asScala)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.throttle]]. + * + * @see [[akka.stream.javadsl.Source.throttle]] + */ + def throttle( + elements: Int, + per: java.time.Duration, + maximumBurst: Int, + mode: ThrottleMode): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.throttle(elements, per.asScala, maximumBurst, mode)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.throttle]]. + * + * @see [[akka.stream.javadsl.Source.throttle]] + */ + def throttle( + cost: Int, + per: java.time.Duration, + costCalculation: function.Function[Out, Integer]): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.throttle(cost, per.asScala, costCalculation.apply)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.throttle]]. + * + * @see [[akka.stream.javadsl.Source.throttle]] + */ + def throttle( + cost: Int, + per: java.time.Duration, + maximumBurst: Int, + costCalculation: function.Function[Out, Integer], + mode: ThrottleMode): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode)) + /** * Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]], * concatenating the processing steps of both. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 530db2353a..94be79902a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -7,11 +7,13 @@ package akka.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration import akka.NotUsed import akka.dispatch.ExecutionContexts import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.stream._ +import akka.stream.impl.Throttle import akka.util.ConstantFun /** @@ -207,5 +209,42 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { via(flow.logWithMarker(name, marker.tupled, extractWithContext)(log)) } + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]]. + * + * @see [[akka.stream.scaladsl.FlowOps.throttle]] + */ + def throttle(elements: Int, per: FiniteDuration): Repr[Out, Ctx] = + throttle(elements, per, Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, ThrottleMode.Shaping) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]]. + * + * @see [[akka.stream.scaladsl.FlowOps.throttle]] + */ + def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out, Ctx] = + throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]]. + * + * @see [[akka.stream.scaladsl.FlowOps.throttle]] + */ + def throttle(cost: Int, per: FiniteDuration, costCalculation: (Out) => Int): Repr[Out, Ctx] = + throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, ThrottleMode.Shaping) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]]. + * + * @see [[akka.stream.scaladsl.FlowOps.throttle]] + */ + def throttle( + cost: Int, + per: FiniteDuration, + maximumBurst: Int, + costCalculation: (Out) => Int, + mode: ThrottleMode): Repr[Out, Ctx] = + via(flow.throttle(cost, per, maximumBurst, a => costCalculation(a._1), mode)) + private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] }