2015-01-28 14:19:50 +01:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
|
2015-01-28 14:19:50 +01:00
|
|
|
*/
|
|
|
|
|
package akka.stream.impl
|
|
|
|
|
|
2016-07-07 07:01:28 -04:00
|
|
|
import akka.dispatch.ExecutionContexts
|
|
|
|
|
import akka.stream.ActorAttributes.SupervisionStrategy
|
2016-07-27 13:29:23 +02:00
|
|
|
import akka.stream.Supervision.{ Stop, stoppingDecider }
|
2016-01-14 15:22:25 +01:00
|
|
|
import akka.stream.impl.QueueSink.{ Output, Pull }
|
2016-07-07 07:01:28 -04:00
|
|
|
import akka.stream.impl.fusing.GraphInterpreter
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.{ Done, NotUsed }
|
2016-07-27 13:29:23 +02:00
|
|
|
import akka.actor.{ Actor, ActorRef, Props }
|
2015-12-04 09:37:32 -05:00
|
|
|
import akka.stream.Attributes.InputBuffer
|
2015-08-19 23:04:20 -04:00
|
|
|
import akka.stream._
|
2016-02-12 01:36:21 +08:00
|
|
|
import akka.stream.impl.Stages.DefaultAttributes
|
2016-03-11 17:08:30 +01:00
|
|
|
import akka.stream.impl.StreamLayout.AtomicModule
|
2016-01-14 15:22:25 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
|
|
|
|
import java.util.function.BiConsumer
|
2016-07-27 13:29:23 +02:00
|
|
|
|
2016-01-14 15:22:25 +01:00
|
|
|
import akka.actor.{ ActorRef, Props }
|
|
|
|
|
import akka.stream.Attributes.InputBuffer
|
|
|
|
|
import akka.stream._
|
2016-01-16 12:17:19 -05:00
|
|
|
import akka.stream.stage._
|
2015-11-18 00:09:04 +01:00
|
|
|
import org.reactivestreams.{ Publisher, Subscriber }
|
2016-07-27 13:29:23 +02:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
import scala.annotation.unchecked.uncheckedVariance
|
2016-02-12 01:36:21 +08:00
|
|
|
import scala.collection.immutable
|
2016-07-27 13:29:23 +02:00
|
|
|
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
2016-01-14 15:22:25 +01:00
|
|
|
import scala.language.postfixOps
|
|
|
|
|
import scala.util.control.NonFatal
|
2015-12-04 09:37:32 -05:00
|
|
|
import scala.util.{ Failure, Success, Try }
|
2016-07-27 13:29:23 +02:00
|
|
|
import akka.stream.scaladsl.{ Sink, SinkQueue, SinkQueueWithCancel, Source }
|
2016-01-21 16:37:26 +01:00
|
|
|
import java.util.concurrent.CompletionStage
|
2016-07-27 13:29:23 +02:00
|
|
|
|
2016-01-21 16:37:26 +01:00
|
|
|
import scala.compat.java8.FutureConverters._
|
|
|
|
|
import scala.compat.java8.OptionConverters._
|
|
|
|
|
import java.util.Optional
|
2016-07-27 13:29:23 +02:00
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
import akka.event.Logging
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-07-27 13:29:23 +02:00
|
|
|
abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule[SinkShape[In], Mat] {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-02-24 11:55:28 +01:00
|
|
|
/**
|
|
|
|
|
* Create the Subscriber or VirtualPublisher that consumes the incoming
|
|
|
|
|
* stream, plus the materialized value. Since Subscriber and VirtualPublisher
|
|
|
|
|
* do not share a common supertype apart from AnyRef this is what the type
|
|
|
|
|
* union devolves into; unfortunately we do not have union types at our
|
|
|
|
|
* disposal at this point.
|
|
|
|
|
*/
|
|
|
|
|
def create(context: MaterializationContext): (AnyRef, Mat)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-07-27 13:29:23 +02:00
|
|
|
override def traversalBuilder: TraversalBuilder =
|
|
|
|
|
LinearTraversalBuilder.fromModule(this).makeIsland(SinkModuleIslandTag)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
// This is okay since we the only caller of this method is right below.
|
2016-07-27 13:29:23 +02:00
|
|
|
// TODO: Remove this, no longer needed
|
2015-01-28 14:19:50 +01:00
|
|
|
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
|
|
|
|
|
|
2015-07-06 22:00:21 +02:00
|
|
|
protected def amendShape(attr: Attributes): SinkShape[In] = {
|
2016-07-27 13:29:23 +02:00
|
|
|
val thisN = traversalBuilder.attributes.nameOrDefault(null)
|
2015-07-06 22:00:21 +02:00
|
|
|
val thatN = attr.nameOrDefault(null)
|
|
|
|
|
|
|
|
|
|
if ((thatN eq null) || thisN == thatN) shape
|
2015-12-14 14:52:06 +01:00
|
|
|
else shape.copy(in = Inlet(thatN + ".in"))
|
2015-03-05 12:21:17 +01:00
|
|
|
}
|
2016-03-11 17:08:30 +01:00
|
|
|
|
|
|
|
|
protected def label: String = Logging.simpleName(this)
|
|
|
|
|
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
|
|
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow.
|
|
|
|
|
* The stream will not have any subscribers attached at this point, which means that after prefetching
|
|
|
|
|
* elements to fill the internal buffers it will assert back-pressure until
|
|
|
|
|
* a subscriber connects and creates demand for elements to be emitted.
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-02-24 11:55:28 +01:00
|
|
|
/*
|
|
|
|
|
* This method is the reason why SinkModule.create may return something that is
|
|
|
|
|
* not a Subscriber: a VirtualPublisher is used in order to avoid the immediate
|
|
|
|
|
* subscription a VirtualProcessor would perform (and it also saves overhead).
|
|
|
|
|
*/
|
|
|
|
|
override def create(context: MaterializationContext): (AnyRef, Publisher[In]) = {
|
|
|
|
|
val proc = new VirtualPublisher[In]
|
2015-06-16 15:34:54 +02:00
|
|
|
(proc, proc)
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape)
|
2016-07-27 13:29:23 +02:00
|
|
|
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] = new PublisherSink[In](attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final class FanoutPublisherSink[In](
|
2015-06-23 17:32:55 +02:00
|
|
|
val attributes: Attributes,
|
2016-06-02 14:06:57 +02:00
|
|
|
shape: SinkShape[In])
|
2015-01-28 14:19:50 +01:00
|
|
|
extends SinkModule[In, Publisher[In]](shape) {
|
|
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
|
2016-05-03 18:58:26 -07:00
|
|
|
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
|
2016-05-23 11:31:49 +03:00
|
|
|
val impl = actorMaterializer.actorOf(
|
|
|
|
|
context,
|
2017-03-03 10:49:34 +01:00
|
|
|
FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(context.effectiveAttributes)))
|
2016-05-23 11:31:49 +03:00
|
|
|
val fanoutProcessor = new ActorProcessor[In, In](impl)
|
|
|
|
|
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
|
|
|
|
|
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
|
2015-01-28 14:19:50 +01:00
|
|
|
(fanoutProcessor, fanoutProcessor)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
|
2015-11-03 12:53:24 +01:00
|
|
|
new FanoutPublisherSink[In](attributes, shape)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-07-27 13:29:23 +02:00
|
|
|
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
|
2015-11-03 12:53:24 +01:00
|
|
|
new FanoutPublisherSink[In](attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Attaches a subscriber to this stream.
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
override def create(context: MaterializationContext) = (subscriber, NotUsed)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape)
|
2016-07-27 13:29:23 +02:00
|
|
|
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* A sink that immediately cancels its upstream upon materialization.
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) {
|
2016-01-20 10:00:37 +02:00
|
|
|
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed)
|
|
|
|
|
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape)
|
2016-07-27 13:29:23 +02:00
|
|
|
override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
|
|
|
|
|
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2016-05-03 18:58:26 -07:00
|
|
|
val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
|
2015-01-28 14:19:50 +01:00
|
|
|
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
|
|
|
|
|
}
|
|
|
|
|
|
2015-03-30 14:42:30 +02:00
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape)
|
2016-07-27 13:29:23 +02:00
|
|
|
override def withAttributes(attr: Attributes): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
2015-03-30 14:42:30 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
|
|
|
|
|
val attributes: Attributes,
|
|
|
|
|
shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
|
2015-03-30 14:42:30 +02:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2016-05-03 18:58:26 -07:00
|
|
|
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
|
2015-04-10 14:39:48 +02:00
|
|
|
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
|
2016-06-02 14:06:57 +02:00
|
|
|
val subscriberRef = actorMaterializer.actorOf(
|
|
|
|
|
context,
|
2015-04-10 14:39:48 +02:00
|
|
|
ActorRefSinkActor.props(ref, effectiveSettings.maxInputBufferSize, onCompleteMessage))
|
2016-01-20 10:00:37 +02:00
|
|
|
(akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed)
|
2015-03-30 14:42:30 +02:00
|
|
|
}
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
|
2015-03-30 14:42:30 +02:00
|
|
|
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
|
2016-07-27 13:29:23 +02:00
|
|
|
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] =
|
2015-03-30 14:42:30 +02:00
|
|
|
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
2015-12-13 17:02:58 +01:00
|
|
|
|
2016-01-14 15:22:25 +01:00
|
|
|
val in: Inlet[T] = Inlet("lastOption.in")
|
2015-12-13 17:02:58 +01:00
|
|
|
|
|
|
|
|
override val shape: SinkShape[T] = SinkShape.of(in)
|
|
|
|
|
|
2015-11-18 00:09:04 +01:00
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
|
|
|
|
val p: Promise[Option[T]] = Promise()
|
2016-08-29 14:00:48 +02:00
|
|
|
(new GraphStageLogic(shape) with InHandler {
|
|
|
|
|
private[this] var prev: T = null.asInstanceOf[T]
|
|
|
|
|
|
2015-11-18 00:09:04 +01:00
|
|
|
override def preStart(): Unit = pull(in)
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
|
|
|
|
prev = grab(in)
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
2015-11-18 00:09:04 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
val head = prev
|
|
|
|
|
prev = null.asInstanceOf[T]
|
|
|
|
|
p.trySuccess(Option(head))
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
2015-11-18 00:09:04 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
|
|
|
|
prev = null.asInstanceOf[T]
|
|
|
|
|
p.tryFailure(ex)
|
|
|
|
|
failStage(ex)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandler(in, this)
|
2015-11-18 00:09:04 +01:00
|
|
|
}, p.future)
|
|
|
|
|
}
|
2015-12-14 17:02:00 +01:00
|
|
|
|
|
|
|
|
override def toString: String = "LastOptionStage"
|
2015-11-18 00:09:04 +01:00
|
|
|
}
|
|
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
2015-12-13 17:02:58 +01:00
|
|
|
|
2016-01-14 15:22:25 +01:00
|
|
|
val in: Inlet[T] = Inlet("headOption.in")
|
2015-12-13 17:02:58 +01:00
|
|
|
|
|
|
|
|
override val shape: SinkShape[T] = SinkShape.of(in)
|
|
|
|
|
|
2015-11-18 00:09:04 +01:00
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
|
|
|
|
val p: Promise[Option[T]] = Promise()
|
2016-08-29 14:00:48 +02:00
|
|
|
(new GraphStageLogic(shape) with InHandler {
|
2015-11-18 00:09:04 +01:00
|
|
|
override def preStart(): Unit = pull(in)
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
|
|
|
|
p.trySuccess(Option(grab(in)))
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
2015-11-18 00:09:04 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
p.trySuccess(None)
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
|
|
|
|
p.tryFailure(ex)
|
|
|
|
|
failStage(ex)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandler(in, this)
|
2015-11-18 00:09:04 +01:00
|
|
|
}, p.future)
|
|
|
|
|
}
|
2015-12-14 17:02:00 +01:00
|
|
|
|
|
|
|
|
override def toString: String = "HeadOptionStage"
|
2015-11-18 00:09:04 +01:00
|
|
|
}
|
2015-12-04 09:37:32 -05:00
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
|
2016-02-12 01:36:21 +08:00
|
|
|
val in = Inlet[T]("seq.in")
|
|
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override def toString: String = "SeqStage"
|
|
|
|
|
|
2016-02-12 01:36:21 +08:00
|
|
|
override val shape: SinkShape[T] = SinkShape.of(in)
|
|
|
|
|
|
|
|
|
|
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
|
|
|
|
|
|
|
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
|
|
|
|
val p: Promise[immutable.Seq[T]] = Promise()
|
2016-08-29 14:00:48 +02:00
|
|
|
val logic = new GraphStageLogic(shape) with InHandler {
|
2016-02-12 01:36:21 +08:00
|
|
|
val buf = Vector.newBuilder[T]
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = pull(in)
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
|
|
|
|
buf += grab(in)
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
2016-02-12 01:36:21 +08:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
val result = buf.result()
|
|
|
|
|
p.trySuccess(result)
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
2016-02-12 01:36:21 +08:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
|
|
|
|
p.tryFailure(ex)
|
|
|
|
|
failStage(ex)
|
|
|
|
|
}
|
2016-02-12 01:36:21 +08:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
setHandler(in, this)
|
2016-02-12 01:36:21 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(logic, p.future)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-14 15:22:25 +01:00
|
|
|
private[stream] object QueueSink {
|
|
|
|
|
sealed trait Output[+T]
|
|
|
|
|
final case class Pull[T](promise: Promise[Option[T]]) extends Output[T]
|
|
|
|
|
case object Cancel extends Output[Nothing]
|
|
|
|
|
}
|
|
|
|
|
|
2015-12-04 09:37:32 -05:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
|
2016-01-16 12:17:19 -05:00
|
|
|
type Requested[E] = Promise[Option[E]]
|
2015-12-04 09:37:32 -05:00
|
|
|
|
|
|
|
|
val in = Inlet[T]("queueSink.in")
|
2016-02-15 13:38:37 +01:00
|
|
|
override def initialAttributes = DefaultAttributes.queueSink
|
2015-12-04 09:37:32 -05:00
|
|
|
override val shape: SinkShape[T] = SinkShape.of(in)
|
|
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override def toString: String = "QueueSink"
|
|
|
|
|
|
2015-12-04 09:37:32 -05:00
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
2016-08-29 14:00:48 +02:00
|
|
|
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] with InHandler {
|
2016-02-07 14:54:48 +01:00
|
|
|
type Received[E] = Try[Option[E]]
|
2015-12-04 09:37:32 -05:00
|
|
|
|
2016-02-15 13:38:37 +01:00
|
|
|
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
|
2016-02-07 14:54:48 +01:00
|
|
|
require(maxBuffer > 0, "Buffer size must be greater than 0")
|
2015-12-04 09:37:32 -05:00
|
|
|
|
2016-02-07 14:54:48 +01:00
|
|
|
var buffer: Buffer[Received[T]] = _
|
|
|
|
|
var currentRequest: Option[Requested[T]] = None
|
2015-12-04 09:37:32 -05:00
|
|
|
|
|
|
|
|
override def preStart(): Unit = {
|
2016-02-16 13:09:01 +01:00
|
|
|
// Allocates one additional element to hold stream
|
|
|
|
|
// closed/failure indicators
|
2016-02-07 14:54:48 +01:00
|
|
|
buffer = Buffer(maxBuffer + 1, materializer)
|
2015-12-20 12:54:05 +01:00
|
|
|
setKeepGoing(true)
|
2016-01-16 12:17:19 -05:00
|
|
|
initCallback(callback.invoke)
|
2015-12-04 09:37:32 -05:00
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-14 15:22:25 +01:00
|
|
|
override def postStop(): Unit = stopCallback {
|
|
|
|
|
case Pull(promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached"))
|
|
|
|
|
case _ ⇒ //do nothing
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private val callback: AsyncCallback[Output[T]] =
|
|
|
|
|
getAsyncCallback {
|
|
|
|
|
case QueueSink.Pull(pullPromise) ⇒ currentRequest match {
|
|
|
|
|
case Some(_) ⇒
|
|
|
|
|
pullPromise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
|
|
|
|
|
case None ⇒
|
|
|
|
|
if (buffer.isEmpty) currentRequest = Some(pullPromise)
|
|
|
|
|
else {
|
|
|
|
|
if (buffer.used == maxBuffer) tryPull(in)
|
|
|
|
|
sendDownstream(pullPromise)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case QueueSink.Cancel ⇒ completeStage()
|
|
|
|
|
}
|
2015-12-04 09:37:32 -05:00
|
|
|
|
|
|
|
|
def sendDownstream(promise: Requested[T]): Unit = {
|
|
|
|
|
val e = buffer.dequeue()
|
|
|
|
|
promise.complete(e)
|
|
|
|
|
e match {
|
|
|
|
|
case Success(_: Some[_]) ⇒ //do nothing
|
|
|
|
|
case Success(None) ⇒ completeStage()
|
|
|
|
|
case Failure(t) ⇒ failStage(t)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def enqueueAndNotify(requested: Received[T]): Unit = {
|
|
|
|
|
buffer.enqueue(requested)
|
|
|
|
|
currentRequest match {
|
|
|
|
|
case Some(p) ⇒
|
|
|
|
|
sendDownstream(p)
|
|
|
|
|
currentRequest = None
|
|
|
|
|
case None ⇒ //do nothing
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
|
|
|
|
enqueueAndNotify(Success(Some(grab(in))))
|
|
|
|
|
if (buffer.used < maxBuffer) pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
|
|
|
|
|
|
|
|
|
|
setHandler(in, this)
|
2015-12-04 09:37:32 -05:00
|
|
|
}
|
|
|
|
|
|
2016-01-14 15:22:25 +01:00
|
|
|
(stageLogic, new SinkQueueWithCancel[T] {
|
2015-12-04 09:37:32 -05:00
|
|
|
override def pull(): Future[Option[T]] = {
|
|
|
|
|
val p = Promise[Option[T]]
|
2016-01-14 15:22:25 +01:00
|
|
|
stageLogic.invoke(Pull(p))
|
2015-12-04 09:37:32 -05:00
|
|
|
p.future
|
|
|
|
|
}
|
2016-01-14 15:22:25 +01:00
|
|
|
override def cancel(): Unit = {
|
|
|
|
|
stageLogic.invoke(QueueSink.Cancel)
|
|
|
|
|
}
|
2015-12-04 09:37:32 -05:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-01-21 16:37:26 +01:00
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] {
|
2016-01-21 16:37:26 +01:00
|
|
|
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same }
|
|
|
|
|
def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava
|
2016-01-14 15:22:25 +01:00
|
|
|
def cancel(): Unit = delegate.cancel()
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*
|
|
|
|
|
* Helper class to be able to express collection as a fold using mutable data
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
|
|
|
|
|
lazy val accumulated = collector.supplier().get()
|
|
|
|
|
private lazy val accumulator = collector.accumulator()
|
|
|
|
|
|
|
|
|
|
def update(elem: T): CollectorState[T, R] = {
|
|
|
|
|
accumulator.accept(accumulated, elem)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def finish(): R = collector.finisher().apply(accumulated)
|
2016-01-21 16:37:26 +01:00
|
|
|
}
|
2016-01-14 15:22:25 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*
|
|
|
|
|
* Helper class to be able to express reduce as a fold for parallel collector
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
|
|
|
|
|
private var reduced: Any = null.asInstanceOf[Any]
|
|
|
|
|
private lazy val combiner = collector.combiner()
|
|
|
|
|
|
|
|
|
|
def update(batch: Any): ReducerState[T, R] = {
|
|
|
|
|
if (reduced == null) reduced = batch
|
|
|
|
|
else reduced = combiner(reduced, batch)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def finish(): R = collector.finisher().apply(reduced)
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-07 07:01:28 -04:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], zeroMat: () ⇒ M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
|
|
|
|
|
val in = Inlet[T]("lazySink.in")
|
|
|
|
|
override def initialAttributes = DefaultAttributes.lazySink
|
|
|
|
|
override val shape: SinkShape[T] = SinkShape.of(in)
|
|
|
|
|
|
|
|
|
|
override def toString: String = "LazySink"
|
|
|
|
|
|
|
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
|
|
|
|
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider)
|
|
|
|
|
|
2017-02-16 03:19:25 -05:00
|
|
|
var completed = false
|
2016-07-07 07:01:28 -04:00
|
|
|
val promise = Promise[M]()
|
|
|
|
|
val stageLogic = new GraphStageLogic(shape) with InHandler {
|
|
|
|
|
override def preStart(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val element = grab(in)
|
2017-02-16 03:19:25 -05:00
|
|
|
val cb: AsyncCallback[Try[Sink[T, M]]] =
|
|
|
|
|
getAsyncCallback {
|
|
|
|
|
case Success(sink) ⇒ initInternalSource(sink, element)
|
|
|
|
|
case Failure(e) ⇒ failure(e)
|
|
|
|
|
}
|
2016-07-07 07:01:28 -04:00
|
|
|
sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
|
2017-02-16 03:19:25 -05:00
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = ()
|
|
|
|
|
override def onUpstreamFinish(): Unit = gotCompletionEvent()
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
|
|
|
|
|
})
|
2016-07-07 07:01:28 -04:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒ decider(e) match {
|
|
|
|
|
case Supervision.Stop ⇒ failure(e)
|
|
|
|
|
case _ ⇒ pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def failure(ex: Throwable): Unit = {
|
|
|
|
|
failStage(ex)
|
|
|
|
|
promise.failure(ex)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
completeStage()
|
|
|
|
|
promise.tryComplete(Try(zeroMat()))
|
|
|
|
|
}
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
|
|
|
|
|
setHandler(in, this)
|
|
|
|
|
|
2017-02-16 03:19:25 -05:00
|
|
|
private def gotCompletionEvent(): Unit = {
|
|
|
|
|
setKeepGoing(true)
|
|
|
|
|
completed = true
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-07 07:01:28 -04:00
|
|
|
private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = {
|
|
|
|
|
val sourceOut = new SubSourceOutlet[T]("LazySink")
|
|
|
|
|
|
|
|
|
|
def switchToFirstElementHandlers(): Unit = {
|
|
|
|
|
sourceOut.setHandler(new OutHandler {
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
sourceOut.push(firstElement)
|
|
|
|
|
if (completed) internalSourceComplete() else switchToFinalHandlers()
|
|
|
|
|
}
|
|
|
|
|
override def onDownstreamFinish(): Unit = internalSourceComplete()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = sourceOut.push(grab(in))
|
2017-02-16 03:19:25 -05:00
|
|
|
override def onUpstreamFinish(): Unit = gotCompletionEvent()
|
2016-07-07 07:01:28 -04:00
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def switchToFinalHandlers(): Unit = {
|
|
|
|
|
sourceOut.setHandler(new OutHandler {
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
override def onDownstreamFinish(): Unit = internalSourceComplete()
|
|
|
|
|
})
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = sourceOut.push(grab(in))
|
|
|
|
|
override def onUpstreamFinish(): Unit = internalSourceComplete()
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def internalSourceComplete(): Unit = {
|
|
|
|
|
sourceOut.complete()
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def internalSourceFailure(ex: Throwable): Unit = {
|
|
|
|
|
sourceOut.fail(ex)
|
|
|
|
|
failStage(ex)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switchToFirstElementHandlers()
|
|
|
|
|
promise.trySuccess(Source.fromGraph(sourceOut.source).runWith(sink)(interpreter.subFusingMaterializer))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
(stageLogic, promise.future)
|
|
|
|
|
}
|
|
|
|
|
}
|