Merge remote-tracking branch 'pr/19097' into release-2.3-dev
This commit is contained in:
commit
dcfa56e547
9 changed files with 146 additions and 167 deletions
|
|
@ -3,17 +3,19 @@
|
|||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.stream.actor.ActorPublisherMessage.Request
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.Attributes.InputBuffer
|
||||
import akka.stream._
|
||||
import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, GraphStageLogic }
|
||||
import akka.util.Timeout
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.stage.{ AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler }
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.concurrent.duration.{ FiniteDuration, _ }
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.language.postfixOps
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -167,32 +169,6 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
|
|||
override def toString: String = "ActorRefSink"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: Attributes,
|
||||
shape: SinkShape[In], timeout: FiniteDuration) extends SinkModule[In, SinkQueue[In]](shape) {
|
||||
|
||||
override def create(context: MaterializationContext) = {
|
||||
import akka.pattern.ask
|
||||
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
|
||||
|
||||
implicit val t = Timeout(timeout)
|
||||
val subscriberRef = actorMaterializer.actorOf(context,
|
||||
AcknowledgeSubscriber.props(bufferSize))
|
||||
(akka.stream.actor.ActorSubscriber[In](subscriberRef),
|
||||
new SinkQueue[In] {
|
||||
override def pull(): Future[Option[In]] = (subscriberRef ? Request(1)).mapTo[Option[In]]
|
||||
})
|
||||
}
|
||||
|
||||
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, SinkQueue[In]] =
|
||||
new AcknowledgeSink[In](bufferSize, attributes, shape, timeout)
|
||||
override def withAttributes(attr: Attributes): Module =
|
||||
new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout)
|
||||
override def toString: String = "AcknowledgeSink"
|
||||
}
|
||||
|
||||
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
||||
|
||||
val in = Inlet[T]("lastOption.in")
|
||||
|
|
@ -261,3 +237,92 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV
|
|||
|
||||
override def toString: String = "HeadOptionStage"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] {
|
||||
trait RequestElementCallback[E] {
|
||||
val requestElement = new AtomicReference[AnyRef](Nil)
|
||||
}
|
||||
|
||||
type Requested[E] = Promise[Option[T]]
|
||||
|
||||
val in = Inlet[T]("queueSink.in")
|
||||
override val shape: SinkShape[T] = SinkShape.of(in)
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||
type Received[E] = Try[Option[E]]
|
||||
|
||||
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
|
||||
require(maxBuffer > 0, "Buffer size must be greater than 0")
|
||||
|
||||
val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1)
|
||||
var currentRequest: Option[Requested[T]] = None
|
||||
|
||||
val stageLogic = new GraphStageLogic(shape) with RequestElementCallback[Requested[T]] {
|
||||
override def keepGoingAfterAllPortsClosed = true
|
||||
|
||||
override def preStart(): Unit = {
|
||||
val list = requestElement.getAndSet(callback.invoke _).asInstanceOf[List[Requested[T]]]
|
||||
list.reverse.foreach(callback.invoke)
|
||||
pull(in)
|
||||
}
|
||||
|
||||
private val callback: AsyncCallback[Requested[T]] =
|
||||
getAsyncCallback(promise ⇒ currentRequest match {
|
||||
case Some(_) ⇒
|
||||
promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
|
||||
case None ⇒
|
||||
if (buffer.isEmpty) currentRequest = Some(promise)
|
||||
else sendDownstream(promise)
|
||||
})
|
||||
|
||||
def sendDownstream(promise: Requested[T]): Unit = {
|
||||
val e = buffer.dequeue()
|
||||
promise.complete(e)
|
||||
e match {
|
||||
case Success(_: Some[_]) ⇒ //do nothing
|
||||
case Success(None) ⇒ completeStage()
|
||||
case Failure(t) ⇒ failStage(t)
|
||||
}
|
||||
}
|
||||
|
||||
def enqueueAndNotify(requested: Received[T]): Unit = {
|
||||
buffer.enqueue(requested)
|
||||
currentRequest match {
|
||||
case Some(p) ⇒
|
||||
sendDownstream(p)
|
||||
currentRequest = None
|
||||
case None ⇒ //do nothing
|
||||
}
|
||||
}
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
enqueueAndNotify(Success(Some(grab(in))))
|
||||
if (buffer.used < maxBuffer - 1) pull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
|
||||
})
|
||||
}
|
||||
|
||||
(stageLogic, new SinkQueue[T] {
|
||||
override def pull(): Future[Option[T]] = {
|
||||
val ref = stageLogic.requestElement
|
||||
val p = Promise[Option[T]]
|
||||
ref.get() match {
|
||||
case l: List[_] ⇒
|
||||
if (!ref.compareAndSet(l, p :: l))
|
||||
ref.get() match {
|
||||
case _: List[_] ⇒ throw new IllegalStateException("Concurrent call of SinkQueue.pull() is detected")
|
||||
case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p)
|
||||
}
|
||||
case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p)
|
||||
}
|
||||
p.future
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue