Add completion timeout example (#29673)

* Adding example for completionTimeout

* Adding header to file

* Modifying example

* Modifying duration

* Modifying syntax for duration
This commit is contained in:
Muskan Gupta 2020-09-29 19:17:36 +05:30 committed by GitHub
parent 289f665445
commit 94d62f34c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 0 deletions

View file

@ -15,6 +15,17 @@ If the completion of the stream does not happen until the provided timeout, the
If the completion of the stream does not happen until the provided timeout, the stream is failed
with a `TimeoutException`.
## Example
This example reads the numbers from a source and do some calculation in the flow with a completion timeout of 10 milliseconds. It will fail the stream, leading to failing the materialized @scala[`Future`] @java[`CompletionStage`] if the stream has not completed mapping the numbers from the source when the timeout hits.
Scala
: @@snip [CompletionTimeout.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/CompletionTimeout.scala) { #completionTimeout }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #completionTimeout }
## Reactive Streams semantics
@@@div { .callout }

View file

@ -4,9 +4,11 @@
package jdocs.stream.operators;
import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.japi.pf.PFBuilder;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.NotUsed;
@ -475,6 +477,14 @@ class SourceOrFlow {
// #watch
}
static CompletionStage<Done> completionTimeoutExample() {
// #completionTimeout
Source<Integer, NotUsed> source = Source.range(1, 100000).map(number -> number * number);
CompletionStage<Done> result = source.completionTimeout(Duration.ofMillis(10)).run(system);
return result;
// #completionTimeout
}
private ActorRef someActor() {
return null;
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.Done
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Flow, Sink, Source }
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContextExecutor, Future }
object CompletionTimeout {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher
def completionTimeoutExample: Future[Done] = {
//#completionTimeout
val source = Source(1 to 10000).map(number => number * number)
source.completionTimeout(10.milliseconds).run()
//#completionTimeout
}
}