diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 6b9ef7faa8..1b06a9b093 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -26,6 +26,7 @@ private[akka] object Ast { case class Merge(other: Producer[Any]) extends AstNode case class Zip(other: Producer[Any]) extends AstNode case class Concat(next: Producer[Any]) extends AstNode + case class Tee(other: Consumer[Any]) extends AstNode trait ProducerNode[I] { def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 1e9bcd7e22..21d2d4215a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -22,6 +22,7 @@ private[akka] object ActorProcessor { case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) + case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index fe4b3c02fd..04581a5a63 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -35,7 +35,10 @@ private[akka] trait ActorProducerLike[T] extends Producer[T] { getPublisher.subscribe(consumer.getSubscriber) } -class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] +/** + * INTERNAL API + */ +private[akka] class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 5d4e3d0a05..2c4847bdd5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -15,6 +15,7 @@ import scala.util.Success import scala.util.Failure import akka.stream.scaladsl.Transformer import akka.stream.scaladsl.RecoveryTransformer +import org.reactivestreams.api.Consumer /** * INTERNAL API @@ -141,6 +142,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def groupBy[K](f: (O) ⇒ K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + override def tee(other: Consumer[_ >: O]): Flow[O] = andThen(Tee(other.asInstanceOf[Consumer[Any]])) + override def toFuture(materializer: FlowMaterializer): Future[O] = { val p = Promise[O]() transformRecover(new RecoveryTransformer[O, Unit] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala new file mode 100644 index 0000000000..ac63f918a0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import org.reactivestreams.api.Producer +import akka.stream.MaterializerSettings +import org.reactivestreams.api.Consumer +import org.reactivestreams.spi.Subscriber +import org.reactivestreams.spi.Subscription + +/** + * INTERNAL API + */ +private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any]) + extends ActorProcessorImpl(_settings) { + + lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && primaryOutputs.NeedsDemand + + override def initialTransferState = needsBothInputAndDemand + + override def primaryOutputsReady(): Unit = { + primaryOutputs.addSubscriber(other.getSubscriber) + super.primaryOutputsReady() + } + + override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize) { + + var hasOtherSubscription = false + var hasDownstreamSubscription = false + var pendingRemoveSubscription: List[S] = Nil + + override type S = ActorSubscription[Any] + override def createSubscription(subscriber: Subscriber[Any]): S = + new ActorSubscription(self, subscriber) + override def afterShutdown(completed: Boolean): Unit = { + primaryOutputsFinished(completed) + } + + override val NeedsDemand: TransferState = new TransferState { + def isReady = demandAvailable + def isCompleted = isClosed + } + + override def addSubscriber(subscriber: Subscriber[Any]): Unit = { + super.addSubscriber(subscriber) + if (subscriber == other.getSubscriber) + hasOtherSubscription = true + else + hasDownstreamSubscription = true + if (pendingRemoveSubscription.nonEmpty && hasOtherSubscription && hasDownstreamSubscription) { + pendingRemoveSubscription foreach removeSubscription + pendingRemoveSubscription = Nil + } + } + + override def removeSubscription(subscription: S): Unit = { + // make sure that we don't shutdown because of premature cancel + if (hasOtherSubscription && hasDownstreamSubscription) + super.removeSubscription(subscription) + else + pendingRemoveSubscription :+= subscription // defer these until both subscriptions have been registered + } + } + + override def transfer(): TransferState = { + val in = primaryInputs.dequeueInputElement() + primaryOutputs.enqueueOutputElement(in) + needsBothInputAndDemand + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c2e4b8688c..d10ad296f0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -213,6 +213,14 @@ trait Flow[+T] { */ def concat[U >: T](next: Producer[U]): Flow[U] + /** + * Fan-out the stream to another consumer. Each element is produced to + * the `other` consumer as well as to downstream consumers. It will + * not shutdown until the subscriptions for `other` and at least + * one downstream consumer have been established. + */ + def tee(other: Consumer[_ >: T]): Flow[T] + /** * Returns a [[scala.concurrent.Future]] that will be fulfilled with the first * thing that is signaled to this stream, which can be either an element (after diff --git a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala new file mode 100644 index 0000000000..98269bfc2f --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowTeeSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 16, + initialFanOutBufferSize = 1, + maxFanOutBufferSize = 16)) + + "A Tee" must { + + "tee to other consumer" in { + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + val p = Flow(List(1, 2, 3)). + tee(c2). + toProducer(materializer) + p.produceTo(c1) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + sub2.requestMore(2) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + c2.expectNext(1) + c2.expectNext(2) + c2.expectNoMsg(100.millis) + sub1.requestMore(3) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + sub2.requestMore(3) + c2.expectNext(3) + c2.expectComplete() + } + + "produce to other even though downstream cancels" in { + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + val p = Flow(List(1, 2, 3)). + tee(c2). + toProducer(materializer) + p.produceTo(c1) + val sub1 = c1.expectSubscription() + sub1.cancel() + val sub2 = c2.expectSubscription() + sub2.requestMore(3) + c2.expectNext(1) + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + } + + "produce to downstream even though other cancels" in { + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + val p = Flow(List(1, 2, 3)). + tee(c1). + toProducer(materializer) + p.produceTo(c2) + val sub1 = c1.expectSubscription() + sub1.cancel() + val sub2 = c2.expectSubscription() + sub2.requestMore(3) + c2.expectNext(1) + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + } + + "produce to downstream even though other cancels before downstream has subscribed" in { + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + val p = Flow(List(1, 2, 3)). + tee(c1). + toProducer(materializer) + val sub1 = c1.expectSubscription() + sub1.cancel() + p.produceTo(c2) + val sub2 = c2.expectSubscription() + sub2.requestMore(3) + c2.expectNext(1) + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + } + + } + +} \ No newline at end of file