!str make Inlet/Outlet invariant and add Java variance

This necessitates the removal of method overloading in the Java Graph
DSL: the to() and via() methods were not otherwise resolved correctly by
javac, leading to incomprehensible error messages. The new approach is
to offer just one way of doing things which is a bit more verbose but
should be easier to read and learn. In this vein auto-importing while
using the DSL is also gone for Java—not sure about Scala yet.
This commit is contained in:
Roland Kuhn 2015-10-08 12:12:35 +02:00
parent c6a5864e25
commit dc07fd250c
23 changed files with 111 additions and 1649 deletions

View file

@ -6,8 +6,8 @@ package docs.http.scaladsl.server
package directives
import akka.event.Logging
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.server.directives.{DebuggingDirectives, LogEntry, LoggingMagnet}
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.http.scaladsl.server.directives.{ DebuggingDirectives, LogEntry, LoggingMagnet }
class DebuggingDirectivesExamplesSpec extends RoutingSpec {
"logRequest-0" in {

View file

@ -30,7 +30,6 @@ class RangeDirectivesExamplesSpec extends RoutingSpec {
responseAs[String] shouldEqual "DE"
}
// we set "akka.http.routing.range-coalescing-threshold = 2"
// above to make sure we get two BodyParts
Get() ~> addHeader(Range(ByteRange(0, 1), ByteRange(1, 2), ByteRange(6, 7))) ~> route ~> check {

View file

@ -50,7 +50,7 @@ object BidiFlowDocSpec {
// construct and add the bottom flow, going inbound
val inbound = b.add(Flow[ByteString].map(fromBytes))
// fuse them together into a BidiShape
BidiShape(outbound, inbound)
BidiShape.fromFlows(outbound, inbound)
}
// this is the same as the above
@ -112,18 +112,18 @@ object BidiFlowDocSpec {
val outbound = b.add(Flow[ByteString].map(addLengthHeader))
val inbound = b.add(Flow[ByteString].transform(() => new FrameParser))
BidiShape(outbound, inbound)
BidiShape.fromFlows(outbound, inbound)
}
//#framing
val chopUp = BidiFlow() { b =>
val f = Flow[ByteString].mapConcat(_.map(ByteString(_)))
BidiShape(b.add(f), b.add(f))
BidiShape.fromFlows(b.add(f), b.add(f))
}
val accumulate = BidiFlow() { b =>
val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _))
BidiShape(b.add(f), b.add(f))
BidiShape.fromFlows(b.add(f), b.add(f))
}
}

View file

@ -1,291 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.Attributes
object FlexiDocSpec {
//#fleximerge-zip-states
//#fleximerge-zip-readall
import akka.stream.FanInShape._
class ZipPorts[A, B](_init: Init[(A, B)] = Name("Zip"))
extends FanInShape[(A, B)](_init) {
val left = newInlet[A]("left")
val right = newInlet[B]("right")
protected override def construct(i: Init[(A, B)]) = new ZipPorts(i)
}
//#fleximerge-zip-readall
//#fleximerge-zip-states
}
class FlexiDocSpec extends AkkaSpec {
import FlexiDocSpec._
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
"implement zip using readall" in {
//#fleximerge-zip-readall
class Zip[A, B] extends FlexiMerge[(A, B), ZipPorts[A, B]](
new ZipPorts, Attributes.name("Zip1State")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[(A, B)] {
override def initialState =
State(ReadAll(p.left, p.right)) { (ctx, _, inputs) =>
val a = inputs(p.left)
val b = inputs(p.right)
ctx.emit((a, b))
SameState
}
override def initialCompletionHandling = eagerClose
}
}
//#fleximerge-zip-readall
//format: OFF
val res =
//#fleximerge-zip-connecting
FlowGraph.closed(Sink.head[(Int, String)]) { implicit b =>
o =>
import FlowGraph.Implicits._
val zip = b.add(new Zip[Int, String])
Source.single(1) ~> zip.left
Source.single("1") ~> zip.right
zip.out ~> o.inlet
}
//#fleximerge-zip-connecting
.run()
//format: ON
Await.result(res, 300.millis) should equal((1, "1"))
}
"implement zip using two states" in {
//#fleximerge-zip-states
class Zip[A, B] extends FlexiMerge[(A, B), ZipPorts[A, B]](
new ZipPorts, Attributes.name("Zip2State")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[(A, B)] {
var lastInA: A = _
val readA: State[A] = State[A](Read(p.left)) { (ctx, input, element) =>
lastInA = element
readB
}
val readB: State[B] = State[B](Read(p.right)) { (ctx, input, element) =>
ctx.emit((lastInA, element))
readA
}
override def initialState: State[_] = readA
override def initialCompletionHandling = eagerClose
}
}
//#fleximerge-zip-states
val res = FlowGraph.closed(Sink.head[(Int, String)]) { implicit b =>
o =>
import FlowGraph.Implicits._
val zip = b.add(new Zip[Int, String])
Source(1 to 2) ~> zip.left
Source((1 to 2).map(_.toString)) ~> zip.right
zip.out ~> o.inlet
}.run()
Await.result(res, 300.millis) should equal((1, "1"))
}
"fleximerge completion handling" in {
import FanInShape._
//#fleximerge-completion
class ImportantWithBackupShape[A](_init: Init[A] = Name("Zip"))
extends FanInShape[A](_init) {
val important = newInlet[A]("important")
val replica1 = newInlet[A]("replica1")
val replica2 = newInlet[A]("replica2")
protected override def construct(i: Init[A]) =
new ImportantWithBackupShape(i)
}
class ImportantWithBackups[A] extends FlexiMerge[A, ImportantWithBackupShape[A]](
new ImportantWithBackupShape, Attributes.name("ImportantWithBackups")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[A] {
import p.important
override def initialCompletionHandling =
CompletionHandling(
onUpstreamFinish = (ctx, input) => input match {
case `important` =>
log.info("Important input completed, shutting down.")
ctx.finish()
SameState
case replica =>
log.info("Replica {} completed, " +
"no more replicas available, " +
"applying eagerClose completion handling.", replica)
ctx.changeCompletionHandling(eagerClose)
SameState
},
onUpstreamFailure = (ctx, input, cause) => input match {
case `important` =>
ctx.fail(cause)
SameState
case replica =>
log.error(cause, "Replica {} failed, " +
"no more replicas available, " +
"applying eagerClose completion handling.", replica)
ctx.changeCompletionHandling(eagerClose)
SameState
})
override def initialState =
State[A](ReadAny(p.important, p.replica1, p.replica2)) {
(ctx, input, element) =>
ctx.emit(element)
SameState
}
}
}
//#fleximerge-completion
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val importantWithBackups = b.add(new ImportantWithBackups[Int])
Source.single(1) ~> importantWithBackups.important
Source.single(2) ~> importantWithBackups.replica1
Source.failed[Int](new Exception("Boom!") with NoStackTrace) ~> importantWithBackups.replica2
importantWithBackups.out ~> Sink.ignore
}.run()
}
"flexi preferring merge" in {
import FanInShape._
//#flexi-preferring-merge-ports
class PreferringMergeShape[A](_init: Init[A] = Name("PreferringMerge"))
extends FanInShape[A](_init) {
val preferred = newInlet[A]("preferred")
val secondary1 = newInlet[A]("secondary1")
val secondary2 = newInlet[A]("secondary2")
protected override def construct(i: Init[A]) = new PreferringMergeShape(i)
}
//#flexi-preferring-merge-ports
//#flexi-preferring-merge
class PreferringMerge extends FlexiMerge[Int, PreferringMergeShape[Int]](
new PreferringMergeShape, Attributes.name("ImportantWithBackups")) {
import akka.stream.scaladsl.FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[Int] {
override def initialState =
State[Int](ReadPreferred(p.preferred, p.secondary1, p.secondary2)) {
(ctx, input, element) =>
ctx.emit(element)
SameState
}
}
}
//#flexi-preferring-merge
}
"flexi route" in {
//#flexiroute-unzip
import FanOutShape._
class UnzipShape[A, B](_init: Init[(A, B)] = Name[(A, B)]("Unzip"))
extends FanOutShape[(A, B)](_init) {
val outA = newOutlet[A]("outA")
val outB = newOutlet[B]("outB")
protected override def construct(i: Init[(A, B)]) = new UnzipShape(i)
}
class Unzip[A, B] extends FlexiRoute[(A, B), UnzipShape[A, B]](
new UnzipShape, Attributes.name("Unzip")) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[(A, B)] {
override def initialState =
State[Any](DemandFromAll(p.outA, p.outB)) {
(ctx, _, element) =>
val (a, b) = element
ctx.emit(p.outA)(a)
ctx.emit(p.outB)(b)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
//#flexiroute-unzip
}
"flexi route completion handling" in {
import FanOutShape._
//#flexiroute-completion
class ImportantRouteShape[A](_init: Init[A] = Name[A]("ImportantRoute"))
extends FanOutShape[A](_init) {
val important = newOutlet[A]("important")
val additional1 = newOutlet[A]("additional1")
val additional2 = newOutlet[A]("additional2")
protected override def construct(i: Init[A]) = new ImportantRouteShape(i)
}
class ImportantRoute[A] extends FlexiRoute[A, ImportantRouteShape[A]](
new ImportantRouteShape, Attributes.name("ImportantRoute")) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[A] {
import p.important
private val select = (p.important | p.additional1 | p.additional2)
override def initialCompletionHandling =
CompletionHandling(
// upstream:
onUpstreamFinish = (ctx) => (),
onUpstreamFailure = (ctx, thr) => (),
// downstream:
onDownstreamFinish = (ctx, output) => output match {
case `important` =>
// finish all downstreams, and cancel the upstream
ctx.finish()
SameState
case _ =>
SameState
})
override def initialState =
State(DemandFromAny(p.important, p.additional1, p.additional2)) {
(ctx, output, element) =>
ctx.emit(select(output))(element)
SameState
}
}
}
//#flexiroute-completion
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val route = b.add(new ImportantRoute[Int])
Source.single(1) ~> route.in
route.important ~> Sink.ignore
route.additional1 ~> Sink.ignore
route.additional2 ~> Sink.ignore
}.run()
}
}

View file

@ -142,8 +142,7 @@ class FlowGraphDocSpec extends AkkaSpec {
assert(inlets.size == this.inlets.size)
assert(outlets.size == this.outlets.size)
// This is why order matters when overriding inlets and outlets.
// The "[Nothing, Any]" is equivalent to casting the Inlets/Outlets.
PriorityWorkerPoolShape[Nothing, Any](inlets(0), inlets(1), outlets(0))
PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out])
}
}
//#flow-graph-components-shape

View file

@ -117,7 +117,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast(_))
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)
//#sink-combine

View file

@ -96,7 +96,7 @@ private[http] object OutgoingConnectionBlueprint {
responseParsingMerge.out ~> responsePrep ~> terminationFanout.in
terminationFanout.out(0) ~> terminationMerge.in1
BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse](
BidiShape(
methodBypassFanout.in,
wrapTls.outlet,
unwrapTls.inlet,

View file

@ -94,7 +94,7 @@ public class BidiFlowTest extends StreamTest {
@Override
public BidiShape<Integer, Long, ByteString, String> apply(Builder<Future<Integer>> b, SinkShape<Integer> sink)
throws Exception {
b.from(Source.single(42)).to(sink);
b.from(b.graph(Source.single(42))).to(sink);
final FlowShape<Integer, Long> top = b.graph(Flow
.<Integer> empty().map(new Function<Integer, Long>() {
@Override
@ -136,9 +136,9 @@ public class BidiFlowTest extends StreamTest {
SinkShape<String> sb) throws Exception {
final BidiShape<Integer, Long, ByteString, String> s = b
.graph(bidi);
b.from(Source.single(1)).to(s.in1());
b.from(b.graph(Source.single(1))).toInlet(s.in1());
b.from(s.out1()).to(st);
b.from(Source.single(bytes)).to(s.in2());
b.from(b.graph(Source.single(bytes))).toInlet(s.in2());
b.from(s.out2()).to(sb);
}
}).run(materializer);
@ -217,8 +217,8 @@ public class BidiFlowTest extends StreamTest {
return ByteString.fromString("Hello " + arg);
}
}));
b.from(shape.out2()).via(left).to(shape.in1())
.from(shape.out1()).via(right).to(shape.in2());
b.from(shape.out2()).via(left).toInlet(shape.in1())
.from(shape.out1()).via(right).toInlet(shape.in2());
}
}).run(materializer);
assertEquals((Integer) 42, Await.result(f, oneSec));
@ -241,8 +241,8 @@ public class BidiFlowTest extends StreamTest {
}
}));
b.from(bcast).to(sink)
.from(Source.single(1)).via(bcast).to(merge)
.from(flow).to(merge);
.from(b.graph(Source.single(1))).viaFanOut(bcast).toFanIn(merge)
.from(flow).toFanIn(merge);
return new Pair<Inlet<String>, Outlet<Integer>>(flow.inlet(), merge.out());
}
});

View file

@ -287,12 +287,12 @@ public class FlowGraphTest extends StreamTest {
final Future<Integer> future = FlowGraph.factory().closed(Sink.<Integer> head(), new Procedure2<Builder<Future<Integer>>, SinkShape<Integer>>() {
@Override
public void apply(Builder<Future<Integer>> b, SinkShape<Integer> out) throws Exception {
b.from(Source.single(1)).to(out);
b.from(b.materializedValue()).to(Sink.foreach(new Procedure<Future<Integer>>(){
b.from(b.graph(Source.single(1))).to(out);
b.from(b.materializedValue()).to(b.graph(Sink.foreach(new Procedure<Future<Integer>>(){
public void apply(Future<Integer> mat) throws Exception {
Patterns.pipe(mat, system.dispatcher()).to(probe.ref());
}
}));
})));
}
}).run(materializer);

View file

@ -646,8 +646,8 @@ public class FlowTest extends StreamTest {
public Inlet<String> apply(Builder<BoxedUnit> b) throws Exception {
final UniformFanOutShape<String, String> broadcast = b.graph(Broadcast.<String>create(2, true));
b.from(broadcast.out(0)).to(out1);
b.from(broadcast.out(1)).to(out2);
b.from(broadcast.out(0)).to(b.graph(out1));
b.from(broadcast.out(1)).to(b.graph(out2));
return broadcast.in();
}
});

View file

@ -381,9 +381,9 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
"emit an error if the TLS handshake fails certificate checks" in assertAllStagesStopped {
val getError = Flow[SslTlsInbound]
.map[Either[SslTlsInbound, SSLException]](i => Left(i))
.recover { case e: SSLException => Right(e) }
.collect { case Right(e) => e }.toMat(Sink.head)(Keep.right)
.map[Either[SslTlsInbound, SSLException]](i Left(i))
.recover { case e: SSLException Right(e) }
.collect { case Right(e) e }.toMat(Sink.head)(Keep.right)
val simple = Flow.wrap(getError, Source.lazyEmpty[SslTlsOutbound])(Keep.left)
@ -399,8 +399,8 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
val clientErr = simple.join(badClientTls(IgnoreBoth))
.join(Tcp().outgoingConnection(Await.result(server, 1.second).localAddress)).run()
Await.result(serverErr.flatMap(identity), 1.second).getMessage should include ("certificate_unknown")
Await.result(clientErr, 1.second).getMessage should equal ("General SSLEngine problem")
Await.result(serverErr.flatMap(identity), 1.second).getMessage should include("certificate_unknown")
Await.result(clientErr, 1.second).getMessage should equal("General SSLEngine problem")
}
"reliably cancel subscriptions when TransportIn fails early" in assertAllStagesStopped {

View file

@ -1,765 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.FlexiMerge._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._
import org.reactivestreams.Publisher
import akka.stream._
import scala.util.control.NoStackTrace
import scala.collection.immutable
import akka.actor.ActorRef
import akka.testkit.TestProbe
import scala.concurrent.Await
import scala.concurrent.duration._
object GraphFlexiMergeSpec {
class Fair[T] extends FlexiMerge[T, UniformFanInShape[T, T]](new UniformFanInShape(2), Attributes.name("FairMerge")) {
def createMergeLogic(p: PortT): MergeLogic[T] = new MergeLogic[T] {
override def initialState = State[T](ReadAny(p.in(0), p.in(1))) { (ctx, input, element)
ctx.emit(element)
SameState
}
}
}
class StrictRoundRobin[T] extends FlexiMerge[T, UniformFanInShape[T, T]](new UniformFanInShape(2), Attributes.name("RoundRobinMerge")) {
def createMergeLogic(p: PortT): MergeLogic[T] = new MergeLogic[T] {
val emitOtherOnClose = CompletionHandling(
onUpstreamFinish = { (ctx, input)
ctx.changeCompletionHandling(defaultCompletionHandling)
readRemaining(other(input))
},
onUpstreamFailure = { (ctx, _, cause)
ctx.fail(cause)
SameState
})
def other(input: InPort): Inlet[T] = if (input eq p.in(0)) p.in(1) else p.in(0)
val read1: State[T] = State(Read(p.in(0))) { (ctx, input, element)
ctx.emit(element)
read2
}
val read2: State[T] = State(Read(p.in(1))) { (ctx, input, element)
ctx.emit(element)
read1
}
def readRemaining(input: Inlet[T]) = State(Read(input)) { (ctx, input, element)
ctx.emit(element)
SameState
}
override def initialState = read1
override def initialCompletionHandling = emitOtherOnClose
}
}
class StartStopTest(lifecycleProbe: ActorRef)
extends FlexiMerge[String, FanInShape2[String, String, String]](new FanInShape2("StartStopTest"), Attributes.name("StartStopTest")) {
def createMergeLogic(p: PortT) = new MergeLogic[String] {
override def preStart(): Unit = lifecycleProbe ! "preStart"
override def postStop(): Unit = lifecycleProbe ! "postStop"
override def initialState = State(ReadAny(p.in0, p.in1)) {
(ctx, port, element)
lifecycleProbe ! element
if (element == "fail") throw new IllegalStateException("test failure")
ctx.emit(element)
SameState
}
}
}
class MyZip[A, B] extends FlexiMerge[(A, B), FanInShape2[A, B, (A, B)]](new FanInShape2("MyZip"), Attributes.name("MyZip")) {
def createMergeLogic(p: PortT): MergeLogic[(A, B)] = new MergeLogic[(A, B)] {
var lastInA: A = _
val readA: State[A] = State[A](Read(p.in0)) { (ctx, input, element)
lastInA = element
readB
}
val readB: State[B] = State[B](Read(p.in1)) { (ctx, input, element)
ctx.emit((lastInA, element))
readA
}
override def initialCompletionHandling = eagerClose
override def initialState: State[_] = readA
}
}
class TripleCancellingZip[A, B, C](var cancelAfter: Int = Int.MaxValue, defVal: Option[A] = None)
extends FlexiMerge[(A, B, C), FanInShape3[A, B, C, (A, B, C)]](new FanInShape3("TripleCancellingZip"), Attributes.name("TripleCancellingZip")) {
def createMergeLogic(p: PortT) = new MergeLogic[(A, B, C)] {
override def initialState = State(ReadAll(p.in0, p.in1, p.in2)) {
case (ctx, input, inputs)
val a = inputs.getOrElse(p.in0, defVal.get)
val b = inputs(p.in1)
val c = inputs(p.in2)
ctx.emit((a, b, c))
if (cancelAfter == 0)
ctx.cancel(p.in0)
cancelAfter -= 1
SameState
}
override def initialCompletionHandling = eagerClose
}
}
object PreferringMerge extends FlexiMerge[Int, UniformFanInShape[Int, Int]](new UniformFanInShape(3), Attributes.name("PreferringMerge")) {
def createMergeLogic(p: PortT) = new MergeLogic[Int] {
override def initialState = State(Read(p.in(0))) {
(ctx, input, element)
ctx.emit(element)
running
}
val running = State(ReadPreferred(p.in(0), p.in(1), p.in(2))) {
(ctx, input, element)
ctx.emit(element)
SameState
}
}
}
class TestMerge(completionProbe: ActorRef)
extends FlexiMerge[String, UniformFanInShape[String, String]](new UniformFanInShape(3), Attributes.name("TestMerge")) {
def createMergeLogic(p: PortT) = new MergeLogic[String] {
var throwFromOnComplete = false
override def initialState = State(ReadAny(p.inSeq: _*)) {
(ctx, input, element)
if (element == "cancel")
ctx.cancel(input)
else if (element == "err")
ctx.fail(new RuntimeException("err") with NoStackTrace)
else if (element == "exc")
throw new RuntimeException("exc") with NoStackTrace
else if (element == "complete")
ctx.finish()
else if (element == "onUpstreamFinish-exc")
throwFromOnComplete = true
else
ctx.emit("onInput: " + element)
SameState
}
override def initialCompletionHandling = CompletionHandling(
onUpstreamFinish = { (ctx, input)
if (throwFromOnComplete)
throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
completionProbe ! input.toString
SameState
},
onUpstreamFailure = { (ctx, input, cause)
cause match {
case _: IllegalArgumentException // swallow
case _ ctx.fail(cause)
}
SameState
})
}
}
}
class GraphFlexiMergeSpec extends AkkaSpec {
import GraphFlexiMergeSpec._
import FlowGraph.Implicits._
implicit val materializer = ActorMaterializer()
val in1 = Source(List("a", "b", "c", "d"))
val in2 = Source(List("e", "f"))
val out = Sink.publisher[String]
val fairString = new Fair[String]
"FlexiMerge" must {
"build simple fair merge" in assertAllStagesStopped {
FlowGraph.closed(TestSink.probe[String]) { implicit b
o
val merge = b.add(fairString)
in1 ~> merge.in(0)
in2 ~> merge.in(1)
merge.out ~> o.inlet
}.run()
.request(10)
.expectNextUnordered("a", "b", "c", "d", "e", "f")
.expectComplete()
}
"be able to have two fleximerges in a graph" in assertAllStagesStopped {
FlowGraph.closed(in1, in2, TestSink.probe[String])((i1, i2, o) o) { implicit b
(in1, in2, o)
val m1 = b.add(fairString)
val m2 = b.add(fairString)
// format: OFF
in1.outlet ~> m1.in(0)
in2.outlet ~> m1.in(1)
Source(List("A", "B", "C", "D", "E", "F")) ~> m2.in(0)
m1.out ~> m2.in(1)
m2.out ~> o.inlet
// format: ON
}.run()
.request(20)
.expectNextUnordered("a", "b", "c", "d", "e", "f", "A", "B", "C", "D", "E", "F")
.expectComplete()
}
"allow reuse" in {
val flow = Flow() { implicit b
val merge = b.add(new Fair[String])
Source(() Iterator.continually("+")) ~> merge.in(0)
merge.in(1) merge.out
}
val g = FlowGraph.closed(out) { implicit b
o
val zip = b add Zip[String, String]()
in1 ~> flow ~> Flow[String].map { of of } ~> zip.in0
in2 ~> flow ~> Flow[String].map { tf tf } ~> zip.in1
zip.out.map { x x.toString } ~> o.inlet
}
val p = g.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(1000)
val received = for (_ 1 to 1000) yield s.expectNext()
val first = received.map(_.charAt(1))
first.toSet should ===(Set('a', 'b', 'c', 'd', '+'))
first.filter(_ != '+') should ===(Seq('a', 'b', 'c', 'd'))
val second = received.map(_.charAt(3))
second.toSet should ===(Set('e', 'f', '+'))
second.filter(_ != '+') should ===(Seq('e', 'f'))
sub.cancel()
}
"allow zip reuse" in {
val flow = Flow() { implicit b
val zip = b.add(new MyZip[String, String])
Source(() Iterator.continually("+")) ~> zip.in0
(zip.in1, zip.out)
}
FlowGraph.closed(TestSink.probe[String]) { implicit b
o
val zip = b.add(Zip[String, String]())
in1 ~> flow.map(_.toString()) ~> zip.in0
in2 ~> zip.in1
zip.out.map(_.toString()) ~> o.inlet
}.run()
.request(100)
.expectNextUnordered("((+,b),f)", "((+,a),e)")
.expectComplete()
}
"build simple round robin merge" in {
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new StrictRoundRobin[String])
in1 ~> merge.in(0)
in2 ~> merge.in(1)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext("a")
s.expectNext("e")
s.expectNext("b")
s.expectNext("f")
s.expectNext("c")
s.expectNext("d")
s.expectComplete()
}
"build simple zip merge" in {
val p = FlowGraph.closed(Sink.publisher[(Int, String)]) { implicit b
o
val merge = b.add(new MyZip[Int, String])
Source(List(1, 2, 3, 4)) ~> merge.in0
Source(List("a", "b", "c")) ~> merge.in1
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[(Int, String)]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext(1 -> "a")
s.expectNext(2 -> "b")
s.expectNext(3 -> "c")
s.expectComplete()
}
"build simple triple-zip merge using ReadAll" in {
val p = FlowGraph.closed(Sink.publisher[(Long, Int, String)]) { implicit b
o
val merge = b.add(new TripleCancellingZip[Long, Int, String])
// format: OFF
Source(List(1L, 2L )) ~> merge.in0
Source(List(1, 2, 3, 4)) ~> merge.in1
Source(List("a", "b", "c" )) ~> merge.in2
merge.out ~> o.inlet
// format: ON
}.run()
val s = TestSubscriber.manualProbe[(Long, Int, String)]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext((1L, 1, "a"))
s.expectNext((2L, 2, "b"))
s.expectComplete()
}
"build simple triple-zip merge using ReadAll, and continue with provided value for cancelled input" in {
val p = FlowGraph.closed(Sink.publisher[(Long, Int, String)]) { implicit b
o
val merge = b.add(new TripleCancellingZip[Long, Int, String](1, Some(0L)))
// format: OFF
Source(List(1L, 2L, 3L, 4L, 5L)) ~> merge.in0
Source(List(1, 2, 3, 4 )) ~> merge.in1
Source(List("a", "b", "c" )) ~> merge.in2
merge.out ~> o.inlet
// format: ON
}.run()
val s = TestSubscriber.manualProbe[(Long, Int, String)]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext((1L, 1, "a"))
s.expectNext((2L, 2, "b"))
// soonCancelledInput is now canceled and continues with default (null) value
s.expectNext((0L, 3, "c"))
s.expectComplete()
}
"build perferring merge" in {
val output = Sink.publisher[Int]
val p = FlowGraph.closed(output) { implicit b
o
val merge = b.add(PreferringMerge)
Source(List(1, 2, 3)) ~> merge.in(0)
Source(List(11, 12, 13)) ~> merge.in(1)
Source(List(14, 15, 16)) ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[Int]
p.subscribe(s)
val sub = s.expectSubscription()
def expect(i: Int): Unit = {
sub.request(1)
s.expectNext(i)
}
def expectNext(): Int = {
sub.request(1)
s.expectNext()
}
expect(1)
expect(2)
expect(3)
val secondaries = expectNext() ::
expectNext() ::
expectNext() ::
expectNext() ::
expectNext() ::
expectNext() :: Nil
secondaries.toSet should equal(Set(11, 12, 13, 14, 15, 16))
s.expectComplete()
}
"build perferring merge, manually driven" in {
val output = Sink.publisher[Int]
val preferredDriver = TestPublisher.manualProbe[Int]()
val otherDriver1 = TestPublisher.manualProbe[Int]()
val otherDriver2 = TestPublisher.manualProbe[Int]()
val p = FlowGraph.closed(output) { implicit b
o
val merge = b.add(PreferringMerge)
Source(preferredDriver) ~> merge.in(0)
Source(otherDriver1) ~> merge.in(1)
Source(otherDriver2) ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[Int]
p.subscribe(s)
val sub = s.expectSubscription()
val p1 = preferredDriver.expectSubscription()
val s1 = otherDriver1.expectSubscription()
val s2 = otherDriver2.expectSubscription()
// just consume the preferred
p1.sendNext(1)
sub.request(1)
s.expectNext(1)
// pick preferred over any of the secondaries
p1.sendNext(2)
s1.sendNext(10)
s2.sendNext(20)
sub.request(1)
s.expectNext(2)
sub.request(2)
Set(s.expectNext(), s.expectNext()) should ===(Set(10, 20))
p1.sendComplete()
// continue with just secondaries when preferred has completed
s1.sendNext(11)
s2.sendNext(21)
sub.request(2)
Set(s.expectNext(), s.expectNext()) should ===(Set(11, 21))
// continue with just one secondary
s1.sendComplete()
s2.sendNext(4)
sub.request(1)
s.expectNext(4)
s2.sendComplete()
// finish when all inputs have completed
s.expectComplete()
}
"support cancel of input" in assertAllStagesStopped {
val autoPublisher = TestPublisher.probe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(autoPublisher) ~> merge.in(0)
Source(List("b", "c", "d")) ~> merge.in(1)
Source(List("e", "f")) ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
autoPublisher.sendNext("a")
autoPublisher.sendNext("cancel")
val sub = s.expectSubscription()
sub.request(10)
val outputs =
for (_ 1 to 6) yield {
val next = s.expectNext()
if (next.startsWith("onInput: ")) next.substring(9) else next.substring(12)
}
val one = Seq("a")
val two = Seq("b", "c", "d")
val three = Seq("e", "f")
outputs.filter(one.contains) should ===(one)
outputs.filter(two.contains) should ===(two)
outputs.filter(three.contains) should ===(three)
completionProbe.expectMsgAllOf("UniformFanIn.in1", "UniformFanIn.in2")
autoPublisher.sendNext("x")
s.expectComplete()
}
"finish when all inputs cancelled" in assertAllStagesStopped {
val autoPublisher1 = TestPublisher.probe[String]()
val autoPublisher2 = TestPublisher.probe[String]()
val autoPublisher3 = TestPublisher.probe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(autoPublisher1) ~> merge.in(0)
Source(autoPublisher2) ~> merge.in(1)
Source(autoPublisher3) ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
autoPublisher1.sendNext("a")
autoPublisher1.sendNext("cancel")
s.expectNext("onInput: a")
autoPublisher2.sendNext("b")
autoPublisher2.sendNext("cancel")
s.expectNext("onInput: b")
autoPublisher3.sendNext("c")
autoPublisher3.sendNext("cancel")
s.expectNext("onInput: c")
s.expectComplete()
}
"handle failure" in assertAllStagesStopped {
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source.failed[String](new IllegalArgumentException("ERROR") with NoStackTrace) ~> merge.in(0)
Source(List("a", "b")) ~> merge.in(1)
Source(List("c")) ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
// IllegalArgumentException is swallowed by the CompletionHandler
val outputs =
for (_ 1 to 3) yield {
val next = s.expectNext()
if (next.startsWith("onInput: ")) next.substring(9) else next.substring(12)
}
val one = Seq("a", "b")
val two = Seq("c")
completionProbe.expectMsgAllOf("UniformFanIn.in1", "UniformFanIn.in2")
outputs.filter(one.contains) should ===(one)
outputs.filter(two.contains) should ===(two)
s.expectComplete()
}
"propagate failure" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(publisher) ~> merge.in(0)
Source.failed[String](new IllegalStateException("ERROR") with NoStackTrace) ~> merge.in(1)
Source.empty[String] ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
s.expectSubscriptionAndError().getMessage should be("ERROR")
}
"emit failure" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(List("err")) ~> merge.in(0)
Source(publisher) ~> merge.in(1)
Source.empty[String] ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectError().getMessage should be("err")
}
"emit failure for user thrown exception" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(List("exc")) ~> merge.in(0)
Source(publisher) ~> merge.in(1)
Source.empty[String] ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectError().getMessage should be("exc")
}
"emit failure for user thrown exception in onComplete" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(List("onUpstreamFinish-exc")) ~> merge.in(0)
Source(publisher) ~> merge.in(1)
Source.empty[String] ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectError().getMessage should be("onUpstreamFinish-exc")
}
"emit failure for user thrown exception in onUpstreamFinish 2" in assertAllStagesStopped {
val autoPublisher = TestPublisher.probe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source.empty[String] ~> merge.in(0)
Source(autoPublisher) ~> merge.in(1)
Source.empty[String] ~> merge.in(2)
merge.out ~> o.inlet
}.run()
autoPublisher.sendNext("onUpstreamFinish-exc")
autoPublisher.sendNext("a")
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(1)
s.expectNext("onInput: a")
autoPublisher.sendComplete()
s.expectError().getMessage should be("onUpstreamFinish-exc")
}
"support finish from onInput" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[String]()
val completionProbe = TestProbe()
val p = FlowGraph.closed(out) { implicit b
o
val merge = b.add(new TestMerge(completionProbe.ref))
Source(List("a", "complete")) ~> merge.in(0)
Source(publisher) ~> merge.in(1)
Source.empty[String] ~> merge.in(2)
merge.out ~> o.inlet
}.run()
val s = TestSubscriber.manualProbe[String]
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(10)
s.expectNext("onInput: a")
s.expectComplete()
}
"have the correct value for input in ReadPreffered" in {
import akka.stream.FanInShape._
class MShape[T](_init: Init[T] = Name("mshape")) extends FanInShape[T](_init) {
val priority = newInlet[T]("priority")
val second = newInlet[T]("second")
protected override def construct(i: Init[T]) = new MShape(i)
}
class MyMerge[T] extends FlexiMerge[T, MShape[T]](
new MShape, Attributes.name("cmerge")) {
import akka.stream.scaladsl.FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[T] {
override def initialState =
State[T](ReadPreferred(p.priority, p.second)) {
(ctx, input, element)
if (element == 1) assert(input == p.priority)
if (element == 2) assert(input == p.second)
ctx.emit(element)
SameState
}
}
}
val sink = Sink.fold[Int, Int](0)(_ + _)
val graph = FlowGraph.closed(sink) { implicit b
sink
import FlowGraph.Implicits._
val merge = b.add(new MyMerge[Int]())
Source.single(1) ~> merge.priority
Source.single(2) ~> merge.second
merge.out ~> sink.inlet
}
Await.result(graph.run(), 1.second) should equal(3)
}
"handle preStart and postStop" in assertAllStagesStopped {
val p = TestProbe()
FlowGraph.closed() { implicit b
val m = b.add(new StartStopTest(p.ref))
Source(List("1", "2", "3")) ~> m.in0
Source.empty ~> m.in1
m.out ~> Sink.ignore
}.run()
p.expectMsg("preStart")
p.expectMsg("1")
p.expectMsg("2")
p.expectMsg("3")
p.expectMsg("postStop")
}
"invoke postStop after error" in assertAllStagesStopped {
val p = TestProbe()
FlowGraph.closed() { implicit b
val m = b.add(new StartStopTest(p.ref))
Source(List("1", "fail", "2", "3")) ~> m.in0
Source.empty ~> m.in1
m.out ~> Sink.ignore
}.run()
p.expectMsg("preStart")
p.expectMsg("1")
p.expectMsg("fail")
p.expectMsg("postStop")
}
}
}

View file

@ -1,484 +0,0 @@
package akka.stream.scaladsl
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import FlowGraph.Implicits._
import akka.stream.ActorMaterializer
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._
import akka.actor.ActorSystem
import akka.stream._
import akka.actor.ActorRef
import akka.testkit.TestProbe
object GraphFlexiRouteSpec {
/**
* This is fair in that sense that after enqueueing to an output it yields to other output if
* they are have requested elements. Or in other words, if all outputs have demand available at the same
* time then in finite steps all elements are enqueued to them.
*/
class Fair[T] extends FlexiRoute[T, UniformFanOutShape[T, T]](new UniformFanOutShape(2), Attributes.name("FairBalance")) {
import FlexiRoute._
override def createRouteLogic(p: PortT): RouteLogic[T] = new RouteLogic[T] {
val select = p.out(0) | p.out(1)
val emitToAnyWithDemand = State(DemandFromAny(p)) { (ctx, out, element)
ctx.emit(select(out))(element)
SameState
}
// initally, wait for demand from all
override def initialState = State(DemandFromAll(p)) { (ctx, _, element)
ctx.emit(p.out(0))(element)
emitToAnyWithDemand
}
}
}
/**
* It never skips an output while cycling but waits on it instead (closed outputs are skipped though).
* The fair route above is a non-strict round-robin (skips currently unavailable outputs).
*/
class StrictRoundRobin[T] extends FlexiRoute[T, UniformFanOutShape[T, T]](new UniformFanOutShape(2), Attributes.name("RoundRobinBalance")) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[T] {
val toOutput1: State[Outlet[T]] = State(DemandFrom(p.out(0))) { (ctx, out, element)
ctx.emit(out)(element)
toOutput2
}
val toOutput2 = State(DemandFrom(p.out(1))) { (ctx, out, element)
ctx.emit(out)(element)
toOutput1
}
override def initialState = toOutput1
}
}
class Unzip[A, B] extends FlexiRoute[(A, B), FanOutShape2[(A, B), A, B]](new FanOutShape2("Unzip"), Attributes.name("Unzip")) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[(A, B)] {
override def initialState = State(DemandFromAll(p)) { (ctx, _, element)
val (a, b) = element
ctx.emit(p.out0)(a)
ctx.emit(p.out1)(b)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
class StartStopTestRoute(lifecycleProbe: ActorRef)
extends FlexiRoute[String, FanOutShape2[String, String, String]](new FanOutShape2("StartStopTest"), Attributes.name("StartStopTest")) {
import FlexiRoute._
def createRouteLogic(p: PortT) = new RouteLogic[String] {
val select = p.out0 | p.out1
override def preStart(): Unit = lifecycleProbe ! "preStart"
override def postStop(): Unit = lifecycleProbe ! "postStop"
override def initialState = State(DemandFromAny(p)) {
(ctx, port, element)
lifecycleProbe ! element
if (element == "fail") throw new IllegalStateException("test failure")
ctx.emit(select(port))(element)
SameState
}
}
}
class TestRoute(completionProbe: ActorRef)
extends FlexiRoute[String, FanOutShape2[String, String, String]](new FanOutShape2("TestRoute"), Attributes.name("TestRoute")) {
import FlexiRoute._
var throwFromOnComplete = false
def createRouteLogic(p: PortT): RouteLogic[String] = new RouteLogic[String] {
val select = p.out0 | p.out1
override def initialState = State(DemandFromAny(p)) {
(ctx, preferred, element)
if (element == "err")
ctx.fail(new RuntimeException("err") with NoStackTrace)
else if (element == "err-output1")
ctx.fail(p.out0, new RuntimeException("err-1") with NoStackTrace)
else if (element == "exc")
throw new RuntimeException("exc") with NoStackTrace
else if (element == "onUpstreamFinish-exc")
throwFromOnComplete = true
else if (element == "finish")
ctx.finish()
else
ctx.emit(select(preferred))("onInput: " + element)
SameState
}
override def initialCompletionHandling = CompletionHandling(
onUpstreamFinish = { ctx
if (throwFromOnComplete)
throw new RuntimeException("onUpstreamFinish-exc") with NoStackTrace
completionProbe ! "onUpstreamFinish"
},
onUpstreamFailure = { (ctx, cause)
cause match {
case _: IllegalArgumentException // swallow
case _
completionProbe ! "onError"
}
},
onDownstreamFinish = { (ctx, cancelledOutput)
completionProbe ! "onDownstreamFinish: " + cancelledOutput
SameState
})
}
}
class TestFixture(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer) {
val autoPublisher = TestPublisher.probe[String]()
val s1 = TestSubscriber.manualProbe[String]
val s2 = TestSubscriber.manualProbe[String]
val completionProbe = TestProbe()
FlowGraph.closed() { implicit b
val route = b.add(new TestRoute(completionProbe.ref))
Source(autoPublisher) ~> route.in
route.out0 ~> Sink(s1)
route.out1 ~> Sink(s2)
}.run()
autoPublisher.sendNext("a")
autoPublisher.sendNext("b")
val sub1 = s1.expectSubscription()
val sub2 = s2.expectSubscription()
}
}
class GraphFlexiRouteSpec extends AkkaSpec {
import GraphFlexiRouteSpec._
implicit val materializer = ActorMaterializer()
val in = Source(List("a", "b", "c", "d", "e"))
val out1 = Sink.publisher[String]
val out2 = Sink.publisher[String]
"FlexiRoute" must {
"build simple fair route" in assertAllStagesStopped {
// we can't know exactly which elements that go to each output, because if subscription/request
// from one of the downstream is delayed the elements will be pushed to the other output
FlowGraph.closed(TestSink.probe[String]) { implicit b
out
val merge = b.add(Merge[String](2))
val route = b.add(new Fair[String])
in ~> route.in
route.out(0) ~> merge.in(0)
route.out(1) ~> merge.in(1)
merge.out ~> out
}.run()
.request(10)
.expectNextUnordered("a", "b", "c", "d", "e")
.expectComplete()
}
"build simple round-robin route" in {
val (p1, p2) = FlowGraph.closed(out1, out2)(Keep.both) { implicit b
(o1, o2)
val route = b.add(new StrictRoundRobin[String])
in ~> route.in
route.out(0) ~> o1.inlet
route.out(1) ~> o2.inlet
}.run()
val s1 = TestSubscriber.manualProbe[String]
p1.subscribe(s1)
val sub1 = s1.expectSubscription()
val s2 = TestSubscriber.manualProbe[String]
p2.subscribe(s2)
val sub2 = s2.expectSubscription()
sub1.request(10)
sub2.request(10)
s1.expectNext("a")
s2.expectNext("b")
s1.expectNext("c")
s2.expectNext("d")
s1.expectNext("e")
s1.expectComplete()
s2.expectComplete()
}
"build simple unzip route" in {
val outA = Sink.publisher[Int]
val outB = Sink.publisher[String]
val (p1, p2) = FlowGraph.closed(outA, outB)(Keep.both) { implicit b
(oa, ob)
val route = b.add(new Unzip[Int, String])
Source(List(1 -> "A", 2 -> "B", 3 -> "C", 4 -> "D")) ~> route.in
route.out0 ~> oa.inlet
route.out1 ~> ob.inlet
}.run()
val s1 = TestSubscriber.manualProbe[Int]
p1.subscribe(s1)
val sub1 = s1.expectSubscription()
val s2 = TestSubscriber.manualProbe[String]
p2.subscribe(s2)
val sub2 = s2.expectSubscription()
sub1.request(3)
sub2.request(4)
s1.expectNext(1)
s2.expectNext("A")
s1.expectNext(2)
s2.expectNext("B")
s1.expectNext(3)
s2.expectNext("C")
sub1.cancel()
s2.expectComplete()
}
"support finish of downstreams and cancel of upstream" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
autoPublisher.sendNext("finish")
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(2)
s2.expectNext("onInput: b")
s1.expectComplete()
s2.expectComplete()
}
"support error of outputs" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
autoPublisher.sendNext("err")
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(2)
s2.expectNext("onInput: b")
s1.expectError().getMessage should be("err")
s2.expectError().getMessage should be("err")
autoPublisher.expectCancellation()
}
"support error of a specific output" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(5)
sub2.request(5)
autoPublisher.sendNext("err-output1")
autoPublisher.sendNext("c")
s2.expectNext("onInput: c")
s1.expectError().getMessage should be("err-1")
autoPublisher.sendComplete()
completionProbe.expectMsg("onUpstreamFinish")
s2.expectComplete()
}
"emit error for user thrown exception" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(5)
sub2.request(5)
autoPublisher.sendNext("exc")
s1.expectError().getMessage should be("exc")
s2.expectError().getMessage should be("exc")
autoPublisher.expectCancellation()
}
"emit error for user thrown exception in onUpstreamFinish" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(5)
sub2.request(5)
autoPublisher.sendNext("onUpstreamFinish-exc")
autoPublisher.sendComplete()
s1.expectError().getMessage should be("onUpstreamFinish-exc")
s2.expectError().getMessage should be("onUpstreamFinish-exc")
}
"handle cancel from output" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(2)
sub2.request(2)
sub1.cancel()
completionProbe.expectMsg("onDownstreamFinish: TestRoute.out0")
s1.expectNoMsg(200.millis)
autoPublisher.sendNext("c")
s2.expectNext("onInput: c")
autoPublisher.sendComplete()
s2.expectComplete()
}
"handle finish from upstream input" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(2)
sub2.request(2)
autoPublisher.sendComplete()
completionProbe.expectMsg("onUpstreamFinish")
s1.expectComplete()
s2.expectComplete()
}
"handle error from upstream input" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(2)
sub2.request(2)
autoPublisher.sendError(new RuntimeException("test err") with NoStackTrace)
completionProbe.expectMsg("onError")
s1.expectError().getMessage should be("test err")
s2.expectError().getMessage should be("test err")
}
"cancel upstream input when all outputs cancelled" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(2)
sub2.request(2)
sub1.cancel()
completionProbe.expectMsg("onDownstreamFinish: TestRoute.out0")
sub2.cancel()
autoPublisher.expectCancellation()
}
"cancel upstream input when all outputs completed" in assertAllStagesStopped {
val fixture = new TestFixture
import fixture._
sub1.request(1)
s1.expectNext("onInput: a")
sub2.request(1)
s2.expectNext("onInput: b")
sub1.request(2)
sub2.request(2)
autoPublisher.sendNext("finish")
s1.expectComplete()
s2.expectComplete()
autoPublisher.expectCancellation()
}
"handle preStart and postStop" in assertAllStagesStopped {
val p = TestProbe()
FlowGraph.closed() { implicit b
val r = b.add(new StartStopTestRoute(p.ref))
Source(List("1", "2", "3")) ~> r.in
r.out0 ~> Sink.ignore
r.out1 ~> Sink.ignore
}.run()
p.expectMsg("preStart")
p.expectMsg("1")
p.expectMsg("2")
p.expectMsg("3")
p.expectMsg("postStop")
}
"invoke postStop after error" in assertAllStagesStopped {
val p = TestProbe()
FlowGraph.closed() { implicit b
val r = b.add(new StartStopTestRoute(p.ref))
Source(List("1", "fail", "2", "3")) ~> r.in
r.out0 ~> Sink.ignore
r.out1 ~> Sink.ignore
}.run()
p.expectMsg("preStart")
p.expectMsg("1")
p.expectMsg("fail")
p.expectMsg("postStop")
}
}
}

View file

@ -93,7 +93,7 @@ class SinkSpec extends AkkaSpec {
"combine to many outputs with simplified API" in {
val probes = Seq.fill(3)(TestSubscriber.manualProbe[Int]())
val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast(_))
val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)
@ -111,7 +111,7 @@ class SinkSpec extends AkkaSpec {
"combine to two sinks with simplified API" in {
val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]())
val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast(_))
val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)

View file

@ -5,15 +5,16 @@ package akka.stream
import scala.collection.immutable
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
object FanInShape {
sealed trait Init[+O] {
sealed trait Init[O] {
def outlet: Outlet[O]
def inlets: immutable.Seq[Inlet[_]]
def name: String
}
final case class Name(override val name: String) extends Init[Nothing] {
override def outlet: Outlet[Nothing] = Outlet(s"$name.out")
final case class Name[O](override val name: String) extends Init[O] {
override def outlet: Outlet[O] = Outlet(s"$name.out")
override def inlets: immutable.Seq[Inlet[_]] = Nil
}
final case class Ports[O](override val outlet: Outlet[O], override val inlets: immutable.Seq[Inlet[_]]) extends Init[O] {
@ -21,12 +22,12 @@ object FanInShape {
}
}
abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inlet[_]], _name: String) extends Shape {
abstract class FanInShape[+O] private (_out: Outlet[O @uncheckedVariance], _registered: Iterator[Inlet[_]], _name: String) extends Shape {
import FanInShape._
def this(init: FanInShape.Init[O]) = this(init.outlet, init.inlets.iterator, init.name)
final def out: Outlet[O] = _out
final def out: Outlet[O @uncheckedVariance] = _out
final override def outlets: immutable.Seq[Outlet[_]] = _out :: Nil
final override def inlets: immutable.Seq[Inlet[_]] = _inlets
@ -37,7 +38,7 @@ abstract class FanInShape[O] private (_out: Outlet[O], _registered: Iterator[Inl
p
}
protected def construct(init: Init[O]): FanInShape[O]
protected def construct(init: Init[O @uncheckedVariance]): FanInShape[O]
def deepCopy(): FanInShape[O] = construct(Ports[O](_out.carbonCopy(), inlets.map(_.carbonCopy())))
final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanInShape[O] = {
@ -52,39 +53,39 @@ object UniformFanInShape {
new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList))
}
class UniformFanInShape[-T, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(n: Int) = this(n, FanInShape.Name("UniformFanIn"))
def this(n: Int, name: String) = this(n, FanInShape.Name(name))
class UniformFanInShape[-T, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(n: Int) = this(n, FanInShape.Name[O]("UniformFanIn"))
def this(n: Int, name: String) = this(n, FanInShape.Name[O](name))
def this(outlet: Outlet[O], inlets: Array[Inlet[T]]) = this(inlets.length, FanInShape.Ports(outlet, inlets.toList))
override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new UniformFanInShape(n, init)
override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new UniformFanInShape(n, init)
override def deepCopy(): UniformFanInShape[T, O] = super.deepCopy().asInstanceOf[UniformFanInShape[T, O]]
val inSeq: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(n)(i => newInlet[T](s"in$i"))
def in(n: Int): Inlet[T] = inSeq(n)
val inSeq: immutable.IndexedSeq[Inlet[T @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T](s"in$i"))
def in(n: Int): Inlet[T @uncheckedVariance] = inSeq(n)
}
class FanInShape1N[T0, T1, O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(n: Int) = this(n, FanInShape.Name("FanInShape1N"))
def this(n: Int, name: String) = this(n, FanInShape.Name(name))
def this(outlet: Outlet[O], in0: Inlet[T0], inlets1: Array[Inlet[T1]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList))
override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1N(n, init)
class FanInShape1N[-T0, -T1, +O](val n: Int, _init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(n: Int) = this(n, FanInShape.Name[O]("FanInShape1N"))
def this(n: Int, name: String) = this(n, FanInShape.Name[O](name))
def this(outlet: Outlet[O @uncheckedVariance], in0: Inlet[T0 @uncheckedVariance], inlets1: Array[Inlet[T1 @uncheckedVariance]]) = this(inlets1.length, FanInShape.Ports(outlet, in0 :: inlets1.toList))
override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1N(n, init)
override def deepCopy(): FanInShape1N[T0, T1, O] = super.deepCopy().asInstanceOf[FanInShape1N[T0, T1, O]]
val in0 = newInlet[T0]("in0")
val in1Seq: immutable.IndexedSeq[Inlet[T1]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}"))
def in(n: Int): Inlet[T1] = {
val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")
val in1Seq: immutable.IndexedSeq[Inlet[T1 @uncheckedVariance]] = Vector.tabulate(n)(i => newInlet[T1](s"in${i+1}"))
def in(n: Int): Inlet[T1 @uncheckedVariance] = {
require(n > 0, "n must be > 0")
in1Seq(n - 1)
}
}
[2..#class FanInShape1[[#T0#], O](_init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(name: String) = this(FanInShape.Name(name))
[2..#class FanInShape1[[#-T0#], +O](_init: FanInShape.Init[O]) extends FanInShape[O](_init) {
def this(name: String) = this(FanInShape.Name[O](name))
def this([#in0: Inlet[T0]#], out: Outlet[O]) = this(FanInShape.Ports(out, [#in0# :: ] :: Nil))
override protected def construct(init: FanInShape.Init[O]): FanInShape[O] = new FanInShape1(init)
override protected def construct(init: FanInShape.Init[O @uncheckedVariance]): FanInShape[O] = new FanInShape1(init)
override def deepCopy(): FanInShape1[[#T0#], O] = super.deepCopy().asInstanceOf[FanInShape1[[#T0#], O]]
[#val in0 = newInlet[T0]("in0")#
[#val in0: Inlet[T0 @uncheckedVariance] = newInlet[T0]("in0")#
]
}#

View file

@ -4,6 +4,7 @@
package akka.stream
import scala.collection.immutable
import scala.annotation.unchecked.uncheckedVariance
object FanOutShape {
sealed trait Init[I] {
@ -20,12 +21,12 @@ object FanOutShape {
}
}
abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outlet[_]], _name: String) extends Shape {
abstract class FanOutShape[-I] private (_in: Inlet[I @uncheckedVariance], _registered: Iterator[Outlet[_]], _name: String) extends Shape {
import FanOutShape._
def this(init: FanOutShape.Init[I]) = this(init.inlet, init.outlets.iterator, init.name)
final def in: Inlet[I] = _in
final def in: Inlet[I @uncheckedVariance] = _in
final override def outlets: immutable.Seq[Outlet[_]] = _outlets
final override def inlets: immutable.Seq[Inlet[_]] = in :: Nil
@ -36,7 +37,7 @@ abstract class FanOutShape[I] private (_in: Inlet[I], _registered: Iterator[Outl
p
}
protected def construct(init: Init[I]): FanOutShape[I]
protected def construct(init: Init[I @uncheckedVariance]): FanOutShape[I]
def deepCopy(): FanOutShape[I] = construct(Ports[I](_in.carbonCopy(), outlets.map(_.carbonCopy())))
final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanOutShape[I] = {
@ -51,24 +52,24 @@ object UniformFanOutShape {
new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList))
}
class UniformFanOutShape[I, O](n: Int, _init: FanOutShape.Init[I]) extends FanOutShape[I](_init) {
class UniformFanOutShape[-I, +O](n: Int, _init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) {
def this(n: Int) = this(n, FanOutShape.Name[I]("UniformFanOut"))
def this(n: Int, name: String) = this(n, FanOutShape.Name[I](name))
def this(inlet: Inlet[I], outlets: Array[Outlet[O]]) = this(outlets.size, FanOutShape.Ports(inlet, outlets.toList))
override protected def construct(init: FanOutShape.Init[I]): FanOutShape[I] = new UniformFanOutShape(n, init)
override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new UniformFanOutShape(n, init)
override def deepCopy(): UniformFanOutShape[I, O] = super.deepCopy().asInstanceOf[UniformFanOutShape[I, O]]
val outArray: Array[Outlet[O]] = Array.tabulate(n)(i => newOutlet[O](s"out$i"))
def out(n: Int): Outlet[O] = outArray(n)
val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(n)(i => newOutlet[O](s"out$i"))
def out(n: Int): Outlet[O @uncheckedVariance] = outArray(n)
}
[2..#class FanOutShape1[I, [#O0#]](_init: FanOutShape.Init[I]) extends FanOutShape[I](_init) {
[2..#class FanOutShape1[-I, [#+O0#]](_init: FanOutShape.Init[I @uncheckedVariance]) extends FanOutShape[I](_init) {
def this(name: String) = this(FanOutShape.Name[I](name))
def this(in: Inlet[I], [#out0: Outlet[O0]#]) = this(FanOutShape.Ports(in, [#out0# :: ] :: Nil))
override protected def construct(init: FanOutShape.Init[I]): FanOutShape[I] = new FanOutShape1(init)
override protected def construct(init: FanOutShape.Init[I @uncheckedVariance]): FanOutShape[I] = new FanOutShape1(init)
override def deepCopy(): FanOutShape1[I, [#O0#]] = super.deepCopy().asInstanceOf[FanOutShape1[I, [#O0#]]]
[#val out0 = newOutlet[O0]("out0")#
[#val out0: Outlet[O0 @uncheckedVariance] = newOutlet[O0]("out0")#
]
}#

View file

@ -4,9 +4,9 @@
package akka.stream
import akka.util.Collections.EmptyImmutableSeq
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
/**
* An input port of a StreamLayout.Module. This type logically belongs
@ -48,8 +48,12 @@ object Inlet {
def apply[T](toString: String): Inlet[T] = new Inlet[T](toString)
}
final class Inlet[-T] private (override val toString: String) extends InPort {
final class Inlet[T] private (override val toString: String) extends InPort {
def carbonCopy(): Inlet[T] = Inlet(toString)
/**
* INTERNAL API.
*/
def as[U]: Inlet[U] = this.asInstanceOf[Inlet[U]]
}
/**
@ -61,8 +65,12 @@ object Outlet {
def apply[T](toString: String): Outlet[T] = new Outlet[T](toString)
}
final class Outlet[+T] private (override val toString: String) extends OutPort {
final class Outlet[T] private (override val toString: String) extends OutPort {
def carbonCopy(): Outlet[T] = Outlet(toString)
/**
* INTERNAL API.
*/
def as[U]: Outlet[U] = this.asInstanceOf[Outlet[U]]
}
/**
@ -187,7 +195,7 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se
* A Source [[Shape]] has exactly one output and no inputs, it models a source
* of data.
*/
final case class SourceShape[+T](outlet: Outlet[T]) extends Shape {
final case class SourceShape[+T](outlet: Outlet[T @uncheckedVariance]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
@ -204,7 +212,7 @@ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape {
* outside like a pipe (but it can be a complex topology of streams within of
* course).
*/
final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends Shape {
final case class FlowShape[-I, +O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
@ -219,7 +227,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S
/**
* A Sink [[Shape]] has exactly one input and no outputs, it models a data sink.
*/
final case class SinkShape[-T](inlet: Inlet[T]) extends Shape {
final case class SinkShape[-T](inlet: Inlet[T @uncheckedVariance]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq
@ -244,10 +252,10 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape {
* +------+
* }}}
*/
final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1],
out1: Outlet[Out1],
in2: Inlet[In2],
out2: Outlet[Out2]) extends Shape {
final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVariance],
out1: Outlet[Out1 @uncheckedVariance],
in2: Inlet[In2 @uncheckedVariance],
out2: Outlet[Out2 @uncheckedVariance]) extends Shape {
//#implementation-details-elided
override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2)
override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2)
@ -270,6 +278,6 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1],
//#bidi-shape
object BidiShape {
def apply[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] =
def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] =
BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet)
}

View file

@ -425,10 +425,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo
initialPhase(2, bidirectional)
protected def fail(e: Throwable, closeTransport: Boolean=true): Unit = {
protected def fail(e: Throwable, closeTransport: Boolean = true): Unit = {
if (tracing) log.debug("fail {} due to: {}", self, e.getMessage)
inputBunch.cancel()
if(closeTransport) {
if (closeTransport) {
log.debug("closing output")
outputBunch.error(TransportOut, e)
}

View file

@ -160,7 +160,7 @@ object SslTlsPlacebo {
val session = SSLContext.getDefault.createSSLEngine.getSession
val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) bytes })
val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _)))
BidiShape(top, bottom)
BidiShape.fromFlows(top, bottom)
}
val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, Unit] =
new javadsl.BidiFlow(forScala)

View file

@ -948,7 +948,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
val f = new function.Function2[FlowGraph.Builder[M], SourceShape[T], Inlet[Out]Pair Outlet[Out Pair T]] {
override def apply(b: FlowGraph.Builder[M], s: SourceShape[T]): Inlet[Out] Pair Outlet[Out Pair T] = {
val zip = b.graph(Zip.create[Out, T])
b.from(s).to(zip.in1)
b.from(s).toInlet(zip.in1)
new Pair(zip.in0, zip.out)
}
}

View file

@ -5,6 +5,7 @@ package akka.stream.javadsl
import akka.stream._
import akka.japi.Pair
import scala.annotation.unchecked.uncheckedVariance
/**
* Merge several streams, taking elements as they arrive from input streams
@ -277,49 +278,41 @@ object FlowGraph {
*
* @return The outlet that will emit the materialized value.
*/
def materializedValue: Outlet[Mat] = delegate.materializedValue
def materializedValue: Outlet[Mat @uncheckedVariance] = delegate.materializedValue
def run(mat: Materializer): Unit = delegate.buildRunnable().run()(mat)
def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out)
def from[T, M](src: Graph[SourceShape[T], M]): ForwardOps[T] = new ForwardOps(delegate.add(src).outlet)
def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet)
def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet)
def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out)
def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0))
def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in)
def to[T, M](dst: Graph[SinkShape[T], M]): ReverseOps[T] = new ReverseOps(delegate.add(dst).inlet)
def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet)
def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet)
def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0))
def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in)
final class ForwardOps[T](out: Outlet[T]) {
def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self }
def to[M](dst: Graph[SinkShape[T], M]): Builder[Mat] = { out ~> dst; self }
def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self }
def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self }
def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self }
def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self }
def via[U, M](f: Graph[FlowShape[T, U], M]): ForwardOps[U] = from((out ~> f).outlet)
def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet)
def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { out ~> in; self }
def to(dst: SinkShape[_ >: T]): Builder[Mat] = { out ~> dst; self }
def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { out ~> j; self }
def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { out ~> j; self }
def via[U, M](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((out ~> f).outlet)
def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet)
def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet)
def out(): Outlet[T] = out
}
final class ReverseOps[T](out: Inlet[T]) {
def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self }
def from[M](dst: Graph[SourceShape[T], M]): Builder[Mat] = { out <~ dst; self }
def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self }
def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self }
def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self }
def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self }
def via[U, M](f: Graph[FlowShape[U, T], M]): ReverseOps[U] = to((out <~ f).inlet)
def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet)
def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)
def via[U](j: UniformFanOutShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)
def fromOutlet(dst: Outlet[_ <: T]): Builder[Mat] = { out <~ dst; self }
def from(dst: SourceShape[_ <: T]): Builder[Mat] = { out <~ dst; self }
def fromFanIn[U](j: UniformFanInShape[U, _ <: T]): Builder[Mat] = { out <~ j; self }
def fromFanOut[U](j: UniformFanOutShape[U, _ <: T]): Builder[Mat] = { out <~ j; self }
def via[U](f: FlowShape[U, _ <: T]): ReverseOps[U] = to((out <~ f).inlet)
def viaFanIn[U](j: UniformFanInShape[U, _ <: T]): ReverseOps[U] = to((out <~ j).inlet)
def viaFanOut[U](j: UniformFanOutShape[U, _ <: T]): ReverseOps[U] = to((out <~ j).inlet)
}
}
}

View file

@ -8,6 +8,7 @@ import akka.stream.{ Outlet, Shape, OutPort, Graph, Attributes }
import scala.collection.immutable
import akka.stream.impl.Junctions.FlexiRouteModule
import akka.stream.impl.Stages.DefaultAttributes
import scala.annotation.unchecked.uncheckedVariance
object FlexiRoute {
@ -24,7 +25,7 @@ object FlexiRoute {
* has been completed. `IllegalArgumentException` is thrown if
* that is not obeyed.
*/
final case class DemandFrom[+T](output: Outlet[T]) extends DemandCondition[Outlet[T]]
final case class DemandFrom[+T](output: Outlet[T @uncheckedVariance]) extends DemandCondition[Outlet[T @uncheckedVariance]]
object DemandFromAny {
def apply(outputs: OutPort*): DemandFromAny = new DemandFromAny(outputs.to[immutable.Seq])

View file

@ -102,7 +102,7 @@ class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]]
object MergePreferred {
import FanInShape._
final class MergePreferredShape[T](val secondaryPorts: Int, _init: Init[T]) extends UniformFanInShape[T, T](secondaryPorts, _init) {
def this(secondaryPorts: Int, name: String) = this(secondaryPorts, Name(name))
def this(secondaryPorts: Int, name: String) = this(secondaryPorts, Name[T](name))
override protected def construct(init: Init[T]): FanInShape[T] = new MergePreferredShape(secondaryPorts, init)
override def deepCopy(): MergePreferredShape[T] = super.deepCopy().asInstanceOf[MergePreferredShape[T]]
@ -562,7 +562,7 @@ object FlowGraph extends GraphApply {
class Builder[+M] private[stream] () {
private var moduleInProgress: Module = EmptyModule
def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit = {
def addEdge[A1, A >: A1, B, B1 >: B, M2](from: Outlet[A1], via: Graph[FlowShape[A, B], M2], to: Inlet[B1]): Unit = {
val flowCopy = via.module.carbonCopy
moduleInProgress =
moduleInProgress
@ -571,7 +571,7 @@ object FlowGraph extends GraphApply {
.wire(flowCopy.shape.outlets.head, to)
}
def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit = {
def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit = {
moduleInProgress = moduleInProgress.wire(from, to)
}
@ -630,7 +630,7 @@ object FlowGraph extends GraphApply {
*
* @return The outlet that will emit the materialized value.
*/
def materializedValue: Outlet[M] = {
def materializedValue: Outlet[M @uncheckedVariance] = {
val module = new MaterializedValueSource[Any]
moduleInProgress = moduleInProgress.compose(module)
module.shape.outlet.asInstanceOf[Outlet[M]]
@ -724,7 +724,7 @@ object FlowGraph extends GraphApply {
trait CombinerBase[T] extends Any {
def importAndGetPort(b: Builder[_]): Outlet[T]
def ~>(to: Inlet[T])(implicit b: Builder[_]): Unit = {
def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit = {
b.addEdge(importAndGetPort(b), to)
}
@ -770,7 +770,7 @@ object FlowGraph extends GraphApply {
trait ReverseCombinerBase[T] extends Any {
def importAndGetPortReverse(b: Builder[_]): Inlet[T]
def <~(from: Outlet[T])(implicit b: Builder[_]): Unit = {
def <~[U <: T](from: Outlet[U])(implicit b: Builder[_]): Unit = {
b.addEdge(from, importAndGetPortReverse(b))
}