Merge pull request #18300 from agolubev/agolubev-#18275_Java_DSL_flow_of_via

+str #18275 Java DSL: Flow.of(MyClass.class).via(otherFlow) throws CCE
This commit is contained in:
drewhk 2015-08-31 17:42:43 +02:00
commit 59a1099848
3 changed files with 70 additions and 18 deletions

View file

@ -556,4 +556,37 @@ public class FlowTest extends StreamTest {
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}
@Test
public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Flow<String,String,?> otherFlow = Flow.of(String.class);
Flow<String,String,?> myFlow = Flow.of(String.class).via(otherFlow);
Source.from(input).via(myFlow).runWith(Sink.foreach(new Procedure<String>() { // Scala Future
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}), materializer);
probe.expectMsgAllOf("A","B","C");
}
@Test
public void mustBeAbleToMaterializeIdentityToJavaSink() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Flow<String,String,?> otherFlow = Flow.of(String.class);
Sink<String,BoxedUnit> sink = Flow.of(String.class).to(otherFlow.to(Sink.foreach(new Procedure<String>() { // Scala Future
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
})));
Source.from(input).to(sink).run(materializer);
probe.expectMsgAllOf("A","B","C");
}
}

View file

@ -182,5 +182,21 @@ class GraphConcatSpec extends TwoStreamsSetup {
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
"work with Flow DSL2" in {
val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val sink = testFlow.concat(Source(1 to 5)).toMat(Sink.ignore)(Keep.left).mapMaterializedValue[String] {
case ((m1, m2), m3)
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
"boo"
}
Source(10 to 15).runWith(sink) should be("boo")
}
}
}

View file

@ -3,29 +3,22 @@
*/
package akka.stream.scaladsl
import scala.language.higherKinds
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ Recover, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream._
import akka.stream.Attributes._
import akka.stream.stage._
import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream._
import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream.stage._
import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import org.reactivestreams.Processor
import scala.annotation.implicitNotFound
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.Future
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.language.higherKinds
import akka.stream.stage._
import akka.stream.impl.{ Stages, StreamLayout, FlowModule }
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
@ -75,8 +68,13 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = {
if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat2]].mapMaterializedValue(combine(().asInstanceOf[Mat], _))
else {
if (this.isIdentity) {
val flowInstance: Flow[In, T, Mat2] = if (flow.isInstanceOf[javadsl.Flow[In, T, Mat2]])
flow.asInstanceOf[javadsl.Flow[In, T, Mat2]].asScala
else
flow.asInstanceOf[Flow[In, T, Mat2]]
flowInstance.mapMaterializedValue(combine(().asInstanceOf[Mat], _))
} else {
val flowCopy = flow.module.carbonCopy
new Flow(
module
@ -121,8 +119,13 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* Sink into the materialized value of the resulting Sink.
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): Sink[In, Mat3] = {
if (isIdentity) sink.asInstanceOf[Sink[In, Mat3]]
else {
if (isIdentity) {
val sinkInstance: Sink[In, Mat2] = if (sink.isInstanceOf[javadsl.Sink[In, Mat2]])
sink.asInstanceOf[javadsl.Sink[In, Mat2]].asScala
else
sink.asInstanceOf[Sink[In, Mat2]]
sinkInstance.mapMaterializedValue(combine(().asInstanceOf[Mat], _))
} else {
val sinkCopy = sink.module.carbonCopy
new Sink(
module
@ -375,8 +378,8 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module)
* Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only.
*/
trait FlowOps[+Out, +Mat] {
import akka.stream.impl.Stages._
import FlowOps._
import akka.stream.impl.Stages._
type Repr[+O, +M] <: FlowOps[O, M]
private final val _identity = (x: Any) x