diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/completionTimeout.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/completionTimeout.md index ebdcffe0a5..d2033d65a3 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/completionTimeout.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/completionTimeout.md @@ -15,6 +15,17 @@ If the completion of the stream does not happen until the provided timeout, the If the completion of the stream does not happen until the provided timeout, the stream is failed with a `TimeoutException`. +## Example + +This example reads the numbers from a source and do some calculation in the flow with a completion timeout of 10 milliseconds. It will fail the stream, leading to failing the materialized @scala[`Future`] @java[`CompletionStage`] if the stream has not completed mapping the numbers from the source when the timeout hits. + +Scala +: @@snip [CompletionTimeout.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/CompletionTimeout.scala) { #completionTimeout } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #completionTimeout } + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 3198ca4488..2e5a39eceb 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -4,9 +4,11 @@ package jdocs.stream.operators; +import akka.Done; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.japi.pf.PFBuilder; +import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.NotUsed; @@ -475,6 +477,14 @@ class SourceOrFlow { // #watch } + static CompletionStage completionTimeoutExample() { + // #completionTimeout + Source source = Source.range(1, 100000).map(number -> number * number); + CompletionStage result = source.completionTimeout(Duration.ofMillis(10)).run(system); + return result; + // #completionTimeout + } + private ActorRef someActor() { return null; } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/CompletionTimeout.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/CompletionTimeout.scala new file mode 100644 index 0000000000..c66d8bcc28 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/CompletionTimeout.scala @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.Done +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ Flow, Sink, Source } + +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContextExecutor, Future } + +object CompletionTimeout { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def completionTimeoutExample: Future[Done] = { + //#completionTimeout + val source = Source(1 to 10000).map(number => number * number) + source.completionTimeout(10.milliseconds).run() + //#completionTimeout + } +}