2015-01-28 14:19:50 +01:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
2015-01-28 14:19:50 +01:00
|
|
|
*/
|
|
|
|
|
package akka.stream.impl
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.NotUsed
|
2015-08-19 23:04:20 -04:00
|
|
|
import akka.actor._
|
2015-04-16 02:24:01 +02:00
|
|
|
import akka.stream._
|
2016-03-11 17:08:30 +01:00
|
|
|
import akka.stream.impl.StreamLayout.AtomicModule
|
2015-01-28 14:19:50 +01:00
|
|
|
import org.reactivestreams._
|
|
|
|
|
import scala.annotation.unchecked.uncheckedVariance
|
2016-01-16 12:17:19 -05:00
|
|
|
import scala.concurrent.Promise
|
2016-03-11 17:08:30 +01:00
|
|
|
import akka.event.Logging
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-03-11 17:08:30 +01:00
|
|
|
private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule {
|
|
|
|
|
|
|
|
|
|
protected def label: String = Logging.simpleName(this)
|
|
|
|
|
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override def replaceShape(s: Shape): AtomicModule =
|
2016-02-15 10:37:19 +01:00
|
|
|
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that")
|
|
|
|
|
else this
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
// This is okay since the only caller of this method is right below.
|
|
|
|
|
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]
|
|
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override def carbonCopy: AtomicModule = newInstance(SourceShape(shape.out.carbonCopy()))
|
2015-03-05 12:21:17 +01:00
|
|
|
|
2015-07-06 22:00:21 +02:00
|
|
|
protected def amendShape(attr: Attributes): SourceShape[Out] = {
|
|
|
|
|
val thisN = attributes.nameOrDefault(null)
|
|
|
|
|
val thatN = attr.nameOrDefault(null)
|
2015-03-05 12:21:17 +01:00
|
|
|
|
2015-07-06 22:00:21 +02:00
|
|
|
if ((thatN eq null) || thisN == thatN) shape
|
2015-12-14 14:52:06 +01:00
|
|
|
else shape.copy(out = Outlet(thatN + ".out"))
|
2015-07-06 22:00:21 +02:00
|
|
|
}
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Holds a `Subscriber` representing the input side of the flow.
|
|
|
|
|
* The `Subscriber` can later be connected to an upstream `Publisher`.
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext): (Publisher[Out], Subscriber[Out]) = {
|
2015-06-16 15:34:54 +02:00
|
|
|
val processor = new VirtualProcessor[Out]
|
2015-01-28 14:19:50 +01:00
|
|
|
(processor, processor)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape)
|
2016-03-11 17:08:30 +01:00
|
|
|
override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSource[Out](attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Construct a transformation starting with given publisher. The transformation steps
|
|
|
|
|
* are executed by a series of [[org.reactivestreams.Processor]] instances
|
|
|
|
|
* that mediate the flow of elements downstream and the propagation of
|
|
|
|
|
* back-pressure upstream.
|
|
|
|
|
*/
|
2016-01-20 10:00:37 +02:00
|
|
|
private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) {
|
2016-03-11 17:08:30 +01:00
|
|
|
|
|
|
|
|
override protected def label: String = s"PublisherSource($p)"
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
override def create(context: MaterializationContext) = (p, NotUsed)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attributes, shape)
|
2016-03-11 17:08:30 +01:00
|
|
|
override def withAttributes(attr: Attributes): AtomicModule = new PublisherSource[Out](p, attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-10-21 22:45:39 +02:00
|
|
|
private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) {
|
2016-03-11 17:08:30 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2015-10-21 22:45:39 +02:00
|
|
|
val p = Promise[Option[Out]]()
|
|
|
|
|
new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) → p
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
2015-10-21 22:45:39 +02:00
|
|
|
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape)
|
2016-03-11 17:08:30 +01:00
|
|
|
override def withAttributes(attr: Attributes): AtomicModule = new MaybeSource(attr, amendShape(attr))
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-03-05 12:21:17 +01:00
|
|
|
* INTERNAL API
|
2015-01-28 14:19:50 +01:00
|
|
|
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
|
|
|
|
|
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
|
|
|
|
|
*/
|
2015-06-23 17:32:55 +02:00
|
|
|
private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2015-06-23 18:28:53 +02:00
|
|
|
val publisherRef = ActorMaterializer.downcast(context.materializer).actorOf(context, props)
|
2015-01-28 14:19:50 +01:00
|
|
|
(akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-19 17:19:45 +01:00
|
|
|
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
|
|
|
|
|
new ActorPublisherSource[Out](props, attributes, shape)
|
2016-03-11 17:08:30 +01:00
|
|
|
override def withAttributes(attr: Attributes): AtomicModule = new ActorPublisherSource(props, attr, amendShape(attr))
|
2015-03-31 15:13:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final class ActorRefSource[Out](
|
2015-06-23 17:32:55 +02:00
|
|
|
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
|
2015-03-31 15:13:57 +02:00
|
|
|
extends SourceModule[Out, ActorRef](shape) {
|
|
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"
|
|
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override def create(context: MaterializationContext) = {
|
2016-02-07 14:54:48 +01:00
|
|
|
val mat = ActorMaterializer.downcast(context.materializer)
|
|
|
|
|
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
|
2015-03-31 15:13:57 +02:00
|
|
|
(akka.stream.actor.ActorPublisher[Out](ref), ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
|
|
|
|
|
new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape)
|
2016-03-11 17:08:30 +01:00
|
|
|
override def withAttributes(attr: Attributes): AtomicModule =
|
2015-03-31 15:13:57 +02:00
|
|
|
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
|
2015-03-03 10:57:25 +01:00
|
|
|
}
|