diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 890916eff8..8dec5e188c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -181,7 +181,6 @@ import scala.util.control.NonFatal } } - // SourceQueueWithComplete impl override def watchCompletion() = completion.future override def offer(element: T): Future[QueueOfferResult] = { val p = Promise[QueueOfferResult] diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala index 6f2a2dd10a..1503976a1a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala @@ -32,8 +32,8 @@ trait SourceQueue[T] { def offer(elem: T): CompletionStage[QueueOfferResult] /** - * Method returns a [[CompletionStage]] that will be completed if the stream completes, - * or will be failed when the operator faces an internal failure. + * Method returns a [[CompletionStage]] that will be completed if this + * operator completes, or will be failed when the stream is failed. */ def watchCompletion(): CompletionStage[Done] } @@ -45,18 +45,25 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { /** * Complete the stream normally. Use `watchCompletion` to be notified of this * operation’s success. + * + * Note that this only means the elements have been passed downstream, not + * that downstream has successfully processed them. */ def complete(): Unit /** * Complete the stream with a failure. Use `watchCompletion` to be notified of this * operation’s success. + * + * Note that this only means the elements have been passed downstream, not + * that downstream has successfully processed them. */ def fail(ex: Throwable): Unit /** - * Method returns a [[Future]] that will be completed if the stream completes, - * or will be failed when the operator faces an internal failure or the the [[SourceQueueWithComplete.fail]] method is invoked. + * Method returns a [[CompletionStage]] that will be completed if this + * operator completes, or will be failed when the stream fails, + * for example when [[SourceQueueWithComplete.fail]] is invoked. */ override def watchCompletion(): CompletionStage[Done] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 435e7753f2..b1b3b53285 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -399,7 +399,7 @@ object Source { * call when buffer is full. * * You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueue.watchCompletion]]. - * It returns future that completes with success when stream is completed or fail when stream is failed. + * It returns a future that completes with success when this operator is completed or fails when stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait * for downstream demand unless there is another message waiting for downstream demand, in that case diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala index 9a37e09615..43d078d706 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala @@ -30,8 +30,11 @@ trait SourceQueue[T] { def offer(elem: T): Future[QueueOfferResult] /** - * Method returns a [[Future]] that will be completed if the stream completes, - * or will be failed when the operator faces an internal failure. + * Method returns a [[Future]] that will be completed if this operator + * completes, or will be failed when the operator faces an internal failure. + * + * Note that this only means the elements have been passed downstream, not + * that downstream has successfully processed them. */ def watchCompletion(): Future[Done] } @@ -43,6 +46,9 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { /** * Complete the stream normally. Use `watchCompletion` to be notified of this * operation’s success. + * + * Note that this only means the elements have been passed downstream, not + * that downstream has successfully processed them. */ def complete(): Unit @@ -53,8 +59,12 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { def fail(ex: Throwable): Unit /** - * Method returns a [[Future]] that will be completed if the stream completes, - * or will be failed when the operator faces an internal failure or the the [[SourceQueueWithComplete.fail]] method is invoked. + * Method returns a [[Future]] that will be completed if this operator + * completes, or will be failed when the stream fails, + * for example when [[SourceQueueWithComplete.fail]] is invoked. + * + * Note that this only means the elements have been passed downstream, not + * that downstream has successfully processed them. */ def watchCompletion(): Future[Done] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 98e3b44ad6..76e749dcc5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -609,7 +609,7 @@ object Source { * call when buffer is full. * * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]]. - * It returns future that completes with success when stream is completed or fail when stream is failed. + * It returns future that completes with success when the operator is completed or fails when the stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait * for downstream demand unless there is another message waiting for downstream demand, in that case