diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala index 725d211e6c..e6de86da9f 100644 --- a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala @@ -42,7 +42,7 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin val requestSubscriber = Duct[(HttpRequest, Any)] - .tee(contextBypassSubscriber) + .broadcast(contextBypassSubscriber) .map(requestMethodByPass) .transform(responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) diff --git a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala index 0ba396f560..afdcc33e8c 100644 --- a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala @@ -44,7 +44,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, .transform(rootParser.copyWith(warnOnIllegalHeader)) .splitWhen(_.isInstanceOf[MessageStart]) .headAndTail(materializer) - .tee(applicationBypassSubscriber) + .broadcast(applicationBypassSubscriber) .collect { case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index d28bd373da..68b9c7def7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -51,8 +51,8 @@ private[akka] object Ast { case class Concat(next: Publisher[Any]) extends AstNode { override def name = "concat" } - case class Tee(other: Subscriber[Any]) extends AstNode { - override def name = "tee" + case class Broadcast(other: Subscriber[Any]) extends AstNode { + override def name = "broadcast" } case class PrefixAndTail(n: Int) extends AstNode { override def name = "prefixAndTail" diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 2549dc4b75..bd71906e74 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -26,7 +26,7 @@ private[akka] object ActorProcessor { case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) - case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) + case b: Broadcast ⇒ Props(new BroadcastImpl(settings, b.other)) case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate)) case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate)) case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index ce2000bc56..2eebb8ba50 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -313,7 +313,7 @@ private[akka] trait Builder[Out] { def groupBy[K](f: (Out) ⇒ K): Thing[(K, Publisher[Out])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) - def tee(other: Subscriber[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Subscriber[Any]])) + def broadcast(other: Subscriber[_ >: Out]): Thing[Out] = andThen(Broadcast(other.asInstanceOf[Subscriber[Any]])) def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Thing[S] = andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala index edc8430bd5..6374e6db07 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala @@ -9,7 +9,7 @@ import org.reactivestreams.{ Subscriber, Subscription, Publisher } /** * INTERNAL API */ -private[akka] class TeeImpl(_settings: MaterializerSettings, other: Subscriber[Any]) +private[akka] class BroadcastImpl(_settings: MaterializerSettings, other: Subscriber[Any]) extends ActorProcessorImpl(_settings) { override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 1e0768a1aa..56359e48ae 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -226,7 +226,7 @@ abstract class Duct[In, Out] { * not shutdown until the subscriptions for `other` and at least * one downstream subscriber have been established. */ - def tee(other: Subscriber[_ >: Out]): Duct[In, Out] + def broadcast(other: Subscriber[_ >: Out]): Duct[In, Out] /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. @@ -393,8 +393,8 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def concat[U >: T](next: Publisher[U]): Duct[In, U] = new DuctAdapter(delegate.concat(next)) - override def tee(other: Subscriber[_ >: T]): Duct[In, T] = - new DuctAdapter(delegate.tee(other)) + override def broadcast(other: Subscriber[_ >: T]): Duct[In, T] = + new DuctAdapter(delegate.broadcast(other)) override def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, T] = new DuctAdapter(delegate.buffer(size, overflowStrategy)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f1599d79ec..ec7b96e65f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -287,7 +287,7 @@ abstract class Flow[T] { * not shutdown until the subscriptions for `other` and at least * one downstream subscriber have been established. */ - def tee(other: Subscriber[_ >: T]): Flow[T] + def broadcast(other: Subscriber[_ >: T]): Flow[T] /** * Append the operations of a [[Duct]] to this flow. @@ -460,8 +460,8 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def concat[U >: T](next: Publisher[U]): Flow[U] = new FlowAdapter(delegate.concat(next)) - override def tee(other: Subscriber[_ >: T]): Flow[T] = - new FlowAdapter(delegate.tee(other)) + override def broadcast(other: Subscriber[_ >: T]): Flow[T] = + new FlowAdapter(delegate.broadcast(other)) override def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] = new FlowAdapter(delegate.flatten(strategy)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 6315c1095a..9106c78a67 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -215,7 +215,7 @@ trait Duct[In, +Out] { * not shutdown until the subscriptions for `other` and at least * one downstream subscriber have been established. */ - def tee(other: Subscriber[_ >: Out]): Duct[In, Out] + def broadcast(other: Subscriber[_ >: Out]): Duct[In, Out] /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0bdee671ac..70c4dc6d71 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -283,7 +283,7 @@ trait Flow[+T] { * not shutdown until the subscriptions for `other` and at least * one downstream subscriber have been established. */ - def tee(other: Subscriber[_ >: T]): Flow[T] + def broadcast(other: Subscriber[_ >: T]): Flow[T] /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. diff --git a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala index d8e42434bc..d95450b791 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTeeSpec.scala @@ -9,7 +9,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FlowTeeSpec extends AkkaSpec { +class FlowBroadcastSpec extends AkkaSpec { val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, @@ -18,13 +18,13 @@ class FlowTeeSpec extends AkkaSpec { maxFanOutBufferSize = 16, dispatcher = "akka.test.stream-dispatcher")) - "A Tee" must { + "A broadcast" must { - "tee to other subscriber" in { + "broadcast to other subscriber" in { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() val p = Flow(List(1, 2, 3)). - tee(c2). + broadcast(c2). toPublisher(materializer) p.subscribe(c1) val sub1 = c1.expectSubscription() @@ -49,7 +49,7 @@ class FlowTeeSpec extends AkkaSpec { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() val p = Flow(List(1, 2, 3)). - tee(c2). + broadcast(c2). toPublisher(materializer) p.subscribe(c1) val sub1 = c1.expectSubscription() @@ -66,7 +66,7 @@ class FlowTeeSpec extends AkkaSpec { val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() val p = Flow(List(1, 2, 3)). - tee(c1). + broadcast(c1). toPublisher(materializer) p.subscribe(c2) val sub1 = c1.expectSubscription()