From 92810e39bccc958a970993b66eb3f084130845d6 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 20 Oct 2014 14:09:24 +0200 Subject: [PATCH] +str #16114 better consistency specs and missing methods added - javadsl --- .../java/akka/stream/javadsl/FlowTest.java | 1 - .../stream/DslFactoriesConsistencySpec.scala | 161 ++++++++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 22 ++- .../main/scala/akka/stream/javadsl/Sink.scala | 22 ++- .../scala/akka/stream/javadsl/Source.scala | 18 +- 5 files changed, 202 insertions(+), 22 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 342aa97bc5..29d63b7956 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -4,7 +4,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.Foreach; import akka.dispatch.Futures; -import akka.dispatch.OnComplete; import akka.dispatch.OnSuccess; import akka.japi.Pair; import akka.japi.Util; diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala new file mode 100644 index 0000000000..ea1dd75af7 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -0,0 +1,161 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import java.lang.reflect.Method + +import org.scalatest.Matchers +import org.scalatest.WordSpec + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class DslFactoriesConsistencySpec extends WordSpec with Matchers { + + // configuration // + + val scalaIgnore = + Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") + + val javaIgnore = + Set("adapt") // the scaladsl -> javadsl bridge + + val `scala -> java aliases` = + ("apply" → "create") :: + ("apply" → "of") :: + ("apply" → "from") :: + Nil + + // format: OFF + val `scala -> java types` = + (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: + (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: + (classOf[scala.Function0[_]], classOf[akka.stream.javadsl.japi.Creator[_]]) :: + (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: + (classOf[scala.Function1[_, Unit]], classOf[akka.stream.javadsl.japi.Procedure[_]]) :: + (classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Function[_, _]]) :: + (classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) :: + (classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) :: + (classOf[akka.stream.scaladsl2.Source[_]], classOf[akka.stream.javadsl.Source[_]]) :: + (classOf[akka.stream.scaladsl2.Sink[_]], classOf[akka.stream.javadsl.Sink[_]]) :: + (classOf[akka.stream.scaladsl2.Flow[_, _]], classOf[akka.stream.javadsl.Flow[_, _]]) :: + (classOf[akka.stream.scaladsl2.FlowGraph], classOf[akka.stream.javadsl.FlowGraph]) :: + (classOf[akka.stream.scaladsl2.PartialFlowGraph], classOf[akka.stream.javadsl.PartialFlowGraph]) :: + Nil + // format: ON + + "Java DSL" must provide { + "Source" which { + "allows creating the same Sources as Scala DSL" in { + val sClass = akka.stream.scaladsl2.Source.getClass + val jClass = akka.stream.javadsl.Source.getClass + + runSpec(sClass, jClass) + } + } + "Flow" which { + "allows creating the same Sources as Scala DSL" in { + val sClass = akka.stream.scaladsl2.Flow.getClass + val jClass = akka.stream.javadsl.Flow.getClass + + runSpec(sClass, jClass) + } + } + "Sink" which { + "allows creating the same Sources as Scala DSL" in { + val sClass = akka.stream.scaladsl2.Sink.getClass + val jClass = akka.stream.javadsl.Sink.getClass + + runSpec(sClass, jClass) + } + } + } + + // here be dragons... + + private def getJMethods(jClass: Class[_]) = jClass.getDeclaredMethods.filterNot(javaIgnore contains _.getName) + private def getSMethods(sClass: Class[_]) = sClass.getMethods.filterNot(scalaIgnore contains _.getName) + + def runSpec(sClass: Class[_], jClass: Class[_]) { + val jMethods = getJMethods(jClass) + val sMethods = getSMethods(sClass) + + var warnings = 0 + + val results = for { + s ← sMethods + j ← jMethods + result = delegationCheck(s, j) + } yield { + result + } + + for { + row ← results.groupBy(_.s) + matches = row._2.filter(_.matches) + } { + if (matches.length == 0) { + warnings += 1 + alert("No match for " + row._1) + row._2 foreach { m ⇒ alert(" > " + m.toString) } + } else if (matches.length == 1) { + info("Matched: Scala:" + row._1.getName + "(" + row._1.getParameterTypes.map(_.getName).mkString(",") + ")" + + " == " + + "Java:" + matches.head.j.getName + "(" + matches.head.j.getParameterTypes.map(_.getName).mkString(",") + ")") + } else { + warnings += 1 + alert("Multiple matches for " + row._1 + "!") + matches foreach { m ⇒ alert(m.toString) } + } + } + + if (warnings > 0) { + jMethods foreach { m ⇒ info(" java: " + m) } + sMethods foreach { m ⇒ info(" scala: " + m) } + fail("Warnings were issued! Fix name / type mappings or delegation code!") + } + } + + sealed trait MatchResult { + def j: Method + def s: Method + def matches: Boolean + } + case class MatchFailure(s: Method, j: Method, reason: String = "") extends MatchResult { val matches = false } + case class Match(s: Method, j: Method, reason: String = "") extends MatchResult { val matches = true } + + def delegationCheck(s: Method, j: Method): MatchResult = { + if (nameMatch(s.getName, j.getName)) { + if (s.getParameterTypes.length == j.getParameterTypes.length) + if (typeMatch(s.getParameterTypes, j.getParameterTypes)) + Match(s, j) + else + MatchFailure(s, j, "Types of parameters don't match!") + else + MatchFailure(s, j, "Same name, but different number of parameters!") + } else { + MatchFailure(s, j, "Names don't match!") + } + } + + def nameMatch(scalaName: String, javaName: String): Boolean = + (scalaName, javaName) match { + case (s, j) if s == j ⇒ true + case t if `scala -> java aliases` contains t ⇒ true + case t ⇒ false + } + + def typeMatch(scalaParams: Array[Class[_]], javaParams: Array[Class[_]]): Boolean = + (scalaParams.toList, javaParams.toList) match { + case (s, j) if s == j ⇒ true + case (s, j) if s.zip(j).forall(typeMatch) ⇒ true + case _ ⇒ false + } + + def typeMatch(p: (Class[_], Class[_])): Boolean = + if (p._1 == p._2) true + else if (`scala -> java types` contains p) true + else false + + private def provide = afterWord("provide") + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index bab8649d9e..1688723928 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -23,7 +23,11 @@ object Flow { /** Create a `Flow` which can process elements of type `T`. */ def create[T](): javadsl.Flow[T, T] = - new javadsl.Flow[T, T](scaladsl2.Pipe.empty[T]) + Flow.adapt[T, T](scaladsl2.Pipe.empty[T]) + + /** Create a `Flow` which can process elements of type `T`. */ + def of[T](clazz: Class[T]): javadsl.Flow[T, T] = + create[T]() /** * Creates a `Flow` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and @@ -41,7 +45,7 @@ object Flow { * Creates a `Flow` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects * a [[FlowGraphBuilder]] and returns the `UndefinedSource` and `UndefinedSink`. */ - def apply[I, O](graph: PartialFlowGraph, block: japi.Function[javadsl.FlowGraphBuilder, akka.japi.Pair[UndefinedSource[I], UndefinedSink[O]]]): Flow[I, O] = { + def create[I, O](graph: PartialFlowGraph, block: japi.Function[javadsl.FlowGraphBuilder, akka.japi.Pair[UndefinedSource[I], UndefinedSink[O]]]): Flow[I, O] = { val sFlow = scaladsl2.Flow(graph.asScala) { b ⇒ val pair = block.apply(b.asJava) pair.first.asScala → pair.second.asScala @@ -49,13 +53,9 @@ object Flow { new Flow[I, O](sFlow) } - /** Create a `Flow` which can process elements of type `T`. */ - def of[T](clazz: Class[T]): javadsl.Flow[T, T] = - create[T]() - } -/** Java API */ +/** Create a `Flow` which can process elements of type `T`. */ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) { import scala.collection.JavaConverters._ import akka.stream.scaladsl2.JavaConverters._ @@ -83,7 +83,7 @@ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) { * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. * * The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`, - * e.g. the `Subscriber` of a `SubscriberSource` and `Publisher` of a `PublisherSink`. + * e.g. the `Subscriber` of a `Source.subscriber()` and `Publisher` of a `Sink.publisher()`. * * @tparam T materialized type of given KeyedSource * @tparam U materialized type of given KeyedSink @@ -96,8 +96,7 @@ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) { /** * Connect the `Source` to this `Flow` and then connect it to the `KeyedSink` and run it. * - * The returned value will contain the materialized value of the `KeyedSink`, - * e.g. `Publisher` of a `Sink.publisher()`. + * The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a `Sink.publisher()`. * * @tparam T materialized type of given KeyedSink */ @@ -107,8 +106,7 @@ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) { /** * Connect the `KeyedSource` to this `Flow` and then connect it to the `Sink` and run it. * - * The returned value will contain the materialized value of the `KeyedSource`, - * e.g. `Subscriber` of a `Source.from(publisher)`. + * The returned value will contain the materialized value of the `KeyedSource`, e.g. `Subscriber` of a `Source.from(publisher)`. * * @tparam T materialized type of given KeyedSource */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 41a3ab6da7..11f7ba0153 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -3,9 +3,12 @@ */ package akka.stream.javadsl +import akka.actor.ActorRef +import akka.actor.Props import akka.stream.javadsl import akka.stream.scaladsl2 -import org.reactivestreams.{ Publisher, Subscriber } +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber import scaladsl2.FlowMaterializer import scala.concurrent.Future @@ -49,6 +52,14 @@ object Sink { def create[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSource[T]]): Sink[T] = new Sink[T](scaladsl2.Sink.apply(graph.asScala) { b ⇒ block.apply(b.asJava).asScala }) + /** + * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorSubscriber]]. + */ + def create[T](props: Props): KeyedSink[T, ActorRef] = + new KeyedSink(scaladsl2.Sink.apply(props)) + /** * A `Sink` that immediately cancels its upstream after materialization. */ @@ -126,9 +137,9 @@ class Sink[-In](delegate: scaladsl2.Sink[In]) { // RUN WITH // /** - * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSource` and run it. - * The returned tuple contains the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a - * [[akka.stream.scaladsl2.SubscriberSource]] and and `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. + * Connect the `KeyedSource` to this `Sink` and run it. + * + * The returned value is the materialized value of the `KeyedSource`, e.g. the `Subscriber` of a `Source.subscriber()`. * * @tparam T materialized type of given Source */ @@ -136,8 +147,7 @@ class Sink[-In](delegate: scaladsl2.Sink[In]) { asScala.runWith(source.asScala)(materializer).asInstanceOf[T] /** - * Connect this `Source` to a `Source` and run it. The returned value is the materialized value - * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. + * Connect this `Sink` to a `Source` and run it. */ def runWith(source: javadsl.Source[In], materializer: FlowMaterializer): Unit = asScala.runWith(source.asScala)(materializer) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index a9034c8b59..944a403b55 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -5,11 +5,14 @@ package akka.stream.javadsl import java.util.concurrent.Callable +import akka.actor.ActorRef +import akka.actor.Props import akka.japi.Util import akka.stream._ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scaladsl2.FlowMaterializer +import scaladsl2.PropsSource import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ @@ -123,6 +126,14 @@ object Source { def from[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] = new Source(scaladsl2.Source(graph.asScala)(x ⇒ block.apply(x.asJava).asScala)) + /** + * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * be [[akka.stream.actor.ActorPublisher]]. + */ + def from[T](props: Props): KeyedSource[T, ActorRef] = + new KeyedSource(scaladsl2.Source.apply(props)) + /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. @@ -174,7 +185,7 @@ class Source[+Out](delegate: scaladsl2.Source[Out]) { new Source(delegate.connect(flow.asScala)) /** - * Connect this source to a sink, concatenating the processing steps of both. + * Connect this `Source` to a `Sink`, concatenating the processing steps of both. */ def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow = new RunnableFlowAdapter(delegate.connect(sink.asScala)) @@ -183,6 +194,7 @@ class Source[+Out](delegate: scaladsl2.Source[Out]) { /** * Connect this `Source` to a `KeyedSink` and run it. + * * The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. * * @tparam S materialized type of the given Sink @@ -191,8 +203,8 @@ class Source[+Out](delegate: scaladsl2.Source[Out]) { asScala.runWith(sink.asScala)(materializer).asInstanceOf[S] /** - * Connect this `Source` to a `Sink` and run it. - * The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. + * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value + * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. */ def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit = delegate.connect(sink.asScala).run()(materializer)