diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 7bea86b20d..0398dafed3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -556,4 +556,37 @@ public class FlowTest extends StreamTest { Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); } + @Test + public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + + Flow otherFlow = Flow.of(String.class); + + Flow myFlow = Flow.of(String.class).via(otherFlow); + Source.from(input).via(myFlow).runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgAllOf("A","B","C"); + } + + @Test + public void mustBeAbleToMaterializeIdentityToJavaSink() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Flow otherFlow = Flow.of(String.class); + + Sink sink = Flow.of(String.class).to(otherFlow.to(Sink.foreach(new Procedure() { // Scala Future + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }))); + + Source.from(input).to(sink).run(materializer); + probe.expectMsgAllOf("A","B","C"); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index 6b582b9caa..f444c395e7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -182,5 +182,21 @@ class GraphConcatSpec extends TwoStreamsSetup { runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") } + + "work with Flow DSL2" in { + val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000) + Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) + + val sink = testFlow.concat(Source(1 to 5)).toMat(Sink.ignore)(Keep.left).mapMaterializedValue[String] { + case ((m1, m2), m3) ⇒ + m1.isInstanceOf[Unit] should be(true) + m2.isInstanceOf[Unit] should be(true) + m3.isInstanceOf[Unit] should be(true) + "boo" + } + + Source(10 to 15).runWith(sink) should be("boo") + + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 61268a1ed7..aae536e59e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,29 +3,22 @@ */ package akka.stream.scaladsl -import scala.language.higherKinds - import akka.event.LoggingAdapter -import akka.stream.impl.Stages.{ Recover, MaterializingStageFactory, StageModule } -import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream._ import akka.stream.Attributes._ -import akka.stream.stage._ -import akka.stream.impl.{ Stages, StreamLayout } +import akka.stream._ import akka.stream.impl.SplitDecision._ import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } +import akka.stream.impl.{ Stages, StreamLayout } +import akka.stream.stage._ import akka.util.Collections.EmptyImmutableSeq -import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } -import org.reactivestreams.Processor -import scala.annotation.implicitNotFound +import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } + import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.Future +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.language.higherKinds -import akka.stream.stage._ -import akka.stream.impl.{ Stages, StreamLayout, FlowModule } /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -75,8 +68,13 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * flow into the materialized value of the resulting Flow. */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { - if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterializedValue(combine(().asInstanceOf[Mat], _)) - else { + if (this.isIdentity) { + val flowInstance: Flow[In, T, Mat2] = if (flow.isInstanceOf[javadsl.Flow[In, T, Mat2]]) + flow.asInstanceOf[javadsl.Flow[In, T, Mat2]].asScala + else + flow.asInstanceOf[Flow[In, T, Mat2]] + flowInstance.mapMaterializedValue(combine(().asInstanceOf[Mat], _)) + } else { val flowCopy = flow.module.carbonCopy new Flow( module @@ -121,8 +119,13 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * Sink into the materialized value of the resulting Sink. */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[In, Mat3] = { - if (isIdentity) sink.asInstanceOf[Sink[In, Mat3]] - else { + if (isIdentity) { + val sinkInstance: Sink[In, Mat2] = if (sink.isInstanceOf[javadsl.Sink[In, Mat2]]) + sink.asInstanceOf[javadsl.Sink[In, Mat2]].asScala + else + sink.asInstanceOf[Sink[In, Mat2]] + sinkInstance.mapMaterializedValue(combine(().asInstanceOf[Mat], _)) + } else { val sinkCopy = sink.module.carbonCopy new Sink( module @@ -375,8 +378,8 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) * Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only. */ trait FlowOps[+Out, +Mat] { - import akka.stream.impl.Stages._ import FlowOps._ + import akka.stream.impl.Stages._ type Repr[+O, +M] <: FlowOps[O, M] private final val _identity = (x: Any) ⇒ x