!str #16520 Add ActorRefSource

* and rename the factory for ActorPublisherSource,
  from Source.apply to Source.actorPublisher

* including internal buffer, with OverflowStrategy

* support to complete/fail stream
This commit is contained in:
Patrik Nordwall 2015-03-31 15:13:57 +02:00
parent 946faedd95
commit f4ed62b84c
13 changed files with 318 additions and 33 deletions

View file

@ -8,7 +8,8 @@ Integrating with Actors
=======================
For piping the elements of a stream as messages to an ordinary actor you can use the
``Sink.actorRef``.
``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is
materialized by ``Source.actorRef``.
For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
@ -28,6 +29,25 @@ Akka Streams :class:`Source` or :class:`Sink`.
prior to 8, Akka provides :class:`UntypedActorPublisher` and :class:`UntypedActorSubscriber` which can be used
easily from any language level.
Source.actorRef
^^^^^^^^^^^^^^^
Messages sent to the actor that is materialized by ``Source.actorRef`` will be emitted to the
stream if there is demand from downstream, otherwise they will be buffered until request for
demand is received.
Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space
available in the buffer.
The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or
``akka.actor.Status.Success`` to the actor reference.
The stream can be completed with failure by sending ``akka.actor.Status.Failure`` to the
actor reference.
The actor will be stopped when the stream is completed, failed or cancelled from downstream,
i.e. you can watch it to get notified when that happens.
Sink.actorRef
^^^^^^^^^^^^^

View file

@ -76,7 +76,7 @@ class ActorPublisherDocSpec extends AkkaSpec {
testActor ! s
//#actor-publisher-usage
val jobManagerSource = Source[JobManager.Job](JobManager.props)
val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props)
val ref = Flow[JobManager.Job]
.map(_.payload.toUpperCase)
.map { elem => println(elem); elem }

View file

@ -8,7 +8,8 @@ Integrating with Actors
=======================
For piping the elements of a stream as messages to an ordinary actor you can use the
``Sink.actorRef``.
``Sink.actorRef``. Messages can be sent to a stream via the :class:`ActorRef` that is
materialized by ``Source.actorRef``.
For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
@ -23,10 +24,29 @@ Akka Streams :class:`Source` or :class:`Sink`.
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
Source.actorRef
^^^^^^^^^^^^^^^
Messages sent to the actor that is materialized by ``Source.actorRef`` will be emitted to the
stream if there is demand from downstream, otherwise they will be buffered until request for
demand is received.
Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space
available in the buffer.
The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or
``akka.actor.Status.Success`` to the actor reference.
The stream can be completed with failure by sending ``akka.actor.Status.Failure`` to the
actor reference.
The actor will be stopped when the stream is completed, failed or cancelled from downstream,
i.e. you can watch it to get notified when that happens.
Sink.actorRef
^^^^^^^^^^^^^
The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates
The sink sends the elements of the stream to the given :class:`ActorRef`. If the target actor terminates
the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage``
will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure``
message will be sent to the destination actor.

View file

@ -63,7 +63,7 @@ private[http] object HttpServer {
val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log)
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
val oneHundredContinueSource = Source.actorPublisher[OneHundredContinue.type] {
Props {
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)

View file

@ -23,11 +23,9 @@ import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.util.Try;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class SourceTest extends StreamTest {
@ -459,4 +457,19 @@ public class SourceTest extends StreamTest {
assertEquals(result.size(), 10000);
for (Integer i: result) assertEquals(i, (Integer) 42);
}
@Test
public void mustBeAbleToUseActorRefSource() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, ActorRef> actorRefSource = Source.actorRef(10, OverflowStrategy.fail());
final ActorRef ref = actorRefSource.to(Sink.foreach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
})).run(materializer);
ref.tell(1, ActorRef.noSender());
probe.expectMsgEquals(1);
ref.tell(2, ActorRef.noSender());
probe.expectMsgEquals(2);
}
}

View file

@ -253,7 +253,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
implicit val materializer = ActorFlowMaterializer()
val probe = TestProbe()
val source = Source[Int](senderProps)
val source: Source[Int, ActorRef] = Source.actorPublisher(senderProps)
val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref))
val (snd, rcv) = source.collect {
@ -286,7 +286,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref))))
val sink2: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe2.ref))
val senderRef2 = FlowGraph.closed(Source[Int](senderProps)) { implicit b
val senderRef2 = FlowGraph.closed(Source.actorPublisher[Int](senderProps)) { implicit b
source2
import FlowGraph.Implicits._

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.OverflowStrategy
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.actor.PoisonPill
import akka.actor.Status
class ActorRefSourceSpec extends AkkaSpec {
implicit val mat = ActorFlowMaterializer()
"A ActorRefSource" must {
"emit received messages to the stream" in {
val s = StreamTestKit.SubscriberProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
sub.request(2)
ref ! 1
s.expectNext(1)
ref ! 2
s.expectNext(2)
ref ! 3
s.expectNoMsg(500.millis)
}
"buffer when needed" in {
val s = StreamTestKit.SubscriberProbe[Int]()
val ref = Source.actorRef(100, OverflowStrategy.dropHead).to(Sink(s)).run()
val sub = s.expectSubscription
for (n 1 to 20) ref ! n
sub.request(10)
for (n 1 to 10) s.expectNext(n)
sub.request(10)
for (n 11 to 20) s.expectNext(n)
for (n 200 to 399) ref ! n
sub.request(100)
for (n 300 to 399) s.expectNext(n)
}
"terminate when the stream is cancelled" in {
val s = StreamTestKit.SubscriberProbe[Int]()
val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink(s)).run()
watch(ref)
val sub = s.expectSubscription
sub.cancel()
expectTerminated(ref)
}
"complete the stream when receiving PoisonPill" in {
val s = StreamTestKit.SubscriberProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
ref ! PoisonPill
s.expectComplete()
}
"complete the stream when receiving Status.Success" in {
val s = StreamTestKit.SubscriberProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
ref ! Status.Success("ok")
s.expectComplete()
}
"fail the stream when receiving Status.Failure" in {
val s = StreamTestKit.SubscriberProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
val exc = StreamTestKit.TE("testfailure")
ref ! Status.Failure(exc)
s.expectError(exc)
}
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.Status
import akka.stream.OverflowStrategy
/**
* INTERNAL API
*/
private[akka] object ActorRefSourceActor {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = {
require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported")
Props(new ActorRefSourceActor(bufferSize, overflowStrategy))
}
}
/**
* INTERNAL API
*/
private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy)
extends akka.stream.actor.ActorPublisher[Any] with ActorLogging {
import akka.stream.actor.ActorPublisherMessage._
import akka.stream.OverflowStrategy._
// when bufferSize is 0 there the buffer is not used
private val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize)
def receive = {
case _: Request
// totalDemand is tracked by super
while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue())
case Cancel
context.stop(self)
case _: Status.Success
context.stop(self) // will complete the stream successfully
case Status.Failure(cause) if isActive
onError(cause)
context.stop(self)
case elem if isActive
if (totalDemand > 0L)
onNext(elem)
else if (bufferSize == 0)
log.debug("Dropping element because there is no downstream demand: [{}]", elem)
else if (!buffer.isFull)
buffer.enqueue(elem)
else overflowStrategy match {
case DropHead
buffer.dropHead()
buffer.enqueue(elem)
case DropTail
buffer.dropTail()
buffer.enqueue(elem)
case DropBuffer
buffer.clear()
buffer.enqueue(elem)
case Fail
onError(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
context.stop(self)
case Backpressure
// there is a precondition check in Source.actorRefSource factory method
}
}
}

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props }
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.OperationAttributes
import akka.stream.{ Outlet, Shape, SourceShape }
import akka.stream.{ Outlet, OverflowStrategy, Shape, SourceShape }
import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
@ -173,13 +173,32 @@ private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
private[akka] final class PropsSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val publisherRef = materializer.actorOf(props, name = s"$flowName-0-props")
val publisherRef = materializer.actorOf(props, name = s"$flowName-0-actorPublisher")
(akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new PropsSource[Out](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, amendShape(attr))
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new ActorPublisherSource[Out](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new ActorPublisherSource(props, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
private[akka] final class ActorRefSource[Out](
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: OperationAttributes, shape: SourceShape[Out])
extends SourceModule[Out, ActorRef](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val ref = materializer.actorOf(ActorRefSourceActor.props(bufferSize, overflowStrategy),
name = s"$flowName-0-actorRef")
(akka.stream.actor.ActorPublisher[Out](ref), ref)
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module =
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
}

View file

@ -101,7 +101,7 @@ object Sink {
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this stage.
* limiting stage in front of this `Sink`.
*
*/
def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, Unit] =

View file

@ -7,7 +7,7 @@ import java.util.concurrent.Callable
import akka.actor.{ Cancellable, ActorRef, Props }
import akka.japi.Util
import akka.stream._
import akka.stream.impl.PropsSource
import akka.stream.impl.ActorPublisherSource
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.unchecked.uncheckedVariance
@ -118,14 +118,6 @@ object Source {
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] =
new Source(scaladsl.Source(initialDelay, interval, tick))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def from[T](props: Props): Source[T, ActorRef] =
new Source(scaladsl.Source.apply(props))
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
@ -157,6 +149,41 @@ object Source {
def subscriber[T](): Source[T, Subscriber[T]] =
new Source(scaladsl.Source.subscriber)
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def actorPublisher[T](props: Props): Source[T, ActorRef] =
new Source(scaladsl.Source.actorPublisher(props))
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
*
* The stream can be completed successfully by sending [[akka.actor.PoisonPill]] or
* [[akka.actor.Status.Success]] to the actor reference.
*
* The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the
* actor reference.
*
* The actor will be stopped when the stream is completed, failed or cancelled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy))
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first

View file

@ -212,7 +212,7 @@ object Sink extends SinkApply {
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this stage.
* limiting stage in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] =
new Sink(new ActorRefSink(ref, onCompleteMessage, none, shape("ActorRefSink")))

View file

@ -23,6 +23,7 @@ import akka.actor.ActorRef
import scala.concurrent.Promise
import org.reactivestreams.Subscriber
import akka.stream.stage.SyncDirective
import akka.stream.OverflowStrategy
/**
* A `Source` is a set of stream processing steps that has one open output. It can comprise
@ -262,14 +263,6 @@ object Source extends SourceApply {
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
new Source(new TickSource(initialDelay, interval, tick, none, shape("TickSource")))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def apply[T](props: Props): Source[T, ActorRef] =
new Source(new PropsSource(props, none, shape("PropsSource")))
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
@ -327,4 +320,42 @@ object Source extends SourceApply {
def subscriber[T]: Source[T, Subscriber[T]] =
new Source(new SubscriberSource[T](none, shape("SubscriberSource")))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def actorPublisher[T](props: Props): Source[T, ActorRef] =
new Source(new ActorPublisherSource(props, none, shape("ActorPublisherSource")))
/**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
*
* The stream can be completed successfully by sending [[akka.actor.PoisonPill]] or
* [[akka.actor.Status.Success]] to the actor reference.
*
* The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the
* actor reference.
*
* The actor will be stopped when the stream is completed, failed or cancelled from downstream,
* i.e. you can watch it to get notified when that happens.
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported")
new Source(new ActorRefSource(bufferSize, overflowStrategy, none, shape("ActorRefSource")))
}
}