diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala index c90fbd8936..7d0ba017ca 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala @@ -6,8 +6,8 @@ package docs.http.scaladsl.server package directives import akka.event.Logging -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} -import akka.http.scaladsl.server.directives.{DebuggingDirectives, LogEntry, LoggingMagnet} +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.http.scaladsl.server.directives.{ DebuggingDirectives, LogEntry, LoggingMagnet } class DebuggingDirectivesExamplesSpec extends RoutingSpec { "logRequest-0" in { diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala index 9d5a35e15d..c2933e208c 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/RangeDirectivesExamplesSpec.scala @@ -30,7 +30,6 @@ class RangeDirectivesExamplesSpec extends RoutingSpec { responseAs[String] shouldEqual "DE" } - // we set "akka.http.routing.range-coalescing-threshold = 2" // above to make sure we get two BodyParts Get() ~> addHeader(Range(ByteRange(0, 1), ByteRange(1, 2), ByteRange(6, 7))) ~> route ~> check { diff --git a/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala index eeed87ed8a..9a10eddb48 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala @@ -50,7 +50,7 @@ object BidiFlowDocSpec { // construct and add the bottom flow, going inbound val inbound = b.add(Flow[ByteString].map(fromBytes)) // fuse them together into a BidiShape - BidiShape(outbound, inbound) + BidiShape.fromFlows(outbound, inbound) } // this is the same as the above @@ -112,18 +112,18 @@ object BidiFlowDocSpec { val outbound = b.add(Flow[ByteString].map(addLengthHeader)) val inbound = b.add(Flow[ByteString].transform(() => new FrameParser)) - BidiShape(outbound, inbound) + BidiShape.fromFlows(outbound, inbound) } //#framing val chopUp = BidiFlow() { b => val f = Flow[ByteString].mapConcat(_.map(ByteString(_))) - BidiShape(b.add(f), b.add(f)) + BidiShape.fromFlows(b.add(f), b.add(f)) } val accumulate = BidiFlow() { b => val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _)) - BidiShape(b.add(f), b.add(f)) + BidiShape.fromFlows(b.add(f), b.add(f)) } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala deleted file mode 100644 index 64da45ea57..0000000000 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package docs.stream - -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.testkit.AkkaSpec -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.Attributes - -object FlexiDocSpec { - //#fleximerge-zip-states - //#fleximerge-zip-readall - import akka.stream.FanInShape._ - class ZipPorts[A, B](_init: Init[(A, B)] = Name("Zip")) - extends FanInShape[(A, B)](_init) { - val left = newInlet[A]("left") - val right = newInlet[B]("right") - protected override def construct(i: Init[(A, B)]) = new ZipPorts(i) - } - //#fleximerge-zip-readall - //#fleximerge-zip-states -} - -class FlexiDocSpec extends AkkaSpec { - import FlexiDocSpec._ - - implicit val ec = system.dispatcher - implicit val mat = ActorMaterializer() - - "implement zip using readall" in { - //#fleximerge-zip-readall - class Zip[A, B] extends FlexiMerge[(A, B), ZipPorts[A, B]]( - new ZipPorts, Attributes.name("Zip1State")) { - import FlexiMerge._ - override def createMergeLogic(p: PortT) = new MergeLogic[(A, B)] { - override def initialState = - State(ReadAll(p.left, p.right)) { (ctx, _, inputs) => - val a = inputs(p.left) - val b = inputs(p.right) - ctx.emit((a, b)) - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - //#fleximerge-zip-readall - - //format: OFF - val res = - //#fleximerge-zip-connecting - FlowGraph.closed(Sink.head[(Int, String)]) { implicit b => - o => - import FlowGraph.Implicits._ - - val zip = b.add(new Zip[Int, String]) - - Source.single(1) ~> zip.left - Source.single("1") ~> zip.right - zip.out ~> o.inlet - } - //#fleximerge-zip-connecting - .run() - //format: ON - - Await.result(res, 300.millis) should equal((1, "1")) - } - - "implement zip using two states" in { - //#fleximerge-zip-states - class Zip[A, B] extends FlexiMerge[(A, B), ZipPorts[A, B]]( - new ZipPorts, Attributes.name("Zip2State")) { - import FlexiMerge._ - - override def createMergeLogic(p: PortT) = new MergeLogic[(A, B)] { - var lastInA: A = _ - - val readA: State[A] = State[A](Read(p.left)) { (ctx, input, element) => - lastInA = element - readB - } - - val readB: State[B] = State[B](Read(p.right)) { (ctx, input, element) => - ctx.emit((lastInA, element)) - readA - } - - override def initialState: State[_] = readA - - override def initialCompletionHandling = eagerClose - } - } - //#fleximerge-zip-states - - val res = FlowGraph.closed(Sink.head[(Int, String)]) { implicit b => - o => - import FlowGraph.Implicits._ - - val zip = b.add(new Zip[Int, String]) - - Source(1 to 2) ~> zip.left - Source((1 to 2).map(_.toString)) ~> zip.right - zip.out ~> o.inlet - }.run() - - Await.result(res, 300.millis) should equal((1, "1")) - } - - "fleximerge completion handling" in { - import FanInShape._ - //#fleximerge-completion - class ImportantWithBackupShape[A](_init: Init[A] = Name("Zip")) - extends FanInShape[A](_init) { - val important = newInlet[A]("important") - val replica1 = newInlet[A]("replica1") - val replica2 = newInlet[A]("replica2") - protected override def construct(i: Init[A]) = - new ImportantWithBackupShape(i) - } - class ImportantWithBackups[A] extends FlexiMerge[A, ImportantWithBackupShape[A]]( - new ImportantWithBackupShape, Attributes.name("ImportantWithBackups")) { - import FlexiMerge._ - - override def createMergeLogic(p: PortT) = new MergeLogic[A] { - import p.important - override def initialCompletionHandling = - CompletionHandling( - onUpstreamFinish = (ctx, input) => input match { - case `important` => - log.info("Important input completed, shutting down.") - ctx.finish() - SameState - - case replica => - log.info("Replica {} completed, " + - "no more replicas available, " + - "applying eagerClose completion handling.", replica) - - ctx.changeCompletionHandling(eagerClose) - SameState - }, - onUpstreamFailure = (ctx, input, cause) => input match { - case `important` => - ctx.fail(cause) - SameState - - case replica => - log.error(cause, "Replica {} failed, " + - "no more replicas available, " + - "applying eagerClose completion handling.", replica) - - ctx.changeCompletionHandling(eagerClose) - SameState - }) - - override def initialState = - State[A](ReadAny(p.important, p.replica1, p.replica2)) { - (ctx, input, element) => - ctx.emit(element) - SameState - } - } - } - //#fleximerge-completion - - FlowGraph.closed() { implicit b => - import FlowGraph.Implicits._ - val importantWithBackups = b.add(new ImportantWithBackups[Int]) - Source.single(1) ~> importantWithBackups.important - Source.single(2) ~> importantWithBackups.replica1 - Source.failed[Int](new Exception("Boom!") with NoStackTrace) ~> importantWithBackups.replica2 - importantWithBackups.out ~> Sink.ignore - }.run() - } - - "flexi preferring merge" in { - import FanInShape._ - //#flexi-preferring-merge-ports - class PreferringMergeShape[A](_init: Init[A] = Name("PreferringMerge")) - extends FanInShape[A](_init) { - val preferred = newInlet[A]("preferred") - val secondary1 = newInlet[A]("secondary1") - val secondary2 = newInlet[A]("secondary2") - protected override def construct(i: Init[A]) = new PreferringMergeShape(i) - } - //#flexi-preferring-merge-ports - - //#flexi-preferring-merge - - class PreferringMerge extends FlexiMerge[Int, PreferringMergeShape[Int]]( - new PreferringMergeShape, Attributes.name("ImportantWithBackups")) { - import akka.stream.scaladsl.FlexiMerge._ - - override def createMergeLogic(p: PortT) = new MergeLogic[Int] { - override def initialState = - State[Int](ReadPreferred(p.preferred, p.secondary1, p.secondary2)) { - (ctx, input, element) => - ctx.emit(element) - SameState - } - } - } - //#flexi-preferring-merge - } - - "flexi route" in { - //#flexiroute-unzip - import FanOutShape._ - class UnzipShape[A, B](_init: Init[(A, B)] = Name[(A, B)]("Unzip")) - extends FanOutShape[(A, B)](_init) { - val outA = newOutlet[A]("outA") - val outB = newOutlet[B]("outB") - protected override def construct(i: Init[(A, B)]) = new UnzipShape(i) - } - class Unzip[A, B] extends FlexiRoute[(A, B), UnzipShape[A, B]]( - new UnzipShape, Attributes.name("Unzip")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT) = new RouteLogic[(A, B)] { - override def initialState = - State[Any](DemandFromAll(p.outA, p.outB)) { - (ctx, _, element) => - val (a, b) = element - ctx.emit(p.outA)(a) - ctx.emit(p.outB)(b) - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - //#flexiroute-unzip - } - - "flexi route completion handling" in { - import FanOutShape._ - //#flexiroute-completion - class ImportantRouteShape[A](_init: Init[A] = Name[A]("ImportantRoute")) - extends FanOutShape[A](_init) { - val important = newOutlet[A]("important") - val additional1 = newOutlet[A]("additional1") - val additional2 = newOutlet[A]("additional2") - protected override def construct(i: Init[A]) = new ImportantRouteShape(i) - } - class ImportantRoute[A] extends FlexiRoute[A, ImportantRouteShape[A]]( - new ImportantRouteShape, Attributes.name("ImportantRoute")) { - import FlexiRoute._ - override def createRouteLogic(p: PortT) = new RouteLogic[A] { - import p.important - private val select = (p.important | p.additional1 | p.additional2) - - override def initialCompletionHandling = - CompletionHandling( - // upstream: - onUpstreamFinish = (ctx) => (), - onUpstreamFailure = (ctx, thr) => (), - // downstream: - onDownstreamFinish = (ctx, output) => output match { - case `important` => - // finish all downstreams, and cancel the upstream - ctx.finish() - SameState - case _ => - SameState - }) - - override def initialState = - State(DemandFromAny(p.important, p.additional1, p.additional2)) { - (ctx, output, element) => - ctx.emit(select(output))(element) - SameState - } - } - } - //#flexiroute-completion - - FlowGraph.closed() { implicit b => - import FlowGraph.Implicits._ - val route = b.add(new ImportantRoute[Int]) - Source.single(1) ~> route.in - route.important ~> Sink.ignore - route.additional1 ~> Sink.ignore - route.additional2 ~> Sink.ignore - }.run() - } - -} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 80258df307..0ea408237d 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -142,8 +142,7 @@ class FlowGraphDocSpec extends AkkaSpec { assert(inlets.size == this.inlets.size) assert(outlets.size == this.outlets.size) // This is why order matters when overriding inlets and outlets. - // The "[Nothing, Any]" is equivalent to casting the Inlets/Outlets. - PriorityWorkerPoolShape[Nothing, Any](inlets(0), inlets(1), outlets(0)) + PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out]) } } //#flow-graph-components-shape diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 3826ba0a00..5e228210d2 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -117,7 +117,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { val sendRmotely = Sink.actorRef(actorRef, "Done") val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ()) - val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast(_)) + val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_)) Source(List(0, 1, 2)).runWith(sink) //#sink-combine diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 3e9631d5b0..2a31f46198 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -96,7 +96,7 @@ private[http] object OutgoingConnectionBlueprint { responseParsingMerge.out ~> responsePrep ~> terminationFanout.in terminationFanout.out(0) ~> terminationMerge.in1 - BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse]( + BidiShape( methodBypassFanout.in, wrapTls.outlet, unwrapTls.inlet, diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 5dc2b093b8..ed105ff4e8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -94,7 +94,7 @@ public class BidiFlowTest extends StreamTest { @Override public BidiShape apply(Builder> b, SinkShape sink) throws Exception { - b.from(Source.single(42)).to(sink); + b.from(b.graph(Source.single(42))).to(sink); final FlowShape top = b.graph(Flow . empty().map(new Function() { @Override @@ -136,9 +136,9 @@ public class BidiFlowTest extends StreamTest { SinkShape sb) throws Exception { final BidiShape s = b .graph(bidi); - b.from(Source.single(1)).to(s.in1()); + b.from(b.graph(Source.single(1))).toInlet(s.in1()); b.from(s.out1()).to(st); - b.from(Source.single(bytes)).to(s.in2()); + b.from(b.graph(Source.single(bytes))).toInlet(s.in2()); b.from(s.out2()).to(sb); } }).run(materializer); @@ -217,8 +217,8 @@ public class BidiFlowTest extends StreamTest { return ByteString.fromString("Hello " + arg); } })); - b.from(shape.out2()).via(left).to(shape.in1()) - .from(shape.out1()).via(right).to(shape.in2()); + b.from(shape.out2()).via(left).toInlet(shape.in1()) + .from(shape.out1()).via(right).toInlet(shape.in2()); } }).run(materializer); assertEquals((Integer) 42, Await.result(f, oneSec)); @@ -241,8 +241,8 @@ public class BidiFlowTest extends StreamTest { } })); b.from(bcast).to(sink) - .from(Source.single(1)).via(bcast).to(merge) - .from(flow).to(merge); + .from(b.graph(Source.single(1))).viaFanOut(bcast).toFanIn(merge) + .from(flow).toFanIn(merge); return new Pair, Outlet>(flow.inlet(), merge.out()); } }); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 72d1a1ae87..94b792b27a 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -287,12 +287,12 @@ public class FlowGraphTest extends StreamTest { final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2>, SinkShape>() { @Override public void apply(Builder> b, SinkShape out) throws Exception { - b.from(Source.single(1)).to(out); - b.from(b.materializedValue()).to(Sink.foreach(new Procedure>(){ + b.from(b.graph(Source.single(1))).to(out); + b.from(b.materializedValue()).to(b.graph(Sink.foreach(new Procedure>(){ public void apply(Future mat) throws Exception { Patterns.pipe(mat, system.dispatcher()).to(probe.ref()); } - })); + }))); } }).run(materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 5112edcede..d2a5879fa0 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -646,8 +646,8 @@ public class FlowTest extends StreamTest { public Inlet apply(Builder b) throws Exception { final UniformFanOutShape broadcast = b.graph(Broadcast.create(2, true)); - b.from(broadcast.out(0)).to(out1); - b.from(broadcast.out(1)).to(out2); + b.from(broadcast.out(0)).to(b.graph(out1)); + b.from(broadcast.out(1)).to(b.graph(out2)); return broadcast.in(); } }); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 3d6bcf510d..85cdc41f0d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -381,9 +381,9 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off "emit an error if the TLS handshake fails certificate checks" in assertAllStagesStopped { val getError = Flow[SslTlsInbound] - .map[Either[SslTlsInbound, SSLException]](i => Left(i)) - .recover { case e: SSLException => Right(e) } - .collect { case Right(e) => e }.toMat(Sink.head)(Keep.right) + .map[Either[SslTlsInbound, SSLException]](i ⇒ Left(i)) + .recover { case e: SSLException ⇒ Right(e) } + .collect { case Right(e) ⇒ e }.toMat(Sink.head)(Keep.right) val simple = Flow.wrap(getError, Source.lazyEmpty[SslTlsOutbound])(Keep.left) @@ -399,8 +399,8 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off val clientErr = simple.join(badClientTls(IgnoreBoth)) .join(Tcp().outgoingConnection(Await.result(server, 1.second).localAddress)).run() - Await.result(serverErr.flatMap(identity), 1.second).getMessage should include ("certificate_unknown") - Await.result(clientErr, 1.second).getMessage should equal ("General SSLEngine problem") + Await.result(serverErr.flatMap(identity), 1.second).getMessage should include("certificate_unknown") + Await.result(clientErr, 1.second).getMessage should equal("General SSLEngine problem") } "reliably cancel subscriptions when TransportIn fails early" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala deleted file mode 100644 index c92fb93119..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ /dev/null @@ -1,765 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.scaladsl - -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.FlexiMerge._ -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl._ -import akka.stream.testkit.Utils._ -import org.reactivestreams.Publisher -import akka.stream._ -import scala.util.control.NoStackTrace -import scala.collection.immutable -import akka.actor.ActorRef -import akka.testkit.TestProbe -import scala.concurrent.Await -import scala.concurrent.duration._ - -object GraphFlexiMergeSpec { - - class Fair[T] extends FlexiMerge[T, UniformFanInShape[T, T]](new UniformFanInShape(2), Attributes.name("FairMerge")) { - def createMergeLogic(p: PortT): MergeLogic[T] = new MergeLogic[T] { - override def initialState = State[T](ReadAny(p.in(0), p.in(1))) { (ctx, input, element) ⇒ - ctx.emit(element) - SameState - } - } - } - - class StrictRoundRobin[T] extends FlexiMerge[T, UniformFanInShape[T, T]](new UniformFanInShape(2), Attributes.name("RoundRobinMerge")) { - def createMergeLogic(p: PortT): MergeLogic[T] = new MergeLogic[T] { - val emitOtherOnClose = CompletionHandling( - onUpstreamFinish = { (ctx, input) ⇒ - ctx.changeCompletionHandling(defaultCompletionHandling) - readRemaining(other(input)) - }, - onUpstreamFailure = { (ctx, _, cause) ⇒ - ctx.fail(cause) - SameState - }) - - def other(input: InPort): Inlet[T] = if (input eq p.in(0)) p.in(1) else p.in(0) - - val read1: State[T] = State(Read(p.in(0))) { (ctx, input, element) ⇒ - ctx.emit(element) - read2 - } - - val read2: State[T] = State(Read(p.in(1))) { (ctx, input, element) ⇒ - ctx.emit(element) - read1 - } - - def readRemaining(input: Inlet[T]) = State(Read(input)) { (ctx, input, element) ⇒ - ctx.emit(element) - SameState - } - - override def initialState = read1 - - override def initialCompletionHandling = emitOtherOnClose - } - } - - class StartStopTest(lifecycleProbe: ActorRef) - extends FlexiMerge[String, FanInShape2[String, String, String]](new FanInShape2("StartStopTest"), Attributes.name("StartStopTest")) { - - def createMergeLogic(p: PortT) = new MergeLogic[String] { - - override def preStart(): Unit = lifecycleProbe ! "preStart" - override def postStop(): Unit = lifecycleProbe ! "postStop" - - override def initialState = State(ReadAny(p.in0, p.in1)) { - (ctx, port, element) ⇒ - lifecycleProbe ! element - if (element == "fail") throw new IllegalStateException("test failure") - - ctx.emit(element) - SameState - } - } - } - - class MyZip[A, B] extends FlexiMerge[(A, B), FanInShape2[A, B, (A, B)]](new FanInShape2("MyZip"), Attributes.name("MyZip")) { - def createMergeLogic(p: PortT): MergeLogic[(A, B)] = new MergeLogic[(A, B)] { - var lastInA: A = _ - - val readA: State[A] = State[A](Read(p.in0)) { (ctx, input, element) ⇒ - lastInA = element - readB - } - - val readB: State[B] = State[B](Read(p.in1)) { (ctx, input, element) ⇒ - ctx.emit((lastInA, element)) - readA - } - - override def initialCompletionHandling = eagerClose - - override def initialState: State[_] = readA - } - } - - class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue, defVal: Option[A] = None) - extends FlexiMerge[(A, B, C), FanInShape3[A, B, C, (A, B, C)]](new FanInShape3("TripleCancellingZip"), Attributes.name("TripleCancellingZip")) { - def createMergeLogic(p: PortT) = new MergeLogic[(A, B, C)] { - override def initialState = State(ReadAll(p.in0, p.in1, p.in2)) { - case (ctx, input, inputs) ⇒ - val a = inputs.getOrElse(p.in0, defVal.get) - val b = inputs(p.in1) - val c = inputs(p.in2) - - ctx.emit((a, b, c)) - if (cancelAfter == 0) - ctx.cancel(p.in0) - cancelAfter -= 1 - - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - - object PreferringMerge extends FlexiMerge[Int, UniformFanInShape[Int, Int]](new UniformFanInShape(3), Attributes.name("PreferringMerge")) { - def createMergeLogic(p: PortT) = new MergeLogic[Int] { - override def initialState = State(Read(p.in(0))) { - (ctx, input, element) ⇒ - ctx.emit(element) - running - } - val running = State(ReadPreferred(p.in(0), p.in(1), p.in(2))) { - (ctx, input, element) ⇒ - ctx.emit(element) - SameState - } - } - } - - class TestMerge(completionProbe: ActorRef) - extends FlexiMerge[String, UniformFanInShape[String, String]](new UniformFanInShape(3), Attributes.name("TestMerge")) { - - def createMergeLogic(p: PortT) = new MergeLogic[String] { - var throwFromOnComplete = false - - override def initialState = State(ReadAny(p.inSeq: _*)) { - (ctx, input, element) ⇒ - if (element == "cancel") - ctx.cancel(input) - else if (element == "err") - ctx.fail(new RuntimeException("err") with NoStackTrace) - else if (element == "exc") - throw new RuntimeException("exc") with NoStackTrace - else if (element == "complete") - ctx.finish() - else if (element == "onUpstreamFinish-exc") - throwFromOnComplete = true - else - ctx.emit("onInput: " + element) - - SameState - } - - override def initialCompletionHandling = CompletionHandling( - onUpstreamFinish = { (ctx, input) ⇒ - if (throwFromOnComplete) - throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace - completionProbe ! input.toString - SameState - }, - onUpstreamFailure = { (ctx, input, cause) ⇒ - cause match { - case _: IllegalArgumentException ⇒ // swallow - case _ ⇒ ctx.fail(cause) - } - SameState - }) - } - } - -} - -class GraphFlexiMergeSpec extends AkkaSpec { - import GraphFlexiMergeSpec._ - import FlowGraph.Implicits._ - - implicit val materializer = ActorMaterializer() - - val in1 = Source(List("a", "b", "c", "d")) - val in2 = Source(List("e", "f")) - - val out = Sink.publisher[String] - - val fairString = new Fair[String] - - "FlexiMerge" must { - - "build simple fair merge" in assertAllStagesStopped { - FlowGraph.closed(TestSink.probe[String]) { implicit b ⇒ - o ⇒ - val merge = b.add(fairString) - - in1 ~> merge.in(0) - in2 ~> merge.in(1) - merge.out ~> o.inlet - }.run() - .request(10) - .expectNextUnordered("a", "b", "c", "d", "e", "f") - .expectComplete() - } - - "be able to have two fleximerges in a graph" in assertAllStagesStopped { - FlowGraph.closed(in1, in2, TestSink.probe[String])((i1, i2, o) ⇒ o) { implicit b ⇒ - (in1, in2, o) ⇒ - val m1 = b.add(fairString) - val m2 = b.add(fairString) - - // format: OFF - in1.outlet ~> m1.in(0) - in2.outlet ~> m1.in(1) - - Source(List("A", "B", "C", "D", "E", "F")) ~> m2.in(0) - m1.out ~> m2.in(1) - m2.out ~> o.inlet - // format: ON - }.run() - .request(20) - .expectNextUnordered("a", "b", "c", "d", "e", "f", "A", "B", "C", "D", "E", "F") - .expectComplete() - } - - "allow reuse" in { - val flow = Flow() { implicit b ⇒ - val merge = b.add(new Fair[String]) - - Source(() ⇒ Iterator.continually("+")) ~> merge.in(0) - - merge.in(1) → merge.out - } - - val g = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val zip = b add Zip[String, String]() - in1 ~> flow ~> Flow[String].map { of ⇒ of } ~> zip.in0 - in2 ~> flow ~> Flow[String].map { tf ⇒ tf } ~> zip.in1 - zip.out.map { x ⇒ x.toString } ~> o.inlet - } - - val p = g.run() - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(1000) - val received = for (_ ← 1 to 1000) yield s.expectNext() - val first = received.map(_.charAt(1)) - first.toSet should ===(Set('a', 'b', 'c', 'd', '+')) - first.filter(_ != '+') should ===(Seq('a', 'b', 'c', 'd')) - val second = received.map(_.charAt(3)) - second.toSet should ===(Set('e', 'f', '+')) - second.filter(_ != '+') should ===(Seq('e', 'f')) - sub.cancel() - } - - "allow zip reuse" in { - val flow = Flow() { implicit b ⇒ - val zip = b.add(new MyZip[String, String]) - - Source(() ⇒ Iterator.continually("+")) ~> zip.in0 - - (zip.in1, zip.out) - } - - FlowGraph.closed(TestSink.probe[String]) { implicit b ⇒ - o ⇒ - val zip = b.add(Zip[String, String]()) - - in1 ~> flow.map(_.toString()) ~> zip.in0 - in2 ~> zip.in1 - - zip.out.map(_.toString()) ~> o.inlet - }.run() - .request(100) - .expectNextUnordered("((+,b),f)", "((+,a),e)") - .expectComplete() - } - - "build simple round robin merge" in { - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new StrictRoundRobin[String]) - in1 ~> merge.in(0) - in2 ~> merge.in(1) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectNext("a") - s.expectNext("e") - s.expectNext("b") - s.expectNext("f") - s.expectNext("c") - s.expectNext("d") - s.expectComplete() - } - - "build simple zip merge" in { - val p = FlowGraph.closed(Sink.publisher[(Int, String)]) { implicit b ⇒ - o ⇒ - val merge = b.add(new MyZip[Int, String]) - Source(List(1, 2, 3, 4)) ~> merge.in0 - Source(List("a", "b", "c")) ~> merge.in1 - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[(Int, String)] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectNext(1 -> "a") - s.expectNext(2 -> "b") - s.expectNext(3 -> "c") - s.expectComplete() - } - - "build simple triple-zip merge using ReadAll" in { - val p = FlowGraph.closed(Sink.publisher[(Long, Int, String)]) { implicit b ⇒ - o ⇒ - val merge = b.add(new TripleCancellingZip[Long, Int, String]) - // format: OFF - Source(List(1L, 2L )) ~> merge.in0 - Source(List(1, 2, 3, 4)) ~> merge.in1 - Source(List("a", "b", "c" )) ~> merge.in2 - merge.out ~> o.inlet - // format: ON - }.run() - - val s = TestSubscriber.manualProbe[(Long, Int, String)] - p.subscribe(s) - val sub = s.expectSubscription() - - sub.request(10) - s.expectNext((1L, 1, "a")) - s.expectNext((2L, 2, "b")) - s.expectComplete() - } - - "build simple triple-zip merge using ReadAll, and continue with provided value for cancelled input" in { - val p = FlowGraph.closed(Sink.publisher[(Long, Int, String)]) { implicit b ⇒ - o ⇒ - val merge = b.add(new TripleCancellingZip[Long, Int, String](1, Some(0L))) - // format: OFF - Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.in0 - Source(List(1, 2, 3, 4 )) ~> merge.in1 - Source(List("a", "b", "c" )) ~> merge.in2 - merge.out ~> o.inlet - // format: ON - }.run() - - val s = TestSubscriber.manualProbe[(Long, Int, String)] - p.subscribe(s) - val sub = s.expectSubscription() - - sub.request(10) - s.expectNext((1L, 1, "a")) - s.expectNext((2L, 2, "b")) - // soonCancelledInput is now canceled and continues with default (null) value - s.expectNext((0L, 3, "c")) - s.expectComplete() - } - - "build perferring merge" in { - val output = Sink.publisher[Int] - val p = FlowGraph.closed(output) { implicit b ⇒ - o ⇒ - val merge = b.add(PreferringMerge) - Source(List(1, 2, 3)) ~> merge.in(0) - Source(List(11, 12, 13)) ~> merge.in(1) - Source(List(14, 15, 16)) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[Int] - p.subscribe(s) - val sub = s.expectSubscription() - - def expect(i: Int): Unit = { - sub.request(1) - s.expectNext(i) - } - def expectNext(): Int = { - sub.request(1) - s.expectNext() - } - - expect(1) - expect(2) - expect(3) - val secondaries = expectNext() :: - expectNext() :: - expectNext() :: - expectNext() :: - expectNext() :: - expectNext() :: Nil - - secondaries.toSet should equal(Set(11, 12, 13, 14, 15, 16)) - s.expectComplete() - } - - "build perferring merge, manually driven" in { - val output = Sink.publisher[Int] - val preferredDriver = TestPublisher.manualProbe[Int]() - val otherDriver1 = TestPublisher.manualProbe[Int]() - val otherDriver2 = TestPublisher.manualProbe[Int]() - - val p = FlowGraph.closed(output) { implicit b ⇒ - o ⇒ - val merge = b.add(PreferringMerge) - Source(preferredDriver) ~> merge.in(0) - Source(otherDriver1) ~> merge.in(1) - Source(otherDriver2) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[Int] - p.subscribe(s) - - val sub = s.expectSubscription() - val p1 = preferredDriver.expectSubscription() - val s1 = otherDriver1.expectSubscription() - val s2 = otherDriver2.expectSubscription() - - // just consume the preferred - p1.sendNext(1) - sub.request(1) - s.expectNext(1) - - // pick preferred over any of the secondaries - p1.sendNext(2) - s1.sendNext(10) - s2.sendNext(20) - sub.request(1) - s.expectNext(2) - - sub.request(2) - Set(s.expectNext(), s.expectNext()) should ===(Set(10, 20)) - - p1.sendComplete() - - // continue with just secondaries when preferred has completed - s1.sendNext(11) - s2.sendNext(21) - sub.request(2) - Set(s.expectNext(), s.expectNext()) should ===(Set(11, 21)) - - // continue with just one secondary - s1.sendComplete() - s2.sendNext(4) - sub.request(1) - s.expectNext(4) - s2.sendComplete() - - // finish when all inputs have completed - s.expectComplete() - } - - "support cancel of input" in assertAllStagesStopped { - val autoPublisher = TestPublisher.probe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(autoPublisher) ~> merge.in(0) - Source(List("b", "c", "d")) ~> merge.in(1) - Source(List("e", "f")) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - - autoPublisher.sendNext("a") - autoPublisher.sendNext("cancel") - - val sub = s.expectSubscription() - sub.request(10) - val outputs = - for (_ ← 1 to 6) yield { - val next = s.expectNext() - if (next.startsWith("onInput: ")) next.substring(9) else next.substring(12) - } - val one = Seq("a") - val two = Seq("b", "c", "d") - val three = Seq("e", "f") - outputs.filter(one.contains) should ===(one) - outputs.filter(two.contains) should ===(two) - outputs.filter(three.contains) should ===(three) - completionProbe.expectMsgAllOf("UniformFanIn.in1", "UniformFanIn.in2") - - autoPublisher.sendNext("x") - - s.expectComplete() - } - - "finish when all inputs cancelled" in assertAllStagesStopped { - val autoPublisher1 = TestPublisher.probe[String]() - val autoPublisher2 = TestPublisher.probe[String]() - val autoPublisher3 = TestPublisher.probe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(autoPublisher1) ~> merge.in(0) - Source(autoPublisher2) ~> merge.in(1) - Source(autoPublisher3) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - - autoPublisher1.sendNext("a") - autoPublisher1.sendNext("cancel") - s.expectNext("onInput: a") - - autoPublisher2.sendNext("b") - autoPublisher2.sendNext("cancel") - s.expectNext("onInput: b") - - autoPublisher3.sendNext("c") - autoPublisher3.sendNext("cancel") - s.expectNext("onInput: c") - - s.expectComplete() - } - - "handle failure" in assertAllStagesStopped { - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.in(0) - Source(List("a", "b")) ~> merge.in(1) - Source(List("c")) ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - // IllegalArgumentException is swallowed by the CompletionHandler - val outputs = - for (_ ← 1 to 3) yield { - val next = s.expectNext() - if (next.startsWith("onInput: ")) next.substring(9) else next.substring(12) - } - val one = Seq("a", "b") - val two = Seq("c") - completionProbe.expectMsgAllOf("UniformFanIn.in1", "UniformFanIn.in2") - outputs.filter(one.contains) should ===(one) - outputs.filter(two.contains) should ===(two) - - s.expectComplete() - } - - "propagate failure" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(publisher) ~> merge.in(0) - Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - s.expectSubscriptionAndError().getMessage should be("ERROR") - } - - "emit failure" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("err")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - - s.expectError().getMessage should be("err") - } - - "emit failure for user thrown exception" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("exc")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectError().getMessage should be("exc") - } - - "emit failure for user thrown exception in onComplete" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("onUpstreamFinish-exc")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectError().getMessage should be("onUpstreamFinish-exc") - } - - "emit failure for user thrown exception in onUpstreamFinish 2" in assertAllStagesStopped { - val autoPublisher = TestPublisher.probe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source.empty[String] ~> merge.in(0) - Source(autoPublisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - autoPublisher.sendNext("onUpstreamFinish-exc") - autoPublisher.sendNext("a") - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(1) - s.expectNext("onInput: a") - - autoPublisher.sendComplete() - s.expectError().getMessage should be("onUpstreamFinish-exc") - } - - "support finish from onInput" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[String]() - val completionProbe = TestProbe() - val p = FlowGraph.closed(out) { implicit b ⇒ - o ⇒ - val merge = b.add(new TestMerge(completionProbe.ref)) - Source(List("a", "complete")) ~> merge.in(0) - Source(publisher) ~> merge.in(1) - Source.empty[String] ~> merge.in(2) - merge.out ~> o.inlet - }.run() - - val s = TestSubscriber.manualProbe[String] - p.subscribe(s) - val sub = s.expectSubscription() - sub.request(10) - s.expectNext("onInput: a") - s.expectComplete() - } - - "have the correct value for input in ReadPreffered" in { - import akka.stream.FanInShape._ - class MShape[T](_init: Init[T] = Name("mshape")) extends FanInShape[T](_init) { - val priority = newInlet[T]("priority") - val second = newInlet[T]("second") - protected override def construct(i: Init[T]) = new MShape(i) - } - class MyMerge[T] extends FlexiMerge[T, MShape[T]]( - new MShape, Attributes.name("cmerge")) { - import akka.stream.scaladsl.FlexiMerge._ - override def createMergeLogic(p: PortT) = new MergeLogic[T] { - override def initialState = - State[T](ReadPreferred(p.priority, p.second)) { - (ctx, input, element) ⇒ - if (element == 1) assert(input == p.priority) - if (element == 2) assert(input == p.second) - ctx.emit(element) - SameState - } - } - } - - val sink = Sink.fold[Int, Int](0)(_ + _) - val graph = FlowGraph.closed(sink) { implicit b ⇒ - sink ⇒ - import FlowGraph.Implicits._ - - val merge = b.add(new MyMerge[Int]()) - - Source.single(1) ~> merge.priority - Source.single(2) ~> merge.second - - merge.out ~> sink.inlet - } - Await.result(graph.run(), 1.second) should equal(3) - } - - "handle preStart and postStop" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val m = b.add(new StartStopTest(p.ref)) - - Source(List("1", "2", "3")) ~> m.in0 - Source.empty ~> m.in1 - m.out ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("2") - p.expectMsg("3") - p.expectMsg("postStop") - } - - "invoke postStop after error" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val m = b.add(new StartStopTest(p.ref)) - - Source(List("1", "fail", "2", "3")) ~> m.in0 - Source.empty ~> m.in1 - m.out ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("fail") - p.expectMsg("postStop") - } - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala deleted file mode 100644 index 1001e0b82c..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ /dev/null @@ -1,484 +0,0 @@ -package akka.stream.scaladsl - -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import FlowGraph.Implicits._ -import akka.stream.ActorMaterializer -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl._ -import akka.stream.testkit.Utils._ -import akka.actor.ActorSystem -import akka.stream._ -import akka.actor.ActorRef -import akka.testkit.TestProbe - -object GraphFlexiRouteSpec { - - /** - * This is fair in that sense that after enqueueing to an output it yields to other output if - * they are have requested elements. Or in other words, if all outputs have demand available at the same - * time then in finite steps all elements are enqueued to them. - */ - class Fair[T] extends FlexiRoute[T, UniformFanOutShape[T, T]](new UniformFanOutShape(2), Attributes.name("FairBalance")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT): RouteLogic[T] = new RouteLogic[T] { - val select = p.out(0) | p.out(1) - - val emitToAnyWithDemand = State(DemandFromAny(p)) { (ctx, out, element) ⇒ - ctx.emit(select(out))(element) - SameState - } - - // initally, wait for demand from all - override def initialState = State(DemandFromAll(p)) { (ctx, _, element) ⇒ - ctx.emit(p.out(0))(element) - emitToAnyWithDemand - } - } - } - - /** - * It never skips an output while cycling but waits on it instead (closed outputs are skipped though). - * The fair route above is a non-strict round-robin (skips currently unavailable outputs). - */ - class StrictRoundRobin[T] extends FlexiRoute[T, UniformFanOutShape[T, T]](new UniformFanOutShape(2), Attributes.name("RoundRobinBalance")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT) = new RouteLogic[T] { - - val toOutput1: State[Outlet[T]] = State(DemandFrom(p.out(0))) { (ctx, out, element) ⇒ - ctx.emit(out)(element) - toOutput2 - } - - val toOutput2 = State(DemandFrom(p.out(1))) { (ctx, out, element) ⇒ - ctx.emit(out)(element) - toOutput1 - } - - override def initialState = toOutput1 - } - } - - class Unzip[A, B] extends FlexiRoute[(A, B), FanOutShape2[(A, B), A, B]](new FanOutShape2("Unzip"), Attributes.name("Unzip")) { - import FlexiRoute._ - - override def createRouteLogic(p: PortT) = new RouteLogic[(A, B)] { - - override def initialState = State(DemandFromAll(p)) { (ctx, _, element) ⇒ - val (a, b) = element - ctx.emit(p.out0)(a) - ctx.emit(p.out1)(b) - SameState - } - - override def initialCompletionHandling = eagerClose - } - } - - class StartStopTestRoute(lifecycleProbe: ActorRef) - extends FlexiRoute[String, FanOutShape2[String, String, String]](new FanOutShape2("StartStopTest"), Attributes.name("StartStopTest")) { - import FlexiRoute._ - - def createRouteLogic(p: PortT) = new RouteLogic[String] { - val select = p.out0 | p.out1 - - override def preStart(): Unit = lifecycleProbe ! "preStart" - override def postStop(): Unit = lifecycleProbe ! "postStop" - - override def initialState = State(DemandFromAny(p)) { - (ctx, port, element) ⇒ - lifecycleProbe ! element - if (element == "fail") throw new IllegalStateException("test failure") - ctx.emit(select(port))(element) - - SameState - } - } - - } - - class TestRoute(completionProbe: ActorRef) - extends FlexiRoute[String, FanOutShape2[String, String, String]](new FanOutShape2("TestRoute"), Attributes.name("TestRoute")) { - import FlexiRoute._ - - var throwFromOnComplete = false - - def createRouteLogic(p: PortT): RouteLogic[String] = new RouteLogic[String] { - val select = p.out0 | p.out1 - - override def initialState = State(DemandFromAny(p)) { - (ctx, preferred, element) ⇒ - if (element == "err") - ctx.fail(new RuntimeException("err") with NoStackTrace) - else if (element == "err-output1") - ctx.fail(p.out0, new RuntimeException("err-1") with NoStackTrace) - else if (element == "exc") - throw new RuntimeException("exc") with NoStackTrace - else if (element == "onUpstreamFinish-exc") - throwFromOnComplete = true - else if (element == "finish") - ctx.finish() - else - ctx.emit(select(preferred))("onInput: " + element) - - SameState - } - - override def initialCompletionHandling = CompletionHandling( - onUpstreamFinish = { ctx ⇒ - if (throwFromOnComplete) - throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace - completionProbe ! "onUpstreamFinish" - }, - onUpstreamFailure = { (ctx, cause) ⇒ - cause match { - case _: IllegalArgumentException ⇒ // swallow - case _ ⇒ - completionProbe ! "onError" - } - }, - onDownstreamFinish = { (ctx, cancelledOutput) ⇒ - completionProbe ! "onDownstreamFinish: " + cancelledOutput - SameState - }) - } - } - - class TestFixture(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer) { - val autoPublisher = TestPublisher.probe[String]() - val s1 = TestSubscriber.manualProbe[String] - val s2 = TestSubscriber.manualProbe[String] - val completionProbe = TestProbe() - FlowGraph.closed() { implicit b ⇒ - val route = b.add(new TestRoute(completionProbe.ref)) - Source(autoPublisher) ~> route.in - route.out0 ~> Sink(s1) - route.out1 ~> Sink(s2) - }.run() - - autoPublisher.sendNext("a") - autoPublisher.sendNext("b") - - val sub1 = s1.expectSubscription() - val sub2 = s2.expectSubscription() - } - -} - -class GraphFlexiRouteSpec extends AkkaSpec { - import GraphFlexiRouteSpec._ - - implicit val materializer = ActorMaterializer() - - val in = Source(List("a", "b", "c", "d", "e")) - - val out1 = Sink.publisher[String] - val out2 = Sink.publisher[String] - - "FlexiRoute" must { - - "build simple fair route" in assertAllStagesStopped { - // we can't know exactly which elements that go to each output, because if subscription/request - // from one of the downstream is delayed the elements will be pushed to the other output - FlowGraph.closed(TestSink.probe[String]) { implicit b ⇒ - out ⇒ - val merge = b.add(Merge[String](2)) - val route = b.add(new Fair[String]) - in ~> route.in - route.out(0) ~> merge.in(0) - route.out(1) ~> merge.in(1) - merge.out ~> out - }.run() - .request(10) - .expectNextUnordered("a", "b", "c", "d", "e") - .expectComplete() - } - - "build simple round-robin route" in { - val (p1, p2) = FlowGraph.closed(out1, out2)(Keep.both) { implicit b ⇒ - (o1, o2) ⇒ - val route = b.add(new StrictRoundRobin[String]) - in ~> route.in - route.out(0) ~> o1.inlet - route.out(1) ~> o2.inlet - }.run() - - val s1 = TestSubscriber.manualProbe[String] - p1.subscribe(s1) - val sub1 = s1.expectSubscription() - val s2 = TestSubscriber.manualProbe[String] - p2.subscribe(s2) - val sub2 = s2.expectSubscription() - - sub1.request(10) - sub2.request(10) - - s1.expectNext("a") - s2.expectNext("b") - s1.expectNext("c") - s2.expectNext("d") - s1.expectNext("e") - - s1.expectComplete() - s2.expectComplete() - } - - "build simple unzip route" in { - val outA = Sink.publisher[Int] - val outB = Sink.publisher[String] - - val (p1, p2) = FlowGraph.closed(outA, outB)(Keep.both) { implicit b ⇒ - (oa, ob) ⇒ - val route = b.add(new Unzip[Int, String]) - Source(List(1 -> "A", 2 -> "B", 3 -> "C", 4 -> "D")) ~> route.in - route.out0 ~> oa.inlet - route.out1 ~> ob.inlet - }.run() - - val s1 = TestSubscriber.manualProbe[Int] - p1.subscribe(s1) - val sub1 = s1.expectSubscription() - val s2 = TestSubscriber.manualProbe[String] - p2.subscribe(s2) - val sub2 = s2.expectSubscription() - - sub1.request(3) - sub2.request(4) - - s1.expectNext(1) - s2.expectNext("A") - s1.expectNext(2) - s2.expectNext("B") - s1.expectNext(3) - s2.expectNext("C") - sub1.cancel() - - s2.expectComplete() - } - - "support finish of downstreams and cancel of upstream" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - autoPublisher.sendNext("finish") - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(2) - s2.expectNext("onInput: b") - - s1.expectComplete() - s2.expectComplete() - } - - "support error of outputs" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - autoPublisher.sendNext("err") - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(2) - s2.expectNext("onInput: b") - - s1.expectError().getMessage should be("err") - s2.expectError().getMessage should be("err") - autoPublisher.expectCancellation() - } - - "support error of a specific output" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(5) - sub2.request(5) - autoPublisher.sendNext("err-output1") - autoPublisher.sendNext("c") - - s2.expectNext("onInput: c") - s1.expectError().getMessage should be("err-1") - - autoPublisher.sendComplete() - completionProbe.expectMsg("onUpstreamFinish") - s2.expectComplete() - } - - "emit error for user thrown exception" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(5) - sub2.request(5) - autoPublisher.sendNext("exc") - - s1.expectError().getMessage should be("exc") - s2.expectError().getMessage should be("exc") - - autoPublisher.expectCancellation() - } - - "emit error for user thrown exception in onUpstreamFinish" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(5) - sub2.request(5) - autoPublisher.sendNext("onUpstreamFinish-exc") - autoPublisher.sendComplete() - - s1.expectError().getMessage should be("onUpstreamFinish-exc") - s2.expectError().getMessage should be("onUpstreamFinish-exc") - } - - "handle cancel from output" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - sub1.cancel() - - completionProbe.expectMsg("onDownstreamFinish: TestRoute.out0") - s1.expectNoMsg(200.millis) - - autoPublisher.sendNext("c") - s2.expectNext("onInput: c") - - autoPublisher.sendComplete() - s2.expectComplete() - } - - "handle finish from upstream input" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - autoPublisher.sendComplete() - - completionProbe.expectMsg("onUpstreamFinish") - - s1.expectComplete() - s2.expectComplete() - } - - "handle error from upstream input" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace) - - completionProbe.expectMsg("onError") - - s1.expectError().getMessage should be("test err") - s2.expectError().getMessage should be("test err") - } - - "cancel upstream input when all outputs cancelled" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - sub1.cancel() - - completionProbe.expectMsg("onDownstreamFinish: TestRoute.out0") - sub2.cancel() - - autoPublisher.expectCancellation() - } - - "cancel upstream input when all outputs completed" in assertAllStagesStopped { - val fixture = new TestFixture - import fixture._ - - sub1.request(1) - s1.expectNext("onInput: a") - sub2.request(1) - s2.expectNext("onInput: b") - - sub1.request(2) - sub2.request(2) - autoPublisher.sendNext("finish") - s1.expectComplete() - s2.expectComplete() - autoPublisher.expectCancellation() - } - - "handle preStart and postStop" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val r = b.add(new StartStopTestRoute(p.ref)) - - Source(List("1", "2", "3")) ~> r.in - r.out0 ~> Sink.ignore - r.out1 ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("2") - p.expectMsg("3") - p.expectMsg("postStop") - } - - "invoke postStop after error" in assertAllStagesStopped { - val p = TestProbe() - - FlowGraph.closed() { implicit b ⇒ - val r = b.add(new StartStopTestRoute(p.ref)) - - Source(List("1", "fail", "2", "3")) ~> r.in - r.out0 ~> Sink.ignore - r.out1 ~> Sink.ignore - }.run() - - p.expectMsg("preStart") - p.expectMsg("1") - p.expectMsg("fail") - p.expectMsg("postStop") - } - - } -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index a9d25df84f..922ed5f262 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -93,7 +93,7 @@ class SinkSpec extends AkkaSpec { "combine to many outputs with simplified API" in { val probes = Seq.fill(3)(TestSubscriber.manualProbe[Int]()) - val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast(_)) + val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast[Int](_)) Source(List(0, 1, 2)).runWith(sink) @@ -111,7 +111,7 @@ class SinkSpec extends AkkaSpec { "combine to two sinks with simplified API" in { val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]()) - val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast(_)) + val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast[Int](_)) Source(List(0, 1, 2)).runWith(sink) diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template index 9e73c459a6..e9f9fb1b85 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanInShape.scala.template @@ -5,15 +5,16 @@ package akka.stream import scala.collection.immutable import scala.annotation.varargs +import scala.annotation.unchecked.uncheckedVariance object FanInShape { - sealed trait Init[+O] { + sealed trait Init[O] { def outlet: Outlet[O] def inlets: immutable.Seq[Inlet[_]] def name: String } - final case class Name(override val name: String) extends Init[Nothing] { - override def outlet: Outlet[Nothing] = Outlet(s"$name.out") + final case class Name[O](override val name: String) extends Init[O] { + override def outlet: Outlet[O] = Outlet(s"$name.out") override def inlets: immutable.Seq[Inlet[_]] = Nil } final case class Ports[O](override val outlet: Outlet[O], override val inlets: immutable.Seq[Inlet[_]]) extends Init[O] { @@ -21,12 +22,12 @@ object FanInShape { } } -abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inlet[_]], _name: String) extends Shape { +abstract class FanInShape[+O] private (_out: Outlet[O @uncheckedVariance], _registered: Iterator[Inlet[_]], _name: String) extends Shape { import FanInShape._ def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name) - final def out: Outlet[O] = _out + final def out: Outlet[O @uncheckedVariance] = _out final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil final override def inlets: immutable.Seq[Inlet[_]] = _inlets @@ -37,7 +38,7 @@ abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inl p } - protected def construct(init: Init[O]): FanInShape[O] + protected def construct(init: Init[O @uncheckedVariance]): FanInShape[O] def deepCopy(): FanInShape[O] = construct(Ports[O](_out.carbonCopy(), inlets.map(_.carbonCopy()))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = { @@ -52,39 +53,39 @@ object UniformFanInShape { new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList)) } -class UniformFanInShape[-T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { - def this(n: Int) = this(n, FanInShape.Name("UniformFanIn")) - def this(n: Int, name: String) = this(n, FanInShape.Name(name)) +class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { + def this(n: Int) = this(n, FanInShape.Name[O]("UniformFanIn")) + def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList)) - override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new UniformFanInShape(n, init) + override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new UniformFanInShape(n, init) override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]] - val inSeq: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(n)(i => newInlet[T](s"in$i")) - def in(n: Int): Inlet[T] = inSeq(n) + val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T](s"in$i")) + def in(n: Int): Inlet[T @uncheckedVariance] = inSeq(n) } -class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { - def this(n: Int) = this(n, FanInShape.Name("FanInShape1N")) - def this(n: Int, name: String) = this(n, FanInShape.Name(name)) - def this(outlet: Outlet[O], in0: Inlet[T0], inlets1: Array[Inlet[T1]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList)) - override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1N(n, init) +class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) { + def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N")) + def this(n: Int, name: String) = this(n, FanInShape.Name[O](name)) + def this(outlet: Outlet[O @uncheckedVariance], in0: Inlet[T0 @uncheckedVariance], inlets1: Array[Inlet[T1 @uncheckedVariance]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList)) + override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1N(n, init) override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]] - val in0 = newInlet[T0]("in0") - val in1Seq: immutable.IndexedSeq[Inlet[T1]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}")) - def in(n: Int): Inlet[T1] = { + val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0") + val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}")) + def in(n: Int): Inlet[T1 @uncheckedVariance] = { require(n > 0, "n must be > 0") in1Seq(n - 1) } } -[2..#class FanInShape1[[#T0#], O](_init: FanInShape.Init[O]) extends FanInShape[O](_init) { - def this(name: String) = this(FanInShape.Name(name)) +[2..#class FanInShape1[[#-T0#], +O](_init: FanInShape.Init[O]) extends FanInShape[O](_init) { + def this(name: String) = this(FanInShape.Name[O](name)) def this([#in0: Inlet[T0]#], out: Outlet[O]) = this(FanInShape.Ports(out, [#in0# :: ] :: Nil)) - override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1(init) + override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1(init) override def deepCopy(): FanInShape1[[#T0#], O] = super.deepCopy().asInstanceOf[FanInShape1[[#T0#], O]] - [#val in0 = newInlet[T0]("in0")# + [#val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")# ] }# diff --git a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template index 0bd0cee686..1497e00b62 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/FanOutShape.scala.template @@ -4,6 +4,7 @@ package akka.stream import scala.collection.immutable +import scala.annotation.unchecked.uncheckedVariance object FanOutShape { sealed trait Init[I] { @@ -20,12 +21,12 @@ object FanOutShape { } } -abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outlet[_]], _name: String) extends Shape { +abstract class FanOutShape[-I] private (_in: Inlet[I @uncheckedVariance], _registered: Iterator[Outlet[_]], _name: String) extends Shape { import FanOutShape._ def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name) - final def in: Inlet[I] = _in + final def in: Inlet[I @uncheckedVariance] = _in final override def outlets: immutable.Seq[Outlet[_]] = _outlets final override def inlets: immutable.Seq[Inlet[_]] = in :: Nil @@ -36,7 +37,7 @@ abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outl p } - protected def construct(init: Init[I]): FanOutShape[I] + protected def construct(init: Init[I @uncheckedVariance]): FanOutShape[I] def deepCopy(): FanOutShape[I] = construct(Ports[I](_in.carbonCopy(), outlets.map(_.carbonCopy()))) final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanOutShape[I] = { @@ -51,24 +52,24 @@ object UniformFanOutShape { new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) } -class UniformFanOutShape[I, O](n: Int, _init: FanOutShape.Init[I]) extends FanOutShape[I](_init) { +class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) { def this(n: Int) = this(n, FanOutShape.Name[I]("UniformFanOut")) def this(n: Int, name: String) = this(n, FanOutShape.Name[I](name)) def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) - override protected def construct(init: FanOutShape.Init[I]): FanOutShape[I] = new UniformFanOutShape(n, init) + override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new UniformFanOutShape(n, init) override def deepCopy(): UniformFanOutShape[I, O] = super.deepCopy().asInstanceOf[UniformFanOutShape[I, O]] - val outArray: Array[Outlet[O]] = Array.tabulate(n)(i => newOutlet[O](s"out$i")) - def out(n: Int): Outlet[O] = outArray(n) + val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(n)(i => newOutlet[O](s"out$i")) + def out(n: Int): Outlet[O @uncheckedVariance] = outArray(n) } -[2..#class FanOutShape1[I, [#O0#]](_init: FanOutShape.Init[I]) extends FanOutShape[I](_init) { +[2..#class FanOutShape1[-I, [#+O0#]](_init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) { def this(name: String) = this(FanOutShape.Name[I](name)) def this(in: Inlet[I], [#out0: Outlet[O0]#]) = this(FanOutShape.Ports(in, [#out0# :: ] :: Nil)) - override protected def construct(init: FanOutShape.Init[I]): FanOutShape[I] = new FanOutShape1(init) + override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new FanOutShape1(init) override def deepCopy(): FanOutShape1[I, [#O0#]] = super.deepCopy().asInstanceOf[FanOutShape1[I, [#O0#]]] - [#val out0 = newOutlet[O0]("out0")# + [#val out0: Outlet[O0 @uncheckedVariance] = newOutlet[O0]("out0")# ] }# diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index a7ee5ffeee..336b14a423 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -4,9 +4,9 @@ package akka.stream import akka.util.Collections.EmptyImmutableSeq - import scala.collection.immutable import scala.collection.JavaConverters._ +import scala.annotation.unchecked.uncheckedVariance /** * An input port of a StreamLayout.Module. This type logically belongs @@ -48,8 +48,12 @@ object Inlet { def apply[T](toString: String): Inlet[T] = new Inlet[T](toString) } -final class Inlet[-T] private (override val toString: String) extends InPort { +final class Inlet[T] private (override val toString: String) extends InPort { def carbonCopy(): Inlet[T] = Inlet(toString) + /** + * INTERNAL API. + */ + def as[U]: Inlet[U] = this.asInstanceOf[Inlet[U]] } /** @@ -61,8 +65,12 @@ object Outlet { def apply[T](toString: String): Outlet[T] = new Outlet[T](toString) } -final class Outlet[+T] private (override val toString: String) extends OutPort { +final class Outlet[T] private (override val toString: String) extends OutPort { def carbonCopy(): Outlet[T] = Outlet(toString) + /** + * INTERNAL API. + */ + def as[U]: Outlet[U] = this.asInstanceOf[Outlet[U]] } /** @@ -187,7 +195,7 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se * A Source [[Shape]] has exactly one output and no inputs, it models a source * of data. */ -final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { +final case class SourceShape[+T](outlet: Outlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq override val outlets: immutable.Seq[Outlet[_]] = List(outlet) @@ -204,7 +212,7 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { * outside like a pipe (but it can be a complex topology of streams within of * course). */ -final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape { +final case class FlowShape[-I, +O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = List(outlet) @@ -219,7 +227,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S /** * A Sink [[Shape]] has exactly one input and no outputs, it models a data sink. */ -final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { +final case class SinkShape[-T](inlet: Inlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq @@ -244,10 +252,10 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { * +------+ * }}} */ -final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], - out1: Outlet[Out1], - in2: Inlet[In2], - out2: Outlet[Out2]) extends Shape { +final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVariance], + out1: Outlet[Out1 @uncheckedVariance], + in2: Inlet[In2 @uncheckedVariance], + out2: Outlet[Out2 @uncheckedVariance]) extends Shape { //#implementation-details-elided override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2) override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2) @@ -270,6 +278,6 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], //#bidi-shape object BidiShape { - def apply[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = + def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala index 7ff5762e40..dd305cebb7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala @@ -425,10 +425,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo initialPhase(2, bidirectional) - protected def fail(e: Throwable, closeTransport: Boolean=true): Unit = { + protected def fail(e: Throwable, closeTransport: Boolean = true): Unit = { if (tracing) log.debug("fail {} due to: {}", self, e.getMessage) inputBunch.cancel() - if(closeTransport) { + if (closeTransport) { log.debug("closing output") outputBunch.error(TransportOut, e) } diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index e2f7552ca1..37d9c9a097 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -160,7 +160,7 @@ object SslTlsPlacebo { val session = SSLContext.getDefault.createSSLEngine.getSession val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) ⇒ bytes }) val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _))) - BidiShape(top, bottom) + BidiShape.fromFlows(top, bottom) } val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, Unit] = new javadsl.BidiFlow(forScala) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 58fdf9b49d..fe92630eaf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -948,7 +948,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph val f = new function.Function2[FlowGraph.Builder[M], SourceShape[T], Inlet[Out]Pair Outlet[Out Pair T]] { override def apply(b: FlowGraph.Builder[M], s: SourceShape[T]): Inlet[Out] Pair Outlet[Out Pair T] = { val zip = b.graph(Zip.create[Out, T]) - b.from(s).to(zip.in1) + b.from(s).toInlet(zip.in1) new Pair(zip.in0, zip.out) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 1dccd09c33..b88e5cf427 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -5,6 +5,7 @@ package akka.stream.javadsl import akka.stream._ import akka.japi.Pair +import scala.annotation.unchecked.uncheckedVariance /** * Merge several streams, taking elements as they arrive from input streams @@ -277,49 +278,41 @@ object FlowGraph { * * @return The outlet that will emit the materialized value. */ - def materializedValue: Outlet[Mat] = delegate.materializedValue + def materializedValue: Outlet[Mat @uncheckedVariance] = delegate.materializedValue def run(mat: Materializer): Unit = delegate.buildRunnable().run()(mat) def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) - def from[T, M](src: Graph[SourceShape[T], M]): ForwardOps[T] = new ForwardOps(delegate.add(src).outlet) def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet) def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet) def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out) def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0)) def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in) - def to[T, M](dst: Graph[SinkShape[T], M]): ReverseOps[T] = new ReverseOps(delegate.add(dst).inlet) def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet) def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet) def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) final class ForwardOps[T](out: Outlet[T]) { - def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self } - def to[M](dst: Graph[SinkShape[T], M]): Builder[Mat] = { out ~> dst; self } - def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self } - def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self } - def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self } - def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self } - def via[U, M](f: Graph[FlowShape[T, U], M]): ForwardOps[U] = from((out ~> f).outlet) - def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet) - def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) - def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) + def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { out ~> in; self } + def to(dst: SinkShape[_ >: T]): Builder[Mat] = { out ~> dst; self } + def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { out ~> j; self } + def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { out ~> j; self } + def via[U, M](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((out ~> f).outlet) + def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet) + def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet) def out(): Outlet[T] = out } final class ReverseOps[T](out: Inlet[T]) { - def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self } - def from[M](dst: Graph[SourceShape[T], M]): Builder[Mat] = { out <~ dst; self } - def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self } - def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self } - def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self } - def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self } - def via[U, M](f: Graph[FlowShape[U, T], M]): ReverseOps[U] = to((out <~ f).inlet) - def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet) - def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) - def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) + def fromOutlet(dst: Outlet[_ <: T]): Builder[Mat] = { out <~ dst; self } + def from(dst: SourceShape[_ <: T]): Builder[Mat] = { out <~ dst; self } + def fromFanIn[U](j: UniformFanInShape[U, _ <: T]): Builder[Mat] = { out <~ j; self } + def fromFanOut[U](j: UniformFanOutShape[U, _ <: T]): Builder[Mat] = { out <~ j; self } + def via[U](f: FlowShape[U, _ <: T]): ReverseOps[U] = to((out <~ f).inlet) + def viaFanIn[U](j: UniformFanInShape[U, _ <: T]): ReverseOps[U] = to((out <~ j).inlet) + def viaFanOut[U](j: UniformFanOutShape[U, _ <: T]): ReverseOps[U] = to((out <~ j).inlet) } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 09a96a02a8..05b2dcbfe6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -8,6 +8,7 @@ import akka.stream.{ Outlet, Shape, OutPort, Graph, Attributes } import scala.collection.immutable import akka.stream.impl.Junctions.FlexiRouteModule import akka.stream.impl.Stages.DefaultAttributes +import scala.annotation.unchecked.uncheckedVariance object FlexiRoute { @@ -24,7 +25,7 @@ object FlexiRoute { * has been completed. `IllegalArgumentException` is thrown if * that is not obeyed. */ - final case class DemandFrom[+T](output: Outlet[T]) extends DemandCondition[Outlet[T]] + final case class DemandFrom[+T](output: Outlet[T @uncheckedVariance]) extends DemandCondition[Outlet[T @uncheckedVariance]] object DemandFromAny { def apply(outputs: OutPort*): DemandFromAny = new DemandFromAny(outputs.to[immutable.Seq]) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 5eec534865..a283d1ef8d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -102,7 +102,7 @@ class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] object MergePreferred { import FanInShape._ final class MergePreferredShape[T](val secondaryPorts: Int, _init: Init[T]) extends UniformFanInShape[T, T](secondaryPorts, _init) { - def this(secondaryPorts: Int, name: String) = this(secondaryPorts, Name(name)) + def this(secondaryPorts: Int, name: String) = this(secondaryPorts, Name[T](name)) override protected def construct(init: Init[T]): FanInShape[T] = new MergePreferredShape(secondaryPorts, init) override def deepCopy(): MergePreferredShape[T] = super.deepCopy().asInstanceOf[MergePreferredShape[T]] @@ -562,7 +562,7 @@ object FlowGraph extends GraphApply { class Builder[+M] private[stream] () { private var moduleInProgress: Module = EmptyModule - def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit = { + def addEdge[A1, A >: A1, B, B1 >: B, M2](from: Outlet[A1], via: Graph[FlowShape[A, B], M2], to: Inlet[B1]): Unit = { val flowCopy = via.module.carbonCopy moduleInProgress = moduleInProgress @@ -571,7 +571,7 @@ object FlowGraph extends GraphApply { .wire(flowCopy.shape.outlets.head, to) } - def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit = { + def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit = { moduleInProgress = moduleInProgress.wire(from, to) } @@ -630,7 +630,7 @@ object FlowGraph extends GraphApply { * * @return The outlet that will emit the materialized value. */ - def materializedValue: Outlet[M] = { + def materializedValue: Outlet[M @uncheckedVariance] = { val module = new MaterializedValueSource[Any] moduleInProgress = moduleInProgress.compose(module) module.shape.outlet.asInstanceOf[Outlet[M]] @@ -724,7 +724,7 @@ object FlowGraph extends GraphApply { trait CombinerBase[T] extends Any { def importAndGetPort(b: Builder[_]): Outlet[T] - def ~>(to: Inlet[T])(implicit b: Builder[_]): Unit = { + def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit = { b.addEdge(importAndGetPort(b), to) } @@ -770,7 +770,7 @@ object FlowGraph extends GraphApply { trait ReverseCombinerBase[T] extends Any { def importAndGetPortReverse(b: Builder[_]): Inlet[T] - def <~(from: Outlet[T])(implicit b: Builder[_]): Unit = { + def <~[U <: T](from: Outlet[U])(implicit b: Builder[_]): Unit = { b.addEdge(from, importAndGetPortReverse(b)) }