From dc07fd250cd8872b74f145c64439448f42238cd3 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 8 Oct 2015 12:12:35 +0200 Subject: [PATCH] !str make Inlet/Outlet invariant and add Java variance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This necessitates the removal of method overloading in the Java Graph DSL: the to() and via() methods were not otherwise resolved correctly by javac, leading to incomprehensible error messages. The new approach is to offer just one way of doing things which is a bit more verbose but should be easier to read and learn. In this vein auto-importing while using the DSL is also gone for Java—not sure about Scala yet. --- .../DebuggingDirectivesExamplesSpec.scala | 4 +- .../RangeDirectivesExamplesSpec.scala | 1 - .../code/docs/stream/BidiFlowDocSpec.scala | 8 +- .../scala/code/docs/stream/FlexiDocSpec.scala | 291 ------- .../code/docs/stream/FlowGraphDocSpec.scala | 3 +- .../StreamPartialFlowGraphDocSpec.scala | 2 +- .../client/OutgoingConnectionBlueprint.scala | 2 +- .../akka/stream/javadsl/BidiFlowTest.java | 14 +- .../akka/stream/javadsl/FlowGraphTest.java | 6 +- .../java/akka/stream/javadsl/FlowTest.java | 4 +- .../test/scala/akka/stream/io/TlsSpec.scala | 10 +- .../stream/scaladsl/GraphFlexiMergeSpec.scala | 765 ------------------ .../stream/scaladsl/GraphFlexiRouteSpec.scala | 484 ----------- .../scala/akka/stream/scaladsl/SinkSpec.scala | 4 +- .../akka/stream/FanInShape.scala.template | 49 +- .../akka/stream/FanOutShape.scala.template | 21 +- .../src/main/scala/akka/stream/Shape.scala | 30 +- .../stream/impl/io/SslTlsCipherActor.scala | 4 +- .../main/scala/akka/stream/io/SslTls.scala | 2 +- .../main/scala/akka/stream/javadsl/Flow.scala | 2 +- .../scala/akka/stream/javadsl/Graph.scala | 39 +- .../akka/stream/scaladsl/FlexiRoute.scala | 3 +- .../scala/akka/stream/scaladsl/Graph.scala | 12 +- 23 files changed, 111 insertions(+), 1649 deletions(-) delete mode 100644 akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala index c90fbd8936..7d0ba017ca 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala @@ -6,8 +6,8 @@ package docs.http.scaladsl.server package directives import akka.event.Logging -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} -import akka.http.scaladsl.server.directives.{DebuggingDirectives, LogEntry, LoggingMagnet} +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.http.scaladsl.server.directives.{ DebuggingDirectives, LogEntry, LoggingMagnet } class DebuggingDirectivesExamplesSpec extends RoutingSpec { "logRequest-0" in { diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala index 9d5a35e15d..c2933e208c 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala @@ -30,7 +30,6 @@ class RangeDirectivesExamplesSpec extends RoutingSpec { responseAs[String] shouldEqual "DE" } - // we set "akka.http.routing.range-coalescing-threshold = 2" // above to make sure we get two BodyParts Get() ~> addHeader(Range(ByteRange(0, 1), ByteRange(1, 2), ByteRange(6, 7))) ~> route ~> check { diff --git a/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala index eeed87ed8a..9a10eddb48 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala @@ -50,7 +50,7 @@ object BidiFlowDocSpec { // construct and add the bottom flow, going inbound val inbound = b.add(Flow[ByteString].map(fromBytes)) // fuse them together into a BidiShape - BidiShape(outbound, inbound) + BidiShape.fromFlows(outbound, inbound) } // this is the same as the above @@ -112,18 +112,18 @@ object BidiFlowDocSpec { val outbound = b.add(Flow[ByteString].map(addLengthHeader)) val inbound = b.add(Flow[ByteString].transform(() => new FrameParser)) - BidiShape(outbound, inbound) + BidiShape.fromFlows(outbound, inbound) } //#framing val chopUp = BidiFlow() { b => val f = Flow[ByteString].mapConcat(_.map(ByteString(_))) - BidiShape(b.add(f), b.add(f)) + BidiShape.fromFlows(b.add(f), b.add(f)) } val accumulate = BidiFlow() { b => val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _)) - BidiShape(b.add(f), b.add(f)) + BidiShape.fromFlows(b.add(f), b.add(f)) } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala deleted file mode 100644 index 64da45ea57..0000000000 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package docs.stream - -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.testkit.AkkaSpec -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.Attributes - -object FlexiDocSpec { - //#fleximerge-zip-states - //#fleximerge-zip-readall - import akka.stream.FanInShape._ - class ZipPorts[A, B](_init: Init[(A, B)] = Name("Zip")) - extends FanInShape[(A, B)](_init) { - val left = newInlet[A]("left") - val right = newInlet[B]("right") - protected override def construct(i: Init[(A, B)]) = new ZipPorts(i) - } - //#fleximerge-zip-readall - //#fleximerge-zip-states -} - -class FlexiDocSpec extends AkkaSpec { - import FlexiDocSpec._ - - implicit val ec = system.dispatcher - implicit val mat = ActorMaterializer() - - "implement zip using readall" in { - //#fleximerge-zip-readall - class Zip[A, B] extends FlexiMerge[(A, B), ZipPorts[A, B]]( - new ZipPorts, Attributes.name("Zip1State")) { - import FlexiMerge._ - override def createMergeLogic(p: PortT) = new MergeLogic[(A, B)] { - override def initialState = - State(ReadAll(p.left, p.right)) { (ctx, _, inputs) => - val a = inputs(p.left) - val b = inputs(p.right) - ctx.emit((a, b)) - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - //#fleximerge-zip-readall - - //format: OFF - val res = - //#fleximerge-zip-connecting - FlowGraph.closed(Sink.head[(Int, String)]) { implicit b => - o => - import FlowGraph.Implicits._ - - val zip = b.add(new Zip[Int, String]) - - Source.single(1) ~> zip.left - Source.single("1") ~> zip.right - zip.out ~> o.inlet - } - //#fleximerge-zip-connecting - .run() - //format: ON - - Await.result(res, 300.millis) should equal((1, "1")) - } - - "implement zip using two states" in { - //#fleximerge-zip-states - class Zip[A, B] extends FlexiMerge[(A, B), ZipPorts[A, B]]( - new ZipPorts, Attributes.name("Zip2State")) { - import FlexiMerge._ - - override def createMergeLogic(p: PortT) = new MergeLogic[(A, B)] { - var lastInA: A = _ - - val readA: State[A] = State[A](Read(p.left)) { (ctx, input, element) => - lastInA = element - readB - } - - val readB: State[B] = State[B](Read(p.right)) { (ctx, input, element) => - ctx.emit((lastInA, element)) - readA - } - - override def initialState: State[_] = readA - - override def initialCompletionHandling = eagerClose - } - } - //#fleximerge-zip-states - - val res = FlowGraph.closed(Sink.head[(Int, String)]) { implicit b => - o => - import FlowGraph.Implicits._ - - val zip = b.add(new Zip[Int, String]) - - Source(1 to 2) ~> zip.left - Source((1 to 2).map(_.toString)) ~> zip.right - zip.out ~> o.inlet - }.run() - - Await.result(res, 300.millis) should equal((1, "1")) - } - - "fleximerge completion handling" in { - import FanInShape._ - //#fleximerge-completion - class ImportantWithBackupShape[A](_init: Init[A] = Name("Zip")) - extends FanInShape[A](_init) { - val important = newInlet[A]("important") - val replica1 = newInlet[A]("replica1") - val replica2 = newInlet[A]("replica2") - protected override def construct(i: Init[A]) = - new ImportantWithBackupShape(i) - } - class ImportantWithBackups[A] extends FlexiMerge[A, ImportantWithBackupShape[A]]( - new ImportantWithBackupShape, Attributes.name("ImportantWithBackups")) { - import FlexiMerge._ - - override def createMergeLogic(p: PortT) = new MergeLogic[A] { - import p.important - override def initialCompletionHandling = - CompletionHandling( - onUpstreamFinish = (ctx, input) => input match { - case `important` => - log.info("Important input completed, shutting down.") - ctx.finish() - SameState - - case replica => - log.info("Replica {} completed, " + - "no more replicas available, " + - "applying eagerClose completion handling.", replica) - - ctx.changeCompletionHandling(eagerClose) - SameState - }, - onUpstreamFailure = (ctx, input, cause) => input match { - case `important` => - ctx.fail(cause) - SameState - - case replica => - log.error(cause, "Replica {} failed, " + - "no more replicas available, " + - "applying eagerClose completion handling.", replica) - - ctx.changeCompletionHandling(eagerClose) - SameState - }) - - override def initialState = - State[A](ReadAny(p.important, p.replica1, p.replica2)) { - (ctx, input, element) => - ctx.emit(element) - SameState - } - } - } - //#fleximerge-completion - - FlowGraph.closed() { implicit b => - import FlowGraph.Implicits._ - val importantWithBackups = b.add(new ImportantWithBackups[Int]) - Source.single(1) ~> importantWithBackups.important - Source.single(2) ~> importantWithBackups.replica1 - Source.failed[Int](new Exception("Boom!") with NoStackTrace) ~> importantWithBackups.replica2 - importantWithBackups.out ~> Sink.ignore - }.run() - } - - "flexi preferring merge" in { - import FanInShape._ - //#flexi-preferring-merge-ports - class PreferringMergeShape[A](_init: Init[A] = Name("PreferringMerge")) - extends FanInShape[A](_init) { - val preferred = newInlet[A]("preferred") - val secondary1 = newInlet[A]("secondary1") - val secondary2 = newInlet[A]("secondary2") - protected override def construct(i: Init[A]) = new PreferringMergeShape(i) - } - //#flexi-preferring-merge-ports - - //#flexi-preferring-merge - - class PreferringMerge extends FlexiMerge[Int, PreferringMergeShape[Int]]( - new PreferringMergeShape, Attributes.name("ImportantWithBackups")) { - import akka.stream.scaladsl.FlexiMerge._ - - override def createMergeLogic(p: PortT) = new MergeLogic[Int] { - override def initialState = - State[Int](ReadPreferred(p.preferred, p.secondary1, p.secondary2)) { - (ctx, input, element) => - ctx.emit(element) - SameState - } - } - } - //#flexi-preferring-merge - } - - "flexi route" in { - //#flexiroute-unzip - import FanOutShape._ - class UnzipShape[A, B](_init: Init[(A, B)] = Name[(A, B)]("Unzip")) - extends FanOutShape[(A, B)](_init) { - val outA = newOutlet[A]("outA") - val outB = newOutlet[B]("outB") - protected override def construct(i: Init[(A, B)]) = new UnzipShape(i) - } - class Unzip[A, B] extends FlexiRoute[(A, B), UnzipShape[A, B]]( - new UnzipShape, Attributes.name("Unzip")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT) = new RouteLogic[(A, B)] { - override def initialState = - State[Any](DemandFromAll(p.outA, p.outB)) { - (ctx, _, element) => - val (a, b) = element - ctx.emit(p.outA)(a) - ctx.emit(p.outB)(b) - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - //#flexiroute-unzip - } - - "flexi route completion handling" in { - import FanOutShape._ - //#flexiroute-completion - class ImportantRouteShape[A](_init: Init[A] = Name[A]("ImportantRoute")) - extends FanOutShape[A](_init) { - val important = newOutlet[A]("important") - val additional1 = newOutlet[A]("additional1") - val additional2 = newOutlet[A]("additional2") - protected override def construct(i: Init[A]) = new ImportantRouteShape(i) - } - class ImportantRoute[A] extends FlexiRoute[A, ImportantRouteShape[A]]( - new ImportantRouteShape, Attributes.name("ImportantRoute")) { - import FlexiRoute._ - override def createRouteLogic(p: PortT) = new RouteLogic[A] { - import p.important - private val select = (p.important | p.additional1 | p.additional2) - - override def initialCompletionHandling = - CompletionHandling( - // upstream: - onUpstreamFinish = (ctx) => (), - onUpstreamFailure = (ctx, thr) => (), - // downstream: - onDownstreamFinish = (ctx, output) => output match { - case `important` => - // finish all downstreams, and cancel the upstream - ctx.finish() - SameState - case _ => - SameState - }) - - override def initialState = - State(DemandFromAny(p.important, p.additional1, p.additional2)) { - (ctx, output, element) => - ctx.emit(select(output))(element) - SameState - } - } - } - //#flexiroute-completion - - FlowGraph.closed() { implicit b => - import FlowGraph.Implicits._ - val route = b.add(new ImportantRoute[Int]) - Source.single(1) ~> route.in - route.important ~> Sink.ignore - route.additional1 ~> Sink.ignore - route.additional2 ~> Sink.ignore - }.run() - } - -} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 80258df307..0ea408237d 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -142,8 +142,7 @@ class FlowGraphDocSpec extends AkkaSpec { assert(inlets.size == this.inlets.size) assert(outlets.size == this.outlets.size) // This is why order matters when overriding inlets and outlets. - // The "[Nothing, Any]" is equivalent to casting the Inlets/Outlets. - PriorityWorkerPoolShape[Nothing, Any](inlets(0), inlets(1), outlets(0)) + PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out]) } } //#flow-graph-components-shape diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 3826ba0a00..5e228210d2 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -117,7 +117,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { val sendRmotely = Sink.actorRef(actorRef, "Done") val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ()) - val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast(_)) + val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_)) Source(List(0, 1, 2)).runWith(sink) //#sink-combine diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 3e9631d5b0..2a31f46198 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -96,7 +96,7 @@ private[http] object OutgoingConnectionBlueprint { responseParsingMerge.out ~> responsePrep ~> terminationFanout.in terminationFanout.out(0) ~> terminationMerge.in1 - BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse]( + BidiShape( methodBypassFanout.in, wrapTls.outlet, unwrapTls.inlet, diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 5dc2b093b8..ed105ff4e8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -94,7 +94,7 @@ public class BidiFlowTest extends StreamTest { @Override public BidiShape apply(Builder> b, SinkShape sink) throws Exception { - b.from(Source.single(42)).to(sink); + b.from(b.graph(Source.single(42))).to(sink); final FlowShape top = b.graph(Flow . empty().map(new Function() { @Override @@ -136,9 +136,9 @@ public class BidiFlowTest extends StreamTest { SinkShape sb) throws Exception { final BidiShape s = b .graph(bidi); - b.from(Source.single(1)).to(s.in1()); + b.from(b.graph(Source.single(1))).toInlet(s.in1()); b.from(s.out1()).to(st); - b.from(Source.single(bytes)).to(s.in2()); + b.from(b.graph(Source.single(bytes))).toInlet(s.in2()); b.from(s.out2()).to(sb); } }).run(materializer); @@ -217,8 +217,8 @@ public class BidiFlowTest extends StreamTest { return ByteString.fromString("Hello " + arg); } })); - b.from(shape.out2()).via(left).to(shape.in1()) - .from(shape.out1()).via(right).to(shape.in2()); + b.from(shape.out2()).via(left).toInlet(shape.in1()) + .from(shape.out1()).via(right).toInlet(shape.in2()); } }).run(materializer); assertEquals((Integer) 42, Await.result(f, oneSec)); @@ -241,8 +241,8 @@ public class BidiFlowTest extends StreamTest { } })); b.from(bcast).to(sink) - .from(Source.single(1)).via(bcast).to(merge) - .from(flow).to(merge); + .from(b.graph(Source.single(1))).viaFanOut(bcast).toFanIn(merge) + .from(flow).toFanIn(merge); return new Pair, Outlet>(flow.inlet(), merge.out()); } }); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 72d1a1ae87..94b792b27a 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -287,12 +287,12 @@ public class FlowGraphTest extends StreamTest { final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2>, SinkShape>() { @Override public void apply(Builder> b, SinkShape out) throws Exception { - b.from(Source.single(1)).to(out); - b.from(b.materializedValue()).to(Sink.foreach(new Procedure>(){ + b.from(b.graph(Source.single(1))).to(out); + b.from(b.materializedValue()).to(b.graph(Sink.foreach(new Procedure>(){ public void apply(Future mat) throws Exception { Patterns.pipe(mat, system.dispatcher()).to(probe.ref()); } - })); + }))); } }).run(materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 5112edcede..d2a5879fa0 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -646,8 +646,8 @@ public class FlowTest extends StreamTest { public Inlet apply(Builder b) throws Exception { final UniformFanOutShape broadcast = b.graph(Broadcast.create(2, true)); - b.from(broadcast.out(0)).to(out1); - b.from(broadcast.out(1)).to(out2); + b.from(broadcast.out(0)).to(b.graph(out1)); + b.from(broadcast.out(1)).to(b.graph(out2)); return broadcast.in(); } }); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 3d6bcf510d..85cdc41f0d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -381,9 +381,9 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off "emit an error if the TLS handshake fails certificate checks" in assertAllStagesStopped { val getError = Flow[SslTlsInbound] - .map[Either[SslTlsInbound, SSLException]](i => Left(i)) - .recover { case e: SSLException => Right(e) } - .collect { case Right(e) => e }.toMat(Sink.head)(Keep.right) + .map[Either[SslTlsInbound, SSLException]](i ⇒ Left(i)) + .recover { case e: SSLException ⇒ Right(e) } + .collect { case Right(e) ⇒ e }.toMat(Sink.head)(Keep.right) val simple = Flow.wrap(getError, Source.lazyEmpty[SslTlsOutbound])(Keep.left) @@ -399,8 +399,8 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off val clientErr = simple.join(badClientTls(IgnoreBoth)) .join(Tcp().outgoingConnection(Await.result(server, 1.second).localAddress)).run() - Await.result(serverErr.flatMap(identity), 1.second).getMessage should include ("certificate_unknown") - Await.result(clientErr, 1.second).getMessage should equal ("General SSLEngine problem") + Await.result(serverErr.flatMap(identity), 1.second).getMessage should include("certificate_unknown") + Await.result(clientErr, 1.second).getMessage should equal("General SSLEngine problem") } "reliably cancel subscriptions when TransportIn fails early" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala deleted file mode 100644 index c92fb93119..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ /dev/null @@ -1,765 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.scaladsl - -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.FlexiMerge._ -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl._ -import akka.stream.testkit.Utils._ -import org.reactivestreams.Publisher -import akka.stream._ -import scala.util.control.NoStackTrace -import scala.collection.immutable -import akka.actor.ActorRef -import akka.testkit.TestProbe -import scala.concurrent.Await -import scala.concurrent.duration._ - -object GraphFlexiMergeSpec { - - class Fair[T] extends FlexiMerge[T, UniformFanInShape[T, T]](new UniformFanInShape(2), Attributes.name("FairMerge")) { - def createMergeLogic(p: PortT): MergeLogic[T] = new MergeLogic[T] { - override def initialState = State[T](ReadAny(p.in(0), p.in(1))) { (ctx, input, element) ⇒ - ctx.emit(element) - SameState - } - } - } - - class StrictRoundRobin[T] extends FlexiMerge[T, UniformFanInShape[T, T]](new UniformFanInShape(2), Attributes.name("RoundRobinMerge")) { - def createMergeLogic(p: PortT): MergeLogic[T] = new MergeLogic[T] { - val emitOtherOnClose = CompletionHandling( - onUpstreamFinish = { (ctx, input) ⇒ - ctx.changeCompletionHandling(defaultCompletionHandling) - readRemaining(other(input)) - }, - onUpstreamFailure = { (ctx, _, cause) ⇒ - ctx.fail(cause) - SameState - }) - - def other(input: InPort): Inlet[T] = if (input eq p.in(0)) p.in(1) else p.in(0) - - val read1: State[T] = State(Read(p.in(0))) { (ctx, input, element) ⇒ - ctx.emit(element) - read2 - } - - val read2: State[T] = State(Read(p.in(1))) { (ctx, input, element) ⇒ - ctx.emit(element) - read1 - } - - def readRemaining(input: Inlet[T]) = State(Read(input)) { (ctx, input, element) ⇒ - ctx.emit(element) - SameState - } - - override def initialState = read1 - - override def initialCompletionHandling = emitOtherOnClose - } - } - - class StartStopTest(lifecycleProbe: ActorRef) - extends FlexiMerge[String, FanInShape2[String, String, String]](new FanInShape2("StartStopTest"), Attributes.name("StartStopTest")) { - - def createMergeLogic(p: PortT) = new MergeLogic[String] { - - override def preStart(): Unit = lifecycleProbe ! "preStart" - override def postStop(): Unit = lifecycleProbe ! "postStop" - - override def initialState = State(ReadAny(p.in0, p.in1)) { - (ctx, port, element) ⇒ - lifecycleProbe ! element - if (element == "fail") throw new IllegalStateException("test failure") - - ctx.emit(element) - SameState - } - } - } - - class MyZip[A, B] extends FlexiMerge[(A, B), FanInShape2[A, B, (A, B)]](new FanInShape2("MyZip"), Attributes.name("MyZip")) { - def createMergeLogic(p: PortT): MergeLogic[(A, B)] = new MergeLogic[(A, B)] { - var lastInA: A = _ - - val readA: State[A] = State[A](Read(p.in0)) { (ctx, input, element) ⇒ - lastInA = element - readB - } - - val readB: State[B] = State[B](Read(p.in1)) { (ctx, input, element) ⇒ - ctx.emit((lastInA, element)) - readA - } - - override def initialCompletionHandling = eagerClose - - override def initialState: State[_] = readA - } - } - - class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue, defVal: Option[A] = None) - extends FlexiMerge[(A, B, C), FanInShape3[A, B, C, (A, B, C)]](new FanInShape3("TripleCancellingZip"), Attributes.name("TripleCancellingZip")) { - def createMergeLogic(p: PortT) = new MergeLogic[(A, B, C)] { - override def initialState = State(ReadAll(p.in0, p.in1, p.in2)) { - case (ctx, input, inputs) ⇒ - val a = inputs.getOrElse(p.in0, defVal.get) - val b = inputs(p.in1) - val c = inputs(p.in2) - - ctx.emit((a, b, c)) - if (cancelAfter == 0) - ctx.cancel(p.in0) - cancelAfter -= 1 - - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - - object PreferringMerge extends FlexiMerge[Int, UniformFanInShape[Int, Int]](new UniformFanInShape(3), Attributes.name("PreferringMerge")) { - def createMergeLogic(p: PortT) = new MergeLogic[Int] { - override def initialState = State(Read(p.in(0))) { - (ctx, input, element) ⇒ - ctx.emit(element) - running - } - val running = State(ReadPreferred(p.in(0), p.in(1), p.in(2))) { - (ctx, input, element) ⇒ - ctx.emit(element) - SameState - } - } - } - - class TestMerge(completionProbe: ActorRef) - extends FlexiMerge[String, UniformFanInShape[String, String]](new UniformFanInShape(3), Attributes.name("TestMerge")) { - - def createMergeLogic(p: PortT) = new MergeLogic[String] { - var throwFromOnComplete = false - - override def initialState = State(ReadAny(p.inSeq: _*)) { - (ctx, input, element) ⇒ - if (element == "cancel") - ctx.cancel(input) - else if (element == "err") - ctx.fail(new RuntimeException("err") with NoStackTrace) - else if (element == "exc") - throw new RuntimeException("exc") with NoStackTrace - else if (element == "complete") - ctx.finish() - else if (element == "onUpstreamFinish-exc") - throwFromOnComplete = true - else - ctx.emit("onInput: " + element) - - SameState - } - - override def initialCompletionHandling = CompletionHandling( - onUpstreamFinish = { (ctx, input) ⇒ - if (throwFromOnComplete) - throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace - completionProbe ! input.toString - SameState - }, - onUpstreamFailure = { (ctx, input, cause) ⇒ - cause match { - case _: IllegalArgumentException ⇒ // swallow - case _ ⇒ ctx.fail(cause) - } - SameState - }) - } - } - -} - -class GraphFlexiMergeSpec extends AkkaSpec { - import GraphFlexiMergeSpec._ - import FlowGraph.Implicits._ - - implicit val materializer = ActorMaterializer() - - val in1 = Source(List("a", "b", "c", "d")) - val in2 = Source(List("e", "f")) - - val out = Sink.publisher[String] - - val fairString = new Fair[String] - - "FlexiMerge" must { - - "build simple fair merge" in assertAllStagesStopped { - FlowGraph.closed(TestSink.probe[String]) { implicit b ⇒ - o ⇒ - val merge = b.add(fairString) - - in1 ~> merge.in(0) - in2 ~> merge.in(1) - merge.out ~> o.inlet - }.run() - .request(10) - .expectNextUnordered("a", "b", "c", "d", "e", "f") - .expectComplete() - } - - "be able to have two fleximerges in a graph" in assertAllStagesStopped { - FlowGraph.closed(in1, in2, TestSink.probe[String])((i1, i2, o) ⇒ o) { implicit b ⇒ - (in1, in2, o) ⇒ - val m1 = b.add(fairString) - val m2 = b.add(fairString) - - // format: OFF - in1.outlet ~> m1.in(0) - in2.outlet ~> m1.in(1) - - Source(List("A", "B", "C", "D", "E", "F")) ~> m2.in(0) - m1.out ~> m2.in(1) - m2.out ~> o.inlet - // format: ON - }.run() - .request(20) - .expectNextUnordered("a", "b", "c", "d", "e", "f", "A", "B", "C", "D", "E", "F") - .expectComplete() - } - - "allow reuse" in { - val flow = Flow() { implicit b ⇒ - val merge = b.add(new Fair[String]) - - Source(() ⇒ Iterator.continually("+")) ~> merge.in(0) - - merge.in(1) → merge.out - } - - val g = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val zip = b add Zip[String, String]() - in1 ~> flow ~> Flow[String].map { of ⇒ of } ~> zip.in0 - in2 ~> flow ~> Flow[String].map { tf ⇒ tf } ~> zip.in1 - zip.out.map { x ⇒ x.toString } ~> o.inlet - } - - val p = g.run() - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(1000) - val received = for (_ ← 1 to 1000) yield s.expectNext() - val first = received.map(_.charAt(1)) - first.toSet should ===(Set('a', 'b', 'c', 'd', '+')) - first.filter(_ != '+') should ===(Seq('a', 'b', 'c', 'd')) - val second = received.map(_.charAt(3)) - second.toSet should ===(Set('e', 'f', '+')) - second.filter(_ != '+') should ===(Seq('e', 'f')) - sub.cancel() - } - - "allow zip reuse" in { - val flow = Flow() { implicit b ⇒ - val zip = b.add(new MyZip[String, String]) - - Source(() ⇒ Iterator.continually("+")) ~> zip.in0 - - (zip.in1, zip.out) - } - - FlowGraph.closed(TestSink.probe[String]) { implicit b ⇒ - o ⇒ - val zip = b.add(Zip[String, String]()) - - in1 ~> flow.map(_.toString()) ~> zip.in0 - in2 ~> zip.in1 - - zip.out.map(_.toString()) ~> o.inlet - }.run() - .request(100) - .expectNextUnordered("((+,b),f)", "((+,a),e)") - .expectComplete() - } - - "build simple round robin merge" in { - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new StrictRoundRobin[String]) - in1 ~> merge.in(0) - in2 ~> merge.in(1) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectNext("a") - s.expectNext("e") - s.expectNext("b") - s.expectNext("f") - s.expectNext("c") - s.expectNext("d") - s.expectComplete() - } - - "build simple zip merge" in { - val p = FlowGraph.closed(Sink.publisher[(Int, String)]) { implicit b ⇒ - o ⇒ - val merge = b.add(new MyZip[Int, String]) - Source(List(1, 2, 3, 4)) ~> merge.in0 - Source(List("a", "b", "c")) ~> merge.in1 - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[(Int, String)] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectNext(1 -> "a") - s.expectNext(2 -> "b") - s.expectNext(3 -> "c") - s.expectComplete() - } - - "build simple triple-zip merge using ReadAll" in { - val p = FlowGraph.closed(Sink.publisher[(Long, Int, String)]) { implicit b ⇒ - o ⇒ - val merge = b.add(new TripleCancellingZip[Long, Int, String]) - // format: OFF - Source(List(1L, 2L )) ~> merge.in0 - Source(List(1, 2, 3, 4)) ~> merge.in1 - Source(List("a", "b", "c" )) ~> merge.in2 - merge.out ~> o.inlet - // format: ON - }.run() - - val s = TestSubscriber.manualProbe[(Long, Int, String)] - p.subscribe(s) - val sub = s.expectSubscription() - - sub.request(10) - s.expectNext((1L, 1, "a")) - s.expectNext((2L, 2, "b")) - s.expectComplete() - } - - "build simple triple-zip merge using ReadAll, and continue with provided value for cancelled input" in { - val p = FlowGraph.closed(Sink.publisher[(Long, Int, String)]) { implicit b ⇒ - o ⇒ - val merge = b.add(new TripleCancellingZip[Long, Int, String](1, Some(0L))) - // format: OFF - Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.in0 - Source(List(1, 2, 3, 4 )) ~> merge.in1 - Source(List("a", "b", "c" )) ~> merge.in2 - merge.out ~> o.inlet - // format: ON - }.run() - - val s = TestSubscriber.manualProbe[(Long, Int, String)] - p.subscribe(s) - val sub = s.expectSubscription() - - sub.request(10) - s.expectNext((1L, 1, "a")) - s.expectNext((2L, 2, "b")) - // soonCancelledInput is now canceled and continues with default (null) value - s.expectNext((0L, 3, "c")) - s.expectComplete() - } - - "build perferring merge" in { - val output = Sink.publisher[Int] - val p = FlowGraph.closed(output) { implicit b ⇒ - o ⇒ - val merge = b.add(PreferringMerge) - Source(List(1, 2, 3)) ~> merge.in(0) - Source(List(11, 12, 13)) ~> merge.in(1) - Source(List(14, 15, 16)) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[Int] - p.subscribe(s) - val sub = s.expectSubscription() - - def expect(i: Int): Unit = { - sub.request(1) - s.expectNext(i) - } - def expectNext(): Int = { - sub.request(1) - s.expectNext() - } - - expect(1) - expect(2) - expect(3) - val secondaries = expectNext() :: - expectNext() :: - expectNext() :: - expectNext() :: - expectNext() :: - expectNext() :: Nil - - secondaries.toSet should equal(Set(11, 12, 13, 14, 15, 16)) - s.expectComplete() - } - - "build perferring merge, manually driven" in { - val output = Sink.publisher[Int] - val preferredDriver = TestPublisher.manualProbe[Int]() - val otherDriver1 = TestPublisher.manualProbe[Int]() - val otherDriver2 = TestPublisher.manualProbe[Int]() - - val p = FlowGraph.closed(output) { implicit b ⇒ - o ⇒ - val merge = b.add(PreferringMerge) - Source(preferredDriver) ~> merge.in(0) - Source(otherDriver1) ~> merge.in(1) - Source(otherDriver2) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[Int] - p.subscribe(s) - - val sub = s.expectSubscription() - val p1 = preferredDriver.expectSubscription() - val s1 = otherDriver1.expectSubscription() - val s2 = otherDriver2.expectSubscription() - - // just consume the preferred - p1.sendNext(1) - sub.request(1) - s.expectNext(1) - - // pick preferred over any of the secondaries - p1.sendNext(2) - s1.sendNext(10) - s2.sendNext(20) - sub.request(1) - s.expectNext(2) - - sub.request(2) - Set(s.expectNext(), s.expectNext()) should ===(Set(10, 20)) - - p1.sendComplete() - - // continue with just secondaries when preferred has completed - s1.sendNext(11) - s2.sendNext(21) - sub.request(2) - Set(s.expectNext(), s.expectNext()) should ===(Set(11, 21)) - - // continue with just one secondary - s1.sendComplete() - s2.sendNext(4) - sub.request(1) - s.expectNext(4) - s2.sendComplete() - - // finish when all inputs have completed - s.expectComplete() - } - - "support cancel of input" in assertAllStagesStopped { - val autoPublisher = TestPublisher.probe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(autoPublisher) ~> merge.in(0) - Source(List("b", "c", "d")) ~> merge.in(1) - Source(List("e", "f")) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - - autoPublisher.sendNext("a") - autoPublisher.sendNext("cancel") - - val sub = s.expectSubscription() - sub.request(10) - val outputs = - for (_ ← 1 to 6) yield { - val next = s.expectNext() - if (next.startsWith("onInput: ")) next.substring(9) else next.substring(12) - } - val one = Seq("a") - val two = Seq("b", "c", "d") - val three = Seq("e", "f") - outputs.filter(one.contains) should ===(one) - outputs.filter(two.contains) should ===(two) - outputs.filter(three.contains) should ===(three) - completionProbe.expectMsgAllOf("UniformFanIn.in1", "UniformFanIn.in2") - - autoPublisher.sendNext("x") - - s.expectComplete() - } - - "finish when all inputs cancelled" in assertAllStagesStopped { - val autoPublisher1 = TestPublisher.probe[String]() - val autoPublisher2 = TestPublisher.probe[String]() - val autoPublisher3 = TestPublisher.probe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(autoPublisher1) ~> merge.in(0) - Source(autoPublisher2) ~> merge.in(1) - Source(autoPublisher3) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - - autoPublisher1.sendNext("a") - autoPublisher1.sendNext("cancel") - s.expectNext("onInput: a") - - autoPublisher2.sendNext("b") - autoPublisher2.sendNext("cancel") - s.expectNext("onInput: b") - - autoPublisher3.sendNext("c") - autoPublisher3.sendNext("cancel") - s.expectNext("onInput: c") - - s.expectComplete() - } - - "handle failure" in assertAllStagesStopped { - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.in(0) - Source(List("a", "b")) ~> merge.in(1) - Source(List("c")) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - // IllegalArgumentException is swallowed by the CompletionHandler - val outputs = - for (_ ← 1 to 3) yield { - val next = s.expectNext() - if (next.startsWith("onInput: ")) next.substring(9) else next.substring(12) - } - val one = Seq("a", "b") - val two = Seq("c") - completionProbe.expectMsgAllOf("UniformFanIn.in1", "UniformFanIn.in2") - outputs.filter(one.contains) should ===(one) - outputs.filter(two.contains) should ===(two) - - s.expectComplete() - } - - "propagate failure" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(publisher) ~> merge.in(0) - Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - s.expectSubscriptionAndError().getMessage should be("ERROR") - } - - "emit failure" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("err")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - - s.expectError().getMessage should be("err") - } - - "emit failure for user thrown exception" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("exc")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectError().getMessage should be("exc") - } - - "emit failure for user thrown exception in onComplete" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("onUpstreamFinish-exc")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectError().getMessage should be("onUpstreamFinish-exc") - } - - "emit failure for user thrown exception in onUpstreamFinish 2" in assertAllStagesStopped { - val autoPublisher = TestPublisher.probe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source.empty[String] ~> merge.in(0) - Source(autoPublisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - autoPublisher.sendNext("onUpstreamFinish-exc") - autoPublisher.sendNext("a") - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(1) - s.expectNext("onInput: a") - - autoPublisher.sendComplete() - s.expectError().getMessage should be("onUpstreamFinish-exc") - } - - "support finish from onInput" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("a", "complete")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectNext("onInput: a") - s.expectComplete() - } - - "have the correct value for input in ReadPreffered" in { - import akka.stream.FanInShape._ - class MShape[T](_init: Init[T] = Name("mshape")) extends FanInShape[T](_init) { - val priority = newInlet[T]("priority") - val second = newInlet[T]("second") - protected override def construct(i: Init[T]) = new MShape(i) - } - class MyMerge[T] extends FlexiMerge[T, MShape[T]]( - new MShape, Attributes.name("cmerge")) { - import akka.stream.scaladsl.FlexiMerge._ - override def createMergeLogic(p: PortT) = new MergeLogic[T] { - override def initialState = - State[T](ReadPreferred(p.priority, p.second)) { - (ctx, input, element) ⇒ - if (element == 1) assert(input == p.priority) - if (element == 2) assert(input == p.second) - ctx.emit(element) - SameState - } - } - } - - val sink = Sink.fold[Int, Int](0)(_ + _) - val graph = FlowGraph.closed(sink) { implicit b ⇒ - sink ⇒ - import FlowGraph.Implicits._ - - val merge = b.add(new MyMerge[Int]()) - - Source.single(1) ~> merge.priority - Source.single(2) ~> merge.second - - merge.out ~> sink.inlet - } - Await.result(graph.run(), 1.second) should equal(3) - } - - "handle preStart and postStop" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val m = b.add(new StartStopTest(p.ref)) - - Source(List("1", "2", "3")) ~> m.in0 - Source.empty ~> m.in1 - m.out ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("2") - p.expectMsg("3") - p.expectMsg("postStop") - } - - "invoke postStop after error" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val m = b.add(new StartStopTest(p.ref)) - - Source(List("1", "fail", "2", "3")) ~> m.in0 - Source.empty ~> m.in1 - m.out ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("fail") - p.expectMsg("postStop") - } - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala deleted file mode 100644 index 1001e0b82c..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ /dev/null @@ -1,484 +0,0 @@ -package akka.stream.scaladsl - -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import FlowGraph.Implicits._ -import akka.stream.ActorMaterializer -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl._ -import akka.stream.testkit.Utils._ -import akka.actor.ActorSystem -import akka.stream._ -import akka.actor.ActorRef -import akka.testkit.TestProbe - -object GraphFlexiRouteSpec { - - /** - * This is fair in that sense that after enqueueing to an output it yields to other output if - * they are have requested elements. Or in other words, if all outputs have demand available at the same - * time then in finite steps all elements are enqueued to them. - */ - class Fair[T] extends FlexiRoute[T, UniformFanOutShape[T, T]](new UniformFanOutShape(2), Attributes.name("FairBalance")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT): RouteLogic[T] = new RouteLogic[T] { - val select = p.out(0) | p.out(1) - - val emitToAnyWithDemand = State(DemandFromAny(p)) { (ctx, out, element) ⇒ - ctx.emit(select(out))(element) - SameState - } - - // initally, wait for demand from all - override def initialState = State(DemandFromAll(p)) { (ctx, _, element) ⇒ - ctx.emit(p.out(0))(element) - emitToAnyWithDemand - } - } - } - - /** - * It never skips an output while cycling but waits on it instead (closed outputs are skipped though). - * The fair route above is a non-strict round-robin (skips currently unavailable outputs). - */ - class StrictRoundRobin[T] extends FlexiRoute[T, UniformFanOutShape[T, T]](new UniformFanOutShape(2), Attributes.name("RoundRobinBalance")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT) = new RouteLogic[T] { - - val toOutput1: State[Outlet[T]] = State(DemandFrom(p.out(0))) { (ctx, out, element) ⇒ - ctx.emit(out)(element) - toOutput2 - } - - val toOutput2 = State(DemandFrom(p.out(1))) { (ctx, out, element) ⇒ - ctx.emit(out)(element) - toOutput1 - } - - override def initialState = toOutput1 - } - } - - class Unzip[A, B] extends FlexiRoute[(A, B), FanOutShape2[(A, B), A, B]](new FanOutShape2("Unzip"), Attributes.name("Unzip")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT) = new RouteLogic[(A, B)] { - - override def initialState = State(DemandFromAll(p)) { (ctx, _, element) ⇒ - val (a, b) = element - ctx.emit(p.out0)(a) - ctx.emit(p.out1)(b) - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - - class StartStopTestRoute(lifecycleProbe: ActorRef) - extends FlexiRoute[String, FanOutShape2[String, String, String]](new FanOutShape2("StartStopTest"), Attributes.name("StartStopTest")) { - import FlexiRoute._ - - def createRouteLogic(p: PortT) = new RouteLogic[String] { - val select = p.out0 | p.out1 - - override def preStart(): Unit = lifecycleProbe ! "preStart" - override def postStop(): Unit = lifecycleProbe ! "postStop" - - override def initialState = State(DemandFromAny(p)) { - (ctx, port, element) ⇒ - lifecycleProbe ! element - if (element == "fail") throw new IllegalStateException("test failure") - ctx.emit(select(port))(element) - - SameState - } - } - - } - - class TestRoute(completionProbe: ActorRef) - extends FlexiRoute[String, FanOutShape2[String, String, String]](new FanOutShape2("TestRoute"), Attributes.name("TestRoute")) { - import FlexiRoute._ - - var throwFromOnComplete = false - - def createRouteLogic(p: PortT): RouteLogic[String] = new RouteLogic[String] { - val select = p.out0 | p.out1 - - override def initialState = State(DemandFromAny(p)) { - (ctx, preferred, element) ⇒ - if (element == "err") - ctx.fail(new RuntimeException("err") with NoStackTrace) - else if (element == "err-output1") - ctx.fail(p.out0, new RuntimeException("err-1") with NoStackTrace) - else if (element == "exc") - throw new RuntimeException("exc") with NoStackTrace - else if (element == "onUpstreamFinish-exc") - throwFromOnComplete = true - else if (element == "finish") - ctx.finish() - else - ctx.emit(select(preferred))("onInput: " + element) - - SameState - } - - override def initialCompletionHandling = CompletionHandling( - onUpstreamFinish = { ctx ⇒ - if (throwFromOnComplete) - throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace - completionProbe ! "onUpstreamFinish" - }, - onUpstreamFailure = { (ctx, cause) ⇒ - cause match { - case _: IllegalArgumentException ⇒ // swallow - case _ ⇒ - completionProbe ! "onError" - } - }, - onDownstreamFinish = { (ctx, cancelledOutput) ⇒ - completionProbe ! "onDownstreamFinish: " + cancelledOutput - SameState - }) - } - } - - class TestFixture(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer) { - val autoPublisher = TestPublisher.probe[String]() - val s1 = TestSubscriber.manualProbe[String] - val s2 = TestSubscriber.manualProbe[String] - val completionProbe = TestProbe() - FlowGraph.closed() { implicit b ⇒ - val route = b.add(new TestRoute(completionProbe.ref)) - Source(autoPublisher) ~> route.in - route.out0 ~> Sink(s1) - route.out1 ~> Sink(s2) - }.run() - - autoPublisher.sendNext("a") - autoPublisher.sendNext("b") - - val sub1 = s1.expectSubscription() - val sub2 = s2.expectSubscription() - } - -} - -class GraphFlexiRouteSpec extends AkkaSpec { - import GraphFlexiRouteSpec._ - - implicit val materializer = ActorMaterializer() - - val in = Source(List("a", "b", "c", "d", "e")) - - val out1 = Sink.publisher[String] - val out2 = Sink.publisher[String] - - "FlexiRoute" must { - - "build simple fair route" in assertAllStagesStopped { - // we can't know exactly which elements that go to each output, because if subscription/request - // from one of the downstream is delayed the elements will be pushed to the other output - FlowGraph.closed(TestSink.probe[String]) { implicit b ⇒ - out ⇒ - val merge = b.add(Merge[String](2)) - val route = b.add(new Fair[String]) - in ~> route.in - route.out(0) ~> merge.in(0) - route.out(1) ~> merge.in(1) - merge.out ~> out - }.run() - .request(10) - .expectNextUnordered("a", "b", "c", "d", "e") - .expectComplete() - } - - "build simple round-robin route" in { - val (p1, p2) = FlowGraph.closed(out1, out2)(Keep.both) { implicit b ⇒ - (o1, o2) ⇒ - val route = b.add(new StrictRoundRobin[String]) - in ~> route.in - route.out(0) ~> o1.inlet - route.out(1) ~> o2.inlet - }.run() - - val s1 = TestSubscriber.manualProbe[String] - p1.subscribe(s1) - val sub1 = s1.expectSubscription() - val s2 = TestSubscriber.manualProbe[String] - p2.subscribe(s2) - val sub2 = s2.expectSubscription() - - sub1.request(10) - sub2.request(10) - - s1.expectNext("a") - s2.expectNext("b") - s1.expectNext("c") - s2.expectNext("d") - s1.expectNext("e") - - s1.expectComplete() - s2.expectComplete() - } - - "build simple unzip route" in { - val outA = Sink.publisher[Int] - val outB = Sink.publisher[String] - - val (p1, p2) = FlowGraph.closed(outA, outB)(Keep.both) { implicit b ⇒ - (oa, ob) ⇒ - val route = b.add(new Unzip[Int, String]) - Source(List(1 -> "A", 2 -> "B", 3 -> "C", 4 -> "D")) ~> route.in - route.out0 ~> oa.inlet - route.out1 ~> ob.inlet - }.run() - - val s1 = TestSubscriber.manualProbe[Int] - p1.subscribe(s1) - val sub1 = s1.expectSubscription() - val s2 = TestSubscriber.manualProbe[String] - p2.subscribe(s2) - val sub2 = s2.expectSubscription() - - sub1.request(3) - sub2.request(4) - - s1.expectNext(1) - s2.expectNext("A") - s1.expectNext(2) - s2.expectNext("B") - s1.expectNext(3) - s2.expectNext("C") - sub1.cancel() - - s2.expectComplete() - } - - "support finish of downstreams and cancel of upstream" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - autoPublisher.sendNext("finish") - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(2) - s2.expectNext("onInput: b") - - s1.expectComplete() - s2.expectComplete() - } - - "support error of outputs" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - autoPublisher.sendNext("err") - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(2) - s2.expectNext("onInput: b") - - s1.expectError().getMessage should be("err") - s2.expectError().getMessage should be("err") - autoPublisher.expectCancellation() - } - - "support error of a specific output" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(5) - sub2.request(5) - autoPublisher.sendNext("err-output1") - autoPublisher.sendNext("c") - - s2.expectNext("onInput: c") - s1.expectError().getMessage should be("err-1") - - autoPublisher.sendComplete() - completionProbe.expectMsg("onUpstreamFinish") - s2.expectComplete() - } - - "emit error for user thrown exception" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(5) - sub2.request(5) - autoPublisher.sendNext("exc") - - s1.expectError().getMessage should be("exc") - s2.expectError().getMessage should be("exc") - - autoPublisher.expectCancellation() - } - - "emit error for user thrown exception in onUpstreamFinish" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(5) - sub2.request(5) - autoPublisher.sendNext("onUpstreamFinish-exc") - autoPublisher.sendComplete() - - s1.expectError().getMessage should be("onUpstreamFinish-exc") - s2.expectError().getMessage should be("onUpstreamFinish-exc") - } - - "handle cancel from output" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - sub1.cancel() - - completionProbe.expectMsg("onDownstreamFinish: TestRoute.out0") - s1.expectNoMsg(200.millis) - - autoPublisher.sendNext("c") - s2.expectNext("onInput: c") - - autoPublisher.sendComplete() - s2.expectComplete() - } - - "handle finish from upstream input" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - autoPublisher.sendComplete() - - completionProbe.expectMsg("onUpstreamFinish") - - s1.expectComplete() - s2.expectComplete() - } - - "handle error from upstream input" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace) - - completionProbe.expectMsg("onError") - - s1.expectError().getMessage should be("test err") - s2.expectError().getMessage should be("test err") - } - - "cancel upstream input when all outputs cancelled" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - sub1.cancel() - - completionProbe.expectMsg("onDownstreamFinish: TestRoute.out0") - sub2.cancel() - - autoPublisher.expectCancellation() - } - - "cancel upstream input when all outputs completed" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - autoPublisher.sendNext("finish") - s1.expectComplete() - s2.expectComplete() - autoPublisher.expectCancellation() - } - - "handle preStart and postStop" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val r = b.add(new StartStopTestRoute(p.ref)) - - Source(List("1", "2", "3")) ~> r.in - r.out0 ~> Sink.ignore - r.out1 ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("2") - p.expectMsg("3") - p.expectMsg("postStop") - } - - "invoke postStop after error" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val r = b.add(new StartStopTestRoute(p.ref)) - - Source(List("1", "fail", "2", "3")) ~> r.in - r.out0 ~> Sink.ignore - r.out1 ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("fail") - p.expectMsg("postStop") - } - - } -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index a9d25df84f..922ed5f262 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -93,7 +93,7 @@ class SinkSpec extends AkkaSpec { "combine to many outputs with simplified API" in { val probes = Seq.fill(3)(TestSubscriber.manualProbe[Int]()) - val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast(_)) + val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast[Int](_)) Source(List(0, 1, 2)).runWith(sink) @@ -111,7 +111,7 @@ class SinkSpec extends AkkaSpec { "combine to two sinks with simplified API" in { val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]()) - val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast(_)) + val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast[Int](_)) Source(List(0, 1, 2)).runWith(sink) diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template index 9e73c459a6..e9f9fb1b85 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template @@ -5,15 +5,16 @@ package akka.stream import scala.collection.immutable import scala.annotation.varargs +import scala.annotation.unchecked.uncheckedVariance object FanInShape { - sealed trait Init[+O] { + sealed trait Init[O] { def outlet: Outlet[O] def inlets: immutable.Seq[Inlet[_]] def name: String } - final case class Name(override val name: String) extends Init[Nothing] { - override def outlet: Outlet[Nothing] = Outlet(s"$name.out") + final case class Name[O](override val name: String) extends Init[O] { + override def outlet: Outlet[O] = Outlet(s"$name.out") override def inlets: immutable.Seq[Inlet[_]] = Nil } final case class Ports[O](override val outlet: Outlet[O], override val inlets: immutable.Seq[Inlet[_]]) extends Init[O] { @@ -21,12 +22,12 @@ object FanInShape { } } -abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inlet[_]], _name: String) extends Shape { +abstract class FanInShape[+O] private (_out: Outlet[O @uncheckedVariance], _registered: Iterator[Inlet[_]], _name: String) extends Shape { import FanInShape._ def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name) - final def out: Outlet[O] = _out + final def out: Outlet[O @uncheckedVariance] = _out final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil final override def inlets: immutable.Seq[Inlet[_]] = _inlets @@ -37,7 +38,7 @@ abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inl p } - protected def construct(init: Init[O]): FanInShape[O] + protected def construct(init: Init[O @uncheckedVariance]): FanInShape[O] def deepCopy(): FanInShape[O] = construct(Ports[O](_out.carbonCopy(), inlets.map(_.carbonCopy()))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = { @@ -52,39 +53,39 @@ object UniformFanInShape { new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList)) } -class UniformFanInShape[-T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { - def this(n: Int) = this(n, FanInShape.Name("UniformFanIn")) - def this(n: Int, name: String) = this(n, FanInShape.Name(name)) +class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { + def this(n: Int) = this(n, FanInShape.Name[O]("UniformFanIn")) + def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList)) - override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new UniformFanInShape(n, init) + override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new UniformFanInShape(n, init) override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]] - val inSeq: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(n)(i => newInlet[T](s"in$i")) - def in(n: Int): Inlet[T] = inSeq(n) + val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T](s"in$i")) + def in(n: Int): Inlet[T @uncheckedVariance] = inSeq(n) } -class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { - def this(n: Int) = this(n, FanInShape.Name("FanInShape1N")) - def this(n: Int, name: String) = this(n, FanInShape.Name(name)) - def this(outlet: Outlet[O], in0: Inlet[T0], inlets1: Array[Inlet[T1]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList)) - override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1N(n, init) +class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { + def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N")) + def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) + def this(outlet: Outlet[O @uncheckedVariance], in0: Inlet[T0 @uncheckedVariance], inlets1: Array[Inlet[T1 @uncheckedVariance]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList)) + override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1N(n, init) override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]] - val in0 = newInlet[T0]("in0") - val in1Seq: immutable.IndexedSeq[Inlet[T1]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}")) - def in(n: Int): Inlet[T1] = { + val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0") + val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}")) + def in(n: Int): Inlet[T1 @uncheckedVariance] = { require(n > 0, "n must be > 0") in1Seq(n - 1) } } -[2..#class FanInShape1[[#T0#], O](_init: FanInShape.Init[O]) extends FanInShape[O](_init) { - def this(name: String) = this(FanInShape.Name(name)) +[2..#class FanInShape1[[#-T0#], +O](_init: FanInShape.Init[O]) extends FanInShape[O](_init) { + def this(name: String) = this(FanInShape.Name[O](name)) def this([#in0: Inlet[T0]#], out: Outlet[O]) = this(FanInShape.Ports(out, [#in0# :: ] :: Nil)) - override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1(init) + override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1(init) override def deepCopy(): FanInShape1[[#T0#], O] = super.deepCopy().asInstanceOf[FanInShape1[[#T0#], O]] - [#val in0 = newInlet[T0]("in0")# + [#val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")# ] }# diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template index 0bd0cee686..1497e00b62 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template @@ -4,6 +4,7 @@ package akka.stream import scala.collection.immutable +import scala.annotation.unchecked.uncheckedVariance object FanOutShape { sealed trait Init[I] { @@ -20,12 +21,12 @@ object FanOutShape { } } -abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outlet[_]], _name: String) extends Shape { +abstract class FanOutShape[-I] private (_in: Inlet[I @uncheckedVariance], _registered: Iterator[Outlet[_]], _name: String) extends Shape { import FanOutShape._ def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name) - final def in: Inlet[I] = _in + final def in: Inlet[I @uncheckedVariance] = _in final override def outlets: immutable.Seq[Outlet[_]] = _outlets final override def inlets: immutable.Seq[Inlet[_]] = in :: Nil @@ -36,7 +37,7 @@ abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outl p } - protected def construct(init: Init[I]): FanOutShape[I] + protected def construct(init: Init[I @uncheckedVariance]): FanOutShape[I] def deepCopy(): FanOutShape[I] = construct(Ports[I](_in.carbonCopy(), outlets.map(_.carbonCopy()))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanOutShape[I] = { @@ -51,24 +52,24 @@ object UniformFanOutShape { new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) } -class UniformFanOutShape[I, O](n: Int, _init: FanOutShape.Init[I]) extends FanOutShape[I](_init) { +class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) { def this(n: Int) = this(n, FanOutShape.Name[I]("UniformFanOut")) def this(n: Int, name: String) = this(n, FanOutShape.Name[I](name)) def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) - override protected def construct(init: FanOutShape.Init[I]): FanOutShape[I] = new UniformFanOutShape(n, init) + override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new UniformFanOutShape(n, init) override def deepCopy(): UniformFanOutShape[I, O] = super.deepCopy().asInstanceOf[UniformFanOutShape[I, O]] - val outArray: Array[Outlet[O]] = Array.tabulate(n)(i => newOutlet[O](s"out$i")) - def out(n: Int): Outlet[O] = outArray(n) + val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(n)(i => newOutlet[O](s"out$i")) + def out(n: Int): Outlet[O @uncheckedVariance] = outArray(n) } -[2..#class FanOutShape1[I, [#O0#]](_init: FanOutShape.Init[I]) extends FanOutShape[I](_init) { +[2..#class FanOutShape1[-I, [#+O0#]](_init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) { def this(name: String) = this(FanOutShape.Name[I](name)) def this(in: Inlet[I], [#out0: Outlet[O0]#]) = this(FanOutShape.Ports(in, [#out0# :: ] :: Nil)) - override protected def construct(init: FanOutShape.Init[I]): FanOutShape[I] = new FanOutShape1(init) + override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new FanOutShape1(init) override def deepCopy(): FanOutShape1[I, [#O0#]] = super.deepCopy().asInstanceOf[FanOutShape1[I, [#O0#]]] - [#val out0 = newOutlet[O0]("out0")# + [#val out0: Outlet[O0 @uncheckedVariance] = newOutlet[O0]("out0")# ] }# diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index a7ee5ffeee..336b14a423 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -4,9 +4,9 @@ package akka.stream import akka.util.Collections.EmptyImmutableSeq - import scala.collection.immutable import scala.collection.JavaConverters._ +import scala.annotation.unchecked.uncheckedVariance /** * An input port of a StreamLayout.Module. This type logically belongs @@ -48,8 +48,12 @@ object Inlet { def apply[T](toString: String): Inlet[T] = new Inlet[T](toString) } -final class Inlet[-T] private (override val toString: String) extends InPort { +final class Inlet[T] private (override val toString: String) extends InPort { def carbonCopy(): Inlet[T] = Inlet(toString) + /** + * INTERNAL API. + */ + def as[U]: Inlet[U] = this.asInstanceOf[Inlet[U]] } /** @@ -61,8 +65,12 @@ object Outlet { def apply[T](toString: String): Outlet[T] = new Outlet[T](toString) } -final class Outlet[+T] private (override val toString: String) extends OutPort { +final class Outlet[T] private (override val toString: String) extends OutPort { def carbonCopy(): Outlet[T] = Outlet(toString) + /** + * INTERNAL API. + */ + def as[U]: Outlet[U] = this.asInstanceOf[Outlet[U]] } /** @@ -187,7 +195,7 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se * A Source [[Shape]] has exactly one output and no inputs, it models a source * of data. */ -final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { +final case class SourceShape[+T](outlet: Outlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq override val outlets: immutable.Seq[Outlet[_]] = List(outlet) @@ -204,7 +212,7 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { * outside like a pipe (but it can be a complex topology of streams within of * course). */ -final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { +final case class FlowShape[-I, +O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = List(outlet) @@ -219,7 +227,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S /** * A Sink [[Shape]] has exactly one input and no outputs, it models a data sink. */ -final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { +final case class SinkShape[-T](inlet: Inlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq @@ -244,10 +252,10 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { * +------+ * }}} */ -final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], - out1: Outlet[Out1], - in2: Inlet[In2], - out2: Outlet[Out2]) extends Shape { +final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVariance], + out1: Outlet[Out1 @uncheckedVariance], + in2: Inlet[In2 @uncheckedVariance], + out2: Outlet[Out2 @uncheckedVariance]) extends Shape { //#implementation-details-elided override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2) override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2) @@ -270,6 +278,6 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], //#bidi-shape object BidiShape { - def apply[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = + def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala index 7ff5762e40..dd305cebb7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala @@ -425,10 +425,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo initialPhase(2, bidirectional) - protected def fail(e: Throwable, closeTransport: Boolean=true): Unit = { + protected def fail(e: Throwable, closeTransport: Boolean = true): Unit = { if (tracing) log.debug("fail {} due to: {}", self, e.getMessage) inputBunch.cancel() - if(closeTransport) { + if (closeTransport) { log.debug("closing output") outputBunch.error(TransportOut, e) } diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index e2f7552ca1..37d9c9a097 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -160,7 +160,7 @@ object SslTlsPlacebo { val session = SSLContext.getDefault.createSSLEngine.getSession val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) ⇒ bytes }) val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _))) - BidiShape(top, bottom) + BidiShape.fromFlows(top, bottom) } val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, Unit] = new javadsl.BidiFlow(forScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 58fdf9b49d..fe92630eaf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -948,7 +948,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph val f = new function.Function2[FlowGraph.Builder[M], SourceShape[T], Inlet[Out]Pair Outlet[Out Pair T]] { override def apply(b: FlowGraph.Builder[M], s: SourceShape[T]): Inlet[Out] Pair Outlet[Out Pair T] = { val zip = b.graph(Zip.create[Out, T]) - b.from(s).to(zip.in1) + b.from(s).toInlet(zip.in1) new Pair(zip.in0, zip.out) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 1dccd09c33..b88e5cf427 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -5,6 +5,7 @@ package akka.stream.javadsl import akka.stream._ import akka.japi.Pair +import scala.annotation.unchecked.uncheckedVariance /** * Merge several streams, taking elements as they arrive from input streams @@ -277,49 +278,41 @@ object FlowGraph { * * @return The outlet that will emit the materialized value. */ - def materializedValue: Outlet[Mat] = delegate.materializedValue + def materializedValue: Outlet[Mat @uncheckedVariance] = delegate.materializedValue def run(mat: Materializer): Unit = delegate.buildRunnable().run()(mat) def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) - def from[T, M](src: Graph[SourceShape[T], M]): ForwardOps[T] = new ForwardOps(delegate.add(src).outlet) def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet) def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet) def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out) def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0)) def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in) - def to[T, M](dst: Graph[SinkShape[T], M]): ReverseOps[T] = new ReverseOps(delegate.add(dst).inlet) def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet) def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet) def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) final class ForwardOps[T](out: Outlet[T]) { - def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self } - def to[M](dst: Graph[SinkShape[T], M]): Builder[Mat] = { out ~> dst; self } - def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self } - def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self } - def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self } - def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self } - def via[U, M](f: Graph[FlowShape[T, U], M]): ForwardOps[U] = from((out ~> f).outlet) - def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet) - def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) - def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) + def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { out ~> in; self } + def to(dst: SinkShape[_ >: T]): Builder[Mat] = { out ~> dst; self } + def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { out ~> j; self } + def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { out ~> j; self } + def via[U, M](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((out ~> f).outlet) + def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet) + def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet) def out(): Outlet[T] = out } final class ReverseOps[T](out: Inlet[T]) { - def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self } - def from[M](dst: Graph[SourceShape[T], M]): Builder[Mat] = { out <~ dst; self } - def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self } - def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self } - def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self } - def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self } - def via[U, M](f: Graph[FlowShape[U, T], M]): ReverseOps[U] = to((out <~ f).inlet) - def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet) - def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) - def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) + def fromOutlet(dst: Outlet[_ <: T]): Builder[Mat] = { out <~ dst; self } + def from(dst: SourceShape[_ <: T]): Builder[Mat] = { out <~ dst; self } + def fromFanIn[U](j: UniformFanInShape[U, _ <: T]): Builder[Mat] = { out <~ j; self } + def fromFanOut[U](j: UniformFanOutShape[U, _ <: T]): Builder[Mat] = { out <~ j; self } + def via[U](f: FlowShape[U, _ <: T]): ReverseOps[U] = to((out <~ f).inlet) + def viaFanIn[U](j: UniformFanInShape[U, _ <: T]): ReverseOps[U] = to((out <~ j).inlet) + def viaFanOut[U](j: UniformFanOutShape[U, _ <: T]): ReverseOps[U] = to((out <~ j).inlet) } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 09a96a02a8..05b2dcbfe6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -8,6 +8,7 @@ import akka.stream.{ Outlet, Shape, OutPort, Graph, Attributes } import scala.collection.immutable import akka.stream.impl.Junctions.FlexiRouteModule import akka.stream.impl.Stages.DefaultAttributes +import scala.annotation.unchecked.uncheckedVariance object FlexiRoute { @@ -24,7 +25,7 @@ object FlexiRoute { * has been completed. `IllegalArgumentException` is thrown if * that is not obeyed. */ - final case class DemandFrom[+T](output: Outlet[T]) extends DemandCondition[Outlet[T]] + final case class DemandFrom[+T](output: Outlet[T @uncheckedVariance]) extends DemandCondition[Outlet[T @uncheckedVariance]] object DemandFromAny { def apply(outputs: OutPort*): DemandFromAny = new DemandFromAny(outputs.to[immutable.Seq]) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 5eec534865..a283d1ef8d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -102,7 +102,7 @@ class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] object MergePreferred { import FanInShape._ final class MergePreferredShape[T](val secondaryPorts: Int, _init: Init[T]) extends UniformFanInShape[T, T](secondaryPorts, _init) { - def this(secondaryPorts: Int, name: String) = this(secondaryPorts, Name(name)) + def this(secondaryPorts: Int, name: String) = this(secondaryPorts, Name[T](name)) override protected def construct(init: Init[T]): FanInShape[T] = new MergePreferredShape(secondaryPorts, init) override def deepCopy(): MergePreferredShape[T] = super.deepCopy().asInstanceOf[MergePreferredShape[T]] @@ -562,7 +562,7 @@ object FlowGraph extends GraphApply { class Builder[+M] private[stream] () { private var moduleInProgress: Module = EmptyModule - def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit = { + def addEdge[A1, A >: A1, B, B1 >: B, M2](from: Outlet[A1], via: Graph[FlowShape[A, B], M2], to: Inlet[B1]): Unit = { val flowCopy = via.module.carbonCopy moduleInProgress = moduleInProgress @@ -571,7 +571,7 @@ object FlowGraph extends GraphApply { .wire(flowCopy.shape.outlets.head, to) } - def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit = { + def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit = { moduleInProgress = moduleInProgress.wire(from, to) } @@ -630,7 +630,7 @@ object FlowGraph extends GraphApply { * * @return The outlet that will emit the materialized value. */ - def materializedValue: Outlet[M] = { + def materializedValue: Outlet[M @uncheckedVariance] = { val module = new MaterializedValueSource[Any] moduleInProgress = moduleInProgress.compose(module) module.shape.outlet.asInstanceOf[Outlet[M]] @@ -724,7 +724,7 @@ object FlowGraph extends GraphApply { trait CombinerBase[T] extends Any { def importAndGetPort(b: Builder[_]): Outlet[T] - def ~>(to: Inlet[T])(implicit b: Builder[_]): Unit = { + def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit = { b.addEdge(importAndGetPort(b), to) } @@ -770,7 +770,7 @@ object FlowGraph extends GraphApply { trait ReverseCombinerBase[T] extends Any { def importAndGetPortReverse(b: Builder[_]): Inlet[T] - def <~(from: Outlet[T])(implicit b: Builder[_]): Unit = { + def <~[U <: T](from: Outlet[U])(implicit b: Builder[_]): Unit = { b.addEdge(from, importAndGetPortReverse(b)) }