2015-01-28 14:19:50 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.stream.impl
|
|
|
|
|
|
2015-05-29 16:43:02 +02:00
|
|
|
import akka.actor.{ Deploy, ActorRef, Props }
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream.impl.StreamLayout.Module
|
2015-07-06 22:00:21 +02:00
|
|
|
import akka.stream.{ Attributes, Inlet, Shape, SinkShape, MaterializationContext, ActorMaterializer }
|
2015-01-28 14:19:50 +01:00
|
|
|
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
|
|
|
|
import scala.annotation.unchecked.uncheckedVariance
|
|
|
|
|
import scala.concurrent.{ Future, Promise }
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
def create(context: MaterializationContext): (Subscriber[In] @uncheckedVariance, Mat)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
override def replaceShape(s: Shape): Module =
|
|
|
|
|
if (s == shape) this
|
|
|
|
|
else throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that")
|
|
|
|
|
|
|
|
|
|
// This is okay since we the only caller of this method is right below.
|
|
|
|
|
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
|
|
|
|
|
|
2015-06-13 16:28:38 -04:00
|
|
|
override def carbonCopy: Module = newInstance(SinkShape(shape.inlet.carbonCopy()))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
override def subModules: Set[Module] = Set.empty
|
2015-03-05 12:21:17 +01:00
|
|
|
|
2015-07-06 22:00:21 +02:00
|
|
|
protected def amendShape(attr: Attributes): SinkShape[In] = {
|
|
|
|
|
val thisN = attributes.nameOrDefault(null)
|
|
|
|
|
val thatN = attr.nameOrDefault(null)
|
|
|
|
|
|
|
|
|
|
if ((thatN eq null) || thisN == thatN) shape
|
|
|
|
|
else shape.copy(inlet = Inlet(thatN + ".in"))
|
2015-03-05 12:21:17 +01:00
|
|
|
}
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow.
|
|
|
|
|
* The stream will not have any subscribers attached at this point, which means that after prefetching
|
|
|
|
|
* elements to fill the internal buffers it will assert back-pressure until
|
|
|
|
|
* a subscriber connects and creates demand for elements to be emitted.
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
override def toString: String = "PublisherSink"
|
|
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
|
2015-06-16 15:34:54 +02:00
|
|
|
val proc = new VirtualProcessor[In]
|
|
|
|
|
(proc, proc)
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module = new PublisherSink[In](attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final class FanoutPublisherSink[In](
|
2015-01-28 14:19:50 +01:00
|
|
|
initialBufferSize: Int,
|
|
|
|
|
maximumBufferSize: Int,
|
2015-06-23 17:32:55 +02:00
|
|
|
val attributes: Attributes,
|
2015-01-28 14:19:50 +01:00
|
|
|
shape: SinkShape[In])
|
|
|
|
|
extends SinkModule[In, Publisher[In]](shape) {
|
|
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
|
2015-06-23 18:28:53 +02:00
|
|
|
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
|
2015-04-10 14:39:48 +02:00
|
|
|
val fanoutActor = actorMaterializer.actorOf(context,
|
|
|
|
|
Props(new FanoutProcessorImpl(actorMaterializer.effectiveSettings(context.effectiveAttributes),
|
2015-05-29 16:43:02 +02:00
|
|
|
initialBufferSize, maximumBufferSize)).withDeploy(Deploy.local))
|
2015-01-28 14:19:50 +01:00
|
|
|
val fanoutProcessor = ActorProcessorFactory[In, In](fanoutActor)
|
|
|
|
|
(fanoutProcessor, fanoutProcessor)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
|
|
|
|
|
new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attributes, shape)
|
|
|
|
|
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module =
|
2015-03-05 12:21:17 +01:00
|
|
|
new FanoutPublisherSink[In](initialBufferSize, maximumBufferSize, attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object HeadSink {
|
2015-06-14 03:12:30 -04:00
|
|
|
final class HeadSinkSubscriber[In] extends Subscriber[In] {
|
|
|
|
|
private[this] var subscription: Subscription = null
|
|
|
|
|
private[this] val promise: Promise[In] = Promise[In]()
|
|
|
|
|
def future: Future[In] = promise.future
|
2015-03-03 10:57:25 +01:00
|
|
|
override def onSubscribe(s: Subscription): Unit = {
|
|
|
|
|
ReactiveStreamsCompliance.requireNonNullSubscription(s)
|
2015-06-14 03:12:30 -04:00
|
|
|
if (subscription ne null) s.cancel()
|
|
|
|
|
else {
|
|
|
|
|
subscription = s
|
|
|
|
|
s.request(1)
|
|
|
|
|
}
|
2015-03-03 10:57:25 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onNext(elem: In): Unit = {
|
|
|
|
|
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
2015-06-14 03:12:30 -04:00
|
|
|
promise.trySuccess(elem)
|
|
|
|
|
subscription.cancel()
|
|
|
|
|
subscription = null
|
2015-03-03 10:57:25 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onError(t: Throwable): Unit = {
|
|
|
|
|
ReactiveStreamsCompliance.requireNonNullException(t)
|
2015-06-14 03:12:30 -04:00
|
|
|
promise.tryFailure(t)
|
2015-03-03 10:57:25 +01:00
|
|
|
}
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-06-14 03:12:30 -04:00
|
|
|
override def onComplete(): Unit =
|
|
|
|
|
promise.tryFailure(new NoSuchElementException("empty stream"))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
|
|
|
|
|
* thing that is signaled to this stream, which can be either an element (after
|
|
|
|
|
* which the upstream subscription is canceled), an error condition (putting
|
|
|
|
|
* the Future into the corresponding failed state) or the end-of-stream
|
|
|
|
|
* (failing the Future with a NoSuchElementException).
|
|
|
|
|
*/
|
2015-06-14 03:12:30 -04:00
|
|
|
private[akka] final class HeadSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) {
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2015-06-14 03:12:30 -04:00
|
|
|
val sub = new HeadSink.HeadSinkSubscriber[In]
|
|
|
|
|
(sub, sub.future)
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module = new HeadSink[In](attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
override def toString: String = "HeadSink"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Attaches a subscriber to this stream which will just discard all received
|
|
|
|
|
* elements.
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] final class BlackholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2015-06-23 18:28:53 +02:00
|
|
|
val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
|
2015-06-05 18:26:32 +02:00
|
|
|
val p = Promise[Unit]()
|
|
|
|
|
(new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize, p), p.future)
|
2015-04-10 14:39:48 +02:00
|
|
|
}
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-06-05 18:26:32 +02:00
|
|
|
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new BlackholeSink(attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module = new BlackholeSink(attr, amendShape(attr))
|
2015-06-14 03:12:30 -04:00
|
|
|
override def toString: String = "BlackholeSink"
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Attaches a subscriber to this stream.
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = (subscriber, ())
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr))
|
2015-06-14 03:12:30 -04:00
|
|
|
override def toString: String = "SubscriberSink"
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* A sink that immediately cancels its upstream upon materialization.
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
|
2015-06-14 03:12:30 -04:00
|
|
|
override def create(context: MaterializationContext): (Subscriber[Any], Unit) = (new CancellingSubscriber[Any], ())
|
2015-01-28 14:19:50 +01:00
|
|
|
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new CancelSink(attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module = new CancelSink(attr, amendShape(attr))
|
2015-06-14 03:12:30 -04:00
|
|
|
override def toString: String = "CancelSink"
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
|
|
|
|
|
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2015-06-23 18:28:53 +02:00
|
|
|
val subscriberRef = ActorMaterializer.downcast(context.materializer).actorOf(context, props)
|
2015-01-28 14:19:50 +01:00
|
|
|
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
|
|
|
|
|
}
|
|
|
|
|
|
2015-03-30 14:42:30 +02:00
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr))
|
2015-06-14 03:12:30 -04:00
|
|
|
override def toString: String = "ActorSubscriberSink"
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
2015-03-30 14:42:30 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
|
2015-06-23 17:32:55 +02:00
|
|
|
val attributes: Attributes,
|
2015-03-30 14:42:30 +02:00
|
|
|
shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
|
|
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2015-06-23 18:28:53 +02:00
|
|
|
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
|
2015-04-10 14:39:48 +02:00
|
|
|
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
|
|
|
|
|
val subscriberRef = actorMaterializer.actorOf(context,
|
|
|
|
|
ActorRefSinkActor.props(ref, effectiveSettings.maxInputBufferSize, onCompleteMessage))
|
2015-03-30 14:42:30 +02:00
|
|
|
(akka.stream.actor.ActorSubscriber[In](subscriberRef), ())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] =
|
|
|
|
|
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
|
2015-06-23 17:32:55 +02:00
|
|
|
override def withAttributes(attr: Attributes): Module =
|
2015-03-30 14:42:30 +02:00
|
|
|
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
|
2015-06-14 03:12:30 -04:00
|
|
|
override def toString: String = "ActorRefSink"
|
2015-03-30 14:42:30 +02:00
|
|
|
}
|
|
|
|
|
|