From 8459bfaef204ce7d3bd425e1d05345149fb94bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 31 Mar 2016 14:18:36 +0200 Subject: [PATCH] 19958: Add tests for throttle reuse --- .../akka/stream/javadsl/FlowThrottleTest.java | 50 +++++++++++++++++++ .../stream/scaladsl/FlowThrottleSpec.scala | 17 +++++++ .../scala/akka/stream/scaladsl/Source.scala | 2 +- 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java new file mode 100644 index 0000000000..30d5bb39a8 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java @@ -0,0 +1,50 @@ +package akka.stream.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorRef; +import akka.japi.Pair; +import akka.japi.function.Function; +import akka.stream.*; +import akka.testkit.JavaTestKit; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import akka.testkit.AkkaSpec; + +import static org.junit.Assert.assertEquals; + +public class FlowThrottleTest extends StreamTest { + public FlowThrottleTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf()); + + @Test + public void mustWorksForTwoStreams() throws Exception { + final Flow sharedThrottle = + Flow.of(Integer.class) + .throttle(1, FiniteDuration.create(1, TimeUnit.DAYS), 1, ThrottleMode.enforcing()); + + CompletionStage> result1 = + Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer); + + // If there is accidental shared state then we would not be able to pass through the single element + assertEquals(result1.toCompletableFuture().get(3, TimeUnit.SECONDS), Collections.singletonList(1)); + + // It works with a new stream, too + CompletionStage> result2 = + Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer); + + assertEquals(result2.toCompletableFuture().get(3, TimeUnit.SECONDS), Collections.singletonList(1)); + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala index 5c50fccab4..1d1b6cc133 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -46,6 +46,23 @@ class FlowThrottleSpec extends AkkaSpec { .cancel() // We won't wait 100 days, sorry } + "work if there are two throttles in different streams" in Utils.assertAllStagesStopped { + val sharedThrottle = Flow[Int].throttle(1, 1.day, 1, Enforcing) + + // If there is accidental shared state then we would not be able to pass through the single element + Source.single(1) + .via(sharedThrottle) + .via(sharedThrottle) + .runWith(Sink.seq).futureValue should ===(Seq(1)) + + // It works with a new stream, too + Source.single(2) + .via(sharedThrottle) + .via(sharedThrottle) + .runWith(Sink.seq).futureValue should ===(Seq(2)) + + } + "emit single element per tick" in Utils.assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0d6a7ec994..4915d9127d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -28,7 +28,7 @@ import scala.compat.java8.FutureConverters._ * a Reactive Streams `Publisher` (at least conceptually). */ final class Source[+Out, +Mat](private[stream] override val module: Module) - extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { + extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { override type Repr[+O] = Source[O, Mat @uncheckedVariance] override type ReprMat[+O, +M] = Source[O, M]