diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index dd1dd36412..d46518c614 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -716,4 +716,16 @@ public class SourceTest extends StreamTest { final Source f = Source.single(42).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); } + + @Test + public void mustBeAbleToUseThrottle() throws Exception { + Integer result = + Source.from(Arrays.asList(0, 1, 2)) + .throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping()) + .throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.enforcing()) + .runWith(Sink.head(), materializer) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals((Object) 0, result); + } } diff --git a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala index 58300f5ad3..992c741758 100644 --- a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala +++ b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala @@ -20,6 +20,15 @@ object ThrottleMode { */ case object Enforcing extends ThrottleMode + /** + * Java API: Tells throttle to make pauses before emitting messages to meet throttle rate + */ + def shaping = Shaping + + /** + * Java API: Makes throttle fail with exception when upstream is faster than throttle rate + */ + def enforcing = Enforcing } /**