+str #19443 add support or Java Stream
This commit is contained in:
parent
b307a0ead7
commit
093d82ce00
18 changed files with 684 additions and 59 deletions
|
|
@ -3,19 +3,28 @@
|
|||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.stream.impl.QueueSink.{ Output, Pull }
|
||||
import akka.{ Done, NotUsed }
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.stream.Attributes.InputBuffer
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.StreamLayout.AtomicModule
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.function.BiConsumer
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.stream.Attributes.InputBuffer
|
||||
import akka.stream._
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.stage._
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.concurrent.{ Promise, Future }
|
||||
import scala.language.postfixOps
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import akka.stream.scaladsl.SinkQueue
|
||||
import akka.stream.scaladsl.{ SinkQueueWithCancel, SinkQueue }
|
||||
import java.util.concurrent.CompletionStage
|
||||
import scala.compat.java8.FutureConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
|
@ -183,7 +192,7 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
|
|||
|
||||
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
||||
|
||||
val in = Inlet[T]("lastOption.in")
|
||||
val in: Inlet[T] = Inlet("lastOption.in")
|
||||
|
||||
override val shape: SinkShape[T] = SinkShape.of(in)
|
||||
|
||||
|
|
@ -220,7 +229,7 @@ private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedV
|
|||
|
||||
private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
||||
|
||||
val in = Inlet[T]("headOption.in")
|
||||
val in: Inlet[T] = Inlet("headOption.in")
|
||||
|
||||
override val shape: SinkShape[T] = SinkShape.of(in)
|
||||
|
||||
|
|
@ -290,10 +299,16 @@ private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[Si
|
|||
}
|
||||
}
|
||||
|
||||
private[stream] object QueueSink {
|
||||
sealed trait Output[+T]
|
||||
final case class Pull[T](promise: Promise[Option[T]]) extends Output[T]
|
||||
case object Cancel extends Output[Nothing]
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] {
|
||||
final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
|
||||
type Requested[E] = Promise[Option[E]]
|
||||
|
||||
val in = Inlet[T]("queueSink.in")
|
||||
|
|
@ -303,7 +318,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
|
|||
override def toString: String = "QueueSink"
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
|
||||
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] {
|
||||
type Received[E] = Try[Option[E]]
|
||||
|
||||
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
|
||||
|
|
@ -321,20 +336,25 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
|
|||
pull(in)
|
||||
}
|
||||
|
||||
override def postStop(): Unit = stopCallback(promise ⇒
|
||||
promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached")))
|
||||
override def postStop(): Unit = stopCallback {
|
||||
case Pull(promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached"))
|
||||
case _ ⇒ //do nothing
|
||||
}
|
||||
|
||||
private val callback: AsyncCallback[Requested[T]] =
|
||||
getAsyncCallback(promise ⇒ currentRequest match {
|
||||
case Some(_) ⇒
|
||||
promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
|
||||
case None ⇒
|
||||
if (buffer.isEmpty) currentRequest = Some(promise)
|
||||
else {
|
||||
if (buffer.used == maxBuffer) tryPull(in)
|
||||
sendDownstream(promise)
|
||||
}
|
||||
})
|
||||
private val callback: AsyncCallback[Output[T]] =
|
||||
getAsyncCallback {
|
||||
case QueueSink.Pull(pullPromise) ⇒ currentRequest match {
|
||||
case Some(_) ⇒
|
||||
pullPromise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
|
||||
case None ⇒
|
||||
if (buffer.isEmpty) currentRequest = Some(pullPromise)
|
||||
else {
|
||||
if (buffer.used == maxBuffer) tryPull(in)
|
||||
sendDownstream(pullPromise)
|
||||
}
|
||||
}
|
||||
case QueueSink.Cancel ⇒ completeStage()
|
||||
}
|
||||
|
||||
def sendDownstream(promise: Requested[T]): Unit = {
|
||||
val e = buffer.dequeue()
|
||||
|
|
@ -366,17 +386,58 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
|
|||
})
|
||||
}
|
||||
|
||||
(stageLogic, new SinkQueue[T] {
|
||||
(stageLogic, new SinkQueueWithCancel[T] {
|
||||
override def pull(): Future[Option[T]] = {
|
||||
val p = Promise[Option[T]]
|
||||
stageLogic.invoke(p)
|
||||
stageLogic.invoke(Pull(p))
|
||||
p.future
|
||||
}
|
||||
override def cancel(): Unit = {
|
||||
stageLogic.invoke(QueueSink.Cancel)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] final class SinkQueueAdapter[T](delegate: SinkQueue[T]) extends akka.stream.javadsl.SinkQueue[T] {
|
||||
private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] {
|
||||
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same }
|
||||
def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava
|
||||
def cancel(): Unit = delegate.cancel()
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Helper class to be able to express collection as a fold using mutable data
|
||||
*/
|
||||
private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
|
||||
lazy val accumulated = collector.supplier().get()
|
||||
private lazy val accumulator = collector.accumulator()
|
||||
|
||||
def update(elem: T): CollectorState[T, R] = {
|
||||
accumulator.accept(accumulated, elem)
|
||||
this
|
||||
}
|
||||
|
||||
def finish(): R = collector.finisher().apply(accumulated)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Helper class to be able to express reduce as a fold for parallel collector
|
||||
*/
|
||||
private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
|
||||
private var reduced: Any = null.asInstanceOf[Any]
|
||||
private lazy val combiner = collector.combiner()
|
||||
|
||||
def update(batch: Any): ReducerState[T, R] = {
|
||||
if (reduced == null) reduced = batch
|
||||
else reduced = combiner(reduced, batch)
|
||||
this
|
||||
}
|
||||
|
||||
def finish(): R = collector.finisher().apply(reduced)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue