From 0fc3fceb8bd9df7dc15eeb5f6a4cdc1b3bb438b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Wed, 10 Feb 2016 10:17:02 +0200 Subject: [PATCH] #19719 add ThrottleMode Java API --- .../test/java/akka/stream/javadsl/SourceTest.java | 12 ++++++++++++ .../src/main/scala/akka/stream/ThrottleMode.scala | 9 +++++++++ 2 files changed, 21 insertions(+) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index dd1dd36412..d46518c614 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -716,4 +716,16 @@ public class SourceTest extends StreamTest { final Source f = Source.single(42).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); } + + @Test + public void mustBeAbleToUseThrottle() throws Exception { + Integer result = + Source.from(Arrays.asList(0, 1, 2)) + .throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping()) + .throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.enforcing()) + .runWith(Sink.head(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals((Object) 0, result); + } } diff --git a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala index 58300f5ad3..992c741758 100644 --- a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala +++ b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala @@ -20,6 +20,15 @@ object ThrottleMode { */ case object Enforcing extends ThrottleMode + /** + * Java API: Tells throttle to make pauses before emitting messages to meet throttle rate + */ + def shaping = Shaping + + /** + * Java API: Makes throttle fail with exception when upstream is faster than throttle rate + */ + def enforcing = Enforcing } /**