Merge pull request #26729 from akka/wip-25045-queues-interop-patriknw

=str #25045 adding Java/Scala interop to SourceQueue and SinkQueue
This commit is contained in:
Patrik Nordwall 2019-04-26 11:12:05 +02:00 committed by GitHub
commit 2e99b2921d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 95 deletions

View file

@ -176,5 +176,15 @@ class QueueSinkSpec extends StreamSpec {
Source.single(()).runWith(Sink.queue().withAttributes(inputBuffer(0, 0)))
}
}
"materialize to a queue which is seamlessly translatable between scala and java DSL" in assertAllStagesStopped {
val expected = List(Some(1), Some(2), Some(3), None)
val javadslQueue = Source(expected.flatten).runWith(Sink.queue()).asJava
val scaladslQueue = akka.stream.javadsl.SinkQueueWithCancel.asScala(javadslQueue)
expected.foreach { v
scaladslQueue.pull().pipeTo(testActor)
expectMsg(v)
}
}
}
}

View file

@ -356,6 +356,24 @@ class QueueSourceSpec extends StreamSpec {
}
}
"materialize to a queue which is seamlessly translatable between scala and java DSL" in {
val s = TestSubscriber.manualProbe[Int]()
val javadslQueue = Source.queue(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run().asJava
val scaladslQueue = akka.stream.javadsl.SourceQueueWithComplete.asScala(javadslQueue)
val sub = s.expectSubscription()
sub.request(1)
assertSuccess(scaladslQueue.offer(42))
s.expectNext(42)
scaladslQueue.watchCompletion().pipeTo(testActor)
expectNoMessage(pause)
sub.cancel()
expectMsg(Done)
}
}
}

View file

@ -11,3 +11,7 @@ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.Conne
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.io.ConnectionSourceStage.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.io.OutgoingConnectionStage.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub#MergedSourceLogic.this")
# #25045 adding Java/Scala interop to SourceQueue and SinkQueue
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SourceQueueAdapter")

View file

@ -4,15 +4,12 @@
package akka.stream.impl
import java.util.concurrent.CompletionStage
import akka.Done
import akka.annotation.InternalApi
import akka.stream.OverflowStrategies._
import akka.stream._
import akka.stream.stage._
import akka.stream.scaladsl.SourceQueueWithComplete
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ Future, Promise }
/**
@ -214,14 +211,3 @@ import scala.concurrent.{ Future, Promise }
(stageLogic, stageLogic)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T])
extends akka.stream.javadsl.SourceQueueWithComplete[T] {
def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava
def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava
def complete(): Unit = delegate.complete()
def fail(ex: Throwable): Unit = delegate.fail(ex)
}

View file

@ -4,32 +4,36 @@
package akka.stream.impl
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import akka.NotUsed
import akka.actor.{ ActorRef, Props }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.ActorRef
import akka.actor.Props
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.QueueSink.{ Output, Pull }
import akka.stream.impl.QueueSink.Output
import akka.stream.impl.QueueSink.Pull
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.scaladsl.{ Sink, SinkQueueWithCancel, Source }
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.SinkQueueWithCancel
import akka.stream.scaladsl.Source
import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{ immutable, mutable }
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.collection.immutable
import akka.util.ccompat._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
/**
* INTERNAL API
@ -441,17 +445,6 @@ import akka.util.ccompat._
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T])
extends akka.stream.javadsl.SinkQueueWithCancel[T] {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same }
def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava
def cancel(): Unit = delegate.cancel()
}
/**
* INTERNAL API
*

View file

@ -10,29 +10,33 @@ import java.util.concurrent.CompletionStage
import akka.Done
import akka.stream.QueueOfferResult
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future
/**
* This trait allows to have the queue as a data source for some stream.
* This trait allows to have a queue as a data source for some stream.
*/
trait SourceQueue[T] {
/**
* Method offers next element to a stream and returns future that:
* - completes with `Enqueued` if element is consumed by a stream
* - completes with `Dropped` when stream dropped offered element
* - completes with `QueueClosed` when stream is completed during future is active
* - completes with `Failure(f)` when failure to enqueue element from upstream
* - fails when stream is completed
* Offers an element to a stream and returns a [[CompletionStage]] that:
* - completes with `Enqueued` if the element is consumed by a stream
* - completes with `Dropped` when the stream dropped the offered element
* - completes with `QueueClosed` when the stream is completed whilst the [[CompletionStage]] is active
* - completes with `Failure(f)` in case of failure to enqueue element from upstream
* - fails when stream is already completed
*
* Additionally when using the backpressure overflowStrategy:
* - If the buffer is full the Future won't be completed until there is space in the buffer
* - Calling offer before the Future is completed in this case will return a failed Future
* - If the buffer is full the [[CompletionStage]] won't be completed until there is space in the buffer
* - Calling offer before the [[CompletionStage]] is completed, in this case it will return a failed [[CompletionStage]]
*
* @param elem element to send to a stream
*/
def offer(elem: T): CompletionStage[QueueOfferResult]
/**
* Method returns a [[CompletionStage]] that will be completed if this
* Returns a [[CompletionStage]] that will be completed if this
* operator completes, or will be failed when the stream is failed.
*/
def watchCompletion(): CompletionStage[Done]
@ -44,7 +48,7 @@ trait SourceQueue[T] {
trait SourceQueueWithComplete[T] extends SourceQueue[T] {
/**
* Complete the stream normally. Use `watchCompletion` to be notified of this
* Completes the stream normally. Use `watchCompletion` to be notified of this
* operations success.
*
* Note that this only means the elements have been passed downstream, not
@ -53,7 +57,7 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
def complete(): Unit
/**
* Complete the stream with a failure. Use `watchCompletion` to be notified of this
* Completes the stream with a failure. Use `watchCompletion` to be notified of this
* operations success.
*
* Note that this only means the elements have been passed downstream, not
@ -69,17 +73,37 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
override def watchCompletion(): CompletionStage[Done]
}
object SourceQueueWithComplete {
/**
* Converts the queue into a `scaladsl.SourceQueueWithComplete`
*/
def asScala[T](queue: SourceQueueWithComplete[T]): akka.stream.scaladsl.SourceQueueWithComplete[T] = {
// would have been better to add `asScala` in SourceQueueWithComplete trait, but not doing
// that for backwards compatibility reasons
new akka.stream.scaladsl.SourceQueueWithComplete[T] {
def offer(elem: T): Future[QueueOfferResult] = queue.offer(elem).toScala
def watchCompletion(): Future[Done] = queue.watchCompletion().toScala
def complete(): Unit = queue.complete()
def fail(ex: Throwable): Unit = queue.fail(ex)
}
}
}
/**
* Trait allows to have the queue as a sink for some stream.
* "SinkQueue" pulls data from stream with backpressure mechanism.
* This trait allows to have a queue as a sink for a stream.
* A [[SinkQueue]] pulls data from stream with a backpressure mechanism.
*/
trait SinkQueue[T] {
/**
* Method pulls elements from stream and returns future that:
* - fails if stream is failed
* - completes with None in case if stream is completed
* - completes with `Some(element)` in case next element is available from stream.
* Pulls elements from the stream and returns a [[CompletionStage]] that:
* - fails if the stream is failed
* - completes with Empty in case the stream is completed
* - completes with `element` in case the next element is available from the stream.
*/
def pull(): CompletionStage[Optional[T]]
}
@ -90,7 +114,26 @@ trait SinkQueue[T] {
trait SinkQueueWithCancel[T] extends SinkQueue[T] {
/**
* Cancel the stream.
* Cancels the stream. This method returns right away without waiting for actual finalizing the stream.
*/
def cancel(): Unit
}
object SinkQueueWithCancel {
/**
* Converts the queue into a `scaladsl.SinkQueueWithCancel`
*/
def asScala[T](queue: SinkQueueWithCancel[T]): akka.stream.scaladsl.SinkQueueWithCancel[T] = {
// would have been better to add `asScala` in SinkQueueWithCancel trait, but not doing
// that for backwards compatibility reasons
new akka.stream.scaladsl.SinkQueueWithCancel[T] {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext same }
override def pull(): Future[Option[T]] =
queue.pull().toScala.map(_.asScala)(same)
override def cancel(): Unit = queue.cancel()
}
}
}

View file

@ -10,7 +10,7 @@ import akka.{ japi, Done, NotUsed }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.japi.function
import akka.stream.impl.{ LinearTraversalBuilder, SinkQueueAdapter }
import akka.stream.impl.LinearTraversalBuilder
import akka.stream.{ javadsl, scaladsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
@ -299,8 +299,8 @@ object Sink {
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueue]].
* [[akka.stream.javadsl.SinkQueue.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``.
* Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueueWithCancel]].
* [[akka.stream.javadsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``.
* `CompletionStage` completes when element is available.
*
* Before calling pull method second time you need to wait until previous CompletionStage completes.
@ -310,13 +310,13 @@ object Sink {
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueue]] including last None
* For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueueWithCancel]] including last None
* as completion marker
*
* @see [[akka.stream.javadsl.SinkQueueWithCancel]]
*/
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_)))
new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(_.asJava))
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,

View file

@ -12,7 +12,7 @@ import akka.annotation.ApiMayChange
import akka.event.LoggingAdapter
import akka.japi.{ function, Pair, Util }
import akka.stream._
import akka.stream.impl.{ LinearTraversalBuilder, SourceQueueAdapter }
import akka.stream.impl.{ LinearTraversalBuilder }
import akka.util.{ ConstantFun, Timeout }
import akka.util.JavaDurationConverters._
import akka.{ Done, NotUsed }
@ -403,7 +403,7 @@ object Source {
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueue]].
* Creates a `Source` that is materialized as an [[akka.stream.javadsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
@ -412,7 +412,7 @@ object Source {
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.javadsl.SourceQueue.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* [[akka.stream.javadsl.SourceQueueWithComplete.offer]] returns `CompletionStage<QueueOfferResult>` which completes with
* `QueueOfferResult.enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
@ -420,7 +420,7 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():CompletionStage`
* call when buffer is full.
*
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueue.watchCompletion]].
* You can watch accessibility of stream with [[akka.stream.javadsl.SourceQueueWithComplete.watchCompletion]].
* It returns a future that completes with success when this operator is completed or fails when stream is failed.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait
@ -433,7 +433,7 @@ object Source {
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(new SourceQueueAdapter(_)))
new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(_.asJava))
/**
* Start a new `Source` from some resource which can be opened, read and closed.

View file

@ -4,33 +4,41 @@
package akka.stream.scaladsl
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.Future
import akka.Done
import akka.stream.QueueOfferResult
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import akka.annotation.InternalApi
/**
* This trait allows to have the queue as a data source for some stream.
* This trait allows to have a queue as a data source for some stream.
*/
trait SourceQueue[T] {
/**
* Method offers next element to a stream and returns future that:
* - completes with `Enqueued` if element is consumed by a stream
* - completes with `Dropped` when stream dropped offered element
* - completes with `QueueClosed` when stream is completed during future is active
* - completes with `Failure(f)` when failure to enqueue element from upstream
* - fails when stream is completed
* Offers an element to a stream and returns a [[Future]] that:
* - completes with `Enqueued` if the element is consumed by a stream
* - completes with `Dropped` when the stream dropped the offered element
* - completes with `QueueClosed` when the stream is completed whilst the [[Future]] is active
* - completes with `Failure(f)` in case of failure to enqueue element from upstream
* - fails when stream is already completed
*
* Additionally when using the backpressure overflowStrategy:
* - If the buffer is full the Future won't be completed until there is space in the buffer
* - Calling offer before the Future is completed in this case will return a failed Future
* - If the buffer is full the [[Future]] won't be completed until there is space in the buffer
* - Calling offer before the [[Future]] is completed, in this case it will return a failed [[Future]]
*
* @param elem element to send to a stream
*/
def offer(elem: T): Future[QueueOfferResult]
/**
* Method returns a [[Future]] that will be completed if this operator
* Returns a [[Future]] that will be completed if this operator
* completes, or will be failed when the operator faces an internal failure.
*
* Note that this only means the elements have been passed downstream, not
@ -45,7 +53,7 @@ trait SourceQueue[T] {
trait SourceQueueWithComplete[T] extends SourceQueue[T] {
/**
* Complete the stream normally. Use `watchCompletion` to be notified of this
* Completes the stream normally. Use `watchCompletion` to be notified of this
* operations success.
*
* Note that this only means the elements have been passed downstream, not
@ -54,7 +62,7 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
def complete(): Unit
/**
* Complete the stream with a failure. Use `watchCompletion` to be notified of this
* Completes the stream with a failure. Use `watchCompletion` to be notified of this
* operations success.
*/
def fail(ex: Throwable): Unit
@ -70,17 +78,44 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] {
def watchCompletion(): Future[Done]
}
object SourceQueueWithComplete {
final implicit class QueueOps[T](val queue: SourceQueueWithComplete[T]) extends AnyVal {
// would have been better to add `asJava` in SourceQueueWithComplete trait, but not doing
// that for backwards compatibility reasons
/**
* Converts the queue into a `javadsl.SourceQueueWithComplete`
*/
def asJava: akka.stream.javadsl.SourceQueueWithComplete[T] =
SourceQueueWithComplete.asJava(queue)
}
/**
* INTERNAL API: Converts the queue into a `javadsl.SourceQueueWithComplete`
*/
@InternalApi private[akka] def asJava[T](
queue: SourceQueueWithComplete[T]): akka.stream.javadsl.SourceQueueWithComplete[T] =
new akka.stream.javadsl.SourceQueueWithComplete[T] {
def offer(elem: T): CompletionStage[QueueOfferResult] =
queue.offer(elem).toJava
def watchCompletion(): CompletionStage[Done] =
queue.watchCompletion().toJava
def complete(): Unit = queue.complete()
def fail(ex: Throwable): Unit = queue.fail(ex)
}
}
/**
* Trait allows to have the queue as a sink for some stream.
* "SinkQueue" pulls data from stream with backpressure mechanism.
* This trait allows to have a queue as a sink for a stream.
* A [[SinkQueue]] pulls data from a stream with a backpressure mechanism.
*/
trait SinkQueue[T] {
/**
* Method pulls elements from stream and returns future that:
* - fails if stream is failed
* - completes with None in case if stream is completed
* - completes with `Some(element)` in case next element is available from stream.
* Pulls elements from the stream and returns a [[Future]] that:
* - fails if the stream is failed
* - completes with None in case the stream is completed
* - completes with `Some(element)` in case the next element is available from stream.
*/
def pull(): Future[Option[T]]
}
@ -91,7 +126,28 @@ trait SinkQueue[T] {
trait SinkQueueWithCancel[T] extends SinkQueue[T] {
/**
* Cancel the stream. This method returns right away without waiting for actual finalizing stream.
* Cancels the stream. This method returns right away without waiting for actual finalizing the stream.
*/
def cancel(): Unit
}
object SinkQueueWithCancel {
final implicit class QueueOps[T](val queue: SinkQueueWithCancel[T]) extends AnyVal {
// would have been better to add `asJava` in SinkQueueWithCancel trait, but not doing
// that for backwards compatibility reasons
def asJava: akka.stream.javadsl.SinkQueueWithCancel[T] =
SinkQueueWithCancel.asJava(queue)
}
/**
* INTERNAL API: Converts the queue into a `javadsl.SinkQueueWithCancel`
*/
@InternalApi private[akka] def asJava[T](queue: SinkQueueWithCancel[T]): akka.stream.javadsl.SinkQueueWithCancel[T] =
new akka.stream.javadsl.SinkQueueWithCancel[T] {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext same }
override def pull(): CompletionStage[Optional[T]] =
queue.pull().map(_.asJava)(same).toJava
override def cancel(): Unit = queue.cancel()
}
}

View file

@ -533,8 +533,8 @@ object Sink {
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueue]].
* [[akka.stream.scaladsl.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]].
* [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* `Future` completes when element is available.
*
* Before calling pull method second time you need to wait until previous Future completes.
@ -544,7 +544,7 @@ object Sink {
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueueWithCancel]] including last None
* as completion marker
*
* See also [[akka.stream.scaladsl.SinkQueueWithCancel]]

View file

@ -615,7 +615,7 @@ object Source {
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]].
* Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueueWithComplete]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
@ -624,7 +624,7 @@ object Source {
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with
* [[akka.stream.scaladsl.SourceQueueWithComplete.offer]] returns `Future[QueueOfferResult]` which completes with
* `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with
* `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` -
* when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed.
@ -632,7 +632,7 @@ object Source {
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.
*
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]].
* You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion]].
* It returns future that completes with success when the operator is completed or fails when the stream is failed.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait