Merge pull request #16119 from 2m/wip-actor-pub-sub-source-sink-16095

=str #16095 make Actor{Publisher,Subscriber} as {Source,Sink}
This commit is contained in:
Martynas Mickevičius 2014-10-27 12:33:16 +02:00
commit cdfd739778
6 changed files with 190 additions and 84 deletions

View file

@ -6,8 +6,14 @@ package akka.stream.actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl2.Broadcast
import akka.stream.scaladsl2.Flow
import akka.stream.scaladsl2.FlowGraph
import akka.stream.scaladsl2.FlowGraphImplicits
import akka.stream.scaladsl2.Merge
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.Sink
import akka.stream.scaladsl2.Source
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.EventFilter
@ -107,9 +113,9 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val c = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription
val s = StreamTestKit.SubscriberProbe[String]()
p.subscribe(s)
val sub = s.expectSubscription
sub.request(2)
probe.expectMsg(TotalDemand(2))
sub.request(3)
@ -121,68 +127,67 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val c = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription
val s = StreamTestKit.SubscriberProbe[String]()
p.subscribe(s)
val sub = s.expectSubscription
sub.request(2)
ref ! Produce("elem-1")
ref ! Produce("elem-2")
ref ! Produce("elem-3")
c.expectNext("elem-1")
c.expectNext("elem-2")
c.expectNoMsg(300.millis)
s.expectNext("elem-1")
s.expectNext("elem-2")
s.expectNoMsg(300.millis)
sub.cancel()
}
"signal error" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val c = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c)
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
ref ! Err("wrong")
c.expectSubscription
c.expectError.getMessage should be("wrong")
s.expectSubscription
s.expectError.getMessage should be("wrong")
}
"signal error before subscribe" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
ref ! Err("early err")
val c = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c)
c.expectError.getMessage should be("early err")
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectError.getMessage should be("early err")
}
"drop onNext elements after cancel" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val c = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription
val s = StreamTestKit.SubscriberProbe[String]()
p.subscribe(s)
val sub = s.expectSubscription
sub.request(2)
ref ! Produce("elem-1")
sub.cancel()
ref ! Produce("elem-2")
c.expectNext("elem-1")
c.expectNoMsg(300.millis)
sub.cancel()
s.expectNext("elem-1")
s.expectNoMsg(300.millis)
}
"remember requested after restart" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val p = ActorPublisher[String](ref)
val c = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription
val s = StreamTestKit.SubscriberProbe[String]()
p.subscribe(s)
val sub = s.expectSubscription
sub.request(3)
probe.expectMsg(TotalDemand(3))
ref ! Produce("elem-1")
ref ! Boom
ref ! Produce("elem-2")
c.expectNext("elem-1")
c.expectNext("elem-2")
s.expectNext("elem-1")
s.expectNext("elem-2")
sub.request(5)
probe.expectMsg(TotalDemand(6))
sub.cancel()
@ -191,54 +196,59 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
"signal onComplete" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val c = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c)
val sub = c.expectSubscription
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
val sub = s.expectSubscription
sub.request(3)
ref ! Produce("elem-1")
ref ! Complete
c.expectNext("elem-1")
c.expectComplete
s.expectNext("elem-1")
s.expectComplete
}
"signal immediate onComplete" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
ref ! Complete
val c = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c)
c.expectComplete
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectComplete
}
"only allow one subscriber" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val c = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c)
c.expectSubscription
val c2 = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c2)
c2.expectError.getClass should be(classOf[IllegalStateException])
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription
val s2 = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s2)
s2.expectError.getClass should be(classOf[IllegalStateException])
}
"signal onCompete when actor is stopped" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val c = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(c)
c.expectSubscription
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription
ref ! PoisonPill
c.expectComplete
s.expectComplete
}
"work together with Flow and ActorSubscriber" in {
implicit val materializer = FlowMaterializer()
val probe = TestProbe()
val snd = system.actorOf(senderProps)
val rcv = system.actorOf(receiverProps(probe.ref))
Flow(ActorPublisher[Int](snd)).collect {
val source = Source[Int](senderProps)
val sink = Sink[String](receiverProps(probe.ref))
val mat = source.collect {
case n if n % 2 == 0 "elem-" + n
}.produceTo(ActorSubscriber(rcv))
}.connect(sink).run()
val snd = mat.get(source)
val rcv = mat.get(sink)
(1 to 3) foreach { snd ! _ }
probe.expectMsg("elem-2")
@ -255,6 +265,46 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
expectTerminated(snd)
}
"work in a FlowGraph" in {
implicit val materializer = FlowMaterializer()
val probe1 = TestProbe()
val probe2 = TestProbe()
val senderRef1 = system.actorOf(senderProps)
val source1 = Source(ActorPublisher[Int](senderRef1))
val source2 = Source[Int](senderProps)
val sink1 = Sink(ActorSubscriber[String](system.actorOf(receiverProps(probe1.ref))))
val sink2 = Sink[String](receiverProps(probe2.ref))
val mat = FlowGraph { implicit b
import FlowGraphImplicits._
val merge = Merge[Int]
val bcast = Broadcast[String]
source1 ~> merge
source2 ~> merge
merge ~> Flow[Int].map(_.toString) ~> bcast
bcast ~> Flow[String].map(_ + "mark") ~> sink1
bcast ~> sink2
}.run()
val senderRef2 = mat.get(source2)
(0 to 10).foreach {
senderRef1 ! _
senderRef2 ! _
}
(0 to 10).foreach { msg
probe1.expectMsg(msg.toString + "mark")
probe2.expectMsg(msg.toString)
}
}
}
}

View file

@ -5,9 +5,10 @@ package akka.stream.actor
import akka.actor.{ Actor, ActorRef, Props }
import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router }
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.Sink
import akka.stream.scaladsl2.Source
import akka.stream.testkit.AkkaSpec
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.testkit.ImplicitSender
import scala.concurrent.duration._
@ -100,8 +101,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
"An ActorSubscriber" must {
"receive requested elements" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
Flow(List(1, 2, 3)).produceTo(ActorSubscriber(ref))
val ref = Source(List(1, 2, 3)).runWith(Sink(manualSubscriberProps(testActor)))
expectNoMsg(200.millis)
ref ! "ready" // requesting 2
expectMsg(OnNext(1))
@ -113,16 +113,16 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
}
"signal error" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
val e = new RuntimeException("simulated") with NoStackTrace
Flow(() throw e).produceTo(ActorSubscriber(ref))
val ref = Source(() throw e).runWith(Sink(manualSubscriberProps(testActor)))
ref ! "ready"
expectMsg(OnError(e))
}
"remember requested after restart" in {
// creating actor with default supervision, because stream supervisor default strategy is to stop
val ref = system.actorOf(manualSubscriberProps(testActor))
Flow(1 to 7).produceTo(ActorSubscriber(ref))
Source(1 to 7).connect(Sink(ActorSubscriber[Int](ref))).run()
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
@ -139,8 +139,7 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
}
"not deliver more after cancel" in {
val ref = system.actorOf(manualSubscriberProps(testActor))
Flow(1 to 5).produceTo(ActorSubscriber(ref))
val ref = Source(1 to 5).runWith(Sink(manualSubscriberProps(testActor)))
ref ! "ready"
expectMsg(OnNext(1))
expectMsg(OnNext(2))
@ -149,24 +148,21 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender {
}
"work with OneByOneRequestStrategy" in {
val ref = system.actorOf(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))
Flow(1 to 17).produceTo(ActorSubscriber(ref))
Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy)))
for (n 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"work with WatermarkRequestStrategy" in {
val ref = system.actorOf(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10)))
Flow(1 to 17).produceTo(ActorSubscriber(ref))
Source(1 to 17).runWith(Sink(requestStrategySubscriberProps(testActor, WatermarkRequestStrategy(highWatermark = 10))))
for (n 1 to 17) expectMsg(OnNext(n))
expectMsg(OnComplete)
}
"suport custom max in flight request strategy with child workers" in {
val ref = system.actorOf(streamerProps)
val N = 117
Flow(1 to N).map(Msg(_, testActor)).produceTo(ActorSubscriber(ref))
receiveN(N).toSet should be((1 to N).map(Done(_)).toSet)
Source(1 to N).map(Msg(_, testActor)).runWith(Sink(streamerProps))
receiveN(N).toSet should be((1 to N).map(Done).toSet)
}
}

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl2
import akka.actor.ActorRef
import akka.actor.Props
import scala.collection.immutable
@ -252,3 +253,25 @@ private[scaladsl2] final case object CancelSink extends SimpleActorFlowSink[Any]
})
}
}
/**
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
private[scaladsl2] final case class PropsSink[In](props: Props) extends KeyedActorFlowSink[In] {
type MaterializedType = ActorRef
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String): ActorRef = {
val (subscriber, subscriberRef) = create(materializer, flowName)
flowPublisher.subscribe(subscriber)
subscriberRef
}
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val subscriberRef = materializer.actorOf(props, name = s"$flowName-props")
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.scaladsl2
import akka.actor.ActorRef
import akka.actor.Props
import akka.stream.impl._
import akka.stream.impl2.ActorBasedFlowMaterializer
import akka.stream.impl2.Ast.AstNode
@ -208,3 +210,22 @@ private[scaladsl2] final case class ConcatSource[Out](source1: Source[Out], sour
override def isActive: Boolean = false
}
/**
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
private[scaladsl2] final case class PropsSource[Out](props: Props) extends KeyedActorFlowSource[Out] {
override type MaterializedType = ActorRef
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val (publisher, publisherRef) = create(materializer, flowName)
publisher.subscribe(flowSubscriber)
publisherRef
}
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val publisherRef = materializer.actorOf(props, name = s"$flowName-0-props")
(akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
}
}

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl2
import akka.actor.Props
import org.reactivestreams.Subscriber
import scala.util.Try
@ -51,6 +52,13 @@ object Sink {
builder.partialBuild().toSink(in)
}
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def apply[T](props: Props): PropsSink[T] = PropsSink[T](props)
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl2
import akka.actor.Props
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousPublisherFromIterable }
import org.reactivestreams.Publisher
import scala.collection.immutable
@ -133,23 +134,6 @@ object Source {
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] =
TickSource(initialDelay, interval, tick)
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element)))
/**
* Create a `Source` with no elements, i.e. an empty stream that is completed immediately
* for every connected `Sink`.
*/
def empty[T](): Source[T] = apply(EmptyPublisher[T])
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause))
/**
* Creates a `Source` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and
* returns the `UndefinedSink`.
@ -169,6 +153,30 @@ object Source {
builder.partialBuild().toSource(out)
}
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def apply[T](props: Props): PropsSource[T] = PropsSource(props)
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element)))
/**
* Create a `Source` with no elements, i.e. an empty stream that is completed immediately
* for every connected `Sink`.
*/
def empty[T](): Source[T] = apply(EmptyPublisher[T])
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause))
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first