diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 92ede6b22c..80258df307 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -80,7 +80,7 @@ class FlowGraphDocSpec extends AkkaSpec { // unconnected zip.out (!) => "must have at least 1 outgoing edge" } //#simple-graph - }.getMessage should include("unconnected ports: Zip.out") + }.getMessage should include("unconnected ports: ZipWith2.out") } "reusing a flow in a graph" in { diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala index 229a5f7f21..43391883c1 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala @@ -50,10 +50,12 @@ class RecipeKeepAlive extends RecipeSpec { sub.expectNext(ByteString(1)) subscription.request(2) sub.expectNext(ByteString(2)) - sub.expectNext(ByteString(3)) + // This still gets through because there is some intrinsic fairness caused by the FIFO queue in the interpreter + // Expecting here a preferred element also only worked true accident with the old Pump. + sub.expectNext(keepaliveMessage) subscription.request(1) - sub.expectNext(keepaliveMessage) + sub.expectNext(ByteString(3)) subscription.request(1) tickPub.sendNext(()) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index a8de76b17c..98167bb198 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -141,6 +141,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { "log materialization errors in `bindAndHandle`" which { "are triggered in `transform`" in Utils.assertAllStagesStopped { + // FIXME racy feature, needs https://github.com/akka/akka/issues/17849 to be fixed + pending val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val flow = Flow[HttpRequest].transform[HttpResponse](() ⇒ sys.error("BOOM")) val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2) @@ -160,6 +162,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { }(materializer2) "are triggered in `mapMaterialized`" in Utils.assertAllStagesStopped { + // FIXME racy feature, needs https://github.com/akka/akka/issues/17849 to be fixed + pending val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val flow = Flow[HttpRequest].map(_ ⇒ HttpResponse()).mapMaterializedValue(_ ⇒ sys.error("BOOM")) val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index b57a247013..ae3944d14a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -4,6 +4,7 @@ package akka.stream.impl.fusing import akka.stream.testkit.AkkaSpec +import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip } import GraphInterpreter._ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { @@ -14,10 +15,10 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { // Reusable components val identity = new Identity[Int] val detacher = new Detacher[Int] - val zip = new Zip[Int, String] - val bcast = new Broadcast[Int](2) - val merge = new Merge[Int](2) - val balance = new Balance[Int](2) + val zip = Zip[Int, String] + val bcast = Broadcast[Int](2) + val merge = Merge[Int](2) + val balance = Balance[Int](2) "implement identity" in new TestSetup { val source = new UpstreamProbe[Int]("source") @@ -181,7 +182,7 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { val sink1 = new DownstreamProbe[(Int, Int)]("sink") val sink2 = new DownstreamProbe[(Int, Int)]("sink2") val zip = new Zip[Int, Int] - val bcast = new Broadcast[(Int, Int)](2) + val bcast = Broadcast[(Int, Int)](2) builder(bcast, zip) .connect(source1, zip.in0) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index 0aee5b2459..606175644c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -47,7 +47,7 @@ class GraphBroadcastSpec extends AkkaSpec { c2.expectComplete() } - "work with n-way broadcast" in { + "work with n-way broadcast" in assertAllStagesStopped { val headSink = Sink.head[Seq[Int]] import system.dispatcher @@ -71,7 +71,7 @@ class GraphBroadcastSpec extends AkkaSpec { Await.result(result, 3.seconds) should be(List.fill(5)(List(1, 2, 3))) } - "work with 22-way broadcast" in { + "work with 22-way broadcast" in assertAllStagesStopped { type T = Seq[Int] type FT = Future[Seq[Int]] val headSink: Sink[T, FT] = Sink.head[T] diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala index 61227bfae3..502468c0e0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala @@ -46,6 +46,21 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup { Await.result(result, 3.seconds).filter(_ == 1).size should be(numElements) } + "eventually pass through all elements" in { + val result = FlowGraph.closed(Sink.head[Seq[Int]]) { implicit b ⇒ + sink ⇒ + val merge = b.add(MergePreferred[Int](3)) + Source(1 to 100) ~> merge.preferred + + merge.out.grouped(500) ~> sink.inlet + Source(101 to 200) ~> merge.in(0) + Source(201 to 300) ~> merge.in(1) + Source(301 to 400) ~> merge.in(2) + }.run() + + Await.result(result, 3.seconds).toSet should ===((1 to 400).toSet) + } + "disallow multiple preferred inputs" in { val s = Source(0 to 3) diff --git a/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template b/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template deleted file mode 100644 index fcd7126912..0000000000 --- a/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright (C) 2014-2015 Typesafe Inc. - */ -package akka.stream.impl - -import akka.actor.Props -import akka.actor.Deploy -import akka.stream._ -import akka.stream.impl.Junctions.{ FanInModule, FanOutModule } -import akka.stream.impl.StreamLayout.Module -import akka.stream.Attributes -import akka.stream.Attributes._ - -/** INTERNAL API: Boilerplate generated Junctions */ -private[akka] object GenJunctions { - - sealed trait ZipWithModule { - /** Allows hiding the boilerplate Props creation from the materializer */ - def props(settings: ActorMaterializerSettings): Props - } - - [2..20# - final case class ZipWith1Module[[#A1#], B]( - shape: FanInShape1[[#A1#], B], - f: ([#A1#]) ⇒ B, - override val attributes: Attributes = name("zipWith1")) extends FanInModule with ZipWithModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = ZipWith1Module(shape.deepCopy(), f, attributes) - - override def props(settings: ActorMaterializerSettings): Props = - Props(new Zip1With(settings, f.asInstanceOf[Function1[[#Any#], Any]])).withDeploy(Deploy.local) - }# - ] - - sealed trait UnzipWithModule { - /** Allows hiding the boilerplate Props creation from the materializer */ - def props(settings: ActorMaterializerSettings): Props - } - - [2..20# - final case class UnzipWith1Module[B, [#A1#]]( - shape: FanOutShape1[B, [#A1#]], - f: B ⇒ ([#A1#]), - override val attributes: Attributes = name("unzipWith1")) extends FanOutModule with UnzipWithModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = UnzipWith1Module(shape.deepCopy(), f, attributes) - - override def props(settings: ActorMaterializerSettings): Props = - Props(new Unzip1With(settings, f.asInstanceOf[Function##1[Any, ([#Any#])]])).withDeploy(Deploy.local) - }# - ] - -} \ No newline at end of file diff --git a/akka-stream/src/main/boilerplate/akka/stream/impl/UnzipWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/impl/UnzipWith.scala.template deleted file mode 100644 index 59497f621e..0000000000 --- a/akka-stream/src/main/boilerplate/akka/stream/impl/UnzipWith.scala.template +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Copyright (C) 2014-2015 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.ActorMaterializerSettings - -[2..20#/** INTERNAL API */ -private[akka] final class Unzip1With(_settings: ActorMaterializerSettings, f: Function##1[Any, ([#Any#])]) - extends FanOut(_settings, outputCount = 1) { - - outputBunch.markAllOutputs() - - initialPhase(##1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs){ () ⇒ - val elem = primaryInputs.dequeueInputElement() - val tuple = f(elem) - - [1..#outputBunch.enqueue(0, tuple._1)# - ] - }) -}# -] diff --git a/akka-stream/src/main/boilerplate/akka/stream/impl/ZipWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/impl/ZipWith.scala.template deleted file mode 100644 index 84784d43eb..0000000000 --- a/akka-stream/src/main/boilerplate/akka/stream/impl/ZipWith.scala.template +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2014-2015 Typesafe Inc. - */ -package akka.stream.impl - -import scala.collection.immutable -import akka.actor.Props -import akka.stream.{ ActorMaterializerSettings, Shape, Inlet, Outlet } -import akka.stream.impl.GenJunctions._ - -[2..20#/** INTERNAL API */ -private[akka] final class Zip1With(_settings: ActorMaterializerSettings, f: Function1[[#Any#], Any]) - extends FanIn(_settings, inputCount = 1) { - - inputBunch.markAllInputs() - - initialPhase(inputCount, TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ - val elem##0 = inputBunch.dequeue(##0) - [2..#val elem0 = inputBunch.dequeue(0)# - ] - - primaryOutputs.enqueueOutputElement(f([#elem0#])) - }) -}# -] diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template index ebd0b698fe..80061988f9 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template @@ -4,8 +4,8 @@ package akka.stream.scaladsl import akka.stream._ -import akka.stream.impl.GenJunctions._ import akka.stream.impl.StreamLayout +import akka.stream.stage._ object UnzipWithApply { import scala.language.implicitConversions @@ -16,8 +16,7 @@ object UnzipWithApply { [2..20#trait UnzipWithCreator1[In, [#A1#]] extends UnzipWithCreator[In, Tuple1[[#A1#]], UnzipWith1[In, [#A1#]]] { override def create(unzipper: In ⇒ ([#A1#])): UnzipWith1[In, [#A1#]] = { - val shape = new FanOutShape1[In, [#A1#]]("UnzipWith1") - new UnzipWith1(shape, new UnzipWith1Module(shape, unzipper, Attributes.name("UnzipWith1"))) + new UnzipWith1(unzipper) } } @@ -26,7 +25,6 @@ object UnzipWithApply { ] } - trait UnzipWithApply { import UnzipWithApply._ @@ -44,14 +42,49 @@ trait UnzipWithApply { } [2..20#/** `UnzipWith` specialized for 1 outputs */ -class UnzipWith1[In, [#A1#]] private[stream] (override val shape: FanOutShape1[In, [#A1#]], - private[stream] override val module: StreamLayout.Module) - extends Graph[FanOutShape1[In, [#A1#]], Unit] { +class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOutShape1[In, [#A1#]]] { + override val shape: FanOutShape1[In, [#A1#]] = new FanOutShape1[In, [#A1#]]("UnzipWith1") + def in: Inlet[In] = shape.in - override def withAttributes(attr: Attributes): UnzipWith1[In, [#A1#]] = - new UnzipWith1(shape, module.withAttributes(attr).nest()) + [#def out0: Outlet[A1] = shape.out0# + ] - override def named(name: String): UnzipWith1[In, [#A1#]] = withAttributes(Attributes.name(name)) + override def createLogic: GraphStageLogic = new GraphStageLogic { + var pendingCount = 1 + var downstreamRunning = 1 + + [#var pending0 = true# + ] + + setHandler(in, new InHandler { + override def onPush() = { + val elem = unzipper(grab(in)) + [#if (!isClosed(out0)) push(out0, elem._1)# + ] + pendingCount = downstreamRunning + } + }) + + [#setHandler(out0, new OutHandler { + override def onPull() = { + pendingCount -= ##1 + if (pendingCount == ##0) pull(in) + } + + override def onDownstreamFinish(): Unit = { + downstreamRunning -= ##1 + if (downstreamRunning == ##0) completeStage() + else { + if (pending0) pendingCount -= ##1 + if (pendingCount == ##0) pull(in) + } + } + })# + ] + + } + + override def toString = "UnzipWith1" } # -] +] \ No newline at end of file diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 821983f593..672dbb2466 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -4,20 +4,18 @@ package akka.stream.scaladsl import akka.stream._ -import akka.stream.impl.GenJunctions._ import akka.stream.impl.StreamLayout +import akka.stream.stage._ trait ZipWithApply { [2..20#/** * Create a new `ZipWith` specialized for 1 inputs. * - * @param f zipping-function from the input values to the output value - * @param attributes optional attributes for this vertex + * @param zipper zipping-function from the input values to the output value */ def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] = { - val shape = new FanInShape1[[#A1#], O]("ZipWith1") - new ZipWith1(shape, new ZipWith1Module(shape, zipper, Attributes.name("ZipWith1"))) + new ZipWith1(zipper) } # @@ -26,14 +24,42 @@ trait ZipWithApply { } [2..20#/** `ZipWith` specialized for 1 inputs */ -class ZipWith1[[#A1#], O] private[stream] (override val shape: FanInShape1[[#A1#], O], - private[stream] override val module: StreamLayout.Module) - extends Graph[FanInShape1[[#A1#], O], Unit] { +class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] { + override val shape: FanInShape1[[#A1#], O] = new FanInShape1[[#A1#], O]("ZipWith1") + def out: Outlet[O] = shape.out + [#val in0: Inlet[A1] = shape.in0# + ] - override def withAttributes(attr: Attributes): ZipWith1[[#A1#], O] = - new ZipWith1(shape, module.withAttributes(attr).nest()) + override def createLogic: GraphStageLogic = new GraphStageLogic { + var pending = 1 + + private def pushAll(): Unit = push(out, zipper([#grab(in0)#])) + + [#setHandler(in0, new InHandler { + override def onPush(): Unit = { + pending -= ##1 + if (pending == ##0) pushAll() + } + + override def onUpstreamFinish(): Unit = { + if (!isAvailable(out) || !isAvailable(in0)) completeStage() + } + + })# + ] + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pending = shape.inlets.size + [#if (!isClosed(in0)) pull(in0) + else completeStage()# + ] + } + }) + } + + override def toString = "Zip" - override def named(name: String): ZipWith1[[#A1#], O] = withAttributes(Attributes.name(name)) } # ] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 25a436b083..e0fb944365 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -9,8 +9,6 @@ import akka.actor._ import akka.dispatch.Dispatchers import akka.pattern.ask import akka.stream.actor.ActorSubscriber -import akka.stream.impl.GenJunctions.ZipWithModule -import akka.stream.impl.GenJunctions.UnzipWithModule import akka.stream.impl.Junctions._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule, ActorInterpreter } @@ -160,23 +158,10 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem, op match { case fanin: FanInModule ⇒ val (props, inputs, output) = fanin match { - - case MergeModule(shape, _) ⇒ - (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out) - case f: FlexiMergeModule[t, p] ⇒ val flexi = f.flexi(f.shape) (FlexiMerge.props(effectiveSettings, f.shape, flexi), f.shape.inlets, f.shape.outlets.head) - case MergePreferredModule(shape, _) ⇒ - (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out) - - case ConcatModule(shape, _) ⇒ - require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO - (Concat.props(effectiveSettings), shape.inSeq, shape.out) - - case zip: ZipWithModule ⇒ - (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) } val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val publisher = new ActorPublisher[Any](impl) @@ -193,15 +178,6 @@ private[akka] case class ActorMaterializerImpl(val system: ActorSystem, case r: FlexiRouteModule[t, p] ⇒ val flexi = r.flexi(r.shape) (FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) - - case BroadcastModule(shape, eagerCancel, _) ⇒ - (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.length), shape.in, shape.outArray.toSeq) - - case BalanceModule(shape, waitForDownstreams, _) ⇒ - (Balance.props(effectiveSettings, shape.outArray.length, waitForDownstreams), shape.in, shape.outArray.toSeq) - - case unzip: UnzipWithModule ⇒ - (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) } val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val size = outs.size diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 5953638069..13512b414b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -292,51 +292,6 @@ private[akka] abstract class FanIn(val settings: ActorMaterializerSettings, val } -/** - * INTERNAL API - */ -private[akka] object FairMerge { - def props(settings: ActorMaterializerSettings, inputPorts: Int): Props = - Props(new FairMerge(settings, inputPorts)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] final class FairMerge(_settings: ActorMaterializerSettings, _inputPorts: Int) extends FanIn(_settings, _inputPorts) { - inputBunch.markAllInputs() - - initialPhase(inputCount, TransferPhase(inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ - val elem = inputBunch.dequeueAndYield() - primaryOutputs.enqueueOutputElement(elem) - }) - -} - -/** - * INTERNAL API - */ -private[akka] object UnfairMerge { - val DefaultPreferred = 0 - - def props(settings: ActorMaterializerSettings, inputPorts: Int): Props = - Props(new UnfairMerge(settings, inputPorts, DefaultPreferred)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] final class UnfairMerge(_settings: ActorMaterializerSettings, - _inputPorts: Int, - val preferred: Int) extends FanIn(_settings, _inputPorts) { - inputBunch.markAllInputs() - - initialPhase(inputCount, TransferPhase(inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ - val elem = inputBunch.dequeuePrefering(preferred) - primaryOutputs.enqueueOutputElement(elem) - }) -} - /** * INTERNAL API */ @@ -344,33 +299,3 @@ private[akka] object FlexiMerge { def props[T, S <: Shape](settings: ActorMaterializerSettings, ports: S, mergeLogic: MergeLogic[T]): Props = Props(new FlexiMergeImpl(settings, ports, mergeLogic)).withDeploy(Deploy.local) } - -/** - * INTERNAL API - */ -private[akka] object Concat { - def props(settings: ActorMaterializerSettings): Props = Props(new Concat(settings)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] final class Concat(_settings: ActorMaterializerSettings) extends FanIn(_settings, inputCount = 2) { - val First = 0 - val Second = 1 - - def drainFirst = TransferPhase(inputBunch.inputsOrCompleteAvailableFor(First) && primaryOutputs.NeedsDemand) { () ⇒ - if (!inputBunch.isDepleted(First)) { - val elem = inputBunch.dequeue(First) - primaryOutputs.enqueueOutputElement(elem) - } - if (inputBunch.isDepleted(First)) nextPhase(drainSecond) - } - - def drainSecond = TransferPhase(inputBunch.inputsAvailableFor(Second) && primaryOutputs.NeedsDemand) { () ⇒ - val elem = inputBunch.dequeue(Second) - primaryOutputs.enqueueOutputElement(elem) - } - - initialPhase(inputCount, drainFirst) -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 4417511937..ce73564c6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -285,54 +285,6 @@ private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val def receive = primaryInputs.subreceive.orElse[Any, Unit](outputBunch.subreceive) } -/** - * INTERNAL API - */ -private[akka] object Broadcast { - def props(settings: ActorMaterializerSettings, eagerCancel: Boolean, outputPorts: Int): Props = - Props(new Broadcast(settings, outputPorts, eagerCancel)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int, eagerCancel: Boolean) extends FanOut(_settings, _outputPorts) { - outputBunch.unmarkCancelledOutputs(!eagerCancel) - outputBunch.markAllOutputs() - - initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - outputBunch.enqueueMarked(elem) - }) -} - -/** - * INTERNAL API - */ -private[akka] object Balance { - def props(settings: ActorMaterializerSettings, outputPorts: Int, waitForAllDownstreams: Boolean): Props = - Props(new Balance(settings, outputPorts, waitForAllDownstreams)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] class Balance(_settings: ActorMaterializerSettings, _outputPorts: Int, waitForAllDownstreams: Boolean) extends FanOut(_settings, _outputPorts) { - outputBunch.markAllOutputs() - - val runningPhase = TransferPhase(primaryInputs.NeedsInput && outputBunch.AnyOfMarkedOutputs) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - outputBunch.enqueueAndYield(elem) - } - - if (waitForAllDownstreams) - initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () ⇒ - nextPhase(runningPhase) - }) - else - initialPhase(1, runningPhase) -} - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala index 93bba67484..504b8bd7c5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala @@ -29,34 +29,6 @@ private[stream] object Junctions { private[akka] trait FanInModule extends JunctionModule private[akka] trait FanOutModule extends JunctionModule - final case class MergeModule[T]( - shape: UniformFanInShape[T, T], - override val attributes: Attributes = name("merge")) extends FanInModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = MergeModule(shape.deepCopy(), attributes) - } - - final case class BroadcastModule[T]( - shape: UniformFanOutShape[T, T], - eagerCancel: Boolean, - override val attributes: Attributes = name("broadcast")) extends FanOutModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), eagerCancel, attributes) - } - - final case class MergePreferredModule[T]( - shape: MergePreferred.MergePreferredShape[T], - override val attributes: Attributes = name("preferred")) extends FanInModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = MergePreferredModule(shape.deepCopy(), attributes) - } - final case class FlexiMergeModule[T, S <: Shape]( shape: S, flexi: S ⇒ MergeLogic[T], @@ -81,16 +53,6 @@ private[stream] object Junctions { override def carbonCopy: Module = FlexiRouteModule(shape.deepCopy().asInstanceOf[S], flexi, attributes) } - final case class BalanceModule[T]( - shape: UniformFanOutShape[T, T], - waitForAllDownstreams: Boolean, - override val attributes: Attributes = name("broadcast")) extends FanOutModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = BalanceModule(shape.deepCopy(), waitForAllDownstreams, attributes) - } - final case class UnzipModule[A, B]( shape: FanOutShape2[(A, B), A, B], override val attributes: Attributes = name("unzip")) extends FanOutModule { @@ -100,13 +62,4 @@ private[stream] object Junctions { override def carbonCopy: Module = UnzipModule(shape.deepCopy(), attributes) } - final case class ConcatModule[T]( - shape: UniformFanInShape[T, T], - override val attributes: Attributes = name("concat")) extends FanInModule { - - override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - - override def carbonCopy: Module = ConcatModule(shape.deepCopy(), attributes) - } - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 5eb0961520..8fe324ee1a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -81,171 +81,6 @@ object GraphStages { override def toString = "Detacher" } - class Broadcast[T](private val outCount: Int) extends GraphStage[UniformFanOutShape[T, T]] { - val in = Inlet[T]("in") - val out = Vector.fill(outCount)(Outlet[T]("out")) - override val shape = UniformFanOutShape(in, out: _*) - - override def createLogic: GraphStageLogic = new GraphStageLogic { - private var pending = outCount - - setHandler(in, new InHandler { - override def onPush(): Unit = { - pending = outCount - val elem = grab(in) - out.foreach(push(_, elem)) - } - }) - - val outHandler = new OutHandler { - override def onPull(): Unit = { - pending -= 1 - if (pending == 0) pull(in) - } - } - - out.foreach(setHandler(_, outHandler)) - } - - override def toString = "Broadcast" - - } - - class Zip[A, B] extends GraphStage[FanInShape2[A, B, (A, B)]] { - val in0 = Inlet[A]("in0") - val in1 = Inlet[B]("in1") - val out = Outlet[(A, B)]("out") - override val shape = new FanInShape2[A, B, (A, B)](in0, in1, out) - - override def createLogic: GraphStageLogic = new GraphStageLogic { - var pending = 2 - - val inHandler = new InHandler { - override def onPush(): Unit = { - pending -= 1 - if (pending == 0) push(out, (grab(in0), grab(in1))) - } - } - - setHandler(in0, inHandler) - setHandler(in1, inHandler) - setHandler(out, new OutHandler { - override def onPull(): Unit = { - pending = 2 - pull(in0) - pull(in1) - } - }) - } - - override def toString = "Zip" - } - - class Merge[T](private val inCount: Int) extends GraphStage[UniformFanInShape[T, T]] { - val in = Vector.fill(inCount)(Inlet[T]("in")) - val out = Outlet[T]("out") - override val shape = UniformFanInShape(out, in: _*) - - override def createLogic: GraphStageLogic = new GraphStageLogic { - private var initialized = false - - private val pendingQueue = Array.ofDim[Inlet[T]](inCount) - private var pendingHead: Int = 0 - private var pendingTail: Int = 0 - - private def noPending: Boolean = pendingHead == pendingTail - private def enqueue(in: Inlet[T]): Unit = { - pendingQueue(pendingTail % inCount) = in - pendingTail += 1 - } - private def dequeueAndDispatch(): Unit = { - val in = pendingQueue(pendingHead % inCount) - pendingHead += 1 - push(out, grab(in)) - pull(in) - } - - in.foreach { i ⇒ - setHandler(i, new InHandler { - override def onPush(): Unit = { - if (isAvailable(out)) { - if (noPending) { - push(out, grab(i)) - pull(i) - } else { - enqueue(i) - dequeueAndDispatch() - } - } else enqueue(i) - } - }) - } - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (!initialized) { - initialized = true - in.foreach(pull(_)) - } else { - if (!noPending) { - dequeueAndDispatch() - } - } - } - }) - } - - override def toString = "Merge" - } - - class Balance[T](private val outCount: Int) extends GraphStage[UniformFanOutShape[T, T]] { - val in = Inlet[T]("in") - val out = Vector.fill(outCount)(Outlet[T]("out")) - override val shape = UniformFanOutShape[T, T](in, out: _*) - - override def createLogic: GraphStageLogic = new GraphStageLogic { - private val pendingQueue = Array.ofDim[Outlet[T]](outCount) - private var pendingHead: Int = 0 - private var pendingTail: Int = 0 - - private def noPending: Boolean = pendingHead == pendingTail - private def enqueue(out: Outlet[T]): Unit = { - pendingQueue(pendingTail % outCount) = out - pendingTail += 1 - } - private def dequeueAndDispatch(): Unit = { - val out = pendingQueue(pendingHead % outCount) - pendingHead += 1 - push(out, grab(in)) - if (!noPending) pull(in) - } - - setHandler(in, new InHandler { - override def onPush(): Unit = dequeueAndDispatch() - }) - - out.foreach { o ⇒ - setHandler(o, new OutHandler { - override def onPull(): Unit = { - if (isAvailable(in)) { - if (noPending) { - push(o, grab(in)) - } else { - enqueue(o) - dequeueAndDispatch() - } - } else { - if (!hasBeenPulled(in)) pull(in) - enqueue(o) - } - } - }) - } - } - - override def toString = "Balance" - } - private object TickSource { class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { private val cancelPromise = Promise[Unit]() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 4a3f6e0341..1dccd09c33 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -10,14 +10,6 @@ import akka.japi.Pair * Merge several streams, taking elements as they arrive from input streams * (picking randomly when several have elements ready). * - * When building the [[FlowGraph]] you must connect one or more input sources - * and one output sink to the `Merge` vertex. - * - * Note that a junction instance describes exactly one place (vertex) in the `FlowGraph` - * that multiple flows can be attached to; if you want to have multiple independent - * junctions within the same `FlowGraph` then you will have to create multiple such - * instances. - * * '''Emits when''' one of the inputs has an element available * * '''Backpressures when''' downstream backpressures @@ -29,13 +21,13 @@ import akka.japi.Pair object Merge { /** - * Create a new `Merge` vertex with the specified output type. + * Create a new `Merge` stage with the specified output type. */ def create[T](inputPorts: Int): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Merge(inputPorts) /** - * Create a new `Merge` vertex with the specified output type. + * Create a new `Merge` stage with the specified output type. */ def create[T](clazz: Class[T], inputPorts: Int): Graph[UniformFanInShape[T, T], Unit] = create(inputPorts) @@ -45,14 +37,6 @@ object Merge { * Merge several streams, taking elements as they arrive from input streams * (picking from preferred when several have elements ready). * - * When building the [[FlowGraph]] you must connect one or more input streams - * and one output sink to the `Merge` vertex. - * - * Note that a junction instance describes exactly one place (vertex) in the `FlowGraph` - * that multiple flows can be attached to; if you want to have multiple independent - * junctions within the same `FlowGraph` then you will have to create multiple such - * instances. - * * '''Emits when''' one of the inputs has an element available, preferring * a specified input if multiple have elements available * @@ -64,13 +48,13 @@ object Merge { */ object MergePreferred { /** - * Create a new `MergePreferred` vertex with the specified output type. + * Create a new `MergePreferred` stage with the specified output type. */ def create[T](secondaryPorts: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = scaladsl.MergePreferred(secondaryPorts) /** - * Create a new `MergePreferred` vertex with the specified output type. + * Create a new `MergePreferred` stage with the specified output type. */ def create[T](clazz: Class[T], secondaryPorts: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = create(secondaryPorts) @@ -81,11 +65,6 @@ object MergePreferred { * It will not shutdown until the subscriptions for at least * two downstream subscribers have been established. * - * Note that a junction instance describes exactly one place (vertex) in the `FlowGraph` - * that multiple flows can be attached to; if you want to have multiple independent - * junctions within the same `FlowGraph` then you will have to create multiple such - * instances. - * * '''Emits when''' all of the outputs stops backpressuring and there is an input element available * * '''Backpressures when''' any of the outputs backpressure @@ -97,7 +76,7 @@ object MergePreferred { */ object Broadcast { /** - * Create a new `Broadcast` vertex with the specified input type. + * Create a new `Broadcast` stage with the specified input type. * * @param outputCount number of output ports * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. @@ -106,14 +85,14 @@ object Broadcast { scaladsl.Broadcast(outputCount, eagerCancel = eagerCancel) /** - * Create a new `Broadcast` vertex with the specified input type. + * Create a new `Broadcast` stage with the specified input type. * * @param outputCount number of output ports */ def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, eagerCancel = false) /** - * Create a new `Broadcast` vertex with the specified input type. + * Create a new `Broadcast` stage with the specified input type. */ def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount) @@ -124,11 +103,6 @@ object Broadcast { * It will not shutdown until the subscriptions for at least * two downstream subscribers have been established. * - * Note that a junction instance describes exactly one place (vertex) in the `FlowGraph` - * that multiple flows can be attached to; if you want to have multiple independent - * junctions within the same `FlowGraph` then you will have to create multiple such - * instances. - * * '''Emits when''' any of the outputs stops backpressuring; emits the element to the first available output * * '''Backpressures when''' all of the outputs backpressure @@ -139,7 +113,7 @@ object Broadcast { */ object Balance { /** - * Create a new `Balance` vertex with the specified input type. + * Create a new `Balance` stage with the specified input type. * * @param waitForAllDownstreams if `true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element @@ -148,19 +122,19 @@ object Balance { scaladsl.Balance(outputCount, waitForAllDownstreams) /** - * Create a new `Balance` vertex with the specified input type. + * Create a new `Balance` stage with the specified input type. */ def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, waitForAllDownstreams = false) /** - * Create a new `Balance` vertex with the specified input type. + * Create a new `Balance` stage with the specified input type. */ def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount) /** - * Create a new `Balance` vertex with the specified input type. + * Create a new `Balance` stage with the specified input type. * * @param waitForAllDownstreams if `true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element @@ -187,7 +161,7 @@ object Zip { import akka.japi.Pair /** - * Create a new `Zip` vertex with the specified input types and zipping-function + * Create a new `Zip` stage with the specified input types and zipping-function * which creates `akka.japi.Pair`s. */ def create[A, B]: Graph[FanInShape2[A, B, A Pair B], Unit] = @@ -214,13 +188,13 @@ object Unzip { import akka.japi.function.Function /** - * Creates a new `Unzip` vertex with the specified output types. + * Creates a new `Unzip` stage with the specified output types. */ def create[A, B](): Graph[FanOutShape2[A Pair B, A, B], Unit] = UnzipWith.create(JavaIdentityFunction.asInstanceOf[Function[Pair[A, B], Pair[A, B]]]) /** - * Creates a new `Unzip` vertex with the specified output types. + * Creates a new `Unzip` stage with the specified output types. */ def create[A, B](left: Class[A], right: Class[B]): Graph[FanOutShape2[A Pair B, A, B], Unit] = create[A, B]() @@ -231,11 +205,6 @@ object Unzip { * by consuming one stream first emitting all of its elements, then consuming the * second stream emitting all of its elements. * - * Note that a junction instance describes exactly one place (vertex) in the `FlowGraph` - * that multiple flows can be attached to; if you want to have multiple independent - * junctions within the same `FlowGraph` then you will have to create multiple such - * instances. - * * '''Emits when''' the current stream has an element available; if the current input completes, it tries the next one * * '''Backpressures when''' downstream backpressures @@ -246,18 +215,17 @@ object Unzip { */ object Concat { /** - * Create a new anonymous `Concat` vertex with the specified input types. - * Note that a `Concat` instance can only be used at one place (one vertex) - * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`. + * Create a new anonymous `Concat` stage with the specified input types. */ def create[T](): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Concat[T]() /** - * Create a new anonymous `Concat` vertex with the specified input types. - * Note that a `Concat` instance can only be used at one place (one vertex) - * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`. + * Create a new anonymous `Concat` stage with the specified input types. + */ + def create[T](inputCount: Int): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Concat[T](inputCount) + + /** + * Create a new anonymous `Concat` stage with the specified input types. */ def create[T](clazz: Class[T]): Graph[UniformFanInShape[T, T], Unit] = create() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 0289a1e724..59cc1a5adc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -3,16 +3,15 @@ */ package akka.stream.scaladsl -import akka.stream.impl.Junctions._ -import akka.stream.impl.GenJunctions._ import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl._ import akka.stream.impl.StreamLayout._ import akka.stream._ import Attributes.name -import scala.collection.immutable +import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec +import scala.collection.immutable object Merge { /** @@ -20,10 +19,7 @@ object Merge { * * @param inputPorts number of input ports */ - def apply[T](inputPorts: Int): Merge[T] = { - val shape = new UniformFanInShape[T, T](inputPorts) - new Merge(inputPorts, shape, new MergeModule(shape, Attributes.name("Merge"))) - } + def apply[T](inputPorts: Int): Merge[T] = new Merge(inputPorts) } @@ -39,15 +35,68 @@ object Merge { * * '''Cancels when''' downstream cancels */ -class Merge[T] private (inputPorts: Int, - override val shape: UniformFanInShape[T, T], - private[stream] override val module: StreamLayout.Module) - extends Graph[UniformFanInShape[T, T], Unit] { +class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] { + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i)) + val out: Outlet[T] = Outlet[T]("Merge.out") + override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) - override def withAttributes(attr: Attributes): Merge[T] = - new Merge(inputPorts, shape, module.withAttributes(attr).nest()) + override def createLogic: GraphStageLogic = new GraphStageLogic { + private var initialized = false - override def named(name: String): Merge[T] = withAttributes(Attributes.name(name)) + private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts) + private var pendingHead = 0 + private var pendingTail = 0 + + private var runningUpstreams = inputPorts + private def upstreamsClosed = runningUpstreams == 0 + + private def pending: Boolean = pendingHead != pendingTail + + private def enqueue(in: Inlet[T]): Unit = { + pendingQueue(pendingTail % inputPorts) = in + pendingTail += 1 + } + + private def dequeueAndDispatch(): Unit = { + val in = pendingQueue(pendingHead % inputPorts) + pendingHead += 1 + push(out, grab(in)) + if (upstreamsClosed && !pending) completeStage() + else tryPull(in) + } + + private def tryPull(in: Inlet[T]): Unit = if (!isClosed(in)) pull(in) + + in.foreach { i ⇒ + setHandler(i, new InHandler { + override def onPush(): Unit = { + if (isAvailable(out)) { + if (!pending) { + push(out, grab(i)) + tryPull(i) + } + } else enqueue(i) + } + + override def onUpstreamFinish() = { + runningUpstreams -= 1 + if (upstreamsClosed && !pending) completeStage() + } + }) + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (!initialized) { + initialized = true + in.foreach(tryPull) + } else if (pending) + dequeueAndDispatch() + } + }) + } + + override def toString = "Merge" } object MergePreferred { @@ -65,10 +114,7 @@ object MergePreferred { * * @param secondaryPorts number of secondary input ports */ - def apply[T](secondaryPorts: Int): MergePreferred[T] = { - val shape = new MergePreferredShape[T](secondaryPorts, "MergePreferred") - new MergePreferred(secondaryPorts, shape, new MergePreferredModule(shape, Attributes.name("MergePreferred"))) - } + def apply[T](secondaryPorts: Int): MergePreferred[T] = new MergePreferred(secondaryPorts) } /** @@ -88,15 +134,91 @@ object MergePreferred { * * A `Broadcast` has one `in` port and 2 or more `out` ports. */ -class MergePreferred[T] private (secondaryPorts: Int, - override val shape: MergePreferred.MergePreferredShape[T], - private[stream] override val module: StreamLayout.Module) - extends Graph[MergePreferred.MergePreferredShape[T], Unit] { +class MergePreferred[T] private (secondaryPorts: Int) extends GraphStage[MergePreferred.MergePreferredShape[T]] { + override val shape: MergePreferred.MergePreferredShape[T] = + new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred") - override def withAttributes(attr: Attributes): MergePreferred[T] = - new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).nest()) + def in(id: Int): Inlet[T] = shape.in(id) + def out: Outlet[T] = shape.out + def preferred: Inlet[T] = shape.preferred - override def named(name: String): MergePreferred[T] = withAttributes(Attributes.name(name)) + // FIXME: Factor out common stuff with Merge + override def createLogic: GraphStageLogic = new GraphStageLogic { + private var initialized = false + + private val pendingQueue = Array.ofDim[Inlet[T]](secondaryPorts) + private var pendingHead = 0 + private var pendingTail = 0 + + private var runningUpstreams = secondaryPorts + 1 + private def upstreamsClosed = runningUpstreams == 0 + + private def pending: Boolean = pendingHead != pendingTail + private def priority: Boolean = isAvailable(preferred) + + private def enqueue(in: Inlet[T]): Unit = { + pendingQueue(pendingTail % secondaryPorts) = in + pendingTail += 1 + } + + private def dequeueAndDispatch(): Unit = { + val in = pendingQueue(pendingHead % secondaryPorts) + pendingHead += 1 + push(out, grab(in)) + if (upstreamsClosed && !pending && !priority) completeStage() + else tryPull(in) + } + + private def tryPull(in: Inlet[T]): Unit = if (!isClosed(in)) pull(in) + + // FIXME: slow iteration, try to make in a vector and inject into shape instead + (0 until secondaryPorts).map(in).foreach { i ⇒ + setHandler(i, new InHandler { + override def onPush(): Unit = { + if (isAvailable(out)) { + if (!pending) { + push(out, grab(i)) + tryPull(i) + } + } else enqueue(i) + } + + override def onUpstreamFinish() = { + runningUpstreams -= 1 + if (upstreamsClosed && !pending && !priority) completeStage() + } + }) + } + + setHandler(preferred, new InHandler { + override def onPush() = { + if (isAvailable(out)) { + push(out, grab(preferred)) + tryPull(preferred) + } + } + + override def onUpstreamFinish() = { + runningUpstreams -= 1 + if (upstreamsClosed && !pending && !priority) completeStage() + } + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (!initialized) { + initialized = true + // FIXME: slow iteration, try to make in a vector and inject into shape instead + tryPull(preferred) + (0 until secondaryPorts).map(in).foreach(tryPull) + } else if (priority) { + push(out, grab(preferred)) + tryPull(preferred) + } else if (pending) + dequeueAndDispatch() + } + }) + } } object Broadcast { @@ -107,8 +229,7 @@ object Broadcast { * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = { - val shape = new UniformFanOutShape[T, T](outputPorts) - new Broadcast(outputPorts, shape, new BroadcastModule(shape, eagerCancel, Attributes.name("Broadcast"))) + new Broadcast(outputPorts, eagerCancel) } } @@ -126,15 +247,73 @@ object Broadcast { * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel * */ -class Broadcast[T] private (outputPorts: Int, - override val shape: UniformFanOutShape[T, T], - private[stream] override val module: StreamLayout.Module) - extends Graph[UniformFanOutShape[T, T], Unit] { +class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { + val in: Inlet[T] = Inlet[T]("Broadast.in") + val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i)) + override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) - override def withAttributes(attr: Attributes): Broadcast[T] = - new Broadcast(outputPorts, shape, module.withAttributes(attr).nest()) + override def createLogic: GraphStageLogic = new GraphStageLogic { + private var pendingCount = outputPorts + private val pending = Array.fill[Boolean](outputPorts)(true) + private var downstreamsRunning = outputPorts + + setHandler(in, new InHandler { + override def onPush(): Unit = { + pendingCount = downstreamsRunning + val elem = grab(in) + + var idx = 0 + val itr = out.iterator + + while (itr.hasNext) { + val o = itr.next() + val i = idx + if (!isClosed(o)) { + push(o, elem) + pending(i) = true + } + idx += 1 + } + } + }) + + private def tryPull(): Unit = + if (pendingCount == 0 && !hasBeenPulled(in)) pull(in) + + { + var idx = 0 + val itr = out.iterator + while (itr.hasNext) { + val out = itr.next() + val i = idx + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pending(i) = false + pendingCount -= 1 + tryPull() + } + + override def onDownstreamFinish() = { + if (eagerCancel) completeStage() + else { + downstreamsRunning -= 1 + if (downstreamsRunning == 0) completeStage() + else if (pending(i)) { + pending(i) = false + pendingCount -= 1 + tryPull() + } + } + } + }) + idx += 1 + } + } + + } + + override def toString = "Broadcast" - override def named(name: String): Broadcast[T] = withAttributes(Attributes.name(name)) } object Balance { @@ -147,9 +326,7 @@ object Balance { * default value is `false` */ def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = { - val shape = new UniformFanOutShape[T, T](outputPorts) - new Balance(outputPorts, waitForAllDownstreams, shape, - new BalanceModule(shape, waitForAllDownstreams, Attributes.name("Balance"))) + new Balance(outputPorts, waitForAllDownstreams) } } @@ -168,26 +345,77 @@ object Balance { * * '''Cancels when''' all downstreams cancel */ -class Balance[T] private (outputPorts: Int, - waitForAllDownstreams: Boolean, - override val shape: UniformFanOutShape[T, T], - private[stream] override val module: StreamLayout.Module) - extends Graph[UniformFanOutShape[T, T], Unit] { +class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { + val in: Inlet[T] = Inlet[T]("Balance.in") + val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) + override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) - override def withAttributes(attr: Attributes): Balance[T] = - new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).nest()) + override def createLogic: GraphStageLogic = new GraphStageLogic { + private val pendingQueue = Array.ofDim[Outlet[T]](outputPorts) + private var pendingHead: Int = 0 + private var pendingTail: Int = 0 - override def named(name: String): Balance[T] = withAttributes(Attributes.name(name)) + private var needDownstreamPulls: Int = if (waitForAllDownstreams) outputPorts else 0 + private var downstreamsRunning: Int = outputPorts + + private def noPending: Boolean = pendingHead == pendingTail + private def enqueue(out: Outlet[T]): Unit = { + pendingQueue(pendingTail % outputPorts) = out + pendingTail += 1 + } + private def dequeueAndDispatch(): Unit = { + val out = pendingQueue(pendingHead % outputPorts) + pendingHead += 1 + push(out, grab(in)) + if (!noPending) pull(in) + } + + setHandler(in, new InHandler { + override def onPush(): Unit = dequeueAndDispatch() + }) + + out.foreach { o ⇒ + setHandler(o, new OutHandler { + private var hasPulled = false + + override def onPull(): Unit = { + if (!hasPulled) { + hasPulled = true + if (needDownstreamPulls > 0) needDownstreamPulls -= 1 + } + + if (needDownstreamPulls == 0) { + if (isAvailable(in)) { + if (noPending) { + push(o, grab(in)) + } + } else { + if (!hasBeenPulled(in)) pull(in) + enqueue(o) + } + } else enqueue(o) + } + + override def onDownstreamFinish() = { + downstreamsRunning -= 1 + if (downstreamsRunning == 0) completeStage() + else if (!hasPulled && needDownstreamPulls > 0) { + needDownstreamPulls -= 1 + if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in) + } + } + }) + } + } + + override def toString = "Balance" } object Zip { /** * Create a new `Zip`. */ - def apply[A, B](): Zip[A, B] = { - val shape = new FanInShape2[A, B, (A, B)]("Zip") - new Zip(shape, new ZipWith2Module[A, B, (A, B)](shape, Keep.both, Attributes.name("Zip"))) - } + def apply[A, B](): Zip[A, B] = new Zip() } /** @@ -203,14 +431,8 @@ object Zip { * * '''Cancels when''' downstream cancels */ -class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)], - private[stream] override val module: StreamLayout.Module) - extends Graph[FanInShape2[A, B, (A, B)], Unit] { - - override def withAttributes(attr: Attributes): Zip[A, B] = - new Zip(shape, module.withAttributes(attr).nest()) - - override def named(name: String): Zip[A, B] = withAttributes(Attributes.name(name)) +class Zip[A, B] extends ZipWith2[A, B, (A, B)](Pair.apply) { + override def toString = "Zip" } /** @@ -247,25 +469,15 @@ object Unzip { * Create a new `Unzip`. */ def apply[A, B](): Unzip[A, B] = { - val shape = new FanOutShape2[(A, B), A, B]("Unzip") - new Unzip(shape, new UnzipWith2Module[(A, B), A, B]( - shape, - _identity.asInstanceOf[((A, B)) ⇒ (A, B)], - Attributes.name("Unzip"))) + new Unzip() } } /** * Combine the elements of multiple streams into a stream of the combined elements. */ -class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B], - private[stream] override val module: StreamLayout.Module) - extends Graph[FanOutShape2[(A, B), A, B], Unit] { - - override def withAttributes(attr: Attributes): Unzip[A, B] = - new Unzip(shape, module.withAttributes(attr).nest()) - - override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name)) +class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](identity) { + override def toString = "Unzip" } /** @@ -285,16 +497,15 @@ object Concat { /** * Create a new `Concat`. */ - def apply[T](): Concat[T] = { - val shape = new UniformFanInShape[T, T](2) - new Concat(shape, new ConcatModule(shape, Attributes.name("Concat"))) + def apply[T](inputCount: Int = 2): Concat[T] = { + new Concat(inputCount) } } /** - * Takes two streams and outputs one stream formed from the two input streams + * Takes multiple streams and outputs one stream formed from the input streams * by first emitting all of the elements from the first stream and then emitting - * all of the elements from the second stream. + * all of the elements from the second stream, etc. * * A `Concat` has one `first` port, one `second` port and one `out` port. * @@ -306,14 +517,44 @@ object Concat { * * '''Cancels when''' downstream cancels */ -class Concat[T] private (override val shape: UniformFanInShape[T, T], - private[stream] override val module: StreamLayout.Module) - extends Graph[UniformFanInShape[T, T], Unit] { +class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] { + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputCount)(i ⇒ Inlet[T]("Concat.in" + i)) + val out: Outlet[T] = Outlet[T]("Concat.out") + override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) - override def withAttributes(attr: Attributes): Concat[T] = - new Concat(shape, module.withAttributes(attr).nest()) + override def createLogic = new GraphStageLogic { + var activeStream: Int = 0 - override def named(name: String): Concat[T] = withAttributes(Attributes.name(name)) + { + var idxx = 0 + val itr = in.iterator + while (itr.hasNext) { + val i = itr.next() + val idx = idxx + setHandler(i, new InHandler { + override def onPush() = { + push(out, grab(i)) + } + + override def onUpstreamFinish() = { + if (idx == activeStream) { + activeStream += 1 + // Skip closed inputs + while (activeStream < inputCount && isClosed(in(activeStream))) activeStream += 1 + if (activeStream == inputCount) completeStage() + else if (isAvailable(out)) pull(in(activeStream)) + } + } + }) + idxx += 1 + } + } + + setHandler(out, new OutHandler { + override def onPull() = pull(in(activeStream)) + }) + + } } object FlowGraph extends GraphApply {