diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 69404da9d7..4fb3c513b2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -176,5 +176,15 @@ class QueueSinkSpec extends StreamSpec { Source.single(()).runWith(Sink.queue().withAttributes(inputBuffer(0, 0))) } } + + "materialize to a queue which is seamlessly translatable between scala and java DSL" in assertAllStagesStopped { + val expected = List(Some(1), Some(2), Some(3), None) + val javadslQueue = Source(expected.flatten).runWith(Sink.queue()).asJava + val scaladslQueue = akka.stream.javadsl.SinkQueueWithCancel.asScala(javadslQueue) + expected.foreach { v ⇒ + scaladslQueue.pull().pipeTo(testActor) + expectMsg(v) + } + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 1da9a4719c..41c009c7e6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -356,6 +356,24 @@ class QueueSourceSpec extends StreamSpec { } } + "materialize to a queue which is seamlessly translatable between scala and java DSL" in { + val s = TestSubscriber.manualProbe[Int]() + + val javadslQueue = Source.queue(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run().asJava + val scaladslQueue = akka.stream.javadsl.SourceQueueWithComplete.asScala(javadslQueue) + val sub = s.expectSubscription() + + sub.request(1) + assertSuccess(scaladslQueue.offer(42)) + s.expectNext(42) + + scaladslQueue.watchCompletion().pipeTo(testActor) + expectNoMessage(pause) + + sub.cancel() + expectMsg(Done) + } + } } diff --git a/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes index ddb5a46860..edade4e1ec 100644 --- a/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes @@ -11,3 +11,7 @@ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.Conne ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.io.ConnectionSourceStage.this") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.io.OutgoingConnectionStage.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub#MergedSourceLogic.this") + +# #25045 adding Java/Scala interop to SourceQueue and SinkQueue +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SourceQueueAdapter") diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 1990ee4a18..0902270639 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -4,15 +4,12 @@ package akka.stream.impl -import java.util.concurrent.CompletionStage - import akka.Done import akka.annotation.InternalApi import akka.stream.OverflowStrategies._ import akka.stream._ import akka.stream.stage._ import akka.stream.scaladsl.SourceQueueWithComplete -import scala.compat.java8.FutureConverters._ import scala.concurrent.{ Future, Promise } /** @@ -214,14 +211,3 @@ import scala.concurrent.{ Future, Promise } (stageLogic, stageLogic) } } - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) - extends akka.stream.javadsl.SourceQueueWithComplete[T] { - def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava - def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava - def complete(): Unit = delegate.complete() - def fail(ex: Throwable): Unit = delegate.fail(ex) -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 70f2b1b6a9..f516390331 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -4,32 +4,36 @@ package akka.stream.impl -import java.util.Optional -import java.util.concurrent.CompletionStage +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import scala.util.control.NonFatal import akka.NotUsed -import akka.actor.{ ActorRef, Props } -import akka.annotation.{ DoNotInherit, InternalApi } +import akka.actor.ActorRef +import akka.actor.Props +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.stream.Attributes.InputBuffer import akka.stream._ -import akka.stream.impl.QueueSink.{ Output, Pull } +import akka.stream.impl.QueueSink.Output +import akka.stream.impl.QueueSink.Pull import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.AtomicModule -import akka.stream.scaladsl.{ Sink, SinkQueueWithCancel, Source } +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.SinkQueueWithCancel +import akka.stream.scaladsl.Source import akka.stream.stage._ -import org.reactivestreams.{ Publisher, Subscriber } - -import scala.annotation.unchecked.uncheckedVariance -import scala.collection.{ immutable, mutable } -import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ -import scala.concurrent.{ Future, Promise } -import scala.util.control.NonFatal -import scala.util.{ Failure, Success, Try } -import scala.collection.immutable import akka.util.ccompat._ +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber /** * INTERNAL API @@ -441,17 +445,6 @@ import akka.util.ccompat._ } } -/** - * INTERNAL API - */ -@InternalApi private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) - extends akka.stream.javadsl.SinkQueueWithCancel[T] { - import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same } - def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava - def cancel(): Unit = delegate.cancel() - -} - /** * INTERNAL API * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala index d00da3cc4b..e534a71a1c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala @@ -10,29 +10,33 @@ import java.util.concurrent.CompletionStage import akka.Done import akka.stream.QueueOfferResult +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ +import scala.concurrent.Future + /** - * This trait allows to have the queue as a data source for some stream. + * This trait allows to have a queue as a data source for some stream. */ trait SourceQueue[T] { /** - * Method offers next element to a stream and returns future that: - * - completes with `Enqueued` if element is consumed by a stream - * - completes with `Dropped` when stream dropped offered element - * - completes with `QueueClosed` when stream is completed during future is active - * - completes with `Failure(f)` when failure to enqueue element from upstream - * - fails when stream is completed + * Offers an element to a stream and returns a [[CompletionStage]] that: + * - completes with `Enqueued` if the element is consumed by a stream + * - completes with `Dropped` when the stream dropped the offered element + * - completes with `QueueClosed` when the stream is completed whilst the [[CompletionStage]] is active + * - completes with `Failure(f)` in case of failure to enqueue element from upstream + * - fails when stream is already completed * * Additionally when using the backpressure overflowStrategy: - * - If the buffer is full the Future won't be completed until there is space in the buffer - * - Calling offer before the Future is completed in this case will return a failed Future + * - If the buffer is full the [[CompletionStage]] won't be completed until there is space in the buffer + * - Calling offer before the [[CompletionStage]] is completed, in this case it will return a failed [[CompletionStage]] * * @param elem element to send to a stream */ def offer(elem: T): CompletionStage[QueueOfferResult] /** - * Method returns a [[CompletionStage]] that will be completed if this + * Returns a [[CompletionStage]] that will be completed if this * operator completes, or will be failed when the stream is failed. */ def watchCompletion(): CompletionStage[Done] @@ -44,7 +48,7 @@ trait SourceQueue[T] { trait SourceQueueWithComplete[T] extends SourceQueue[T] { /** - * Complete the stream normally. Use `watchCompletion` to be notified of this + * Completes the stream normally. Use `watchCompletion` to be notified of this * operation’s success. * * Note that this only means the elements have been passed downstream, not @@ -53,7 +57,7 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { def complete(): Unit /** - * Complete the stream with a failure. Use `watchCompletion` to be notified of this + * Completes the stream with a failure. Use `watchCompletion` to be notified of this * operation’s success. * * Note that this only means the elements have been passed downstream, not @@ -69,17 +73,37 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { override def watchCompletion(): CompletionStage[Done] } +object SourceQueueWithComplete { + + /** + * Converts the queue into a `scaladsl.SourceQueueWithComplete` + */ + def asScala[T](queue: SourceQueueWithComplete[T]): akka.stream.scaladsl.SourceQueueWithComplete[T] = { + // would have been better to add `asScala` in SourceQueueWithComplete trait, but not doing + // that for backwards compatibility reasons + new akka.stream.scaladsl.SourceQueueWithComplete[T] { + def offer(elem: T): Future[QueueOfferResult] = queue.offer(elem).toScala + + def watchCompletion(): Future[Done] = queue.watchCompletion().toScala + + def complete(): Unit = queue.complete() + + def fail(ex: Throwable): Unit = queue.fail(ex) + } + } +} + /** - * Trait allows to have the queue as a sink for some stream. - * "SinkQueue" pulls data from stream with backpressure mechanism. + * This trait allows to have a queue as a sink for a stream. + * A [[SinkQueue]] pulls data from stream with a backpressure mechanism. */ trait SinkQueue[T] { /** - * Method pulls elements from stream and returns future that: - * - fails if stream is failed - * - completes with None in case if stream is completed - * - completes with `Some(element)` in case next element is available from stream. + * Pulls elements from the stream and returns a [[CompletionStage]] that: + * - fails if the stream is failed + * - completes with Empty in case the stream is completed + * - completes with `element` in case the next element is available from the stream. */ def pull(): CompletionStage[Optional[T]] } @@ -90,7 +114,26 @@ trait SinkQueue[T] { trait SinkQueueWithCancel[T] extends SinkQueue[T] { /** - * Cancel the stream. + * Cancels the stream. This method returns right away without waiting for actual finalizing the stream. */ def cancel(): Unit } + +object SinkQueueWithCancel { + + /** + * Converts the queue into a `scaladsl.SinkQueueWithCancel` + */ + def asScala[T](queue: SinkQueueWithCancel[T]): akka.stream.scaladsl.SinkQueueWithCancel[T] = { + // would have been better to add `asScala` in SinkQueueWithCancel trait, but not doing + // that for backwards compatibility reasons + new akka.stream.scaladsl.SinkQueueWithCancel[T] { + import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same } + + override def pull(): Future[Option[T]] = + queue.pull().toScala.map(_.asScala)(same) + + override def cancel(): Unit = queue.cancel() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 9afc1f6116..37d3420952 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -10,7 +10,7 @@ import akka.{ japi, Done, NotUsed } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function -import akka.stream.impl.{ LinearTraversalBuilder, SinkQueueAdapter } +import akka.stream.impl.LinearTraversalBuilder import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -299,8 +299,8 @@ object Sink { } /** - * Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueue]]. - * [[akka.stream.javadsl.SinkQueue.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``. + * Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueueWithCancel]]. + * [[akka.stream.javadsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``. * `CompletionStage` completes when element is available. * * Before calling pull method second time you need to wait until previous CompletionStage completes. @@ -310,13 +310,13 @@ object Sink { * upstream and then stop back pressure. You can configure size of input * buffer by using [[Sink.withAttributes]] method. * - * For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueue]] including last None + * For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueueWithCancel]] including last None * as completion marker * * @see [[akka.stream.javadsl.SinkQueueWithCancel]] */ def queue[T](): Sink[T, SinkQueueWithCancel[T]] = - new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_))) + new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(_.asJava)) /** * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9bf1d1b821..ded1962f7d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -12,7 +12,7 @@ import akka.annotation.ApiMayChange import akka.event.LoggingAdapter import akka.japi.{ function, Pair, Util } import akka.stream._ -import akka.stream.impl.{ LinearTraversalBuilder, SourceQueueAdapter } +import akka.stream.impl.{ LinearTraversalBuilder } import akka.util.{ ConstantFun, Timeout } import akka.util.JavaDurationConverters._ import akka.{ Done, NotUsed } @@ -403,7 +403,7 @@ object Source { } /** - * Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueue]]. + * Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueueWithComplete]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * if downstream is terminated. @@ -412,7 +412,7 @@ object Source { * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage` which completes with + * [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage` which completes with * `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with * `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. @@ -420,7 +420,7 @@ object Source { * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage` * call when buffer is full. * - * You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueue.watchCompletion]]. + * You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]]. * It returns a future that completes with success when this operator is completed or fails when stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait @@ -433,7 +433,7 @@ object Source { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = - new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(new SourceQueueAdapter(_))) + new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(_.asJava)) /** * Start a new `Source` from some resource which can be opened, read and closed. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala index 2bb5008b4e..c593d82d03 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala @@ -4,33 +4,41 @@ package akka.stream.scaladsl +import java.util.Optional +import java.util.concurrent.CompletionStage + import scala.concurrent.Future + import akka.Done import akka.stream.QueueOfferResult +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ + +import akka.annotation.InternalApi /** - * This trait allows to have the queue as a data source for some stream. + * This trait allows to have a queue as a data source for some stream. */ trait SourceQueue[T] { /** - * Method offers next element to a stream and returns future that: - * - completes with `Enqueued` if element is consumed by a stream - * - completes with `Dropped` when stream dropped offered element - * - completes with `QueueClosed` when stream is completed during future is active - * - completes with `Failure(f)` when failure to enqueue element from upstream - * - fails when stream is completed + * Offers an element to a stream and returns a [[Future]] that: + * - completes with `Enqueued` if the element is consumed by a stream + * - completes with `Dropped` when the stream dropped the offered element + * - completes with `QueueClosed` when the stream is completed whilst the [[Future]] is active + * - completes with `Failure(f)` in case of failure to enqueue element from upstream + * - fails when stream is already completed * * Additionally when using the backpressure overflowStrategy: - * - If the buffer is full the Future won't be completed until there is space in the buffer - * - Calling offer before the Future is completed in this case will return a failed Future + * - If the buffer is full the [[Future]] won't be completed until there is space in the buffer + * - Calling offer before the [[Future]] is completed, in this case it will return a failed [[Future]] * * @param elem element to send to a stream */ def offer(elem: T): Future[QueueOfferResult] /** - * Method returns a [[Future]] that will be completed if this operator + * Returns a [[Future]] that will be completed if this operator * completes, or will be failed when the operator faces an internal failure. * * Note that this only means the elements have been passed downstream, not @@ -45,7 +53,7 @@ trait SourceQueue[T] { trait SourceQueueWithComplete[T] extends SourceQueue[T] { /** - * Complete the stream normally. Use `watchCompletion` to be notified of this + * Completes the stream normally. Use `watchCompletion` to be notified of this * operation’s success. * * Note that this only means the elements have been passed downstream, not @@ -54,7 +62,7 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { def complete(): Unit /** - * Complete the stream with a failure. Use `watchCompletion` to be notified of this + * Completes the stream with a failure. Use `watchCompletion` to be notified of this * operation’s success. */ def fail(ex: Throwable): Unit @@ -70,17 +78,44 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { def watchCompletion(): Future[Done] } +object SourceQueueWithComplete { + final implicit class QueueOps[T](val queue: SourceQueueWithComplete[T]) extends AnyVal { + // would have been better to add `asJava` in SourceQueueWithComplete trait, but not doing + // that for backwards compatibility reasons + + /** + * Converts the queue into a `javadsl.SourceQueueWithComplete` + */ + def asJava: akka.stream.javadsl.SourceQueueWithComplete[T] = + SourceQueueWithComplete.asJava(queue) + } + + /** + * INTERNAL API: Converts the queue into a `javadsl.SourceQueueWithComplete` + */ + @InternalApi private[akka] def asJava[T]( + queue: SourceQueueWithComplete[T]): akka.stream.javadsl.SourceQueueWithComplete[T] = + new akka.stream.javadsl.SourceQueueWithComplete[T] { + def offer(elem: T): CompletionStage[QueueOfferResult] = + queue.offer(elem).toJava + def watchCompletion(): CompletionStage[Done] = + queue.watchCompletion().toJava + def complete(): Unit = queue.complete() + def fail(ex: Throwable): Unit = queue.fail(ex) + } +} + /** - * Trait allows to have the queue as a sink for some stream. - * "SinkQueue" pulls data from stream with backpressure mechanism. + * This trait allows to have a queue as a sink for a stream. + * A [[SinkQueue]] pulls data from a stream with a backpressure mechanism. */ trait SinkQueue[T] { /** - * Method pulls elements from stream and returns future that: - * - fails if stream is failed - * - completes with None in case if stream is completed - * - completes with `Some(element)` in case next element is available from stream. + * Pulls elements from the stream and returns a [[Future]] that: + * - fails if the stream is failed + * - completes with None in case the stream is completed + * - completes with `Some(element)` in case the next element is available from stream. */ def pull(): Future[Option[T]] } @@ -91,7 +126,28 @@ trait SinkQueue[T] { trait SinkQueueWithCancel[T] extends SinkQueue[T] { /** - * Cancel the stream. This method returns right away without waiting for actual finalizing stream. + * Cancels the stream. This method returns right away without waiting for actual finalizing the stream. */ def cancel(): Unit } + +object SinkQueueWithCancel { + final implicit class QueueOps[T](val queue: SinkQueueWithCancel[T]) extends AnyVal { + // would have been better to add `asJava` in SinkQueueWithCancel trait, but not doing + // that for backwards compatibility reasons + + def asJava: akka.stream.javadsl.SinkQueueWithCancel[T] = + SinkQueueWithCancel.asJava(queue) + } + + /** + * INTERNAL API: Converts the queue into a `javadsl.SinkQueueWithCancel` + */ + @InternalApi private[akka] def asJava[T](queue: SinkQueueWithCancel[T]): akka.stream.javadsl.SinkQueueWithCancel[T] = + new akka.stream.javadsl.SinkQueueWithCancel[T] { + import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same } + override def pull(): CompletionStage[Optional[T]] = + queue.pull().map(_.asJava)(same).toJava + override def cancel(): Unit = queue.cancel() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 93a282f9f8..fe7420687a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -533,8 +533,8 @@ object Sink { } /** - * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueue]]. - * [[akka.stream.scaladsl.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. + * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]]. + * [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. * `Future` completes when element is available. * * Before calling pull method second time you need to wait until previous Future completes. @@ -544,7 +544,7 @@ object Sink { * upstream and then stop back pressure. You can configure size of input * buffer by using [[Sink.withAttributes]] method. * - * For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None + * For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueueWithCancel]] including last None * as completion marker * * See also [[akka.stream.scaladsl.SinkQueueWithCancel]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index a438fb3da4..28a2ed9c27 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -615,7 +615,7 @@ object Source { } /** - * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]]. + * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueueWithComplete]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * if downstream is terminated. @@ -624,7 +624,7 @@ object Source { * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with + * [[akka.stream.scaladsl.SourceQueueWithComplete.offer]] returns `Future[QueueOfferResult]` which completes with * `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with * `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. @@ -632,7 +632,7 @@ object Source { * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` * call when buffer is full. * - * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]]. + * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]]. * It returns future that completes with success when the operator is completed or fails when the stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait