diff --git a/akka-docs-dev/rst/java/stream-integrations.rst b/akka-docs-dev/rst/java/stream-integrations.rst index 5b93cf8abf..f1322de9cf 100644 --- a/akka-docs-dev/rst/java/stream-integrations.rst +++ b/akka-docs-dev/rst/java/stream-integrations.rst @@ -8,7 +8,8 @@ Integrating with Actors ======================= For piping the elements of a stream as messages to an ordinary actor you can use the -``Sink.actorRef``. +``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is +materialized by ``Source.actorRef``. For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with @@ -28,6 +29,25 @@ Akka Streams :class:`Source` or :class:`Sink`. prior to 8, Akka provides :class:`UntypedActorPublisher` and :class:`UntypedActorSubscriber` which can be used easily from any language level. +Source.actorRef +^^^^^^^^^^^^^^^ + +Messages sent to the actor that is materialized by ``Source.actorRef`` will be emitted to the +stream if there is demand from downstream, otherwise they will be buffered until request for +demand is received. + +Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space +available in the buffer. + +The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or +``akka.actor.Status.Success`` to the actor reference. + +The stream can be completed with failure by sending ``akka.actor.Status.Failure`` to the +actor reference. + +The actor will be stopped when the stream is completed, failed or cancelled from downstream, +i.e. you can watch it to get notified when that happens. + Sink.actorRef ^^^^^^^^^^^^^ diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala index 5b39c6208b..977918cab7 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ActorPublisherDocSpec.scala @@ -76,7 +76,7 @@ class ActorPublisherDocSpec extends AkkaSpec { testActor ! s //#actor-publisher-usage - val jobManagerSource = Source[JobManager.Job](JobManager.props) + val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props) val ref = Flow[JobManager.Job] .map(_.payload.toUpperCase) .map { elem => println(elem); elem } diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index 7016c3070f..e63d4d2f0a 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -8,7 +8,8 @@ Integrating with Actors ======================= For piping the elements of a stream as messages to an ordinary actor you can use the -``Sink.actorRef``. +``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is +materialized by ``Source.actorRef``. For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with @@ -23,10 +24,29 @@ Akka Streams :class:`Source` or :class:`Sink`. because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the the stream may deadlock. +Source.actorRef +^^^^^^^^^^^^^^^ + +Messages sent to the actor that is materialized by ``Source.actorRef`` will be emitted to the +stream if there is demand from downstream, otherwise they will be buffered until request for +demand is received. + +Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space +available in the buffer. + +The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or +``akka.actor.Status.Success`` to the actor reference. + +The stream can be completed with failure by sending ``akka.actor.Status.Failure`` to the +actor reference. + +The actor will be stopped when the stream is completed, failed or cancelled from downstream, +i.e. you can watch it to get notified when that happens. + Sink.actorRef ^^^^^^^^^^^^^ -The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates +The sink sends the elements of the stream to the given :class:`ActorRef`. If the target actor terminates the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage`` will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure`` message will be sent to the destination actor. diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 1f73d58101..3fc153a3a7 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -63,7 +63,7 @@ private[http] object HttpServer { val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log) @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 - val oneHundredContinueSource = Source[OneHundredContinue.type] { + val oneHundredContinueSource = Source.actorPublisher[OneHundredContinue.type] { Props { val actor = new TokenSourceActor(OneHundredContinue) oneHundredContinueRef = Some(actor.context.self) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 276f6a8194..b5ab867b66 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -23,11 +23,9 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; import scala.util.Try; - import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; public class SourceTest extends StreamTest { @@ -459,4 +457,19 @@ public class SourceTest extends StreamTest { assertEquals(result.size(), 10000); for (Integer i: result) assertEquals(i, (Integer) 42); } + + @Test + public void mustBeAbleToUseActorRefSource() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source actorRefSource = Source.actorRef(10, OverflowStrategy.fail()); + final ActorRef ref = actorRefSource.to(Sink.foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + })).run(materializer); + ref.tell(1, ActorRef.noSender()); + probe.expectMsgEquals(1); + ref.tell(2, ActorRef.noSender()); + probe.expectMsgEquals(2); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 79a96679ba..0369327bb3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -253,7 +253,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { implicit val materializer = ActorFlowMaterializer() val probe = TestProbe() - val source = Source[Int](senderProps) + val source: Source[Int, ActorRef] = Source.actorPublisher(senderProps) val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref)) val (snd, rcv) = source.collect { @@ -286,7 +286,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref)))) val sink2: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe2.ref)) - val senderRef2 = FlowGraph.closed(Source[Int](senderProps)) { implicit b ⇒ + val senderRef2 = FlowGraph.closed(Source.actorPublisher[Int](senderProps)) { implicit b ⇒ source2 ⇒ import FlowGraph.Implicits._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala new file mode 100644 index 0000000000..d2b2e24281 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import scala.concurrent.duration._ +import akka.stream.ActorFlowMaterializer +import akka.stream.OverflowStrategy +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.actor.PoisonPill +import akka.actor.Status + +class ActorRefSourceSpec extends AkkaSpec { + implicit val mat = ActorFlowMaterializer() + + "A ActorRefSource" must { + + "emit received messages to the stream" in { + val s = StreamTestKit.SubscriberProbe[Int]() + val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() + val sub = s.expectSubscription + sub.request(2) + ref ! 1 + s.expectNext(1) + ref ! 2 + s.expectNext(2) + ref ! 3 + s.expectNoMsg(500.millis) + } + + "buffer when needed" in { + val s = StreamTestKit.SubscriberProbe[Int]() + val ref = Source.actorRef(100, OverflowStrategy.dropHead).to(Sink(s)).run() + val sub = s.expectSubscription + for (n ← 1 to 20) ref ! n + sub.request(10) + for (n ← 1 to 10) s.expectNext(n) + sub.request(10) + for (n ← 11 to 20) s.expectNext(n) + + for (n ← 200 to 399) ref ! n + sub.request(100) + for (n ← 300 to 399) s.expectNext(n) + } + + "terminate when the stream is cancelled" in { + val s = StreamTestKit.SubscriberProbe[Int]() + val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink(s)).run() + watch(ref) + val sub = s.expectSubscription + sub.cancel() + expectTerminated(ref) + } + + "complete the stream when receiving PoisonPill" in { + val s = StreamTestKit.SubscriberProbe[Int]() + val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() + val sub = s.expectSubscription + ref ! PoisonPill + s.expectComplete() + } + + "complete the stream when receiving Status.Success" in { + val s = StreamTestKit.SubscriberProbe[Int]() + val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() + val sub = s.expectSubscription + ref ! Status.Success("ok") + s.expectComplete() + } + + "fail the stream when receiving Status.Failure" in { + val s = StreamTestKit.SubscriberProbe[Int]() + val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() + val sub = s.expectSubscription + val exc = StreamTestKit.TE("testfailure") + ref ! Status.Failure(exc) + s.expectError(exc) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala new file mode 100644 index 0000000000..809d449a01 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.ActorLogging +import akka.actor.Props +import akka.actor.Status +import akka.stream.OverflowStrategy + +/** + * INTERNAL API + */ +private[akka] object ActorRefSourceActor { + def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = { + require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported") + Props(new ActorRefSourceActor(bufferSize, overflowStrategy)) + } +} + +/** + * INTERNAL API + */ +private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy) + extends akka.stream.actor.ActorPublisher[Any] with ActorLogging { + import akka.stream.actor.ActorPublisherMessage._ + import akka.stream.OverflowStrategy._ + + // when bufferSize is 0 there the buffer is not used + private val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize) + + def receive = { + case _: Request ⇒ + // totalDemand is tracked by super + while (totalDemand > 0L && !buffer.isEmpty) + onNext(buffer.dequeue()) + + case Cancel ⇒ + context.stop(self) + + case _: Status.Success ⇒ + context.stop(self) // will complete the stream successfully + + case Status.Failure(cause) if isActive ⇒ + onError(cause) + context.stop(self) + + case elem if isActive ⇒ + if (totalDemand > 0L) + onNext(elem) + else if (bufferSize == 0) + log.debug("Dropping element because there is no downstream demand: [{}]", elem) + else if (!buffer.isFull) + buffer.enqueue(elem) + else overflowStrategy match { + case DropHead ⇒ + buffer.dropHead() + buffer.enqueue(elem) + case DropTail ⇒ + buffer.dropTail() + buffer.enqueue(elem) + case DropBuffer ⇒ + buffer.clear() + buffer.enqueue(elem) + case Fail ⇒ + onError(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) + context.stop(self) + case Backpressure ⇒ + // there is a precondition check in Source.actorRefSource factory method + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 885a38e146..201294f3d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props } import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.OperationAttributes -import akka.stream.{ Outlet, Shape, SourceShape } +import akka.stream.{ Outlet, OverflowStrategy, Shape, SourceShape } import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance @@ -173,13 +173,32 @@ private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. */ -private[akka] final class PropsSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { +private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { - val publisherRef = materializer.actorOf(props, name = s"$flowName-0-props") + val publisherRef = materializer.actorOf(props, name = s"$flowName-0-actorPublisher") (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) } - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new PropsSource[Out](props, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, amendShape(attr)) + override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new ActorPublisherSource[Out](props, attributes, shape) + override def withAttributes(attr: OperationAttributes): Module = new ActorPublisherSource(props, attr, amendShape(attr)) +} + +/** + * INTERNAL API + */ +private[akka] final class ActorRefSource[Out]( + bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: OperationAttributes, shape: SourceShape[Out]) + extends SourceModule[Out, ActorRef](shape) { + + override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { + val ref = materializer.actorOf(ActorRefSourceActor.props(bufferSize, overflowStrategy), + name = s"$flowName-0-actorRef") + (akka.stream.actor.ActorPublisher[Out](ref), ref) + } + + override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = + new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape) + override def withAttributes(attr: OperationAttributes): Module = + new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 86362cd658..7f0acde809 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -101,7 +101,7 @@ object Sink { * i.e. if the actor is not consuming the messages fast enough the mailbox * of the actor will grow. For potentially slow consumer actors it is recommended * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate - * limiting stage in front of this stage. + * limiting stage in front of this `Sink`. * */ def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, Unit] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b02c3c885c..0123c873cc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -7,7 +7,7 @@ import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } import akka.japi.Util import akka.stream._ -import akka.stream.impl.PropsSource +import akka.stream.impl.ActorPublisherSource import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.annotation.unchecked.uncheckedVariance @@ -118,14 +118,6 @@ object Source { def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] = new Source(scaladsl.Source(initialDelay, interval, tick)) - /** - * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorPublisher]]. - */ - def from[T](props: Props): Source[T, ActorRef] = - new Source(scaladsl.Source.apply(props)) - /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. @@ -157,6 +149,41 @@ object Source { def subscriber[T](): Source[T, Subscriber[T]] = new Source(scaladsl.Source.subscriber) + /** + * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorPublisher]]. + */ + def actorPublisher[T](props: Props): Source[T, ActorRef] = + new Source(scaladsl.Source.actorPublisher(props)) + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped + * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does + * not matter. + * + * The stream can be completed successfully by sending [[akka.actor.PoisonPill]] or + * [[akka.actor.Status.Success]] to the actor reference. + * + * The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the + * actor reference. + * + * The actor will be stopped when the stream is completed, failed or cancelled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = + new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy)) + /** * Concatenates two sources so that the first element * emitted by the second source is emitted after the last element of the first diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index e19dfc21cc..2a6451e6ea 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -212,7 +212,7 @@ object Sink extends SinkApply { * i.e. if the actor is not consuming the messages fast enough the mailbox * of the actor will grow. For potentially slow consumer actors it is recommended * to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate - * limiting stage in front of this stage. + * limiting stage in front of this `Sink`. */ def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] = new Sink(new ActorRefSink(ref, onCompleteMessage, none, shape("ActorRefSink"))) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index c2937dfd7e..e897583f82 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -23,6 +23,7 @@ import akka.actor.ActorRef import scala.concurrent.Promise import org.reactivestreams.Subscriber import akka.stream.stage.SyncDirective +import akka.stream.OverflowStrategy /** * A `Source` is a set of stream processing steps that has one open output. It can comprise @@ -262,14 +263,6 @@ object Source extends SourceApply { def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = new Source(new TickSource(initialDelay, interval, tick, none, shape("TickSource"))) - /** - * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorPublisher]]. - */ - def apply[T](props: Props): Source[T, ActorRef] = - new Source(new PropsSource(props, none, shape("PropsSource"))) - /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. @@ -327,4 +320,42 @@ object Source extends SourceApply { def subscriber[T]: Source[T, Subscriber[T]] = new Source(new SubscriberSource[T](none, shape("SubscriberSource"))) + /** + * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorPublisher]]. + */ + def actorPublisher[T](props: Props): Source[T, ActorRef] = + new Source(new ActorPublisherSource(props, none, shape("ActorPublisherSource"))) + + /** + * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. + * Messages sent to this actor will be emitted to the stream if there is demand from downstream, + * otherwise they will be buffered until request for demand is received. + * + * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if + * there is no space available in the buffer. + * + * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped + * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does + * not matter. + * + * The stream can be completed successfully by sending [[akka.actor.PoisonPill]] or + * [[akka.actor.Status.Success]] to the actor reference. + * + * The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the + * actor reference. + * + * The actor will be stopped when the stream is completed, failed or cancelled from downstream, + * i.e. you can watch it to get notified when that happens. + * + * @param bufferSize The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { + require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") + require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported") + new Source(new ActorRefSource(bufferSize, overflowStrategy, none, shape("ActorRefSource"))) + } + }