diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala index 8271c61ce5..ea94481982 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -101,7 +101,7 @@ class FlowMapBenchmark { flow = mkMaps(Source(syncTestPublisher), numberOfMapOps) { if (UseGraphStageIdentity) - new GraphStages.Identity[Int] + GraphStages.identity[Int] else Flow[Int].map(identity) } diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala index 04773f199d..0ec9e9dee5 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -2,7 +2,7 @@ package akka.stream import akka.event._ import akka.stream.impl.fusing.{ GraphInterpreterSpecKit, GraphStages, Map => MapStage } -import akka.stream.impl.fusing.GraphStages.Identity +import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.stage._ import org.openjdk.jmh.annotations._ @@ -28,7 +28,7 @@ class InterpreterBenchmark { def graph_interpreter_100k_elements() { new GraphInterpreterSpecKit { new TestSetup { - val identities = Vector.fill(numberOfIds)(new Identity[Int]) + val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int]) val source = new GraphDataSource("source", data100k) val sink = new GraphDataSink[Int]("sink", data100k.size) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index e91377e5e2..a7fbcca813 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -115,6 +115,8 @@ private[http] object OutgoingConnectionBlueprint { private val responses = Inlet[HttpResponse]("responses") private val out = Outlet[HttpRequest]("out") + override def initialAttributes = Attributes.name("TerminationMerge") + val shape = new FanInShape2(requests, responses, out) override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { @@ -145,6 +147,8 @@ private[http] object OutgoingConnectionBlueprint { private val methodBypassInput = Inlet[HttpMethod]("method") private val out = Outlet[List[ResponseOutput]]("out") + override def initialAttributes = Attributes.name("ResponseParsingMerge") + val shape = new FanInShape2(dataInput, methodBypassInput, out) override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala index c3dcbe748a..401efa22b5 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolConductor.scala @@ -45,19 +45,19 @@ private object PoolConductor { /* Stream Setup ============ -                                            Request-  +                                            Request- Request- +-----------+ +-----------+   Switch-   +-------------+    +-----------+  Context Context |  retry | |  slot- |   Command  | doubler |    |  route  +--------------> +--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->|  (Flexi  +--------------> | |  | |    | |    |  Route) +--------------> - +----+------+ +-----+-----+             +-------------+    +-----------+     to slots      -   ^     ^  + +----+------+ +-----+-----+             +-------------+    +-----------+     to slots +   ^     ^ | | SlotEvent | +----+----+ | | flatten | mapAsync | +----+----+   |     | RawSlotEvent -   | Request-    |                                           +   | Request-    | | Context +---------+ +-------------+  retry |<-------- RawSlotEvent (from slotEventMerge) |  Split  | @@ -113,6 +113,8 @@ private object PoolConductor { private val slotIn = Inlet[SlotEvent]("slotEvents") private val out = Outlet[SwitchCommand]("switchCommand") + override def initialAttributes = Attributes.name("SlotSelector") + override val shape = new FanInShape2(ctxIn, slotIn, out) override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { @@ -205,6 +207,8 @@ private object PoolConductor { private class Route(slotCount: Int) extends GraphStage[UniformFanOutShape[SwitchCommand, RequestContext]] { + override def initialAttributes = Attributes.name("PoolConductor.Route") + override val shape = new UniformFanOutShape[SwitchCommand, RequestContext](slotCount) override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 052f2d2ac3..99836bd515 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -179,6 +179,9 @@ private[http] object HttpServerBluePrint { private val requestPrepOut = Outlet[RequestOutput]("requestPrepOut") private val httpResponseIn = Inlet[HttpResponse]("httpResponseIn") private val responseCtxOut = Outlet[ResponseRenderingContext]("responseCtxOut") + + override def initialAttributes = Attributes.name("ControllerStage") + val shape = new BidiShape(requestParsingIn, requestPrepOut, httpResponseIn, responseCtxOut) def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { @@ -412,6 +415,8 @@ private[http] object HttpServerBluePrint { private val toWs = Outlet[ByteString]("toWs") private val fromWs = Inlet[ByteString]("fromWs") + override def initialAttributes = Attributes.name("ProtocolSwitchStage") + def shape: ProtocolSwitchShape = ProtocolSwitchShape(fromNet, toNet, fromHttp, toHttp, fromWs, toWs) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala index 44c74fbfd2..ceaf4a7afe 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameEventParser.scala @@ -136,4 +136,6 @@ private[http] object FrameEventParser extends ByteStringParser[FrameEvent] { } else if (data.length == 1) invalid("close code must be length 2 but was 1") // must be >= length 2 if not empty else None } + + override def toString: String = "FrameEventParser" } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala index ceb56c25fe..ed0128a07b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala @@ -26,6 +26,8 @@ private[http] object FrameHandler { type Ctx = Context[Output] def initial: State = Idle + override def toString: String = s"HandlerStage(server=$server)" + private object Idle extends StateWithControlFrameHandling { def handleRegularFrameStart(start: FrameStart)(implicit ctx: Ctx): SyncDirective = (start.header.opcode, start.isFullMessage) match { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala index 65a571eb3e..bd6e886d24 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Masking.scala @@ -37,6 +37,7 @@ private[http] object Masking { if (header.mask.isDefined) throw new ProtocolException("Frame mustn't already be masked") header.copy(mask = Some(mask)) } + override def toString: String = s"Masking($random)" } private class Unmasking extends Masker { def extractMask(header: FrameHeader): Int = header.mask match { @@ -44,6 +45,7 @@ private[http] object Masking { case None ⇒ throw new ProtocolException("Frame wasn't masked") } def setNewMask(header: FrameHeader, mask: Int): FrameHeader = header.copy(mask = None) + override def toString: String = "Unmasking" } /** Implements both masking and unmasking which is mostly symmetric (because of XOR) */ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index ce51579aba..09ccbd8d52 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -152,11 +152,13 @@ private[http] object Websocket { }.named("ws-message-api")) } - private object BypassRouter extends GraphStage[FanOutShape2[Output, BypassEvent, MessagePart]] { + private case object BypassRouter extends GraphStage[FanOutShape2[Output, BypassEvent, MessagePart]] { private val in = Inlet[Output]("in") private val bypass = Outlet[BypassEvent]("bypass-out") private val user = Outlet[MessagePart]("message-out") + override def initialAttributes = Attributes.name("BypassRouter") + val shape = new FanOutShape2(in, bypass, user) def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { @@ -182,12 +184,14 @@ private[http] object Websocket { } } - private object BypassMerge extends GraphStage[FanInShape3[BypassEvent, AnyRef, Tick.type, AnyRef]] { + private case object BypassMerge extends GraphStage[FanInShape3[BypassEvent, AnyRef, Tick.type, AnyRef]] { private val bypass = Inlet[BypassEvent]("bypass-in") private val user = Inlet[AnyRef]("message-in") private val tick = Inlet[Tick.type]("tick-in") private val out = Outlet[AnyRef]("out") + override def initialAttributes = Attributes.name("BypassMerge") + val shape = new FanInShape3(bypass, user, tick, out) def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { @@ -207,10 +211,12 @@ private[http] object Websocket { } } - private object LiftCompletions extends GraphStage[FlowShape[FrameStart, AnyRef]] { + private case object LiftCompletions extends GraphStage[FlowShape[FrameStart, AnyRef]] { private val in = Inlet[FrameStart]("in") private val out = Outlet[AnyRef]("out") + override def initialAttributes = Attributes.name("LiftCompletions") + val shape = new FlowShape(in, out) def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { @@ -225,5 +231,5 @@ private[http] object Websocket { } } - object Tick + case object Tick } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 8f1c60656c..1e69fa95b1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -123,6 +123,9 @@ package util { val in = Inlet[ByteString]("in") val out = Outlet[HttpEntity.Strict]("out") + + override def initialAttributes = Attributes.name("ToStrict") + override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index f1589e1ad9..486a9b1539 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -253,6 +253,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") + responsesSub.request(1) val error @ IllegalResponseException(info) = responses.expectError() info.summary shouldEqual "The server-side HTTP version is not supported" netOut.expectError(error) @@ -296,6 +297,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. sendWireData("HTTP/1.1 200 OK") netInSub.sendComplete() + responsesSub.request(1) val error @ IllegalResponseException(info) = responses.expectError() info.summary shouldEqual "Illegal HTTP message start" netOut.expectError(error) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index 05d67c1f95..f4751b05cf 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -634,6 +634,7 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") rec(100000) netIn.sendComplete() + requests.request(1) requests.expectComplete() netOut.expectComplete() } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala index 8c5fc1ed35..aa599f574a 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala @@ -18,6 +18,7 @@ import akka.http.impl.util._ import akka.http.ServerSettings import akka.http.scaladsl.model.headers.{ ProductVersion, Server } import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } +import akka.stream.OverflowStrategy abstract class HttpServerTestSetupBase { implicit def system: ActorSystem @@ -38,7 +39,7 @@ abstract class HttpServerTestSetupBase { server ⇒ import GraphDSL.Implicits._ Source(netIn) ~> Flow[ByteString].map(SessionBytes(null, _)) ~> server.in2 - server.out1 ~> Flow[SslTlsOutbound].collect { case SendBytes(x) ⇒ x } ~> netOut.sink + server.out1 ~> Flow[SslTlsOutbound].collect { case SendBytes(x) ⇒ x }.buffer(1, OverflowStrategy.backpressure) ~> netOut.sink server.out2 ~> Sink(requests) Source(responses) ~> server.in1 ClosedShape @@ -71,4 +72,4 @@ abstract class HttpServerTestSetupBase { def send(string: String): Unit = send(ByteString(string.stripMarginWithNewline("\r\n"), "UTF8")) def closeNetworkInput(): Unit = netIn.sendComplete() -} \ No newline at end of file +} diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 175f1063e7..eee6e23920 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -14,6 +14,7 @@ import akka.util.ByteString import akka.http.scaladsl.model.ws._ import Protocol.Opcode import akka.testkit.EventFilter +import akka.stream.OverflowStrategy class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { import WSTestUtils._ @@ -821,22 +822,20 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val messageOut = TestPublisher.probe[Message]() val messageHandler: Flow[Message, Message, Unit] = - Flow.fromGraph { - GraphDSL.create() { implicit b ⇒ - val in = b.add(Sink(messageIn)).inlet - val out = b.add(Source(messageOut)).outlet - - FlowShape[Message, Message](in, out) - } - } + Flow.fromSinkAndSource( + Flow[Message].buffer(1, OverflowStrategy.backpressure).to(Sink(messageIn)), // alternatively need to request(1) before expectComplete + Source(messageOut)) Source(netIn) .via(printEvent("netIn")) .via(FrameEventParser) - .via(Websocket.stack(serverSide, maskingRandomFactory = Randoms.SecureRandomInstances, closeTimeout = closeTimeout, log = system.log).join(messageHandler)) + .via(Websocket + .stack(serverSide, maskingRandomFactory = Randoms.SecureRandomInstances, closeTimeout = closeTimeout, log = system.log) + .join(messageHandler)) .via(printEvent("frameRendererIn")) .transform(() ⇒ new FrameEventRenderer) .via(printEvent("frameRendererOut")) + .buffer(1, OverflowStrategy.backpressure) // alternatively need to request(1) before expectComplete .to(netOut.sink) .run() diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala index fd7d5697fb..40244f2612 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketClientSpec.scala @@ -344,7 +344,10 @@ class WebsocketClientSpec extends FreeSpec with Matchers with WithMaterializerSp def expectWireData(bs: ByteString) = netOut.expectBytes(bs) def expectNoWireData() = netOut.expectNoBytes(noMsgTimeout) - def expectNetworkClose(): Unit = netOut.expectComplete() + def expectNetworkClose(): Unit = { + netOut.request(1) + netOut.expectComplete() + } def expectNetworkAbort(): Unit = netOut.expectError() def closeNetworkInput(): Unit = netIn.sendComplete() diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index d9b5fea9b9..26705d52a6 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -392,8 +392,10 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks clientOutSub.sendComplete() + serverInSub.request(1) serverIn.expectComplete() serverOutSub.expectCancellation() + clientInSub.request(1) clientIn.expectComplete() connSourceSub.cancel() diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index 717c04da22..447a731320 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -5,7 +5,7 @@ package akka.stream.tck import akka.stream.impl.Stages import akka.stream._ -import akka.stream.impl.Stages.Identity +import akka.stream.impl.fusing.GraphStages import akka.stream.scaladsl.Flow import org.reactivestreams.Processor @@ -18,7 +18,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { implicit val materializer = ActorMaterializer(settings)(system) // withAttributes "wraps" the underlying identity and protects it from automatic removal - Flow[Int].via(Stages.identityGraph.asInstanceOf[Graph[FlowShape[Int, Int], Unit]]).named("identity").toProcessor.run() + Flow[Int].via(GraphStages.Identity.asInstanceOf[Graph[FlowShape[Int, Int], Unit]]).named("identity").toProcessor.run() } override def createElement(element: Int): Int = element diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MaterializedValuePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MaterializedValuePublisherTest.scala deleted file mode 100644 index 2c6181d7b6..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MaterializedValuePublisherTest.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.tck - -import akka.stream.impl.MaterializedValuePublisher -import org.reactivestreams.Publisher - -class MaterializedValuePublisherTest extends AkkaPublisherVerification[Any] { - - override def createPublisher(elements: Long): Publisher[Any] = { - val pub = new MaterializedValuePublisher() - - // it contains a value already - pub.setValue("Hello") - - pub - } - - override def maxElementsFromPublisher = 1 -} \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala deleted file mode 100644 index 32201ca5b1..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import akka.stream.impl.SingleElementPublisher - -import scala.collection.immutable -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import org.reactivestreams._ - -class SingleElementPublisherTest extends AkkaPublisherVerification[Int] { - - def createPublisher(elements: Long): Publisher[Int] = { - Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher(false)) - } - - override def maxElementsFromPublisher(): Long = 1 -} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index 2f6b418481..d550551f69 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -5,7 +5,6 @@ package akka.stream.tck import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.impl.ActorMaterializerImpl -import akka.stream.impl.Stages.Identity import akka.stream.scaladsl.Flow import akka.stream.Attributes import akka.stream.stage.{ Context, PushStage } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java index d1fe0bf758..e49d4eb134 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java @@ -9,6 +9,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.net.BindException; import org.junit.ClassRule; import org.junit.Test; import scala.concurrent.Await; @@ -22,6 +23,7 @@ import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.TestUtils; import akka.util.ByteString; +import akka.testkit.JavaTestKit; public class TcpTest extends StreamTest { public TcpTest() { @@ -83,12 +85,22 @@ public class TcpTest extends StreamTest { final ServerBinding b = Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)); assertEquals(b.localAddress().getPort(), serverAddress.getPort()); - try { - Await.result(binding.to(echoHandler).run(materializer), FiniteDuration.create(5, TimeUnit.SECONDS)); - assertTrue("Expected BindFailedException, but nothing was reported", false); - } catch (BindFailedException e) { - // expected - } + new JavaTestKit(system) {{ + new EventFilter(BindException.class) { + @Override + protected Void run() { + try { + Await.result(binding.to(echoHandler).run(materializer), FiniteDuration.create(5, TimeUnit.SECONDS)); + assertTrue("Expected BindFailedException, but nothing was reported", false); + } catch (BindFailedException e) { + // expected + } catch (Exception e) { + throw new AssertionError("failed", e); + } + return null; + } + }.occurrences(1).exec(); + }}; } @Test diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala new file mode 100644 index 0000000000..64134a7682 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream + +import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.testkit.AkkaSpec +import org.scalactic.ConversionCheckedTripleEquals + +class FusingSpec extends AkkaSpec with ConversionCheckedTripleEquals { + + implicit val materializer = ActorMaterializer() + + "Fusing" must { + + "fuse a moderately complex graph" in { + val g = Source.unfoldInf(1)(x ⇒ (x, x)).filter(_ % 2 == 1).alsoTo(Sink.fold(0)(_ + _)).to(Sink.fold(1)(_ + _)) + val fused = Fusing.aggressive(g) + val module = fused.module + module.subModules.size should ===(1) + module.info.downstreams.size should be > 5 + module.info.upstreams.size should be > 5 + } + + "not fuse across AsyncBoundary" in { + val g = + Source.unfoldInf(1)(x ⇒ (x, x)).filter(_ % 2 == 1) + .alsoTo(Sink.fold(0)(_ + (_: Int)).addAttributes(Attributes.asyncBoundary)) + .to(Sink.fold(1)(_ + _)) + val fused = Fusing.aggressive(g) + val module = fused.module + module.subModules.size should ===(2) + module.info.downstreams.size should be > 5 + module.info.upstreams.size should be > 5 + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index f424557893..9e8ce95a7e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -68,7 +68,7 @@ class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with Con class FusedGraph[S <: Shape](ga: GraphAssembly, s: S, a: Attributes = Attributes.none) extends Graph[S, Unit] { override def shape = s - override val module = GraphModule(ga, s, a) + override val module = GraphModule(ga, s, a, ga.stages.map(_.module)) override def withAttributes(attr: Attributes) = new FusedGraph(ga, s, attr) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index 29695f242b..2a1674a25f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -175,7 +175,8 @@ class StreamLayoutSpec extends AkkaSpec { var publishers = Vector.empty[TestPublisher] var subscribers = Vector.empty[TestSubscriber] - override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes): Unit = { + override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, + matVal: java.util.Map[Module, Any]): Unit = { for (inPort ← atomic.inPorts) { val subscriber = TestSubscriber(atomic, inPort) subscribers :+= subscriber diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index f098807dc9..8d3bb153b9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -18,7 +18,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { "ActorGraphInterpreter" must { "be able to interpret a simple identity graph stage" in assertAllStagesStopped { - val identity = new GraphStages.Identity[Int] + val identity = GraphStages.identity[Int] Await.result( Source(1 to 100).via(identity).grouped(200).runWith(Sink.head), @@ -27,7 +27,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { } "be able to reuse a simple identity graph stage" in assertAllStagesStopped { - val identity = new GraphStages.Identity[Int] + val identity = GraphStages.identity[Int] Await.result( Source(1 to 100) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index df61390cde..f875e36f40 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -15,8 +15,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { "GraphInterpreter" must { // Reusable components - val identity = new Identity[Int] - val detacher = new Detacher[Int] + val identity = GraphStages.identity[Int] + val detach = detacher[Int] val zip = Zip[Int, String] val bcast = Broadcast[Int](2) val merge = Merge[Int](2) @@ -71,9 +71,9 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { val source = new UpstreamProbe[Int]("source") val sink = new DownstreamProbe[Int]("sink") - builder(detacher) - .connect(source, detacher.in) - .connect(detacher.out, sink) + builder(detach) + .connect(source, detach.shape.inlet) + .connect(detach.shape.outlet, sink) .init() lastEvents() should ===(Set.empty) @@ -309,12 +309,12 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { val source = new UpstreamProbe[Int]("source") val sink = new DownstreamProbe[Int]("sink") - builder(detacher, balance, merge) + builder(detach, balance, merge) .connect(source, merge.in(0)) .connect(merge.out, balance.in) .connect(balance.out(0), sink) - .connect(balance.out(1), detacher.in) - .connect(detacher.out, merge.in(1)) + .connect(balance.out(1), detach.shape.inlet) + .connect(detach.shape.outlet, merge.in(1)) .init() lastEvents() should ===(Set.empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index 9d2642426d..478d515ee4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -70,7 +70,8 @@ trait GraphInterpreterSpecKit { def init(): Unit = { val assembly = buildAssembly() - val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none) + val (inHandlers, outHandlers, logics) = + assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, (_, _, _) ⇒ (), fuzzingMode = false) @@ -87,7 +88,8 @@ trait GraphInterpreterSpecKit { } def manualInit(assembly: GraphAssembly): Unit = { - val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none) + val (inHandlers, outHandlers, logics) = + assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, (_, _, _) ⇒ (), fuzzingMode = false) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala index c3e095e46a..3b5f3022e7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowJoinSpec.scala @@ -8,8 +8,10 @@ import akka.stream.testkit._ import com.typesafe.config.ConfigFactory import scala.concurrent.Await import scala.concurrent.duration._ +import akka.stream.OverflowStrategy +import org.scalatest.concurrent.ScalaFutures -class FlowJoinSpec extends AkkaSpec(ConfigFactory.parseString("akka.loglevel=INFO")) { +class FlowJoinSpec extends AkkaSpec(ConfigFactory.parseString("akka.loglevel=INFO")) with ScalaFutures { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) @@ -35,7 +37,11 @@ class FlowJoinSpec extends AkkaSpec(ConfigFactory.parseString("akka.loglevel=INF FlowShape(merge.in(1), broadcast.out(1)) }) - val flow2 = Flow[Int].filter(_ % 2 == 1).map(_ * 10).take((end + 1) / 2) + val flow2 = Flow[Int] + .filter(_ % 2 == 1) + .map(_ * 10) + .buffer((end + 1) / 2, OverflowStrategy.backpressure) + .take((end + 1) / 2) val mm = flow1.join(flow2).run() @@ -44,5 +50,23 @@ class FlowJoinSpec extends AkkaSpec(ConfigFactory.parseString("akka.loglevel=INF probe.expectNext().toSet should be(result) sub.cancel() } + + "propagate one element" in { + val source = Source.single("lonely traveler") + + val flow1 = Flow.fromGraph(GraphDSL.create(Sink.head[String]) { implicit b ⇒ + sink ⇒ + import GraphDSL.Implicits._ + val merge = b.add(Merge[String](2)) + val broadcast = b.add(Broadcast[String](2)) + source ~> merge.in(0) + merge.out ~> broadcast.in + broadcast.out(0) ~> sink + + FlowShape(merge.in(1), broadcast.out(1)) + }) + + whenReady(flow1.join(Flow[String]).run())(_ shouldBe "lonely traveler") + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala index 3f854da5d5..51e5d7a3a6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala @@ -69,7 +69,7 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { val customDispatcher = TestProbe() val f1 = Flow[Int].map(sendThreadNameTo(defaultDispatcher.ref)) - val f2 = Flow[Int].map(sendThreadNameTo(customDispatcher.ref)) + val f2 = Flow[Int].map(sendThreadNameTo(customDispatcher.ref)).map(x ⇒ x) .withAttributes(dispatcher("my-dispatcher1") and name("separate-disptacher")) Source(0 to 2).via(f1).via(f2).runWith(Sink.ignore) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index a0052ea5c5..ad3d3d7c5b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -74,7 +74,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece Array(null, stage.shape.outlet), Array(-1, 0)) - val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none) + val (inHandlers, outHandlers, logics) = + assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) val props = Props(new BrokenActorInterpreter(assembly, inHandlers, outHandlers, logics, stage.shape, settings, materializer, "a3")) .withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index f2430f97d9..c59c85ef98 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -23,13 +23,11 @@ class SinkSpec extends AkkaSpec { SinkShape(bcast.in) }) Source(List(0, 1, 2)).runWith(sink) - for (i ← 0 to 2) { - val p = probes(i) - val s = p.expectSubscription() - s.request(3) - p.expectNext(i) - p.expectComplete() - } + + val subscriptions = probes.map(_.expectSubscription()) + subscriptions.foreach { s ⇒ s.request(3) } + probes.zipWithIndex.foreach { case (p, i) ⇒ p.expectNext(i) } + probes.foreach { case p ⇒ p.expectComplete() } } "be composable with importing 1 module" in { @@ -42,13 +40,11 @@ class SinkSpec extends AkkaSpec { SinkShape(bcast.in) }) Source(List(0, 1, 2)).runWith(sink) - for (i ← 0 to 2) { - val p = probes(i) - val s = p.expectSubscription() - s.request(3) - p.expectNext(i) - p.expectComplete() - } + + val subscriptions = probes.map(_.expectSubscription()) + subscriptions.foreach { s ⇒ s.request(3) } + probes.zipWithIndex.foreach { case (p, i) ⇒ p.expectNext(i) } + probes.foreach { case p ⇒ p.expectComplete() } } "be composable with importing 2 modules" in { @@ -62,13 +58,11 @@ class SinkSpec extends AkkaSpec { SinkShape(bcast.in) }) Source(List(0, 1, 2)).runWith(sink) - for (i ← 0 to 2) { - val p = probes(i) - val s = p.expectSubscription() - s.request(3) - p.expectNext(i) - p.expectComplete() - } + + val subscriptions = probes.map(_.expectSubscription()) + subscriptions.foreach { s ⇒ s.request(3) } + probes.zipWithIndex.foreach { case (p, i) ⇒ p.expectNext(i) } + probes.foreach { case p ⇒ p.expectComplete() } } "be composable with importing 3 modules" in { @@ -82,13 +76,11 @@ class SinkSpec extends AkkaSpec { SinkShape(bcast.in) }) Source(List(0, 1, 2)).runWith(sink) - for (i ← 0 to 2) { - val p = probes(i) - val s = p.expectSubscription() - s.request(3) - p.expectNext(i) - p.expectComplete() - } + + val subscriptions = probes.map(_.expectSubscription()) + subscriptions.foreach { s ⇒ s.request(3) } + probes.zipWithIndex.foreach { case (p, i) ⇒ p.expectNext(i) } + probes.foreach { case p ⇒ p.expectComplete() } } "combine to many outputs with simplified API" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index ba20233ba6..2c330715c8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -10,6 +10,7 @@ import scala.util.control.NoStackTrace import akka.stream.{ SourceShape, ActorMaterializer } import akka.stream.testkit._ import akka.stream.impl.{ PublisherSource, ReactiveStreamsCompliance } +import scala.concurrent.Future class SourceSpec extends AkkaSpec { @@ -223,8 +224,6 @@ class SourceSpec extends AkkaSpec { val expected = List(9227465, 5702887, 3524578, 2178309, 1346269, 832040, 514229, 317811, 196418, 121393, 75025, 46368, 28657, 17711, 10946, 6765, 4181, 2584, 1597, 987, 610, 377, 233, 144, 89, 55, 34, 21, 13, 8, 5, 3, 2, 1, 1, 0) "generate a finite fibonacci sequence" in { - import GraphDSL.Implicits._ - val source = Source.unfold((0, 1)) { case (a, _) if a > 10000000 ⇒ None case (a, b) ⇒ Some((b, a + b) → a) @@ -234,21 +233,15 @@ class SourceSpec extends AkkaSpec { } "generate a finite fibonacci sequence asynchronously" in { - import GraphDSL.Implicits._ - import scala.concurrent.Future - import scala.concurrent.ExecutionContext.Implicits.global - val source = Source.unfoldAsync((0, 1)) { case (a, _) if a > 10000000 ⇒ Future.successful(None) - case (a, b) ⇒ Future(Some((b, a + b) → a)) + case (a, b) ⇒ Future.successful(Some((b, a + b) → a)) } val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) result should ===(expected) } "generate an infinite fibonacci sequence" in { - import GraphDSL.Implicits._ - val source = Source.unfoldInf((0, 1)) { case (a, b) ⇒ (b, a + b) → a } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template index 5a5eba8b21..d99ea32cab 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template @@ -43,6 +43,7 @@ trait UnzipWithApply { [2..20#/** `UnzipWith` specialized for 1 outputs */ class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOutShape1[In, [#A1#]]] { + override def initialAttributes = Attributes.name("UnzipWith1") override val shape: FanOutShape1[In, [#A1#]] = new FanOutShape1[In, [#A1#]]("UnzipWith1") def in: Inlet[In] = shape.in diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 8432b45503..4afdec4342 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -24,6 +24,7 @@ trait ZipWithApply { [2..20#/** `ZipWith` specialized for 1 inputs */ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] { + override def initialAttributes = Attributes.name("ZipWith1") override val shape: FanInShape1[[#A1#], O] = new FanInShape1[[#A1#], O]("ZipWith1") def out: Outlet[O] = shape.out [#val in0: Inlet[A1] = shape.in0# diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index e0bd4800c2..c1f4918269 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -40,6 +40,11 @@ akka { # Maximum number of elements emitted in batch if downstream signals large demand output-burst-limit = 1000 + + # Enable automatic fusing of all graphs that are run. For short-lived streams + # this may cause an initial runtime overhead, but most of the time fusing is + # desirable since it reduces the number of Actors that are created. + auto-fusing = on debug { # Enables the fuzzing mode which increases the chance of race conditions diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index c814bed6fb..a482f8224d 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -196,6 +196,9 @@ final case class AbruptTerminationException(actor: ActorRef) object ActorMaterializerSettings { + /** + * Create [[ActorMaterializerSettings]] from individual settings (Scala). + */ def apply( initialInputBufferSize: Int, maxInputBufferSize: Int, @@ -204,25 +207,20 @@ object ActorMaterializerSettings { subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, debugLogging: Boolean, outputBurstLimit: Int, - fuzzingMode: Boolean) = + fuzzingMode: Boolean, + autoFusing: Boolean) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode) + outputBurstLimit, fuzzingMode, autoFusing) /** - * Create [[ActorMaterializerSettings]]. - * - * You can refine the configuration based settings using [[ActorMaterializerSettings#withInputBuffer]], - * [[ActorMaterializerSettings#withDispatcher]] + * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala). */ def apply(system: ActorSystem): ActorMaterializerSettings = apply(system.settings.config.getConfig("akka.stream.materializer")) /** - * Create [[ActorMaterializerSettings]]. - * - * You can refine the configuration based settings using [[ActorMaterializerSettings#withInputBuffer]], - * [[ActorMaterializerSettings#withDispatcher]] + * Create [[ActorMaterializerSettings]] from a Config subsection (Scala). */ def apply(config: Config): ActorMaterializerSettings = ActorMaterializerSettings( @@ -233,22 +231,34 @@ object ActorMaterializerSettings { subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config), debugLogging = config.getBoolean("debug-logging"), outputBurstLimit = config.getInt("output-burst-limit"), - fuzzingMode = config.getBoolean("debug.fuzzing-mode")) + fuzzingMode = config.getBoolean("debug.fuzzing-mode"), + autoFusing = config.getBoolean("auto-fusing")) /** - * Java API - * - * You can refine the configuration based settings using [[ActorMaterializerSettings#withInputBuffer]], - * [[ActorMaterializerSettings#withDispatcher]] + * Create [[ActorMaterializerSettings]] from individual settings (Java). + */ + def create( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + fuzzingMode: Boolean, + autoFusing: Boolean) = + new ActorMaterializerSettings( + initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, fuzzingMode, autoFusing) + + /** + * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java). */ def create(system: ActorSystem): ActorMaterializerSettings = apply(system) /** - * Java API - * - * You can refine the configuration based settings using [[ActorMaterializerSettings#withInputBuffer]], - * [[ActorMaterializerSettings#withDispatcher]] + * Create [[ActorMaterializerSettings]] from a Config subsection (Java). */ def create(config: Config): ActorMaterializerSettings = apply(config) @@ -256,10 +266,8 @@ object ActorMaterializerSettings { } /** - * The buffers employed by the generated Processors can be configured by - * creating an appropriate instance of this class. - * - * This will likely be replaced in the future by auto-tuning these values at runtime. + * This class describes the configurable properties of the [[ActorMaterializer]]. + * Please refer to the `withX` methods for descriptions of the individual settings. */ final class ActorMaterializerSettings( val initialInputBufferSize: Int, @@ -269,7 +277,8 @@ final class ActorMaterializerSettings( val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, val debugLogging: Boolean, val outputBurstLimit: Int, - val fuzzingMode: Boolean) { + val fuzzingMode: Boolean, + val autoFusing: Boolean) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -284,16 +293,29 @@ final class ActorMaterializerSettings( subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings = this.subscriptionTimeoutSettings, debugLogging: Boolean = this.debugLogging, outputBurstLimit: Int = this.outputBurstLimit, - fuzzingMode: Boolean = this.fuzzingMode) = + fuzzingMode: Boolean = this.fuzzingMode, + autoFusing: Boolean = this.autoFusing) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode) + outputBurstLimit, fuzzingMode, autoFusing) + /** + * Each asynchronous piece of a materialized stream topology is executed by one Actor + * that manages an input buffer for all inlets of its shape. This setting configures + * the initial and maximal input buffer in number of elements for each inlet. + * + * FIXME: Currently only the initialSize is used, auto-tuning is not yet implemented. + */ def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings = { if (initialSize == this.initialInputBufferSize && maxSize == this.maxInputBufferSize) this else copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize) } + /** + * This setting configures the default dispatcher to be used by streams materialized + * with the [[ActorMaterializer]]. This can be overridden for individual parts of the + * stream topology by using [[akka.stream.Attributes#dispatcher]]. + */ def withDispatcher(dispatcher: String): ActorMaterializerSettings = { if (this.dispatcher == dispatcher) this else copy(dispatcher = dispatcher) @@ -324,35 +346,91 @@ final class ActorMaterializerSettings( }) } - def withSubscriptionTimeoutSettings(sub: StreamSubscriptionTimeoutSettings): ActorMaterializerSettings = - copy(subscriptionTimeoutSettings = sub) - - def withFuzzing(enable: Boolean): ActorMaterializerSettings = { + /** + * Test utility: fuzzing mode means that GraphStage events are not processed + * in FIFO order within a fused subgraph, but randomized. + */ + def withFuzzing(enable: Boolean): ActorMaterializerSettings = if (enable == this.fuzzingMode) this else copy(fuzzingMode = enable) - } - def withDebugLogging(enable: Boolean): ActorMaterializerSettings = { + /** + * Maximum number of elements emitted in batch if downstream signals large demand. + */ + def withOutputBurstLimit(limit: Int): ActorMaterializerSettings = + if (limit == this.outputBurstLimit) this + else copy(outputBurstLimit = limit) + + /** + * Enable to log all elements that are dropped due to failures (at DEBUG level). + */ + def withDebugLogging(enable: Boolean): ActorMaterializerSettings = if (enable == this.debugLogging) this else copy(debugLogging = enable) - } + + /** + * Enable automatic fusing of all graphs that are run. For short-lived streams + * this may cause an initial runtime overhead, but most of the time fusing is + * desirable since it reduces the number of Actors that are created. + */ + def withAutoFusing(enable: Boolean): ActorMaterializerSettings = + if (enable == this.autoFusing) this + else copy(autoFusing = enable) + + /** + * Leaked publishers and subscribers are cleaned up when they are not used within a given + * deadline, configured by [[StreamSubscriptionTimeoutSettings]]. + */ + def withSubscriptionTimeoutSettings(settings: StreamSubscriptionTimeoutSettings): ActorMaterializerSettings = + if (settings == this.subscriptionTimeoutSettings) this + else copy(subscriptionTimeoutSettings = settings) private def requirePowerOfTwo(n: Integer, name: String): Unit = { require(n > 0, s"$name must be > 0") require((n & (n - 1)) == 0, s"$name must be a power of two") } + + override def equals(other: Any): Boolean = other match { + case s: ActorMaterializerSettings ⇒ + s.initialInputBufferSize == initialInputBufferSize && + s.maxInputBufferSize == maxInputBufferSize && + s.dispatcher == dispatcher && + s.supervisionDecider == supervisionDecider && + s.subscriptionTimeoutSettings == subscriptionTimeoutSettings && + s.debugLogging == debugLogging && + s.outputBurstLimit == outputBurstLimit && + s.fuzzingMode == fuzzingMode && + s.autoFusing == autoFusing + case _ ⇒ false + } + + override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$fuzzingMode,$autoFusing)" } object StreamSubscriptionTimeoutSettings { import akka.stream.StreamSubscriptionTimeoutTerminationMode._ + /** + * Create settings from individual values (Java). + */ + def create(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration): StreamSubscriptionTimeoutSettings = + new StreamSubscriptionTimeoutSettings(mode, timeout) + + /** + * Create settings from individual values (Scala). + */ def apply(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration): StreamSubscriptionTimeoutSettings = new StreamSubscriptionTimeoutSettings(mode, timeout) - /** Java API */ + /** + * Create settings from a Config subsection (Java). + */ def create(config: Config): StreamSubscriptionTimeoutSettings = apply(config) + /** + * Create settings from a Config subsection (Scala). + */ def apply(config: Config): StreamSubscriptionTimeoutSettings = { val c = config.getConfig("subscription-timeout") StreamSubscriptionTimeoutSettings( @@ -365,8 +443,22 @@ object StreamSubscriptionTimeoutSettings { } } -final class StreamSubscriptionTimeoutSettings(val mode: StreamSubscriptionTimeoutTerminationMode, val timeout: FiniteDuration) +/** + * Leaked publishers and subscribers are cleaned up when they are not used within a given + * deadline, configured by [[StreamSubscriptionTimeoutSettings]]. + */ +final class StreamSubscriptionTimeoutSettings(val mode: StreamSubscriptionTimeoutTerminationMode, val timeout: FiniteDuration) { + override def equals(other: Any): Boolean = other match { + case s: StreamSubscriptionTimeoutSettings ⇒ s.mode == mode && s.timeout == timeout + case _ ⇒ false + } + override def toString: String = s"StreamSubscriptionTimeoutSettings($mode,$timeout)" +} +/** + * This mode describes what shall happen when the subscription timeout expires for + * substream Publishers created by operations like `prefixAndTail`. + */ sealed abstract class StreamSubscriptionTimeoutTerminationMode object StreamSubscriptionTimeoutTerminationMode { @@ -374,11 +466,19 @@ object StreamSubscriptionTimeoutTerminationMode { case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode - /** Java API */ - def noop = NoopTermination - /** Java API */ - def warn = WarnTermination - /** Java API */ - def cancel = CancelTermination + /** + * Do not do anything when timeout expires. + */ + def noop: StreamSubscriptionTimeoutTerminationMode = NoopTermination + + /** + * Log a warning when the timeout expires. + */ + def warn: StreamSubscriptionTimeoutTerminationMode = WarnTermination + + /** + * When the timeout expires attach a Subscriber that will immediately cancel its subscription. + */ + def cancel: StreamSubscriptionTimeoutTerminationMode = CancelTermination } diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index b3b3809b68..e2972a2c77 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -74,31 +74,46 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { attributeList.find(c isInstance _).map(c cast _) /** - * Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. + * Scala API: get all attributes of a given type (or subtypes thereof). + */ + def filtered[T <: Attribute: ClassTag]: List[T] = { + val c = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + attributeList.collect { + case a if c.isAssignableFrom(a.getClass) ⇒ c.cast(a) + } + } + + /** + * Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. * If no such attribute exists the `default` value is returned. */ - def get[T <: Attribute: ClassTag](default: T) = + def get[T <: Attribute: ClassTag](default: T): T = getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]], default) /** - * Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. * If no such attribute exists the `default` value is returned. */ - def getFirst[T <: Attribute: ClassTag](default: T) = + def getFirst[T <: Attribute: ClassTag](default: T): T = getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]], default) /** - * Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the last (most specific) attribute of a given type parameter T `Class` or subclass thereof. */ - def get[T <: Attribute: ClassTag] = + def get[T <: Attribute: ClassTag]: Option[T] = getAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]]) /** - * Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. + * Scala API: Get the first (least specific) attribute of a given type parameter T `Class` or subclass thereof. */ - def getFirst[T <: Attribute: ClassTag] = + def getFirst[T <: Attribute: ClassTag]: Option[T] = getFirstAttribute(classTag[T].runtimeClass.asInstanceOf[Class[T]]) + /** + * Test whether the given attribute is contained within this attributes list. + */ + def contains(attr: Attribute): Boolean = attributeList.contains(attr) + /** * Adds given attributes to the end of these attributes. */ @@ -114,9 +129,9 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { Attributes(attributeList :+ other) /** - * INTERNAL API + * Extracts Name attributes and concatenates them. */ - private[akka] def nameLifted: Option[String] = Option(nameOrDefault(null)) + def nameLifted: Option[String] = Option(nameOrDefault(null)) /** * INTERNAL API @@ -157,6 +172,7 @@ object Attributes { /** Use to disable logging on certain operations when configuring [[Attributes.LogLevels]] */ final val Off: Logging.LogLevel = Logging.levelFor("off").get } + final case object AsyncBoundary extends Attribute /** * INTERNAL API @@ -166,6 +182,8 @@ object Attributes { val none: Attributes = Attributes() + val asyncBoundary: Attributes = Attributes(AsyncBoundary) + /** * Specifies the name of the operation. * If the name is null or empty the name is ignored, i.e. [[#none]] is returned. diff --git a/akka-stream/src/main/scala/akka/stream/Fusing.scala b/akka-stream/src/main/scala/akka/stream/Fusing.scala new file mode 100644 index 0000000000..e84dfaece3 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/Fusing.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream + +import java.{ util ⇒ ju } +import scala.collection.immutable +import scala.collection.JavaConverters._ +import akka.stream.impl.StreamLayout._ + +/** + * This class holds some graph transformation functions that can fuse together + * multiple operation stages into synchronous execution islands. The purpose is + * to reduce the number of Actors that are created in order to execute the stream + * and thereby improve start-up cost as well as reduce element traversal latency + * for large graphs. Fusing itself is a time-consuming operation, meaning that + * usually it is best to cache the result of this computation and reuse it instead + * of fusing the same graph many times. + * + * Fusing together all operations which allow this treatment will reduce the + * parallelism that is available in the stream graph’s execution—in the worst case + * it will become single-threaded and not benefit from multiple CPU cores at all. + * Where parallelism is required, the [[akka.stream.Attributes#AsyncBoundary]] + * attribute can be used to declare subgraph boundaries across which the graph + * shall not be fused. + */ +object Fusing { + + /** + * Fuse all operations where this is technically possible (i.e. all + * implementations based on [[akka.stream.stage.GraphStage]]) and not forbidden + * via [[akka.stream.Attributes#AsyncBoundary]]. + */ + def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = + akka.stream.impl.fusing.Fusing.aggressive(g) + + /** + * A fused graph of the right shape, containing a [[FusedModule]] which + * holds more information on the operation structure of the contained stream + * topology for convenient graph traversal. + */ + case class FusedGraph[S <: Shape, M](override val module: FusedModule, + override val shape: S) extends Graph[S, M] { + override def withAttributes(attr: Attributes) = copy(module = module.withAttributes(attr)) + } + + /** + * When fusing a [[Graph]] a part of the internal stage wirings are hidden within + * [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are + * optimized for high-speed execution. This structural information bundle contains + * the wirings in a more accessible form, allowing traversal from port to upstream + * or downstream port and from there to the owning module (or graph vertex). + */ + final case class StructuralInfo(upstreams: immutable.Map[InPort, OutPort], + downstreams: immutable.Map[OutPort, InPort], + inOwners: immutable.Map[InPort, Module], + outOwners: immutable.Map[OutPort, Module]) + +} diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 8bc58d63a9..ca64f92017 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -25,4 +25,6 @@ trait Graph[+S <: Shape, +M] { def withAttributes(attr: Attributes): Graph[S, M] def named(name: String): Graph[S, M] = withAttributes(Attributes.name(name)) + + def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(module.attributes and attr) } diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index a1a983baa5..910a770f48 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -22,6 +22,10 @@ sealed abstract class InPort { self: Inlet[_] ⇒ * INTERNAL API */ private[stream] var id: Int = -1 + /** + * INTERNAL API + */ + private[stream] def inlet: Inlet[_] = this } /** * An output port of a StreamLayout.Module. This type logically belongs @@ -37,6 +41,10 @@ sealed abstract class OutPort { self: Outlet[_] ⇒ * INTERNAL API */ private[stream] var id: Int = -1 + /** + * INTERNAL API + */ + private[stream] def outlet: Outlet[_] = this } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 276f28f7b3..1f19fe22c0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -4,7 +4,7 @@ package akka.stream.impl import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } - +import java.{ util ⇒ ju } import akka.actor._ import akka.event.Logging import akka.dispatch.Dispatchers @@ -15,9 +15,11 @@ import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } import akka.stream.impl.io.SslTlsCipherActor import akka.stream.io.SslTls.TlsModule import org.reactivestreams._ - import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Await, ExecutionContextExecutor } +import akka.stream.impl.fusing.GraphStageModule +import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly +import akka.stream.impl.fusing.Fusing /** * INTERNAL API @@ -71,7 +73,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, override def scheduleOnce(delay: FiniteDuration, task: Runnable) = system.scheduler.scheduleOnce(delay, task)(executionContext) - override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { + override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = { + val runnableGraph = + if (settings.autoFusing) Fusing.aggressive(_runnableGraph) + else _runnableGraph + if (haveShutDown.get()) throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.") if (StreamLayout.Debug) StreamLayout.validate(runnableGraph.module) @@ -85,7 +91,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, name } - override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes): Any = { + override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { def newMaterializationContext() = new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) @@ -93,18 +99,18 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, case sink: SinkModule[_, _] ⇒ val (sub, mat) = sink.create(newMaterializationContext()) assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]]) - mat + matVal.put(atomic, mat) case source: SourceModule[_, _] ⇒ val (pub, mat) = source.create(newMaterializationContext()) assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]]) - mat + matVal.put(atomic, mat) // FIXME: Remove this, only stream-of-stream ops need it case stage: StageModule ⇒ val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) assignPort(stage.inPort, processor) assignPort(stage.outPort, processor) - mat + matVal.put(atomic, mat) case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) @@ -123,24 +129,35 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, assignPort(tls.plainIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn)) assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn)) + matVal.put(atomic, ()) + case graph: GraphModule ⇒ - val calculatedSettings = effectiveSettings(effectiveAttributes) - val (inHandlers, outHandlers, logics, mat) = graph.assembly.materialize(effectiveAttributes) + matGraph(graph, effectiveAttributes, matVal) - val props = ActorGraphInterpreter.props( - graph.assembly, inHandlers, outHandlers, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) + case stage: GraphStageModule ⇒ + val graph = + GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage), + stage.shape, stage.attributes, Array(stage)) + matGraph(graph, effectiveAttributes, matVal) + } + } - val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) - for ((inlet, i) ← graph.shape.inlets.iterator.zipWithIndex) { - val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i) - assignPort(inlet, subscriber) - } - for ((outlet, i) ← graph.shape.outlets.iterator.zipWithIndex) { - val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(i) } - impl ! ActorGraphInterpreter.ExposedPublisher(i, publisher) - assignPort(outlet, publisher) - } - mat + private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { + val calculatedSettings = effectiveSettings(effectiveAttributes) + val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) + + val props = ActorGraphInterpreter.props( + graph.assembly, inHandlers, outHandlers, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) + + val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) + for ((inlet, i) ← graph.shape.inlets.iterator.zipWithIndex) { + val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i) + assignPort(inlet, subscriber) + } + for ((outlet, i) ← graph.shape.outlets.iterator.zipWithIndex) { + val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(i) } + impl ! ActorGraphInterpreter.ExposedPublisher(i, publisher) + assignPort(outlet, publisher) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 89d51285e8..411c5ec6ec 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -42,41 +42,6 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend override def toString: String = name } -/** - * INTERNAL API - */ -private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] { - import ReactiveStreamsCompliance._ - - requireNonNullElement(value) - - private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription { - private[this] var done: Boolean = false - override def cancel(): Unit = done = true - - override def request(elements: Long): Unit = if (!done) { - if (elements < 1) rejectDueToNonPositiveDemand(subscriber) - done = true - try { - tryOnNext(subscriber, value) - tryOnComplete(subscriber) - } catch { - case _: SpecViolation ⇒ // TODO log? - } - } - } - - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = - try { - requireNonNullSubscriber(subscriber) - tryOnSubscribe(subscriber, new SingleElementSubscription(subscriber)) - } catch { - case _: SpecViolation ⇒ // nothing we can do - } - - override def toString: String = name -} - /** * INTERNAL API */ @@ -151,4 +116,3 @@ private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] override def toString: String = "already-subscribed-publisher" } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 69c277fd53..2c2d33b414 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -226,6 +226,8 @@ private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedV }) }, p.future) } + + override def toString: String = "LastOptionStage" } private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { @@ -256,4 +258,6 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV }) }, p.future) } + + override def toString: String = "HeadOptionStage" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 2c5ab5a1dc..ed5e79fc9d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -118,8 +118,6 @@ private[stream] object Stages { symbolicStage.attributes) { } - val identityGraph = SymbolicGraphStage[Any, Any, Any](Identity) - sealed trait SymbolicStage[-In, +Out] { def attributes: Attributes def create(effectiveAttributes: Attributes): Stage[In, Out] @@ -131,14 +129,6 @@ private[stream] object Stages { } - object Identity extends SymbolicStage[Any, Any] { - override val attributes: Attributes = identityOp - - def apply[T]: SymbolicStage[T, T] = this.asInstanceOf[SymbolicStage[T, T]] - - override def create(attr: Attributes): Stage[Any, Any] = fusing.Map(conforms, supervision(attr)) - } - final case class Map[In, Out](f: In ⇒ Out, attributes: Attributes = map) extends SymbolicStage[In, Out] { override def create(attr: Attributes): Stage[In, Out] = fusing.Map(f, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index f35a165fd0..0474aa89fa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -4,16 +4,22 @@ package akka.stream.impl import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } +import java.{ util ⇒ ju } import akka.stream.impl.MaterializerSession.MaterializationPanic import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.scaladsl.Keep import akka.stream._ import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber } -import scala.collection.mutable import scala.util.control.{ NoStackTrace, NonFatal } import akka.event.Logging.simpleName import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ +import akka.stream.impl.fusing.GraphStageModule +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource +import akka.stream.impl.fusing.GraphModule /** * INTERNAL API @@ -23,14 +29,14 @@ private[akka] object StreamLayout { // compile-time constant final val Debug = false - final def validate(m: Module, level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = { + final def validate(m: Module, level: Int = 0, doPrint: Boolean = false, idMap: ju.Map[AnyRef, Integer] = new ju.HashMap): Unit = { val ids = Iterator from 1 def id(obj: AnyRef) = idMap get obj match { - case Some(x) ⇒ x - case None ⇒ + case null ⇒ val x = ids.next() - idMap(obj) = x + idMap.put(obj, x) x + case x ⇒ x } def in(i: InPort) = s"${i.toString}@${id(i)}" def out(o: OutPort) = s"${o.toString}@${id(o)}" @@ -78,7 +84,12 @@ private[akka] object StreamLayout { case Combine(f, left, right) ⇒ atomics(left) ++ atomics(right) } val atomic = atomics(materializedValueComputation) - if ((atomic -- subModules - m).nonEmpty) problems ::= s"computation refers to non-existent modules [${atomic -- subModules - m mkString ","}]" + val graphValues = subModules.flatMap { + case GraphModule(_, _, _, mvids) ⇒ mvids + case _ ⇒ Nil + } + if ((atomic -- subModules -- graphValues - m).nonEmpty) + problems ::= s"computation refers to non-existent modules [${atomic -- subModules -- graphValues - m mkString ","}]" val print = doPrint || problems.nonEmpty @@ -98,10 +109,24 @@ private[akka] object StreamLayout { // TODO: Special case linear composites // TODO: Cycles - sealed trait MaterializedValueNode - case class Combine(f: (Any, Any) ⇒ Any, dep1: MaterializedValueNode, dep2: MaterializedValueNode) extends MaterializedValueNode - case class Atomic(module: Module) extends MaterializedValueNode - case class Transform(f: Any ⇒ Any, dep: MaterializedValueNode) extends MaterializedValueNode + sealed trait MaterializedValueNode { + /* + * These nodes are used in hash maps and therefore must have efficient implementations + * of hashCode and equals. There is no value in allowing aliases to be equal, so using + * reference equality. + */ + override def hashCode: Int = super.hashCode + override def equals(other: Any): Boolean = super.equals(other) + } + case class Combine(f: (Any, Any) ⇒ Any, dep1: MaterializedValueNode, dep2: MaterializedValueNode) extends MaterializedValueNode { + override def toString: String = s"Combine($dep1,$dep2)" + } + case class Atomic(module: Module) extends MaterializedValueNode { + override def toString: String = s"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)})" + } + case class Transform(f: Any ⇒ Any, dep: MaterializedValueNode) extends MaterializedValueNode { + override def toString: String = s"Transform($dep)" + } case object Ignore extends MaterializedValueNode trait Module { @@ -179,7 +204,7 @@ private[akka] object StreamLayout { if (Debug) validate(this) CompositeModule( - subModules = if (this.isSealed) Set(this) else this.subModules, + if (this.isSealed) Set(this) else this.subModules, shape, downstreams, upstreams, @@ -228,6 +253,40 @@ private[akka] object StreamLayout { Attributes.none) } + /** + * Creates a new Module which is `this` Module composed with `that` Module. + * + * The difference to compose(that) is that this version completely ignores the materialized value + * computation of `that` while the normal version executes the computation and discards its result. + * This means that this version must not be used for user-provided `that` modules because users may + * transform materialized values only to achieve some side-effect; it can only be + * used where we know that there is no meaningful computation to be done (like for + * MaterializedValueSource). + * + * @param that a Module to be composed with (cannot be itself) + * @return a Module that represents the composition of `this` and `that` + */ + def composeNoMat(that: Module): Module = { + if (Debug) validate(this) + + require(that ne this, "A module cannot be added to itself. You should pass a separate instance to compose().") + require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.") + + val modules1 = if (this.isSealed) Set(this) else this.subModules + val modules2 = if (that.isSealed) Set(that) else that.subModules + + val matComputation = if (this.isSealed) Atomic(this) else this.materializedValueComputation + + CompositeModule( + modules1 ++ modules2, + AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets), + downstreams ++ that.downstreams, + upstreams ++ that.upstreams, + // would like to optimize away this allocation for Keep.{left,right} but that breaks side-effecting transformations + matComputation, + Attributes.none) + } + /** * Creates a new Module which contains `this` Module * @return a new Module @@ -236,7 +295,7 @@ private[akka] object StreamLayout { if (Debug) validate(this) CompositeModule( - subModules = Set(this), + Set(this), shape, /* * Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the @@ -296,7 +355,9 @@ private[akka] object StreamLayout { override def materializedValueComputation: MaterializedValueNode = Ignore } - final case class CopiedModule(shape: Shape, attributes: Attributes, copyOf: Module) extends Module { + final case class CopiedModule(override val shape: Shape, + override val attributes: Attributes, + copyOf: Module) extends Module { override val subModules: Set[Module] = Set(copyOf) override def withAttributes(attr: Attributes): Module = this.copy(attributes = attr) @@ -316,12 +377,12 @@ private[akka] object StreamLayout { } final case class CompositeModule( - subModules: Set[Module], - shape: Shape, + override val subModules: Set[Module], + override val shape: Shape, override val downstreams: Map[OutPort, InPort], override val upstreams: Map[InPort, OutPort], override val materializedValueComputation: MaterializedValueNode, - attributes: Attributes) extends Module { + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) @@ -340,6 +401,34 @@ private[akka] object StreamLayout { | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} |""".stripMargin } + + final case class FusedModule( + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { + + override def replaceShape(s: Shape): Module = { + shape.requireSamePortsAs(s) + copy(shape = s) + } + + override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this) + + override def withAttributes(attributes: Attributes): FusedModule = copy(attributes = attributes) + + override def toString = + s""" + | Module: ${this.attributes.nameOrDefault("unnamed")} + | Modules: + | ${subModules.iterator.map(m ⇒ m.toString.split("\n").mkString("\n ")).mkString("\n ")} + | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} + | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} + |""".stripMargin + } } private[stream] object VirtualProcessor { @@ -442,152 +531,13 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] { } } -/** - * INTERNAL API - */ -private[stream] final case class MaterializedValueSource[M]( - shape: SourceShape[M] = SourceShape[M](Outlet[M]("Materialized.out")), - attributes: Attributes = Attributes.name("Materialized")) extends StreamLayout.Module { - - override def subModules: Set[Module] = Set.empty - override def withAttributes(attr: Attributes): Module = this.copy(shape = amendShape(attr), attributes = attr) - override def carbonCopy: Module = this.copy(shape = SourceShape(Outlet[M]("Materialized.out"))) - - override def replaceShape(s: Shape): Module = - if (s == shape) this - else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource") - - private def amendShape(attr: Attributes): SourceShape[M] = { - val thisN = attributes.nameOrDefault(null) - val thatN = attr.nameOrDefault(null) - - if ((thatN eq null) || thisN == thatN) shape - else shape.copy(outlet = Outlet(thatN + ".out")) - } -} - -/** - * INTERNAL API - */ -private[stream] object MaterializedValuePublisher { - final val NotRequested = 0 - final val Requested = 1 - final val Completed = 2 - - final val NoValue = new AnyRef -} - -/** - * INTERNAL API - */ -private[stream] class MaterializedValuePublisher extends Publisher[Any] { - import MaterializedValuePublisher._ - - private val value = new AtomicReference[AnyRef](NoValue) - private val registeredSubscriber = new AtomicReference[Subscriber[_ >: Any]](null) - private val requestState = new AtomicInteger(NotRequested) - - private def close(): Unit = { - requestState.set(Completed) - value.set(NoValue) - registeredSubscriber.set(null) - } - - private def tryOrClose(block: ⇒ Unit): Unit = { - try block catch { - case v: ReactiveStreamsCompliance.SpecViolation ⇒ - close() - // What else can we do here? - case NonFatal(e) ⇒ - val sub = registeredSubscriber.get() - if ((sub ne null) && - requestState.compareAndSet(NotRequested, Completed) || requestState.compareAndSet(Requested, Completed)) { - sub.onError(e) - } - close() - throw e - } - } - - def setValue(m: Any): Unit = - tryOrClose { - if (value.compareAndSet(NoValue, m.asInstanceOf[AnyRef]) && requestState.get() == Requested) - pushAndClose(m) - } - - /* - * Both call-sites do a CAS on their "own" side and a GET on the other side. The possible overlaps - * are (removing symmetric cases where you can relabel A->B, B->A): - * - * A-CAS - * A-GET - * B-CAS - * B-GET - pushAndClose fires here - * - * A-CAS - * B-CAS - * A-GET - pushAndClose fires here - * B-GET - pushAndClose fires here - * - * A-CAS - * B-CAS - * B-GET - pushAndClose fires here - * A-GET - pushAndClose fires here - * - * The proof that there are no other cases: - * - * - all permutations of 4 operations are 4! = 24 - * - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings - * - if we don't count cases which are a simple relabeling A->B, B->A, we get 6 / 2 = 3 reorderings - * which are all enumerated above. - * - * pushAndClose protects against double onNext by doing a CAS itself. - */ - private def pushAndClose(m: Any): Unit = { - if (requestState.compareAndSet(Requested, Completed)) { - val sub = registeredSubscriber.get() - ReactiveStreamsCompliance.tryOnNext(sub, m) - ReactiveStreamsCompliance.tryOnComplete(sub) - close() - } - } - - override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = { - tryOrClose { - ReactiveStreamsCompliance.requireNonNullSubscriber(subscriber) - if (registeredSubscriber.compareAndSet(null, subscriber)) { - ReactiveStreamsCompliance.tryOnSubscribe(subscriber, new Subscription { - override def cancel(): Unit = close() - - override def request(n: Long): Unit = { - if (n <= 0) { - ReactiveStreamsCompliance.tryOnError( - subscriber, - ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) - } else { - if (requestState.compareAndSet(NotRequested, Requested)) { - val m = value.get() - if (m ne NoValue) pushAndClose(m) - } - } - } - }) - } else { - if (subscriber == registeredSubscriber.get()) - ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber) - else - ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "MaterializedValuePublisher") - } - } - } - -} - /** * INERNAL API */ private[stream] object MaterializerSession { class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace + + final val Debug = false } /** @@ -596,10 +546,10 @@ private[stream] object MaterializerSession { private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) { import StreamLayout._ - private var subscribersStack: List[mutable.Map[InPort, Subscriber[Any]]] = - mutable.Map.empty[InPort, Subscriber[Any]].withDefaultValue(null) :: Nil - private var publishersStack: List[mutable.Map[OutPort, Publisher[Any]]] = - mutable.Map.empty[OutPort, Publisher[Any]].withDefaultValue(null) :: Nil + private var subscribersStack: List[ju.Map[InPort, Subscriber[Any]]] = + new ju.HashMap[InPort, Subscriber[Any]] :: Nil + private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] = + new ju.HashMap[OutPort, Publisher[Any]] :: Nil /* * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule @@ -611,16 +561,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo */ private var moduleStack: List[Module] = topLevel :: Nil - private def subscribers: mutable.Map[InPort, Subscriber[Any]] = subscribersStack.head - private def publishers: mutable.Map[OutPort, Publisher[Any]] = publishersStack.head + private def subscribers: ju.Map[InPort, Subscriber[Any]] = subscribersStack.head + private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head private def currentLayout: Module = moduleStack.head // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies // of the same module. // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter private def enterScope(enclosing: CopiedModule): Unit = { - subscribersStack ::= mutable.Map.empty.withDefaultValue(null) - publishersStack ::= mutable.Map.empty.withDefaultValue(null) + subscribersStack ::= new ju.HashMap + publishersStack ::= new ju.HashMap moduleStack ::= enclosing.copyOf } @@ -638,11 +588,11 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of // the original module and assign them to the copy ports in the outer scope that we will return to enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { - case (original, exposed) ⇒ assignPort(exposed, scopeSubscribers(original)) + case (original, exposed) ⇒ assignPort(exposed, scopeSubscribers.get(original)) } enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach { - case (original, exposed) ⇒ assignPort(exposed, scopePublishers(original)) + case (original, exposed) ⇒ assignPort(exposed, scopePublishers.get(original)) } } @@ -658,10 +608,10 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo // Cancels all intermediate Publishers and fails all intermediate Subscribers. // (This is an attempt to clean up after an exception during materialization) val errorPublisher = new ErrorPublisher(new MaterializationPanic(cause), "") - for (subMap ← subscribersStack; sub ← subMap.valuesIterator) + for (subMap ← subscribersStack; sub ← subMap.asScala.valuesIterator) errorPublisher.subscribe(sub) - for (pubMap ← publishersStack; pub ← pubMap.valuesIterator) + for (pubMap ← publishersStack; pub ← pubMap.asScala.valuesIterator) pub.subscribe(new CancellingSubscriber) throw cause @@ -671,20 +621,27 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo protected def mergeAttributes(parent: Attributes, current: Attributes): Attributes = parent and current + private val matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = new ju.HashMap + def registerSrc(ms: MaterializedValueSource[Any]): Unit = { + if (MaterializerSession.Debug) println(s"registering source $ms") + matValSrc.get(ms.computation) match { + case null ⇒ matValSrc.put(ms.computation, ms :: Nil) + case xs ⇒ matValSrc.put(ms.computation, ms :: xs) + } + } + protected def materializeModule(module: Module, effectiveAttributes: Attributes): Any = { - val materializedValues = collection.mutable.HashMap.empty[Module, Any] - var materializedValuePublishers: List[MaterializedValuePublisher] = Nil + val materializedValues: ju.Map[Module, Any] = new ju.HashMap for (submodule ← module.subModules) { val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes) submodule match { - case mv: MaterializedValueSource[_] ⇒ - val pub = new MaterializedValuePublisher - materializedValuePublishers ::= pub - materializedValues.put(mv, ()) - assignPort(mv.shape.outlet, pub) + case GraphStageModule(shape, attributes, mv: MaterializedValueSource[_]) ⇒ + val copy = mv.copySrc.asInstanceOf[MaterializedValueSource[Any]] + registerSrc(copy) + materializeAtomic(copy.module, subEffectiveAttributes, materializedValues) case atomic if atomic.isAtomic ⇒ - materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes)) + materializeAtomic(atomic, subEffectiveAttributes, materializedValues) case copied: CopiedModule ⇒ enterScope(copied) materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes)) @@ -694,38 +651,54 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } } - val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) - materializedValuePublishers foreach { pub ⇒ pub.setValue(mat) } - mat + if (MaterializerSession.Debug) { + println("RESOLVING") + println(s" module = $module") + println(s" computation = ${module.materializedValueComputation}") + println(s" matValSrc = $matValSrc") + println(s" matVals = $materializedValues") + } + resolveMaterialized(module.materializedValueComputation, materializedValues, " ") } protected def materializeComposite(composite: Module, effectiveAttributes: Attributes): Any = { materializeModule(composite, effectiveAttributes) } - protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes): Any + protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit - private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, Any]): Any = matNode match { - case Atomic(m) ⇒ materializedValues(m) - case Combine(f, d1, d2) ⇒ f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) - case Transform(f, d) ⇒ f(resolveMaterialized(d, materializedValues)) - case Ignore ⇒ () + private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], indent: String): Any = { + if (MaterializerSession.Debug) println(indent + matNode) + val ret = matNode match { + case Atomic(m) ⇒ matVal.get(m) + case Combine(f, d1, d2) ⇒ f(resolveMaterialized(d1, matVal, indent + " "), resolveMaterialized(d2, matVal, indent + " ")) + case Transform(f, d) ⇒ f(resolveMaterialized(d, matVal, indent + " ")) + case Ignore ⇒ () + } + if (MaterializerSession.Debug) println(indent + s"result = $ret") + matValSrc.remove(matNode) match { + case null ⇒ // nothing to do + case srcs ⇒ + if (MaterializerSession.Debug) println(indent + s"triggering sources $srcs") + srcs.foreach(_.setValue(ret)) + } + ret } final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = { - subscribers(in) = subscriber + subscribers.put(in, subscriber) // Interface (unconnected) ports of the current scope will be wired when exiting the scope if (!currentLayout.inPorts(in)) { - val publisher = publishers(currentLayout.upstreams(in)) + val publisher = publishers.get(currentLayout.upstreams(in)) if (publisher ne null) publisher.subscribe(subscriber) } } final protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = { - publishers(out) = publisher + publishers.put(out, publisher) // Interface (unconnected) ports of the current scope will be wired when exiting the scope if (!currentLayout.outPorts(out)) { - val subscriber = subscribers(currentLayout.downstreams(out)) + val subscriber = subscribers.get(currentLayout.downstreams(out)) if (subscriber ne null) publisher.subscribe(subscriber) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index db97a5f3e3..039d6cc5cf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -100,11 +100,13 @@ private[stream] object Timers { override def toString = "IdleTimeout" } - final class IdleBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { + final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { val in1 = Inlet[I]("in1") val in2 = Inlet[O]("in2") val out1 = Outlet[I]("out1") val out2 = Outlet[O]("out2") + + override def initialAttributes = Attributes.name("IdleTimeoutBidi") val shape = BidiShape(in1, out1, in2, out2) override def toString = "IdleTimeoutBidi" @@ -151,6 +153,7 @@ private[stream] object Timers { final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] { val in: Inlet[T] = Inlet("IdleInject.in") val out: Outlet[T] = Outlet("IdleInject.out") + override def initialAttributes = Attributes.name("DelayInitial") override val shape: FlowShape[T, T] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { @@ -181,6 +184,7 @@ private[stream] object Timers { final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () ⇒ O) extends GraphStage[FlowShape[I, O]] { val in: Inlet[I] = Inlet("IdleInject.in") val out: Outlet[O] = Outlet("IdleInject.out") + override def initialAttributes = Attributes.name("IdleInject") override val shape: FlowShape[I, O] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 4bb626196f..fb6fc6e236 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -21,7 +21,8 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes) extends Module { +private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes, + matValIDs: Array[Module]) extends Module { override def subModules: Set[Module] = Set.empty override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr) @@ -32,6 +33,8 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at override final def replaceShape(newShape: Shape): Module = CopiedModule(newShape, attributes, copyOf = this) + + override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes" } /** @@ -191,8 +194,11 @@ private[stream] object ActorGraphInterpreter { } override def onDownstreamFinish(): Unit = cancel() + + override def toString: String = BatchingActorInputBoundary.this.toString }) + override def toString: String = s"BatchingActorInputBoundary(id=$id, fill=$inputBufferElements/$size, completed=$upstreamCompleted, canceled=$downstreamCanceled)" } private[stream] class ActorOutputBoundary(actor: ActorRef, id: Int) extends DownstreamBoundaryStageLogic[Any] { @@ -243,6 +249,8 @@ private[stream] object ActorGraphInterpreter { override def onUpstreamFinish(): Unit = complete() override def onUpstreamFailure(cause: Throwable): Unit = fail(cause) + + override def toString: String = ActorOutputBoundary.this.toString }) def subscribePending(): Unit = @@ -282,6 +290,7 @@ private[stream] object ActorGraphInterpreter { cancel(in) } + override def toString: String = s"ActorOutputBoundary(id=$id, demand=$downstreamDemand, finished=$downstreamCompleted)" } } @@ -314,10 +323,14 @@ private[stream] class ActorGraphInterpreter( private var subscribesPending = inputs.length - // Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other - // actors. - // TODO: Better heuristic here (take into account buffer size, connection count, 4 events per element, have a max) - val eventLimit = settings.maxInputBufferSize * (inputs.length + outputs.length) * 2 + /* + * Limits the number of events processed by the interpreter before scheduling + * a self-message for fairness with other actors. The basic assumption here is + * to give each input buffer slot a chance to run through the whole pipeline + * and back (for the demand). + */ + val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) + // Limits the number of events processed by the interpreter on an abort event. // TODO: Better heuristic here private val abortLimit = eventLimit * 2 @@ -336,6 +349,7 @@ private[stream] class ActorGraphInterpreter( i += 1 } interpreter.init() + runBatch() } override def receive: Receive = { @@ -349,6 +363,7 @@ private[stream] class ActorGraphInterpreter( outputs(id).requestMore(demand) runBatch() case Resume ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") resumeScheduled = false if (interpreter.isSuspended) runBatch() case AsyncInput(logic, event, handler) ⇒ @@ -372,8 +387,10 @@ private[stream] class ActorGraphInterpreter( inputs(id).onComplete() runBatch() case OnSubscribe(id: Int, subscription: Subscription) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id") subscribesPending -= 1 inputs(id).onSubscribe(subscription) + runBatch() case Cancel(id: Int) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id") outputs(id).cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala new file mode 100644 index 0000000000..d8905f982f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala @@ -0,0 +1,557 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import java.{ util ⇒ ju } +import java.util.Arrays +import scala.collection.immutable +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal +import scala.annotation.tailrec +import akka.stream._ +import akka.stream.Attributes.AsyncBoundary +import akka.stream.Fusing.{ FusedGraph, StructuralInfo } +import akka.stream.stage.GraphStageWithMaterializedValue +import akka.stream.impl.StreamLayout +import akka.stream.impl.StreamLayout._ +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource + +/** + * INTERNAL API + */ +private[stream] object Fusing { + + final val Debug = false + + /** + * Fuse everything that is not forbidden via AsyncBoundary attribute. + */ + def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = { + val struct = new BuildStructuralInfo + /* + * First perform normalization by descending the module tree and recording + * information in the BuildStructuralInfo instance. + */ + val matValue = + try descend(g.module, Attributes.none, struct, struct.newGroup(""), "") + catch { + case NonFatal(ex) ⇒ + if (Debug) struct.dump() + throw ex + } + /* + * Then create a copy of the original Shape with the new copied ports. + */ + val shape = g.shape.copyFromPorts( + struct.newInlets(g.shape.inlets), + struct.newOutlets(g.shape.outlets)).asInstanceOf[S] + /* + * Extract the full topological information from the builder before + * removing assembly-internal (fused) wirings in the next step. + */ + val info = struct.toInfo + /* + * Perform the fusing of `struct.groups` into GraphModules (leaving them + * as they are for non-fusable modules). + */ + struct.breakUpGroupsByDispatcher() + val modules = fuse(struct) + /* + * Now we have everything ready for a FusedModule. + */ + val module = FusedModule( + modules, + shape, + immutable.Map.empty ++ struct.downstreams.asScala, + immutable.Map.empty ++ struct.upstreams.asScala, + matValue, + Attributes.none, + info) + + if (StreamLayout.Debug) validate(module) + if (Debug) println(module) + + FusedGraph(module, shape) + } + + /** + * Take the fusable islands identified by `descend` in the `groups` list + * and execute their fusion; only fusable islands will have multiple modules + * in their set. + */ + private def fuse(struct: BuildStructuralInfo): Set[Module] = + struct.groups.asScala.flatMap { group ⇒ + if (group.size == 0) Nil + else if (group.size == 1) group.iterator.next() :: Nil + else fuseGroup(struct, group) :: Nil + }(collection.breakOut) + + /** + * Transform a set of GraphStageModules into a single GraphModule. This is done + * by performing a traversal of all their Inlets, sorting them into those without + * internal connections (the exposed inlets) and those with internal connections + * (where the corresponding Outlet is recorded in a map so that it will be wired + * to the same slot number in the GraphAssembly). Then all Outlets are traversed, + * completing internal connections using the aforementioned maps and appending + * the others to the list of exposed Outlets. + */ + private def fuseGroup(struct: BuildStructuralInfo, group: ju.Set[Module]): GraphModule = { + val stages = new Array[GraphStageWithMaterializedValue[Shape, Any]](group.size) + val matValIDs = new Array[Module](group.size) + val attributes = new Array[Attributes](group.size) + + /* + * The overall GraphAssembly arrays are constructed in three parts: + * - 1) exposed inputs (ins) + * - 2) connections (ins and outs) + * - 3) exposed outputs (outs) + */ + val insB1, insB2 = new ju.ArrayList[Inlet[_]] + val outsB3 = new ju.ArrayList[Outlet[_]] + val inOwnersB1, inOwnersB2 = new ju.ArrayList[Int] + val outOwnersB3 = new ju.ArrayList[Int] + + // for the shape of the GraphModule + val inlets = new ju.ArrayList[Inlet[_]] + val outlets = new ju.ArrayList[Outlet[_]] + + // connection slots are allocated from the inputs side, outs find their place by this map + val outConns: ju.Map[OutPort, Int] = new ju.HashMap + + /* + * First traverse all Inlets and sort them into exposed and internal, + * taking note of their partner Outlets where appropriate. + */ + var pos = 0 + var it = group.iterator + val ups = struct.upstreams + val downs = struct.downstreams + val outGroup = struct.outGroup + while (it.hasNext) it.next() match { + case copy @ CopiedModule(shape, attr, gsm: GraphStageModule) ⇒ + stages(pos) = gsm.stage + matValIDs(pos) = copy + attributes(pos) = attr and gsm.attributes + + shape.inlets.iterator.zip(gsm.shape.inlets.iterator).foreach { + case (in, orig) ⇒ + val out = ups.get(in) + val internal = (out != null) && (outGroup.get(out) eq group) + if (internal) { + ups.remove(in) + downs.remove(out) + outConns.put(out, insB2.size) + insB2.add(orig) + inOwnersB2.add(pos) + } else { + insB1.add(orig) + inOwnersB1.add(pos) + inlets.add(in) + } + } + + pos += 1 + } + + val outsB2 = new Array[Outlet[_]](insB2.size) + val outOwnersB2 = new Array[Int](insB2.size) + + /* + * Then traverse all Outlets and complete connections. + */ + pos = 0 + it = group.iterator + while (it.hasNext) it.next() match { + case CopiedModule(shape, _, gsm: GraphStageModule) ⇒ + shape.outlets.iterator.zip(gsm.shape.outlets.iterator).foreach { + case (out, orig) ⇒ + if (outConns.containsKey(out)) { + val idx = outConns.remove(out) + outsB2(idx) = orig + outOwnersB2(idx) = pos + } else { + outsB3.add(orig) + outOwnersB3.add(pos) + outlets.add(out) + } + } + pos += 1 + } + + /* + * Now mechanically gather together the GraphAssembly arrays from their various pieces. + */ + + val shape = AmorphousShape(inlets.asScala.to[immutable.Seq], outlets.asScala.to[immutable.Seq]) + + val connStart = insB1.size + val conns = insB2.size + val outStart = connStart + conns + val size = outStart + outsB3.size + + val ins = new Array[Inlet[_]](size) + copyToArray(insB2.iterator, ins, copyToArray(insB1.iterator, ins, 0)) + + val inOwners = new Array[Int](size) + Arrays.fill(inOwners, copyToArray(inOwnersB2.iterator, inOwners, copyToArray(inOwnersB1.iterator, inOwners, 0)), size, -1) + + val outs = new Array[Outlet[_]](size) + System.arraycopy(outsB2, 0, outs, connStart, conns) + copyToArray(outsB3.iterator, outs, outStart) + + val outOwners = new Array[Int](size) + Arrays.fill(outOwners, 0, connStart, -1) + System.arraycopy(outOwnersB2, 0, outOwners, connStart, conns) + copyToArray(outOwnersB3.iterator, outOwners, outStart) + + // FIXME attributes should contain some naming info and async boundary where needed + val firstModule = group.iterator.next() + val async = if (isAsync(firstModule)) Attributes(AsyncBoundary) else Attributes.none + val disp = dispatcher(firstModule) match { + case None ⇒ Attributes.none + case Some(d) ⇒ Attributes(d) + } + val attr = async and disp + + GraphModule(new GraphInterpreter.GraphAssembly(stages, attributes, ins, inOwners, outs, outOwners), shape, attr, matValIDs) + } + + @tailrec private def copyToArray[T](it: ju.Iterator[T], array: Array[T], idx: Int): Int = + if (it.hasNext) { + array(idx) = it.next() + copyToArray(it, array, idx + 1) + } else idx + + /** + * This is a normalization step for the graph that also collects the needed + * information for later fusing. The goal is to transform an arbitrarily deep + * module tree into one that has exactly two levels: all direct submodules are + * CopiedModules where each contains exactly one atomic module. This way all + * modules have their own identity and all necessary port copies have been + * made. The upstreams/downstreams in the BuildStructuralInfo are rewritten + * to point to the shapes of the copied modules. + * + * The materialized value computation is rewritten as well in that all + * leaf nodes point to the copied modules and all nested computations are + * “inlined”, resulting in only one big computation tree for the whole + * normalized overall module. The contained MaterializedValueSource stages + * are also rewritten to point to the copied MaterializedValueNodes. This + * correspondence is then used during materialization to trigger these sources + * when “their” node has received its value. + */ + private def descend(m: Module, + inheritedAttributes: Attributes, + struct: BuildStructuralInfo, + openGroup: ju.Set[Module], + indent: String): MaterializedValueNode = { + def log(msg: String): Unit = println(indent + msg) + val async = m match { + case _: GraphStageModule ⇒ m.attributes.contains(AsyncBoundary) + case _ if m.isAtomic ⇒ true // non-GraphStage atomic or has AsyncBoundary + case _ ⇒ m.attributes.contains(AsyncBoundary) + } + if (Debug) log(s"entering ${m.getClass} (async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})") + val localGroup = + if (async) struct.newGroup(indent) + else openGroup + + if (m.isAtomic) { + if (Debug) log(s"atomic module $m") + struct.addModule(m, localGroup, inheritedAttributes, indent) + } else { + val attributes = inheritedAttributes and m.attributes + m match { + case CopiedModule(shape, _, copyOf) ⇒ + val ret = descend(copyOf, attributes, struct, localGroup, indent + " ") + struct.rewire(copyOf.shape, shape, indent) + ret + case _ ⇒ + // we need to keep track of all MaterializedValueSource nodes that get pushed into the current + // computation context (i.e. that need the same value). + struct.enterMatCtx() + // now descend into submodules and collect their computations (plus updates to `struct`) + val subMat: Predef.Map[Module, MaterializedValueNode] = + m.subModules.map(sub ⇒ sub -> descend(sub, attributes, struct, localGroup, indent + " "))(collection.breakOut) + // we need to remove all wirings that this module copied from nested modules so that we + // don’t do wirings twice + val down = m.subModules.foldLeft(m.downstreams.toSet)((set, m) ⇒ set -- m.downstreams) + down.foreach { + case (start, end) ⇒ struct.wire(start, end, indent) + } + // now rewrite the materialized value computation based on the copied modules and their computation nodes + val newMat = rewriteMat(subMat, m.materializedValueComputation) + // and finally rewire all MaterializedValueSources to their new computation nodes + val matSrcs = struct.exitMatCtx() + matSrcs.foreach { c ⇒ + if (Debug) log(s"materialized value source: ${struct.hash(c)}") + val ms = c.copyOf match { + case g: GraphStageModule ⇒ g.stage.asInstanceOf[MaterializedValueSource[Any]] + } + if (Debug) require(find(ms.computation, m.materializedValueComputation), s"mismatch:\n ${ms.computation}\n ${m.materializedValueComputation}") + val replacement = CopiedModule(c.shape, c.attributes, new MaterializedValueSource[Any](newMat, ms.out).module) + struct.replace(c, replacement, localGroup) + } + // the result for each level is the materialized value computation + newMat + } + } + } + + private def find(node: MaterializedValueNode, inTree: MaterializedValueNode): Boolean = + if (node == inTree) true + else + inTree match { + case Atomic(_) ⇒ false + case Ignore ⇒ false + case Transform(_, dep) ⇒ find(node, dep) + case Combine(_, left, right) ⇒ find(node, left) || find(node, right) + } + + private def rewriteMat(subMat: Predef.Map[Module, MaterializedValueNode], + mat: MaterializedValueNode): MaterializedValueNode = + mat match { + case Atomic(sub) ⇒ subMat(sub) + case Combine(f, left, right) ⇒ Combine(f, rewriteMat(subMat, left), rewriteMat(subMat, right)) + case Transform(f, dep) ⇒ Transform(f, rewriteMat(subMat, dep)) + case Ignore ⇒ Ignore + } + + private implicit class NonNull[T](val x: T) extends AnyVal { + def nonNull(msg: String): T = + if (x != null) x + else throw new IllegalArgumentException("null encountered: " + msg) + } + + /** + * INTERNAL API + * + * Collect structural information about a module tree while descending into + * it and performing normalization. + */ + final class BuildStructuralInfo { + def toInfo: StructuralInfo = + StructuralInfo( + immutable.Map.empty ++ upstreams.asScala, + immutable.Map.empty ++ downstreams.asScala, + immutable.Map.empty ++ inOwners.asScala, + immutable.Map.empty ++ outOwners.asScala) + + /** + * the set of all contained modules + */ + val modules: ju.Set[Module] = new ju.HashSet + + /** + * the list of all groups of modules that are within each async boundary + */ + val groups: ju.Deque[ju.Set[Module]] = new ju.LinkedList + + /** + * Fusable groups may contain modules with differing dispatchers, in which + * case the group needs to be broken up. + */ + def breakUpGroupsByDispatcher(): Unit = { + val newGroups: ju.List[ju.Set[Module]] = new ju.LinkedList + val it = groups.iterator() + while (it.hasNext) { + val group = it.next() + if (group.size > 1) { + val subgroups = group.asScala.groupBy(dispatcher) + if (subgroups.size > 1) { + group.clear() + subgroups.valuesIterator.foreach(g ⇒ newGroups.add(g.asJava)) + } + } + } + groups.addAll(newGroups) + } + + /** + * a mapping from OutPort to its containing group, needed when determining + * whether an upstream connection is internal or not + */ + val outGroup: ju.Map[OutPort, ju.Set[Module]] = new ju.HashMap + + def replace(oldMod: Module, newMod: Module, localGroup: ju.Set[Module]): Unit = { + modules.remove(oldMod) + modules.add(newMod) + localGroup.remove(oldMod) + localGroup.add(newMod) + } + + /** + * A stack of mappings for a given non-copied InPort. + */ + val newIns: ju.Map[InPort, List[InPort]] = new ju.HashMap + /** + * A stack of mappings for a given non-copied OutPort. + */ + val newOuts: ju.Map[OutPort, List[OutPort]] = new ju.HashMap + + private def addMapping[T](orig: T, mapd: T, map: ju.Map[T, List[T]]): Unit = { + if (map.containsKey(orig)) { + map.put(orig, mapd :: map.get(orig)) + } else map.put(orig, mapd :: Nil) + } + + private def removeMapping[T](orig: T, map: ju.Map[T, List[T]]): T = + map.remove(orig) match { + case null ⇒ null.asInstanceOf[T] + case x :: Nil ⇒ x + case x :: xs ⇒ + map.put(orig, xs) + x + } + + /** + * A stack of materialized value sources, grouped by materialized computation context. + */ + private var matSrc: List[List[CopiedModule]] = Nil + + def enterMatCtx(): Unit = matSrc ::= Nil + def exitMatCtx(): List[CopiedModule] = + matSrc match { + case x :: xs ⇒ + matSrc = xs + x + case Nil ⇒ throw new IllegalArgumentException("exitMatCtx with empty stack") + } + def pushMatSrc(m: CopiedModule): Unit = + matSrc match { + case x :: xs ⇒ matSrc = (m :: x) :: xs + case Nil ⇒ throw new IllegalArgumentException("pushMatSrc without context") + } + + /** + * The downstreams relationships of the original module rewritten in terms of + * the copied ports. + */ + val downstreams: ju.Map[OutPort, InPort] = new ju.HashMap + /** + * The upstreams relationships of the original module rewritten in terms of + * the copied ports. + */ + val upstreams: ju.Map[InPort, OutPort] = new ju.HashMap + + /** + * The owner mapping for the copied InPorts. + */ + val inOwners: ju.Map[InPort, Module] = new ju.HashMap + /** + * The owner mapping for the copied OutPorts. + */ + val outOwners: ju.Map[OutPort, Module] = new ju.HashMap + + def dump(): Unit = { + println("StructuralInfo:") + println(" newIns:") + newIns.asScala.foreach { case (k, v) ⇒ println(s" $k (${hash(k)}) -> ${v.map(hash).mkString(",")}") } + println(" newOuts:") + newOuts.asScala.foreach { case (k, v) ⇒ println(s" $k (${hash(k)}) -> ${v.map(hash).mkString(",")}") } + } + + def hash(obj: AnyRef) = f"${System.identityHashCode(obj)}%08x" + def printShape(s: Shape) = s"${s.getClass.getSimpleName}(ins=${s.inlets.map(hash).mkString(",")} outs=${s.outlets.map(hash).mkString(",")})" + + /** + * Create and return a new grouping (i.e. an AsyncBoundary-delimited context) + */ + def newGroup(indent: String): ju.Set[Module] = { + val group = new ju.HashSet[Module] + if (Debug) println(indent + s"creating new group ${hash(group)}") + groups.add(group) + group + } + + /** + * Add a module to the given group, performing normalization (i.e. giving it a unique port identity). + */ + def addModule(m: Module, group: ju.Set[Module], inheritedAttributes: Attributes, indent: String): Atomic = { + val copy = CopiedModule(m.shape.deepCopy(), inheritedAttributes, realModule(m)) + if (Debug) println(indent + s"adding copy ${hash(copy)} ${printShape(copy.shape)} of ${printShape(m.shape)}") + group.add(copy) + modules.add(copy) + copy.shape.outlets.foreach(o ⇒ outGroup.put(o, group)) + val orig1 = m.shape.inlets.iterator + val mapd1 = copy.shape.inlets.iterator + while (orig1.hasNext) { + val orig = orig1.next() + val mapd = mapd1.next() + addMapping(orig, mapd, newIns) + inOwners.put(mapd, copy) + } + val orig2 = m.shape.outlets.iterator + val mapd2 = copy.shape.outlets.iterator + while (orig2.hasNext) { + val orig = orig2.next() + val mapd = mapd2.next() + addMapping(orig, mapd, newOuts) + outOwners.put(mapd, copy) + } + copy.copyOf match { + case GraphStageModule(_, _, _: MaterializedValueSource[_]) ⇒ pushMatSrc(copy) + case _ ⇒ + } + Atomic(copy) + } + + /** + * Record a wiring between two copied ports, using (and reducing) the port + * mappings. + */ + def wire(out: OutPort, in: InPort, indent: String): Unit = { + if (Debug) println(indent + s"wiring $out (${hash(out)}) -> $in (${hash(in)})") + val newOut = removeMapping(out, newOuts) nonNull out.toString + val newIn = removeMapping(in, newIns) nonNull in.toString + downstreams.put(newOut, newIn) + upstreams.put(newIn, newOut) + } + + /** + * Replace all mappings for a given shape with its new (copied) form. + */ + def rewire(oldShape: Shape, newShape: Shape, indent: String): Unit = { + if (Debug) println(indent + s"rewiring ${printShape(oldShape)} -> ${printShape(newShape)}") + oldShape.inlets.iterator.zip(newShape.inlets.iterator).foreach { + case (oldIn, newIn) ⇒ addMapping(newIn, removeMapping(oldIn, newIns) nonNull oldIn.toString, newIns) + } + oldShape.outlets.iterator.zip(newShape.outlets.iterator).foreach { + case (oldOut, newOut) ⇒ addMapping(newOut, removeMapping(oldOut, newOuts) nonNull oldOut.toString, newOuts) + } + } + + /** + * Transform original into copied Inlets. + */ + def newInlets(old: immutable.Seq[Inlet[_]]): immutable.Seq[Inlet[_]] = + old.map(i ⇒ newIns.get(i).head.inlet) + + /** + * Transform original into copied Inlets. + */ + def newOutlets(old: immutable.Seq[Outlet[_]]): immutable.Seq[Outlet[_]] = + old.map(o ⇒ newOuts.get(o).head.outlet) + } + + private def isAsync(m: Module): Boolean = m match { + case CopiedModule(_, inherited, orig) ⇒ + val attr = inherited and orig.attributes + attr.contains(AsyncBoundary) + } + + /** + * Figure out the dispatcher setting of a module. + */ + private def dispatcher(m: Module): Option[ActorAttributes.Dispatcher] = m match { + case CopiedModule(_, inherited, orig) ⇒ + val attr = inherited and orig.attributes + attr.get[ActorAttributes.Dispatcher] + case x ⇒ x.attributes.get[ActorAttributes.Dispatcher] + } + + private def realModule(m: Module): Module = m match { + case CopiedModule(_, _, of) ⇒ realModule(of) + case other ⇒ other + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index ae4f1174cf..16f9abe539 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -4,14 +4,17 @@ package akka.stream.impl.fusing import java.util.Arrays - import akka.event.LoggingAdapter import akka.stream.stage._ import scala.annotation.tailrec import scala.collection.immutable import akka.stream._ +import akka.stream.impl.StreamLayout._ +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal +import java.{ util ⇒ ju } +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource /** * INTERNAL API @@ -113,9 +116,11 @@ private[stream] object GraphInterpreter { * - array of the logics * - materialized value */ - def materialize(inheritedAttributes: Attributes): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic], Any) = { + def materialize(inheritedAttributes: Attributes, + copiedModules: Array[Module], + matVal: ju.Map[Module, Any], + register: MaterializedValueSource[Any] ⇒ Unit): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = { val logics = Array.ofDim[GraphStageLogic](stages.length) - var finalMat: Any = () var i = 0 while (i < stages.length) { @@ -140,10 +145,16 @@ private[stream] object GraphInterpreter { idx += 1 } - // FIXME: Support for materialized values in fused islands is not yet figured out! - val logicAndMat = stages(i).createLogicAndMaterializedValue(inheritedAttributes and originalAttributes(i)) - // FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0. - if (i == 0) finalMat = logicAndMat._2 + val stage = stages(i) match { + case mv: MaterializedValueSource[_] ⇒ + val copy = mv.copySrc.asInstanceOf[MaterializedValueSource[Any]] + register(copy) + copy + case x ⇒ x + } + + val logicAndMat = stage.createLogicAndMaterializedValue(inheritedAttributes and originalAttributes(i)) + matVal.put(copiedModules(i), logicAndMat._2) logics(i) = logicAndMat._1 i += 1 @@ -174,17 +185,17 @@ private[stream] object GraphInterpreter { i += 1 } - (inHandlers, outHandlers, logics, finalMat) + (inHandlers, outHandlers, logics) } override def toString: String = - "GraphAssembly(" + - stages.mkString("[", ",", "]") + ", " + - ins.mkString("[", ",", "]") + ", " + - inOwners.mkString("[", ",", "]") + ", " + - outs.mkString("[", ",", "]") + ", " + - outOwners.mkString("[", ",", "]") + - ")" + "GraphAssembly\n " + + stages.mkString("[", ",", "]") + "\n " + + originalAttributes.mkString("[", ",", "]") + "\n " + + ins.mkString("[", ",", "]") + "\n " + + inOwners.mkString("[", ",", "]") + "\n " + + outs.mkString("[", ",", "]") + "\n " + + outOwners.mkString("[", ",", "]") } object GraphAssembly { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 412105e165..cd170aaa87 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -4,13 +4,31 @@ package akka.stream.impl.fusing import java.util.concurrent.atomic.AtomicBoolean - import akka.actor.Cancellable +import akka.dispatch.ExecutionContexts +import akka.event.Logging import akka.stream._ import akka.stream.stage._ - import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration +import akka.stream.impl.StreamLayout._ +import akka.stream.impl.ReactiveStreamsCompliance + +/** + * INTERNAL API + */ +private[akka] final case class GraphStageModule(shape: Shape, + attributes: Attributes, + stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module { + def carbonCopy: Module = replaceShape(shape.deepCopy()) + + def replaceShape(s: Shape): Module = + CopiedModule(s, Attributes.none, this) + + def subModules: Set[Module] = Set.empty + + def withAttributes(attributes: Attributes): Module = new GraphStageModule(shape, attributes, stage) +} /** * INTERNAL API @@ -21,12 +39,13 @@ object GraphStages { * INERNAL API */ private[stream] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("in") - val out = Outlet[T]("out") + val in = Inlet[T](Logging.simpleName(this) + ".in") + val out = Outlet[T](Logging.simpleName(this) + ".out") override val shape = FlowShape(in, out) } - class Identity[T] extends SimpleLinearGraphStage[T] { + object Identity extends SimpleLinearGraphStage[Any] { + override def initialAttributes = Attributes.name("identityOp") override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler { @@ -41,9 +60,12 @@ object GraphStages { override def toString = "Identity" } - class Detacher[T] extends GraphStage[FlowShape[T, T]] { + def identity[T] = Identity.asInstanceOf[SimpleLinearGraphStage[T]] + + private class Detacher[T] extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("in") val out = Outlet[T]("out") + override def initialAttributes = Attributes.name("Detacher") override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -53,28 +75,29 @@ object GraphStages { override def onPush(): Unit = { if (isAvailable(out)) { push(out, grab(in)) - pull(in) + tryPull(in) } } }) setHandler(out, new OutHandler { override def onPull(): Unit = { - if (!initialized) { - pull(in) - initialized = true - } else if (isAvailable(in)) { + if (isAvailable(in)) { push(out, grab(in)) - if (!hasBeenPulled(in)) pull(in) + tryPull(in) } } }) + override def preStart(): Unit = tryPull(in) } override def toString = "Detacher" } + private val _detacher = new Detacher[Any] + def detacher[T]: GraphStage[FlowShape[T, T]] = _detacher.asInstanceOf[GraphStage[FlowShape[T, T]]] + private object TickSource { class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { private val cancelPromise = Promise[Unit]() @@ -94,6 +117,7 @@ object GraphStages { extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { val out = Outlet[T]("TimerSource.out") + override def initialAttributes = Attributes.name("TickSource") override val shape = SourceShape(out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Cancellable) = { @@ -119,9 +143,54 @@ object GraphStages { override protected def onTimer(timerKey: Any) = if (isAvailable(out)) push(out, tick) + + override def toString: String = "TickSourceLogic" } (logic, cancellable) } + + override def toString: String = "TickSource" + } + + /** + * INTERNAL API. + * + * This source is not reusable, it is only created internally. + */ + private[stream] class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] { + def this(computation: MaterializedValueNode) = this(computation, Outlet[T]("matValue")) + override def initialAttributes: Attributes = Attributes.name("matValueSource") + override val shape = SourceShape(out) + + private val promise = Promise[T] + def setValue(t: T): Unit = promise.success(t) + + def copySrc: MaterializedValueSource[T] = new MaterializedValueSource(computation, out) + + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + setHandler(out, eagerTerminateOutput) + override def preStart(): Unit = { + val cb = getAsyncCallback[T](t ⇒ emit(out, t, () ⇒ completeStage())) + promise.future.foreach(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + } + } + + override def toString: String = s"MatValSrc($computation)" + } + + private[stream] class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { + ReactiveStreamsCompliance.requireNonNullElement(elem) + val out = Outlet[T]("single.out") + val shape = SourceShape(out) + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + setHandler(out, new OutHandler { + override def onPull(): Unit = { + push(out, elem) + completeStage() + } + }) + } + override def toString: String = s"SingleSource($elem)" } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index a818e87275..b650a15045 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -8,6 +8,7 @@ import akka.stream._ import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.stage._ +import java.{ util ⇒ ju } /** * INTERNAL API @@ -136,7 +137,8 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S } val assembly = new GraphAssembly(stages, attributes, ins, inOwners, outs, outOwners) - val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none) + val (inHandlers, outHandlers, logics) = + assembly.materialize(Attributes.none, assembly.stages.map(_.module), new ju.HashMap, _ ⇒ ()) val interpreter = new GraphInterpreter( assembly, NoMaterializer, diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9bfcbfaa0e..b1f14bd228 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -523,6 +523,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut private val in = Inlet[In]("in") private val out = Outlet[Out]("out") + override def initialAttributes = Attributes.name("MapAsync") override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { @@ -600,6 +601,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I private val in = Inlet[In]("in") private val out = Outlet[Out]("out") + override def initialAttributes = Attributes.name("MapAsyncUnordered") override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { @@ -767,6 +769,7 @@ private[stream] object TimerKeys { private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { val in = Inlet[T]("in") val out = Outlet[immutable.Seq[T]]("out") + override def initialAttributes = Attributes.name("GroupedWithin") val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index d243d08b6f..a182cb4fe0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -16,6 +16,8 @@ import scala.concurrent._ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in") private val out = Outlet[T]("flatten.out") + + override def initialAttributes = Attributes.name("FlattenMerge") override val shape = FlowShape(in, out) override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { @@ -187,6 +189,8 @@ private[fusing] object StreamOfStreams { extends GraphStageWithMaterializedValue[SinkShape[T], Future[LocalSinkSubscription]] { private val in = Inlet[T]("LocalSink.in") + + override def initialAttributes = Attributes.name("LocalSink") override val shape = SinkShape(in) override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[LocalSinkSubscription]) = { @@ -213,4 +217,4 @@ private[fusing] object StreamOfStreams { logic -> sub.future } } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 2b091252ef..9928e267f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -13,7 +13,7 @@ import akka.io.Tcp import akka.io.Tcp._ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance -import akka.stream.impl.fusing.GraphStages.Detacher +import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding } import akka.stream.scaladsl.{ BidiFlow, Flow, Tcp ⇒ StreamTcp } import akka.stream.stage.GraphStageLogic.StageActorRef @@ -38,6 +38,7 @@ private[stream] class ConnectionSourceStage(val tcpManager: ActorRef, import ConnectionSourceStage._ val out: Outlet[StreamTcp.IncomingConnection] = Outlet("IncomingConnections.out") + override def initialAttributes = Attributes.name("ConnectionSource") val shape: SourceShape[StreamTcp.IncomingConnection] = SourceShape(out) private val connectionFlowsAwaitingInitialization = new AtomicLong() @@ -100,7 +101,7 @@ private[stream] class ConnectionSourceStage(val tcpManager: ActorRef, val tcpFlow = Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose)) - .via(new Detacher[ByteString]) // must read ahead for proper completions + .via(detacher[ByteString]) // must read ahead for proper completions .mapMaterializedValue { m ⇒ connectionFlowsAwaitingInitialization.decrementAndGet() m @@ -291,6 +292,7 @@ private[stream] class IncomingConnectionStage(connection: ActorRef, remoteAddres val bytesIn: Inlet[ByteString] = Inlet("IncomingTCP.in") val bytesOut: Outlet[ByteString] = Outlet("IncomingTCP.out") + override def initialAttributes = Attributes.name("IncomingConnection") val shape: FlowShape[ByteString, ByteString] = FlowShape(bytesIn, bytesOut) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { @@ -318,6 +320,7 @@ private[stream] class OutgoingConnectionStage(manager: ActorRef, val bytesIn: Inlet[ByteString] = Inlet("IncomingTCP.in") val bytesOut: Outlet[ByteString] = Outlet("IncomingTCP.out") + override def initialAttributes = Attributes.name("OutgoingConnection") val shape: FlowShape[ByteString, ByteString] = FlowShape(bytesIn, bytesOut) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[StreamTcp.OutgoingConnection]) = { @@ -338,4 +341,4 @@ private[stream] class OutgoingConnectionStage(manager: ActorRef, } override def toString = s"TCP-to($remoteAddress)" -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala index c442eefcae..64817557d6 100644 --- a/akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala +++ b/akka-stream/src/main/scala/akka/stream/io/ByteStringParser.scala @@ -15,6 +15,7 @@ abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] private val bytesIn = Inlet[ByteString]("bytesIn") private val objOut = Outlet[T]("objOut") + override def initialAttributes = Attributes.name("ByteStringParser") final override val shape = FlowShape(bytesIn, objOut) class ParsingLogic extends GraphStageLogic(shape) { @@ -116,4 +117,4 @@ object ByteStringParser { else throw NeedMoreData def skipZeroTerminatedString(): Unit = while (readByte() != 0) {} } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 92d68d98cd..ff2ee432ad 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1154,7 +1154,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create(that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 5e17d0c2af..a36b2478fb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -212,5 +212,5 @@ object BidiFlow { * the *joint* frequencies of the elements in both directions. */ def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] = - fromGraph(new Timers.IdleBidi(timeout)) + fromGraph(new Timers.IdleTimeoutBidi(timeout)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 57ddcaa37b..827b76ba88 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -19,8 +19,6 @@ import scala.concurrent.Future import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.language.higherKinds import akka.stream.impl.fusing.FlattenMerge -import akka.stream.impl.SubFlowImpl -import akka.stream.impl.fusing.GraphInterpreter /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -36,7 +34,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) override type Closed = Sink[In @uncheckedVariance, Mat @uncheckedVariance] override type ClosedMat[+M] = Sink[In @uncheckedVariance, M] - private[stream] def isIdentity: Boolean = this.module eq Stages.identityGraph.module + private[stream] def isIdentity: Boolean = this.module eq GraphStages.Identity.module override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) @@ -248,7 +246,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) } object Flow { - private[this] val identity: Flow[Any, Any, Unit] = new Flow[Any, Any, Unit](SymbolicGraphStage(Stages.Identity).module) + private[this] val identity: Flow[Any, Any, Unit] = new Flow[Any, Any, Unit](GraphStages.Identity.module) /** * Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index c81dab74c1..c20052b4a0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -11,6 +11,7 @@ import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec import scala.collection.immutable +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource object Merge { /** @@ -39,6 +40,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte require(inputPorts > 1, "A Merge must have more than 1 input port") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i)) val out: Outlet[T] = Outlet[T]("Merge.out") + override def initialAttributes = Attributes.name("Merge") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -53,6 +55,8 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte private def pending: Boolean = pendingHead != pendingTail + override def preStart(): Unit = in.foreach(tryPull) + private def enqueue(in: Inlet[T]): Unit = { pendingQueue(pendingTail % inputPorts) = in pendingTail += 1 @@ -91,10 +95,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte setHandler(out, new OutHandler { override def onPull(): Unit = { - if (!initialized) { - initialized = true - in.foreach(tryPull) - } else if (pending) + if (pending) dequeueAndDispatch() } }) @@ -141,6 +142,8 @@ object MergePreferred { */ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports") + + override def initialAttributes = Attributes.name("MergePreferred") override val shape: MergePreferred.MergePreferredShape[T] = new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred") @@ -396,6 +399,7 @@ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) ext require(outputPorts > 1, "A Broadcast must have more than 1 output ports") val in: Inlet[T] = Inlet[T]("Broadast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i)) + override def initialAttributes = Attributes.name("Broadcast") override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -494,6 +498,7 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext require(outputPorts > 1, "A Balance must have more than 1 output ports") val in: Inlet[T] = Inlet[T]("Balance.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) + override def initialAttributes = Attributes.name("Balance") override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -660,6 +665,7 @@ final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T require(inputPorts > 1, "A Concat must have more than 1 input ports") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Concat.in" + i)) val out: Outlet[T] = Outlet[T]("Concat.out") + override def initialAttributes = Attributes.name("Concat") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { @@ -694,6 +700,8 @@ final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T override def onPull() = pull(in(activeStream)) }) } + + override def toString: String = s"Concat($inputPorts)" } object GraphDSL extends GraphApply { @@ -760,9 +768,9 @@ object GraphDSL extends GraphApply { * @return The outlet that will emit the materialized value. */ def materializedValue: Outlet[M @uncheckedVariance] = { - val module = new MaterializedValueSource[Any] - moduleInProgress = moduleInProgress.compose(module) - module.shape.outlet.asInstanceOf[Outlet[M]] + val source = new MaterializedValueSource[M](moduleInProgress.materializedValueComputation) + moduleInProgress = moduleInProgress.composeNoMat(source.module) + source.out } private[stream] def deprecatedAndThen(port: OutPort, op: StageModule): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala index 61cbb3df0c..a198d39389 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala @@ -31,6 +31,8 @@ object One2OneBidiFlow { val inOut = Outlet[I]("inOut") val outIn = Inlet[O]("outIn") val outOut = Outlet[O]("outOut") + + override def initialAttributes = Attributes.name("One2OneBidi") val shape = BidiShape(inIn, inOut, outIn, outOut) override def toString = "One2OneBidi" diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 9b96bb8d00..8d0d943dc3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -8,18 +8,20 @@ import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.{ DefaultAttributes, StageModule } import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.fusing.GraphStages.TickSource +import akka.stream.impl.fusing.GraphStages +import akka.stream.impl.fusing.GraphStages._ import akka.stream.impl.io.{ OutputStreamSourceStage, InputStreamSource, FileSource } import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } import akka.stream.{ Outlet, SourceShape, _ } import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec +import scala.annotation.unchecked.uncheckedVariance +import scala.language.higherKinds import scala.collection.immutable import scala.concurrent.duration.{ FiniteDuration, _ } import scala.concurrent.{ Future, Promise } -import scala.language.higherKinds -import scala.annotation.unchecked.uncheckedVariance +import akka.stream.impl.fusing.Buffer /** * A `Source` is a set of stream processing steps that has one open output. It can comprise @@ -41,7 +43,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { - if (flow.module eq Stages.identityGraph.module) this.asInstanceOf[Source[T, Mat3]] + if (flow.module eq GraphStages.Identity.module) this.asInstanceOf[Source[T, Mat3]] else { val flowCopy = flow.module.carbonCopy new Source( @@ -191,7 +193,7 @@ object Source { * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = - Source.single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) + single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -200,11 +202,7 @@ object Source { * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T, Unit] = - new Source( - new PublisherSource( - SingleElementPublisher(future, "FutureSource"), - DefaultAttributes.futureSource, - shape("FutureSource"))).mapAsyncUnordered(1)(ConstantFun.scalaIdentityFunction) + single(future).mapAsyncUnordered(1)(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.futureSource) /** * Elements are emitted periodically with the specified interval. @@ -221,28 +219,19 @@ object Source { * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, Unit] = - new Source( - new PublisherSource( - SingleElementPublisher(element, "SingleSource"), - DefaultAttributes.singleSource, - shape("SingleSource"))) + fromGraph(new GraphStages.SingleSource(element).withAttributes(DefaultAttributes.singleSource)) /** * Create a `Source` that will continually emit the given element. */ - def repeat[T](element: T): Source[T, Unit] = { - ReactiveStreamsCompliance.requireNonNullElement(element) - new Source( - new PublisherSource( - SingleElementPublisher( - new immutable.Iterable[T] { - override val iterator: Iterator[T] = Iterator.continually(element) + def repeat[T](element: T): Source[T, Unit] = + single(new immutable.Iterable[T] { + override val iterator: Iterator[T] = Iterator.continually(element) - override def toString: String = "repeat(" + element + ")" - }, "RepeatSource"), - DefaultAttributes.repeat, - shape("RepeatSource"))).mapConcat(ConstantFun.scalaIdentityFunction) - } + override def toString: String = "repeat(" + element + ")" + }) + .mapConcat(ConstantFun.scalaIdentityFunction) + .withAttributes(DefaultAttributes.repeat) /** * Create a `Source` that will unfold a value of type `S` into @@ -296,7 +285,7 @@ object Source { val init = Source.single(s) init ~> cnct ~> uzip.in - cnct <~ uzip.out0 + cnct <~ Flow[S].buffer(2, OverflowStrategy.backpressure) <~ uzip.out0 SourceShape(uzip.out1) }).withAttributes(DefaultAttributes.unfoldInf) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 0a182d4828..79726c97f1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -9,7 +9,7 @@ import akka.actor._ import akka.io.Inet.SocketOption import akka.io.{ IO, Tcp ⇒ IoTcp } import akka.stream._ -import akka.stream.impl.fusing.GraphStages.Detacher +import akka.stream.impl.fusing.GraphStages.detacher import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage } import akka.util.ByteString @@ -166,7 +166,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { localAddress, options, halfClose, - connectTimeout)).via(new Detacher[ByteString]) // must read ahead for proper completions + connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions idleTimeout match { case d: FiniteDuration ⇒ tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d)) @@ -182,4 +182,3 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = outgoingConnection(new InetSocketAddress(host, port)) } - diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index e326e6f057..a6f4c5854b 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -3,7 +3,8 @@ */ package akka.stream.stage -import java.util.concurrent.atomic.AtomicReference +import java.util +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference } import akka.actor._ import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch } @@ -11,7 +12,7 @@ import akka.event.LoggingAdapter import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly -import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule } +import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule, GraphStageModule } import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName } import scala.annotation.tailrec @@ -23,16 +24,10 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) - final override private[stream] lazy val module: Module = - GraphModule( - GraphAssembly(shape.inlets, shape.outlets, this), - shape, - Attributes.none) + protected def initialAttributes: Attributes = Attributes.none + + final override private[stream] lazy val module: Module = GraphStageModule(shape, initialAttributes, this) - /** - * This method throws an [[UnsupportedOperationException]] by default. The subclass can override this method - * and provide a correct implementation that creates an exact copy of the stage with the provided new attributes. - */ final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] { override def shape = GraphStageWithMaterializedValue.this.shape override private[stream] def module = GraphStageWithMaterializedValue.this.module.withAttributes(attr) diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index c2c7a49da5..0de0e40974 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -167,16 +167,16 @@ private[stream] object AbstractStage { extends GraphStageWithMaterializedValue[FlowShape[In, Out], Mat] { val name = stageAttributes.nameOrDefault() + override def initialAttributes = stageAttributes val shape = FlowShape(Inlet[In](name + ".in"), Outlet[Out](name + ".out")) override def toString = name override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Mat) = { - val effectiveAttributes = inheritedAttributes and stageAttributes - val stageAndMat = factory(effectiveAttributes) + val stageAndMat = factory(inheritedAttributes) val stage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = stageAndMat._1.asInstanceOf[AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext]] - (new PushPullGraphLogic(shape, effectiveAttributes, stage), stageAndMat._2) + (new PushPullGraphLogic(shape, inheritedAttributes, stage), stageAndMat._2) } }