diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 90a3a05991..fbbfa9f9b3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -96,7 +96,7 @@ public class FlowGraphTest extends StreamTest { final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); - final Zip zip = Zip.create(); + final ZipWith> zip = Zip.create(); final KeyedSink, Future> out = Sink .foreach(new Procedure>() { @Override diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 8d1dceb6fb..0b99164127 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -11,11 +11,15 @@ import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; +import org.reactivestreams.Publisher; +import scala.runtime.BoxedUnit; + import org.junit.ClassRule; import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.duration.Duration; import java.util.*; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -205,6 +209,127 @@ public class FlowTest extends StreamTest { } + public Creator> op() { + return new akka.stream.javadsl.japi.Creator>() { + @Override + public PushPullStage create() throws Exception { + return new PushPullStage() { + @Override + public Directive onPush(T element, Context ctx) { + return ctx.push(element); + } + + @Override + public Directive onPull(Context ctx) { + return ctx.pull(); + } + }; + } + }; + } + + @Test + public void mustBeAbleToUseMerge() throws Exception { + final Flow f1 = Flow.of(String.class).section(OperationAttributes.name("f1"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowTest.this.op()); + } + }); + final Flow f2 = Flow.of(String.class).section(OperationAttributes.name("f2"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowTest.this.op()); + } + }); + final Flow f3 = Flow.of(String.class).section(OperationAttributes.name("f3"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowTest.this.op()); + } + }); + + final Source in1 = Source.from(Arrays.asList("a", "b", "c")); + final Source in2 = Source.from(Arrays.asList("d", "e", "f")); + + final KeyedSink> publisher = Sink.publisher(); + + // this is red in intellij, but actually valid, scalac generates bridge methods for Java, so inference *works* + final Merge merge = Merge. create(); + MaterializedMap m = FlowGraph.builder().addEdge(in1, f1, merge).addEdge(in2, f2, merge) + .addEdge(merge, f3, publisher).build().run(materializer); + + // collecting + final Publisher pub = m.get(publisher); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); + + final List result = Await.result(all, Duration.apply(200, TimeUnit.MILLISECONDS)); + assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); + } + + @Test + public void mustBeAbleToUseZip() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList(1, 2, 3); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + final ZipWith> zip = Zip.create(); + final KeyedSink, Future> out = Sink + .foreach(new Procedure>() { + @Override + public void apply(Pair param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }); + + FlowGraph.builder().addEdge(in1, zip.left()).addEdge(in2, zip.right()).addEdge(zip.out(), out).run(materializer); + + List output = Arrays.asList(probe.receiveN(3)); + @SuppressWarnings("unchecked") + List> expected = Arrays.asList(new Pair("A", 1), new Pair( + "B", 2), new Pair("C", 3)); + assertEquals(expected, output); + } + + @Test + public void mustBeAbleToUseUnzip() { + final JavaTestKit probe1 = new JavaTestKit(system); + final JavaTestKit probe2 = new JavaTestKit(system); + + @SuppressWarnings("unchecked") + final List> input = Arrays.asList(new Pair("A", 1), + new Pair("B", 2), new Pair("C", 3)); + + final Iterable expected1 = Arrays.asList("A", "B", "C"); + final Iterable expected2 = Arrays.asList(1, 2, 3); + + final Source> in = Source.from(input); + final Unzip unzip = Unzip.create(); + + final KeyedSink> out1 = Sink.foreach(new Procedure() { + @Override + public void apply(String param) throws Exception { + probe1.getRef().tell(param, ActorRef.noSender()); + } + }); + final KeyedSink> out2 = Sink.foreach(new Procedure() { + @Override + public void apply(Integer param) throws Exception { + probe2.getRef().tell(param, ActorRef.noSender()); + } + }); + + FlowGraph.builder().addEdge(in, unzip.in()).addEdge(unzip.left(), out1).addEdge(unzip.right(), out2) + .run(materializer); + + List output1 = Arrays.asList(probe1.receiveN(3)); + List output2 = Arrays.asList(probe2.receiveN(3)); + assertEquals(expected1, output1); + assertEquals(expected2, output2); + } + @Test public void mustBeAbleToUseConcat() { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala new file mode 100644 index 0000000000..370de31234 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala @@ -0,0 +1,104 @@ +package akka.stream.scaladsl + +import akka.stream.scaladsl.FlowGraphImplicits._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.TwoStreamsSetup +import scala.concurrent.duration._ + +class GraphZipWithSpec extends TwoStreamsSetup { + + override type Outputs = Int + val op = ZipWith((_: Int) + (_: Int)) + override def operationUnderTestLeft() = op.left + override def operationUnderTestRight() = op.right + + "ZipWith" must { + + "work in the happy case" in { + val probe = StreamTestKit.SubscriberProbe[Outputs]() + + FlowGraph { implicit b ⇒ + val zip = ZipWith((_: Int) + (_: Int)) + + Source(1 to 4) ~> zip.left + Source(10 to 40 by 10) ~> zip.right + + zip.out ~> Sink(probe) + }.run() + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(11) + probe.expectNext(22) + + subscription.request(1) + probe.expectNext(33) + subscription.request(1) + probe.expectNext(44) + + probe.expectComplete() + } + + "work in the sad case" in { + val probe = StreamTestKit.SubscriberProbe[Outputs]() + + FlowGraph { implicit b ⇒ + val zip = ZipWith[Int, Int, Int]((_: Int) / (_: Int)) + + Source(1 to 4) ~> zip.left + Source(-2 to 2) ~> zip.right + + zip.out ~> Sink(probe) + }.run() + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(1 / -2) + probe.expectNext(2 / -1) + + subscription.request(2) + probe.expectError() match { + case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") + } + probe.expectNoMsg(200.millis) + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectCompletedOrSubscriptionFollowedByComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + subscriber2.expectCompletedOrSubscriptionFollowedByComplete() + } + + "work with one delayed completed and one nonempty publisher" in { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectCompletedOrSubscriptionFollowedByComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + subscriber2.expectCompletedOrSubscriptionFollowedByComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in { + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index f02d78da1a..6851c0c73e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -16,7 +16,6 @@ import akka.actor._ import akka.stream.{ FlowMaterializer, MaterializerSettings, OverflowStrategy, TimerTransformer } import akka.stream.MaterializationException import akka.stream.actor.ActorSubscriber -import akka.stream.impl.Zip.ZipAs import akka.stream.scaladsl._ import akka.stream.stage._ import akka.pattern.ask @@ -31,7 +30,7 @@ private[akka] object Ast { def attributes: OperationAttributes def withAttributes(attributes: OperationAttributes): AstNode } - + // FIXME Fix the name `Defaults` is waaaay too opaque. How about "Names"? object Defaults { val timerTransform = name("timerTransform") val stageFactory = name("stageFactory") @@ -72,13 +71,11 @@ private[akka] object Ast { import Defaults._ final case class TimerTransform(mkStage: () ⇒ TimerTransformer[Any, Any], attributes: OperationAttributes = timerTransform) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class StageFactory(mkStage: () ⇒ Stage[_, _], attributes: OperationAttributes = stageFactory) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } object Fused { @@ -86,107 +83,88 @@ private[akka] object Ast { Fused(ops, name(ops.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+"))) //FIXME change to something more performant for name } final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: OperationAttributes = fused) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Map(f: Any ⇒ Any, attributes: OperationAttributes = map) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Filter(p: Any ⇒ Boolean, attributes: OperationAttributes = filter) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Collect(pf: PartialFunction[Any, Any], attributes: OperationAttributes = collect) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } // FIXME Replace with OperateAsync final case class MapAsync(f: Any ⇒ Future[Any], attributes: OperationAttributes = mapAsync) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } //FIXME Should be OperateUnorderedAsync final case class MapAsyncUnordered(f: Any ⇒ Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Grouped(n: Int, attributes: OperationAttributes = grouped) extends AstNode { require(n > 0, "n must be greater than 0") - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } //FIXME should be `n: Long` final case class Take(n: Int, attributes: OperationAttributes = take) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } //FIXME should be `n: Long` final case class Drop(n: Int, attributes: OperationAttributes = drop) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Scan(zero: Any, f: (Any, Any) ⇒ Any, attributes: OperationAttributes = scan) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: OperationAttributes = buffer) extends AstNode { require(size > 0, s"Buffer size must be larger than zero but was [$size]") - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any, attributes: OperationAttributes = conflate) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any), attributes: OperationAttributes = expand) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class MapConcat(f: Any ⇒ immutable.Seq[Any], attributes: OperationAttributes = mapConcat) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class GroupBy(f: Any ⇒ Any, attributes: OperationAttributes = groupBy) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class PrefixAndTail(n: Int, attributes: OperationAttributes = prefixAndTail) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class SplitWhen(p: Any ⇒ Boolean, attributes: OperationAttributes = splitWhen) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } final case class ConcatAll(attributes: OperationAttributes = concatAll) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } case class DirectProcessor(p: () ⇒ Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } + case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode { - def withAttributes(attributes: OperationAttributes) = - copy(attributes = attributes) + def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } sealed trait JunctionAstNode { @@ -197,6 +175,8 @@ private[akka] object Ast { sealed trait FanInAstNode extends JunctionAstNode sealed trait FanOutAstNode extends JunctionAstNode + final case class ZipWith(f: (Any, Any) ⇒ Any, attributes: OperationAttributes) extends FanInAstNode + // FIXME Why do we need this? case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode @@ -206,7 +186,6 @@ private[akka] object Ast { final case class Broadcast(attributes: OperationAttributes) extends FanOutAstNode final case class Balance(waitForAllDownstreams: Boolean, attributes: OperationAttributes) extends FanOutAstNode - final case class Zip(as: ZipAs, attributes: OperationAttributes) extends FanInAstNode final case class Unzip(attributes: OperationAttributes) extends FanOutAstNode final case class Concat(attributes: OperationAttributes) extends FanInAstNode @@ -476,7 +455,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val props = fanin match { case Ast.Merge(_) ⇒ FairMerge.props(transformedSettings, inputCount) case Ast.MergePreferred(_) ⇒ UnfairMerge.props(transformedSettings, inputCount) - case Ast.Zip(as, _) ⇒ Zip.props(transformedSettings, as) + case Ast.ZipWith(f, _) ⇒ ZipWith.props(transformedSettings, f) case Ast.Concat(_) ⇒ Concat.props(transformedSettings) case Ast.FlexiMergeNode(merger, _) ⇒ FlexiMergeImpl.props(transformedSettings, inputCount, merger.createMergeLogic()) } @@ -484,7 +463,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val publisher = new ActorPublisher[Out](impl) impl ! ExposedPublisher(publisher.asInstanceOf[ActorPublisher[Any]]) - val subscribers = Vector.tabulate(inputCount)(FanIn.SubInput[In](impl, _)) + val subscribers = Vector.tabulate(inputCount)(FanIn.SubInput[In](impl, _)) // FIXME switch to List.tabulate for inputCount < 8? (subscribers, List(publisher)) case fanout: Ast.FanOutAstNode ⇒ @@ -496,7 +475,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting } val impl = actorOf(props, actorName, fanout) - val publishers = Vector.tabulate(outputCount)(id ⇒ new ActorPublisher[Out](impl) { + val publishers = Vector.tabulate(outputCount)(id ⇒ new ActorPublisher[Out](impl) { // FIXME switch to List.tabulate for inputCount < 8? override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) }) impl ! FanOut.ExposedPublishers(publishers.asInstanceOf[immutable.Seq[ActorPublisher[Any]]]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index afe6d56199..cfc1bb3e2f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -7,7 +7,6 @@ import akka.actor.{ ActorRef, ActorLogging, Actor } import akka.actor.Props import akka.stream.MaterializerSettings import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } -import akka.stream.impl.Zip.ZipAs import org.reactivestreams.{ Subscription, Subscriber } /** @@ -245,7 +244,7 @@ private[akka] object FairMerge { /** * INTERNAL API */ -private[akka] class FairMerge(_settings: MaterializerSettings, _inputPorts: Int) extends FanIn(_settings, _inputPorts) { +private[akka] final class FairMerge(_settings: MaterializerSettings, _inputPorts: Int) extends FanIn(_settings, _inputPorts) { inputBunch.markAllInputs() nextPhase(TransferPhase(inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ @@ -268,7 +267,9 @@ private[akka] object UnfairMerge { /** * INTERNAL API */ -private[akka] class UnfairMerge(_settings: MaterializerSettings, _inputPorts: Int, val preferred: Int) extends FanIn(_settings, _inputPorts) { +private[akka] final class UnfairMerge(_settings: MaterializerSettings, + _inputPorts: Int, + val preferred: Int) extends FanIn(_settings, _inputPorts) { inputBunch.markAllInputs() nextPhase(TransferPhase(inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ @@ -280,42 +281,20 @@ private[akka] class UnfairMerge(_settings: MaterializerSettings, _inputPorts: In /** * INTERNAL API */ -private[akka] object Zip { - def props(settings: MaterializerSettings, zipAs: ZipAs): Props = - Props(new Zip(settings, zipAs)) - - /** - * INTERNAL API - * - * Allows to zip to different tuple implementations (e.g. Scala's `Tuple2` or Java's `Pair`), - * while sharing the same `Zip` implementation. - */ - private[akka] sealed trait ZipAs { - type Zipped[A, B] - def apply[A, B](first: A, second: B): Zipped[A, B] - } - /** INTERNAL API */ - private[akka] object AsJavaPair extends ZipAs { - override type Zipped[A, B] = akka.japi.Pair[A, B] - override def apply[A, B](first: A, second: B) = akka.japi.Pair(first, second) - } - /** INTERNAL API */ - private[akka] object AsScalaTuple2 extends ZipAs { - override type Zipped[A, B] = (A, B) - override def apply[A, B](first: A, second: B) = first → second - } +private[akka] object ZipWith { + def props(settings: MaterializerSettings, f: (Any, Any) ⇒ Any): Props = Props(new ZipWith(settings, f)) } /** * INTERNAL API */ -private[akka] class Zip(_settings: MaterializerSettings, zip: ZipAs) extends FanIn(_settings, inputPorts = 2) { +private[akka] final class ZipWith(_settings: MaterializerSettings, f: (Any, Any) ⇒ Any) extends FanIn(_settings, inputPorts = 2) { inputBunch.markAllInputs() nextPhase(TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ val elem0 = inputBunch.dequeue(0) val elem1 = inputBunch.dequeue(1) - primaryOutputs.enqueueOutputElement(zip(elem0, elem1)) + primaryOutputs.enqueueOutputElement(f(elem0, elem1)) }) } @@ -323,14 +302,13 @@ private[akka] class Zip(_settings: MaterializerSettings, zip: ZipAs) extends Fan * INTERNAL API */ private[akka] object Concat { - def props(settings: MaterializerSettings): Props = - Props(new Concat(settings)) + def props(settings: MaterializerSettings): Props = Props(new Concat(settings)) } /** * INTERNAL API */ -private[akka] class Concat(_settings: MaterializerSettings) extends FanIn(_settings, inputPorts = 2) { +private[akka] final class Concat(_settings: MaterializerSettings) extends FanIn(_settings, inputPorts = 2) { val First = 0 val Second = 1 diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index b787daf453..ad75da7e91 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -218,54 +218,44 @@ class Balance[T](delegate: scaladsl.Balance[T]) extends javadsl.Junction[T] { } object Zip { - + import akka.japi.{ Pair, Function2 } /** - * Create a new anonymous `Zip` vertex with the specified input types. - * Note that a `Zip` instance can only be used at one place (one vertex) - * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* - */ - def create[A, B](): Zip[A, B] = create(name = null) - - /** - * Create a new anonymous `Zip` vertex with the specified input types. - * Note that a `Zip` instance can only be used at one place (one vertex) + * Create a new anonymous `ZipWith` vertex with the specified input types and zipping-function + * which creates `akka.japi.Pair`s. + * Note that a ZipWith` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def create[A, B](left: Class[A], right: Class[B]): Zip[A, B] = create[A, B]() + def create[A, B]: ZipWith[A, B, A Pair B] = + ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]]) + + private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = + new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } +} + +object ZipWith { /** - * Create a named `Zip` vertex with the specified input types. - * Note that a `Zip` instance can only be used at one place (one vertex) + * Creates a new anonymous `ZipWith` vertex with the specified input types and zipping-function `f`. + * Note that a `ZipWith` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ - def create[A, B](name: String): Zip[A, B] = - new Zip(new scaladsl.Zip[A, B](OperationAttributes.name(name).asScala) { - override private[akka] def astNode: Ast.FanInAstNode = Ast.Zip(impl.Zip.AsJavaPair, attributes) - }) + def create[A, B, C](f: akka.japi.Function2[A, B, C]): ZipWith[A, B, C] = + create(name = null, f = f) /** - * Create a named `Zip` vertex with the specified input types. - * Note that a `Zip` instance can only be used at one place (one vertex) + * Creates a new named `ZipWith` vertex with the specified input types and zipping-function `f`. + * Note that a `ZipWith` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ - def create[A, B](name: String, left: Class[A], right: Class[A]): Zip[A, B] = - create[A, B](name) + def create[A, B, C](name: String, f: akka.japi.Function2[A, B, C]): ZipWith[A, B, C] = + new ZipWith(new scaladsl.ZipWith[A, B, C](OperationAttributes.name(name).asScala, f.apply _)) - class Left[A, B](private val zip: Zip[A, B]) extends JunctionInPort[A] { - override def asScala: scaladsl.JunctionInPort[A] = zip.asScala.left - } - class Right[A, B](private val zip: Zip[A, B]) extends JunctionInPort[B] { - override def asScala: scaladsl.JunctionInPort[B] = zip.asScala.right - } - class Out[A, B](private val zip: Zip[A, B]) extends JunctionOutPort[akka.japi.Pair[A, B]] { - // this cast is safe thanks to using `ZipAs` in the Ast element, Zip will emit the expected type (Pair) - override def asScala: scaladsl.JunctionOutPort[akka.japi.Pair[A, B]] = - zip.asScala.out.asInstanceOf[scaladsl.JunctionOutPort[akka.japi.Pair[A, B]]] - } + final class Left[A, B, C](override val asScala: scaladsl.ZipWith.Left[A, B, C]) extends JunctionInPort[A] + final class Right[A, B, C](override val asScala: scaladsl.ZipWith.Right[A, B, C]) extends JunctionInPort[B] + final class Out[A, B, C](override val asScala: scaladsl.ZipWith.Out[A, B, C]) extends JunctionOutPort[C] } /** @@ -273,14 +263,10 @@ object Zip { * by combining corresponding elements in pairs. If one of the two streams is * longer than the other, its remaining elements are ignored. */ -final class Zip[A, B] private (delegate: scaladsl.Zip[A, B]) { - - /** Convert this element to it's `scaladsl` equivalent. */ - def asScala = delegate - - val left = new Zip.Left(this) - val right = new Zip.Right(this) - val out = new Zip.Out(this) +final class ZipWith[A, B, C] private (val asScala: scaladsl.ZipWith[A, B, C]) { + val left = new ZipWith.Left[A, B, C](asScala.left) + val right = new ZipWith.Right[A, B, C](asScala.right) + val out = new ZipWith.Out[A, B, C](asScala.out) } object Unzip { @@ -341,7 +327,7 @@ object Concat { * Create a named `Concat` vertex with the specified input types. * Note that a `Concat` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ def create[T](name: String): Concat[T] = new Concat(scaladsl.Concat[T](name)) @@ -349,7 +335,7 @@ object Concat { * Create a named `Concat` vertex with the specified input types. * Note that a `Concat` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ def create[T](name: String, clazz: Class[T]): Concat[T] = create(name) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala index 03617bdc28..5d135802ac 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala @@ -29,9 +29,12 @@ object OperationAttributes { /** * Specifies the name of the operation. */ - def name(name: String): OperationAttributes = new OperationAttributes { - private[akka] def asScala = scaladsl.OperationAttributes.name(name) - } + def name(name: String): OperationAttributes = + if (name eq null) none + else + new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.name(name) + } /** * Specifies the initial and maximum size of the input buffer. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala index 18c25eeb2a..2024476d39 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/japi/Util.scala @@ -8,7 +8,7 @@ import scala.collection.immutable object Util { import collection.JavaConverters._ - + // FIXME this does not make something an immutable iterable!! def immutableIterable[T](iterable: java.lang.Iterable[T]): immutable.Iterable[T] = new immutable.Iterable[T] { override def iterator: Iterator[T] = iterable.iterator().asScala diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 158832a6f0..8011998518 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -57,8 +57,8 @@ private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOu } private[akka] object Identity { - private val id = new AtomicInteger(1) - def getId: Int = id.getAndIncrement + private val id = new AtomicInteger(1) // FIXME This looks extremely shady, why an Int, and why here? + def getId: Int = id.getAndIncrement // FIXME this should be `createId()` } private[akka] final class Identity[T](override val attributes: OperationAttributes = OperationAttributes.none) extends FlowGraphInternal.InternalVertex with Junction[T] { @@ -91,7 +91,7 @@ object Merge { * Note that a `Merge` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. - */ + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String): Merge[T] = new Merge[T](OperationAttributes.name(name)) def apply[T](attributes: OperationAttributes): Merge[T] = new Merge[T](attributes) @@ -134,7 +134,7 @@ object MergePreferred { * Note that a `MergePreferred` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. - */ + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String): MergePreferred[T] = new MergePreferred[T](OperationAttributes.name(name)) def apply[T](attributes: OperationAttributes): MergePreferred[T] = new MergePreferred[T](attributes) @@ -180,7 +180,7 @@ object Broadcast { * Note that a `Broadcast` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. - */ + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String): Broadcast[T] = new Broadcast[T](OperationAttributes.name(name)) def apply[T](attributes: OperationAttributes): Broadcast[T] = new Broadcast[T](attributes) @@ -219,7 +219,7 @@ object Balance { * * If you use `waitForAllDownstreams = true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element. - */ + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String, waitForAllDownstreams: Boolean = false): Balance[T] = new Balance[T](waitForAllDownstreams, OperationAttributes.name(name)) /** @@ -255,56 +255,72 @@ final class Balance[T](val waitForAllDownstreams: Boolean, override val attribut object Zip { /** - * Create a new anonymous `Zip` vertex with the specified input types. - * Note that a `Zip` instance can only be used at one place (one vertex) + * Create a new anonymous `ZipWith` vertex with the specified input types and zipping-function + * which creates `Tuple2`s. + * Note that a ZipWith` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ - def apply[A, B]: Zip[A, B] = new Zip[A, B](OperationAttributes.none) + def apply[A, B]: ZipWith[A, B, (A, B)] = + apply(OperationAttributes.none) + def apply[A, B](attributes: OperationAttributes): ZipWith[A, B, (A, B)] = + new ZipWith(attributes, _toTuple.asInstanceOf[(A, B) ⇒ (A, B)]) + + private[this] final val _toTuple: (Any, Any) ⇒ (Any, Any) = (a, b) ⇒ (a, b) +} + +object ZipWith { /** - * Create a named `Zip` vertex with the specified input types. - * Note that a `Zip` instance can only be used at one place (one vertex) + * Create a new anonymous `ZipWith` vertex with the specified input types. + * Note that a `ZipWith` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ - def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](OperationAttributes.name(name)) + def apply[A, B, C](f: (A, B) ⇒ C): ZipWith[A, B, C] = new ZipWith[A, B, C](OperationAttributes.none, f) + /** + * Create a named `ZipWith` vertex with the specified input types. + * Note that a `ZipWith` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? + def apply[A, B, C](name: String, f: (A, B) ⇒ C): ZipWith[A, B, C] = + new ZipWith[A, B, C](OperationAttributes.name(name), f) - def apply[A, B](attr: OperationAttributes): Zip[A, B] = new Zip[A, B](attr) - - class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] { + final class Left[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionInPort[A] { + type NextT = C override private[akka] def port = 0 - type NextT = (A, B) override private[akka] def next = vertex.out } - class Right[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[B] { + + final class Right[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionInPort[B] { + type NextT = C override private[akka] def port = 1 - type NextT = (A, B) override private[akka] def next = vertex.out } - class Out[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionOutPort[(A, B)] + + final class Out[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionOutPort[C] } /** * Takes two streams and outputs an output stream formed from the two input streams - * by combining corresponding elements in pairs. If one of the two streams is - * longer than the other, its remaining elements are ignored. + * by combining corresponding elements using the supplied function. + * If one of the two streams is longer than the other, its remaining elements are ignored. */ -private[akka] class Zip[A, B](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { - import akka.stream.impl.Zip.AsScalaTuple2 - - val left = new Zip.Left(this) - val right = new Zip.Right(this) - val out = new Zip.Out(this) +private[akka] final class ZipWith[A, B, C](override val attributes: OperationAttributes, f: (A, B) ⇒ C) extends FlowGraphInternal.InternalVertex { + val left = new ZipWith.Left(this) + val right = new ZipWith.Right(this) + val out = new ZipWith.Out(this) override def minimumInputCount: Int = 2 override def maximumInputCount: Int = 2 override def minimumOutputCount: Int = 1 override def maximumOutputCount: Int = 1 - override private[akka] def astNode: FanInAstNode = Ast.Zip(AsScalaTuple2, zip and attributes) + // FIXME cache + private[akka] override def astNode: FanInAstNode = Ast.ZipWith(f.asInstanceOf[(Any, Any) ⇒ Any], zip and attributes) - final override private[scaladsl] def newInstance() = new Zip[A, B](attributes.withoutName) + private[scaladsl] final override def newInstance() = new ZipWith[A, B, C](attributes.withoutName, f = f) } object Unzip { @@ -312,7 +328,7 @@ object Unzip { * Create a new anonymous `Unzip` vertex with the specified output types. * Note that a `Unzip` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ def apply[A, B]: Unzip[A, B] = new Unzip[A, B](OperationAttributes.none) @@ -320,22 +336,23 @@ object Unzip { * Create a named `Unzip` vertex with the specified output types. * Note that a `Unzip` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* - */ + * is called and those instances are not `equal`. + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[A, B](name: String): Unzip[A, B] = new Unzip[A, B](OperationAttributes.name(name)) def apply[A, B](attributes: OperationAttributes): Unzip[A, B] = new Unzip[A, B](attributes) - class In[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionInPort[(A, B)] { + final class In[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionInPort[(A, B)] { override type NextT = Nothing - override private[akka] def next = NoNext + private[akka] override def next = NoNext } - class Left[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionOutPort[A] { - override private[akka] def port = 0 + final class Left[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionOutPort[A] { + private[akka] override def port = 0 } - class Right[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionOutPort[B] { - override private[akka] def port = 1 + + final class Right[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionOutPort[B] { + private[akka] override def port = 1 } } @@ -362,7 +379,7 @@ object Concat { * Create a new anonymous `Concat` vertex with the specified input types. * Note that a `Concat` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* + * is called and those instances are not `equal`. */ def apply[T]: Concat[T] = new Concat[T](OperationAttributes.none) @@ -370,18 +387,19 @@ object Concat { * Create a named `Concat` vertex with the specified input types. * Note that a `Concat` instance can only be used at one place (one vertex) * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`.* - */ + * is called and those instances are not `equal`. + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String): Concat[T] = new Concat[T](OperationAttributes.name(name)) def apply[T](attributes: OperationAttributes): Concat[T] = new Concat[T](attributes) - class First[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] { + final class First[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] { override val port = 0 type NextT = T override def next = vertex.out } - class Second[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] { + + final class Second[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] { override val port = 1 type NextT = T override def next = vertex.out @@ -422,7 +440,7 @@ object UndefinedSink { * Note that a `UndefinedSink` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. - */ + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](OperationAttributes.name(name)) } /** @@ -455,7 +473,7 @@ object UndefinedSource { * Note that a `UndefinedSource` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. - */ + */ // FIXME why do we have these when we can simply tell the user to use apply(OperationAttributes.name(name))? def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](OperationAttributes.name(name)) } /** @@ -489,7 +507,7 @@ private[akka] object FlowGraphInternal { private[scaladsl] def newInstance(): Vertex } - case class SourceVertex(source: Source[_]) extends Vertex { + final case class SourceVertex(source: Source[_]) extends Vertex { override def toString = source.toString /** @@ -511,7 +529,7 @@ private[akka] object FlowGraphInternal { final override private[scaladsl] def newInstance() = this.copy() } - case class SinkVertex(sink: Sink[_]) extends Vertex { + final case class SinkVertex(sink: Sink[_]) extends Vertex { override def toString = sink.toString /** @@ -563,13 +581,12 @@ private[akka] object FlowGraphInternal { } // flow not part of equals/hashCode - case class EdgeLabel(qualifier: Int)( + final case class EdgeLabel(qualifier: Int)( val pipe: Pipe[Any, Nothing], val inputPort: Int, val outputPort: Int) { override def toString: String = pipe.toString - } /** @@ -582,7 +599,7 @@ private[akka] object FlowGraphInternal { * be copied into another graph then the SourceVertex/SinkVertex would still point to the same instance * of the IdentityProcessor. */ - class IdentityProcessor extends Processor[Any, Any] { + final class IdentityProcessor extends Processor[Any, Any] { import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala index 6c59a14e70..c93369f68e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -18,13 +18,15 @@ case class OperationAttributes private (private val attributes: List[OperationAt * Adds given attributes to the end of these attributes. */ def and(other: OperationAttributes): OperationAttributes = { + // FIXME should return `this` if other.attributes is empty + // FIXME should return `other` if this is `none` OperationAttributes(attributes ::: other.attributes) } private[akka] def nameLifted: Option[String] = attributes.collect { case Name(name) ⇒ name - }.reduceOption(_ + "-" + _) + }.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead private[akka] def name: String = nameLifted match { case Some(name) ⇒ name @@ -36,7 +38,7 @@ case class OperationAttributes private (private val attributes: List[OperationAt case InputBuffer(initial, max) ⇒ (s: MaterializerSettings) ⇒ s.withInputBuffer(initial, max) case FanOutBuffer(initial, max) ⇒ (s: MaterializerSettings) ⇒ s.withFanOutBuffer(initial, max) case Dispatcher(dispatcher) ⇒ (s: MaterializerSettings) ⇒ s.withDispatcher(dispatcher) - }.reduceOption(_ andThen _).getOrElse(identity) + }.reduceOption(_ andThen _).getOrElse(identity) // FIXME is this the optimal way of encoding this? private[akka] def transform(node: AstNode): AstNode = if ((this eq OperationAttributes.none) || (this eq node.attributes)) node @@ -49,8 +51,8 @@ case class OperationAttributes private (private val attributes: List[OperationAt * * https://github.com/akka/akka/issues/16392 */ - private[akka] def withoutName = this.copy( - attributes = attributes.filterNot { + private[akka] def withoutName = this.copy( // FIXME should return OperationAttributes.none if empty + attributes = attributes.filterNot { // FIXME should return the same instance if didn't have any Name case attr: Name ⇒ true }) }