Merge pull request #15141 from akka/wip-15105-tee2-patriknw

+str #15105 Add Flow tee
This commit is contained in:
Patrik Nordwall 2014-05-12 11:10:10 +02:00
commit 90e4ca04d2
7 changed files with 189 additions and 1 deletions

View file

@ -29,6 +29,7 @@ private[akka] object Ast {
case class Merge(other: Producer[Any]) extends AstNode
case class Zip(other: Producer[Any]) extends AstNode
case class Concat(next: Producer[Any]) extends AstNode
case class Tee(other: Consumer[Any]) extends AstNode
trait ProducerNode[I] {
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I]

View file

@ -22,6 +22,7 @@ private[akka] object ActorProcessor {
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
case t: Tee Props(new TeeImpl(settings, t.other))
}
}

View file

@ -35,7 +35,10 @@ private[akka] trait ActorProducerLike[T] extends Producer[T] {
getPublisher.subscribe(consumer.getSubscriber)
}
class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T]
/**
* INTERNAL API
*/
private[akka] class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T]
/**
* INTERNAL API

View file

@ -15,6 +15,7 @@ import scala.util.Success
import scala.util.Failure
import akka.stream.scaladsl.Transformer
import akka.stream.scaladsl.RecoveryTransformer
import org.reactivestreams.api.Consumer
/**
* INTERNAL API
@ -141,6 +142,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
override def groupBy[K](f: (O) K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any Any]))
override def tee(other: Consumer[_ >: O]): Flow[O] = andThen(Tee(other.asInstanceOf[Consumer[Any]]))
override def toFuture(materializer: FlowMaterializer): Future[O] = {
val p = Promise[O]()
transformRecover(new RecoveryTransformer[O, Unit] {

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import org.reactivestreams.api.Producer
import akka.stream.MaterializerSettings
import org.reactivestreams.api.Consumer
import org.reactivestreams.spi.Subscriber
import org.reactivestreams.spi.Subscription
/**
* INTERNAL API
*/
private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any])
extends ActorProcessorImpl(_settings) {
lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && primaryOutputs.NeedsDemand
override def initialTransferState = needsBothInputAndDemand
override def primaryOutputsReady(): Unit = {
primaryOutputs.addSubscriber(other.getSubscriber)
super.primaryOutputsReady()
}
override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize) {
var hasOtherSubscription = false
var hasDownstreamSubscription = false
var pendingRemoveSubscription: List[S] = Nil
override type S = ActorSubscription[Any]
override def createSubscription(subscriber: Subscriber[Any]): S =
new ActorSubscription(self, subscriber)
override def afterShutdown(completed: Boolean): Unit = {
primaryOutputsFinished(completed)
}
override val NeedsDemand: TransferState = new TransferState {
def isReady = demandAvailable
def isCompleted = isClosed
}
override def addSubscriber(subscriber: Subscriber[Any]): Unit = {
super.addSubscriber(subscriber)
if (subscriber == other.getSubscriber)
hasOtherSubscription = true
else
hasDownstreamSubscription = true
if (pendingRemoveSubscription.nonEmpty && hasOtherSubscription && hasDownstreamSubscription) {
pendingRemoveSubscription foreach removeSubscription
pendingRemoveSubscription = Nil
}
}
override def removeSubscription(subscription: S): Unit = {
// make sure that we don't shutdown because of premature cancel
if (hasOtherSubscription && hasDownstreamSubscription)
super.removeSubscription(subscription)
else
pendingRemoveSubscription :+= subscription // defer these until both subscriptions have been registered
}
}
override def transfer(): TransferState = {
val in = primaryInputs.dequeueInputElement()
primaryOutputs.enqueueOutputElement(in)
needsBothInputAndDemand
}
}

View file

@ -222,6 +222,14 @@ trait Flow[+T] {
*/
def concat[U >: T](next: Producer[U]): Flow[U]
/**
* Fan-out the stream to another consumer. Each element is produced to
* the `other` consumer as well as to downstream consumers. It will
* not shutdown until the subscriptions for `other` and at least
* one downstream consumer have been established.
*/
def tee(other: Consumer[_ >: T]): Flow[T]
/**
* Returns a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowTeeSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16))
"A Tee" must {
"tee to other consumer" in {
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
val p = Flow(List(1, 2, 3)).
tee(c2).
toProducer(materializer)
p.produceTo(c1)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.requestMore(1)
sub2.requestMore(2)
c1.expectNext(1)
c1.expectNoMsg(100.millis)
c2.expectNext(1)
c2.expectNext(2)
c2.expectNoMsg(100.millis)
sub1.requestMore(3)
c1.expectNext(2)
c1.expectNext(3)
c1.expectComplete()
sub2.requestMore(3)
c2.expectNext(3)
c2.expectComplete()
}
"produce to other even though downstream cancels" in {
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
val p = Flow(List(1, 2, 3)).
tee(c2).
toProducer(materializer)
p.produceTo(c1)
val sub1 = c1.expectSubscription()
sub1.cancel()
val sub2 = c2.expectSubscription()
sub2.requestMore(3)
c2.expectNext(1)
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
}
"produce to downstream even though other cancels" in {
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
val p = Flow(List(1, 2, 3)).
tee(c1).
toProducer(materializer)
p.produceTo(c2)
val sub1 = c1.expectSubscription()
sub1.cancel()
val sub2 = c2.expectSubscription()
sub2.requestMore(3)
c2.expectNext(1)
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
}
"produce to downstream even though other cancels before downstream has subscribed" in {
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
val p = Flow(List(1, 2, 3)).
tee(c1).
toProducer(materializer)
val sub1 = c1.expectSubscription()
sub1.cancel()
p.produceTo(c2)
val sub2 = c2.expectSubscription()
sub2.requestMore(3)
c2.expectNext(1)
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
}
}
}