!str #16521 Add ActorRefSink

* also rename the factory for ActorSubscriber props Sink,
  from apply to actorSubscriber
This commit is contained in:
Patrik Nordwall 2015-03-30 14:42:30 +02:00
parent 8f47b6dfcc
commit 946faedd95
11 changed files with 247 additions and 43 deletions

View file

@ -7,8 +7,12 @@ Integration
Integrating with Actors
=======================
:class:`AbstractActorPublisher` and :class:`AbstractActorSubscriber` are two traits that provides support for
implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`.
For piping the elements of a stream as messages to an ordinary actor you can use the
``Sink.actorRef``.
For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
an :class:`Actor`.
These can be consumed by other Reactive Stream libraries or used as a
Akka Streams :class:`Source` or :class:`Sink`.
@ -24,6 +28,20 @@ Akka Streams :class:`Source` or :class:`Sink`.
prior to 8, Akka provides :class:`UntypedActorPublisher` and :class:`UntypedActorSubscriber` which can be used
easily from any language level.
Sink.actorRef
^^^^^^^^^^^^^
The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates
the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage``
will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure``
message will be sent to the destination actor.
.. warning::
There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming
the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors
it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
limiting stage in front of this stage.
ActorPublisher
^^^^^^^^^^^^^^

View file

@ -80,7 +80,7 @@ class ActorSubscriberDocSpec extends AkkaSpec {
//#actor-subscriber-usage
val N = 117
Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink(WorkerPool.props))
.runWith(Sink.actorSubscriber(WorkerPool.props))
//#actor-subscriber-usage
receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet)

View file

@ -7,8 +7,12 @@ Integration
Integrating with Actors
=======================
:class:`ActorPublisher` and :class:`ActorSubscriber` are two traits that provides support for
implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`.
For piping the elements of a stream as messages to an ordinary actor you can use the
``Sink.actorRef``.
For more advanced use cases the :class:`ActorPublisher` and :class:`ActorSubscriber` traits are
provided to support implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with
an :class:`Actor`.
These can be consumed by other Reactive Stream libraries or used as a
Akka Streams :class:`Source` or :class:`Sink`.
@ -19,6 +23,21 @@ Akka Streams :class:`Source` or :class:`Sink`.
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
Sink.actorRef
^^^^^^^^^^^^^
The sink sends the elements of the stream to the given `ActorRef`. If the target actor terminates
the stream will be cancelled. When the stream is completed successfully the given ``onCompleteMessage``
will be sent to the destination actor. When the stream is completed with failure a ``akka.actor.Status.Failure``
message will be sent to the destination actor.
.. warning::
There is no back-pressure signal from the destination actor, i.e. if the actor is not consuming
the messages fast enough the mailbox of the actor will grow. For potentially slow consumer actors
it is recommended to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
limiting stage in front of this stage.
ActorPublisher
^^^^^^^^^^^^^^

View file

@ -3,12 +3,11 @@
*/
package akka.stream.javadsl;
import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;
import akka.stream.StreamTest;
import akka.stream.javadsl.japi.Function2;
import akka.stream.testkit.AkkaSpec;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
@ -16,8 +15,10 @@ import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.List;
import akka.stream.StreamTest;
import akka.stream.javadsl.japi.Function2;
import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
public class SinkTest extends StreamTest {
public SinkTest() {
@ -31,14 +32,14 @@ public class SinkTest extends StreamTest {
@Test
public void mustBeAbleToUseFanoutPublisher() throws Exception {
final Sink<Object, Publisher<Object>> pubSink = Sink.fanoutPublisher(2, 2);
@SuppressWarnings("unused")
final Publisher<Object> publisher = Source.from(new ArrayList<Object>()).runWith(pubSink, materializer);
}
@Test
public void mustBeAbleToUseFuture() throws Exception {
final Sink<Integer, Future<Integer>> futSink = Sink.head();
final List<Integer> list = new ArrayList<Integer>();
list.add(1);
final List<Integer> list = Collections.singletonList(1);
final Future<Integer> future = Source.from(list).runWith(futSink, materializer);
assert Await.result(future, Duration.create("1 second")).equals(1);
}
@ -50,7 +51,19 @@ public class SinkTest extends StreamTest {
return arg1 + arg2;
}
});
@SuppressWarnings("unused")
Future<Integer> integerFuture = Source.from(new ArrayList<Integer>()).runWith(foldSink, materializer);
}
@Test
public void mustBeAbleToUseActorRefSink() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Sink<Integer, ?> actorRefSink = Sink.actorRef(probe.getRef(), "done");
Source.from(Arrays.asList(1, 2, 3)).runWith(actorRefSink, materializer);
probe.expectMsgEquals(1);
probe.expectMsgEquals(2);
probe.expectMsgEquals(3);
probe.expectMsgEquals("done");
}
}

View file

@ -254,7 +254,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val probe = TestProbe()
val source = Source[Int](senderProps)
val sink = Sink[String](receiverProps(probe.ref))
val sink: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe.ref))
val (snd, rcv) = source.collect {
case n if n % 2 == 0 "elem-" + n
@ -284,7 +284,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val source1 = Source(ActorPublisher[Int](senderRef1))
val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref))))
val sink2 = Sink[String](receiverProps(probe2.ref))
val sink2: Sink[String, ActorRef] = Sink.actorSubscriber(receiverProps(probe2.ref))
val senderRef2 = FlowGraph.closed(Source[Int](senderProps)) { implicit b
source2

View file

@ -100,7 +100,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"An ActorSubscriber" must {
"receive requested elements" in {
val ref = Source(List(1, 2, 3)).runWith(Sink(manualSubscriberProps(testActor)))
val ref = Source(List(1, 2, 3)).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
expectNoMsg(200.millis)
ref ! "ready" // requesting 2
expectMsg(OnNext(1))
@ -113,7 +113,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"signal error" in {
val e = new RuntimeException("simulated") with NoStackTrace
val ref = Source(() throw e).runWith(Sink(manualSubscriberProps(testActor)))
val ref = Source(() throw e).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
ref ! "ready"
expectMsg(OnError(e))
}
@ -138,7 +138,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
}
"not deliver more after cancel" in {
val ref = Source(1 to 5).runWith(Sink(manualSubscriberProps(testActor)))
val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor)))
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
@ -147,20 +147,20 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
}
"work with OneByOneRequestStrategy" in {
Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy)))
Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy)))
for (n 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"work with WatermarkRequestStrategy" in {
Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10))))
Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10))))
for (n 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"suport custom max in flight request strategy with child workers" in {
val N = 117
Source(1 to N).map(Msg(_, testActor)).runWith(Sink(streamerProps))
Source(1 to N).map(Msg(_, testActor)).runWith(Sink.actorSubscriber(streamerProps))
receiveN(N).toSet should be((1 to N).map(Done).toSet)
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.ActorFlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
object ActorRefSinkSpec {
case class Fw(ref: ActorRef) extends Actor {
def receive = {
case msg ref forward msg
}
}
}
class ActorRefSinkSpec extends AkkaSpec {
import ActorRefSinkSpec._
implicit val mat = ActorFlowMaterializer()
"A ActorRefSink" must {
"send the elements to the ActorRef" in {
Source(List(1, 2, 3)).runWith(Sink.actorRef(testActor, onCompleteMessage = "done"))
expectMsg(1)
expectMsg(2)
expectMsg(3)
expectMsg("done")
}
"cancel stream when actor terminates" in {
val publisher = StreamTestKit.PublisherProbe[Int]()
val fw = system.actorOf(Props(classOf[Fw], testActor).withDispatcher("akka.test.stream-dispatcher"))
Source(publisher).runWith(Sink.actorRef(fw, onCompleteMessage = "done"))
val autoPublisher = new StreamTestKit.AutoPublisher(publisher)
autoPublisher.sendNext(1)
autoPublisher.sendNext(2)
expectMsg(1)
expectMsg(2)
system.stop(fw)
autoPublisher.subscription.expectCancellation()
}
}
}

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.actor.ActorSubscriber
import akka.actor.ActorRef
import akka.stream.actor.ActorSubscriberMessage
import akka.actor.Status
import akka.stream.actor.WatermarkRequestStrategy
import akka.actor.Props
import akka.actor.Terminated
/**
* INTERNAL API
*/
private[akka] object ActorRefSinkActor {
def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props =
Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage))
}
/**
* INTERNAL API
*/
private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber {
import ActorSubscriberMessage._
override val requestStrategy = WatermarkRequestStrategy(highWatermark)
context.watch(ref)
def receive = {
case OnNext(elem)
ref ! elem
case OnError(cause)
ref ! Status.Failure(cause)
context.stop(self)
case OnComplete
ref ! onCompleteMessage
context.stop(self)
case Terminated(`ref`)
context.stop(self) // will cancel upstream
}
}

View file

@ -190,13 +190,34 @@ private[akka] final class CancelSink(val attributes: OperationAttributes, shape:
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
private[akka] final class PropsSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val subscriberRef = materializer.actorOf(props, name = s"$flowName-props")
val subscriberRef = materializer.actorOf(props, name = s"$flowName-actorSubscriber")
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new PropsSink[In](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new PropsSink[In](props, attr, amendShape(attr))
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
val attributes: OperationAttributes,
shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val subscriberRef = materializer.actorOf(
ActorRefSinkActor.props(ref, materializer.settings.maxInputBufferSize, onCompleteMessage),
name = s"$flowName-actorRef")
(akka.stream.actor.ActorSubscriber[In](subscriberRef), ())
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] =
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module =
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
}

View file

@ -39,14 +39,6 @@ object Sink {
def create[In](subs: Subscriber[In]): Sink[In, Unit] =
new Sink(scaladsl.Sink(subs))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def create[T](props: Props): Sink[T, ActorRef] =
new Sink(scaladsl.Sink.apply(props))
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
@ -96,6 +88,33 @@ object Sink {
def head[In](): Sink[In, Future[In]] =
new Sink(scaladsl.Sink.head[In])
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be cancelled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure a [[akka.actor.Status.Failure]]
* message will be sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this stage.
*
*/
def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, Unit] =
new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
new Sink(scaladsl.Sink.actorSubscriber(props))
/**
* A graph with the shape of a sink logically is a sink, this method makes
* it so also in type.

View file

@ -65,14 +65,6 @@ object Sink extends SinkApply {
def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] =
new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink")))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def apply[T](props: Props): Sink[T, ActorRef] =
new Sink(new PropsSink(props, none, shape("PropsSink")))
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
@ -206,4 +198,31 @@ object Sink extends SinkApply {
Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("OnCompleteSink")
}
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be cancelled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure a [[akka.actor.Status.Failure]]
* message will be sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting stage in front of this stage.
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] =
new Sink(new ActorRefSink(ref, onCompleteMessage, none, shape("ActorRefSink")))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
new Sink(new ActorSubscriberSink(props, none, shape("ActorSubscriberSink")))
}