clarify watchCompletion only watches the stage, not the stream (#26224)

* clarify watchCompletion only watches the stage, not the stream

* stage->operator
This commit is contained in:
Arnout Engelen 2019-01-13 18:23:15 +01:00 committed by Patrik Nordwall
parent eaca6de25f
commit 5664f4ae88
5 changed files with 27 additions and 11 deletions

View file

@ -181,7 +181,6 @@ import scala.util.control.NonFatal
}
}
// SourceQueueWithComplete impl
override def watchCompletion() = completion.future
override def offer(element: T): Future[QueueOfferResult] = {
val p = Promise[QueueOfferResult]

View file

@ -32,8 +32,8 @@ trait SourceQueue[T] {
def offer(elem: T): CompletionStage[QueueOfferResult]
/**
* Method returns a [[CompletionStage]] that will be completed if the stream completes,
* or will be failed when the operator faces an internal failure.
* Method returns a [[CompletionStage]] that will be completed if this
* operator completes, or will be failed when the stream is failed.
*/
def watchCompletion(): CompletionStage[Done]
}
@ -45,18 +45,25 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
/**
* Complete the stream normally. Use `watchCompletion` to be notified of this
* operations success.
*
* Note that this only means the elements have been passed downstream, not
* that downstream has successfully processed them.
*/
def complete(): Unit
/**
* Complete the stream with a failure. Use `watchCompletion` to be notified of this
* operations success.
*
* Note that this only means the elements have been passed downstream, not
* that downstream has successfully processed them.
*/
def fail(ex: Throwable): Unit
/**
* Method returns a [[Future]] that will be completed if the stream completes,
* or will be failed when the operator faces an internal failure or the the [[SourceQueueWithComplete.fail]] method is invoked.
* Method returns a [[CompletionStage]] that will be completed if this
* operator completes, or will be failed when the stream fails,
* for example when [[SourceQueueWithComplete.fail]] is invoked.
*/
override def watchCompletion(): CompletionStage[Done]
}

View file

@ -399,7 +399,7 @@ object Source {
* call when buffer is full.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed.
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
* for downstream demand unless there is another message waiting for downstream demand, in that case

View file

@ -30,8 +30,11 @@ trait SourceQueue[T] {
def offer(elem: T): Future[QueueOfferResult]
/**
* Method returns a [[Future]] that will be completed if the stream completes,
* or will be failed when the operator faces an internal failure.
* Method returns a [[Future]] that will be completed if this operator
* completes, or will be failed when the operator faces an internal failure.
*
* Note that this only means the elements have been passed downstream, not
* that downstream has successfully processed them.
*/
def watchCompletion(): Future[Done]
}
@ -43,6 +46,9 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
/**
* Complete the stream normally. Use `watchCompletion` to be notified of this
* operations success.
*
* Note that this only means the elements have been passed downstream, not
* that downstream has successfully processed them.
*/
def complete(): Unit
@ -53,8 +59,12 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
def fail(ex: Throwable): Unit
/**
* Method returns a [[Future]] that will be completed if the stream completes,
* or will be failed when the operator faces an internal failure or the the [[SourceQueueWithComplete.fail]] method is invoked.
* Method returns a [[Future]] that will be completed if this operator
* completes, or will be failed when the stream fails,
* for example when [[SourceQueueWithComplete.fail]] is invoked.
*
* Note that this only means the elements have been passed downstream, not
* that downstream has successfully processed them.
*/
def watchCompletion(): Future[Done]
}

View file

@ -609,7 +609,7 @@ object Source {
* call when buffer is full.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed.
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
* for downstream demand unless there is another message waiting for downstream demand, in that case