diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index cf94e0aff6..4b02e69bf8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -3,8 +3,6 @@ */ package akka.stream -import java.lang.reflect.Method - import org.scalatest.Matchers import org.scalatest.WordSpec @@ -13,7 +11,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { // configuration // val scalaIgnore = - Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") + Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass", "shape") val javaIgnore = Set("adapt") // the scaladsl -> javadsl bridge @@ -23,6 +21,8 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { ("apply" → "of") :: ("apply" → "from") :: ("apply" -> "fromGraph") :: + ("apply" -> "fromIterator") :: + ("apply" -> "fromFunctions") :: Nil // format: OFF @@ -31,15 +31,14 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: + (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) :: (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) :: - (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Creator[_]]) :: - (classOf[scala.Function2[_, _, _]], classOf[akka.japi.function.Function2[_, _, _]]) :: (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: (classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) :: - (classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) :: - Nil + (classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) :: + ((2 to 22) map { i => (Class.forName(s"scala.Function$i"), Class.forName(s"akka.japi.function.Function$i")) }).toList // format: ON val sSource = classOf[scaladsl.Source[_, _]] @@ -51,55 +50,99 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { val sFlow = classOf[scaladsl.Flow[_, _, _]] val jFlow = classOf[javadsl.Flow[_, _, _]] + val sRunnableGraph = classOf[scaladsl.RunnableGraph[_]] + val jRunnableGraph = classOf[javadsl.RunnableGraph[_]] + + val graph = classOf[Graph[_, _]] + + case class TestCase(name: String, sClass: Option[Class[_]], jClass: Option[Class[_]], jFactory: Option[Class[_]]) + object TestCase { + def apply(name: String, sClass: Class[_], jClass: Class[_], jFactory: Class[_]): TestCase = + TestCase(name, Some(sClass), Some(jClass), Some(jFactory)) + + def apply(name: String, sClass: Class[_], jClass: Class[_]): TestCase = + TestCase(name, Some(sClass), Some(jClass), None) + } + + val testCases = Seq( + TestCase("Source", scaladsl.Source.getClass, javadsl.Source.getClass, classOf[javadsl.SourceCreate]), + TestCase("Flow", scaladsl.Flow.getClass, javadsl.Flow.getClass, classOf[javadsl.FlowCreate]), + TestCase("Sink", scaladsl.Sink.getClass, javadsl.Sink.getClass, classOf[javadsl.SinkCreate]), + TestCase("FlowGraph", scaladsl.FlowGraph.getClass, javadsl.FlowGraph.getClass, classOf[javadsl.GraphCreate]), + TestCase("BidiFlow", scaladsl.BidiFlow.getClass, javadsl.BidiFlow.getClass, classOf[javadsl.BidiFlowCreate]), + TestCase("ZipWith", Some(scaladsl.ZipWith.getClass), None, Some(javadsl.ZipWith.getClass)), + TestCase("Merge", scaladsl.Merge.getClass, javadsl.Merge.getClass), + TestCase("MergePreferred", scaladsl.MergePreferred.getClass, javadsl.MergePreferred.getClass), + TestCase("Broadcast", scaladsl.Broadcast.getClass, javadsl.Broadcast.getClass), + TestCase("Balance", scaladsl.Balance.getClass, javadsl.Balance.getClass), + TestCase("Zip", scaladsl.Zip.getClass, javadsl.Zip.getClass), + TestCase("UnZip", scaladsl.Unzip.getClass, javadsl.Unzip.getClass), + TestCase("Concat", scaladsl.Concat.getClass, javadsl.Concat.getClass)) + "Java DSL" must provide { - "Source" which { - "allows creating the same Sources as Scala DSL" in { - pending - val sClass = akka.stream.scaladsl.Source.getClass - val jClass = akka.stream.javadsl.Source.getClass - - runSpec(sClass, jClass) - } - } - "Flow" which { - "allows creating the same Sources as Scala DSL" in { - pending - val sClass = akka.stream.scaladsl.Flow.getClass - val jClass = akka.stream.javadsl.Flow.getClass - - runSpec(sClass, jClass) - } - } - "Sink" which { - "allows creating the same Sources as Scala DSL" in { - pending - val sClass = akka.stream.scaladsl.Sink.getClass - val jClass = akka.stream.javadsl.Sink.getClass - - runSpec(sClass, jClass) - } + testCases foreach { + case TestCase(name, Some(sClass), jClass, jFactoryOption) ⇒ + name which { + s"allows creating the same ${name}s as Scala DSL" in { + runSpec( + getSMethods(sClass), + jClass.toList.flatMap(getJMethods) ++ + jFactoryOption.toList.flatMap(f ⇒ getJMethods(f).map(unspecializeName andThen curryLikeJava))) + } + } } } // here be dragons... - private def getJMethods(jClass: Class[_]): Array[Method] = jClass.getDeclaredMethods.filterNot(javaIgnore contains _.getName).filter(include) - private def getSMethods(sClass: Class[_]): Array[Method] = sClass.getMethods.filterNot(scalaIgnore contains _.getName).filter(include) + private def getJMethods(jClass: Class[_]): List[Method] = jClass.getDeclaredMethods.filterNot(javaIgnore contains _.getName).map(toMethod).filterNot(ignore).toList + private def getSMethods(sClass: Class[_]): List[Method] = sClass.getMethods.filterNot(scalaIgnore contains _.getName).map(toMethod).filterNot(ignore).toList - private def include(m: Method): Boolean = { - if (m.getDeclaringClass == akka.stream.scaladsl.Source.getClass - && m.getName == "apply" - && m.getParameterTypes.length == 1 - && m.getParameterTypes()(0) == classOf[scala.Function1[_, _]]) - false // conflict between two Source.apply(Function1) - else - true + private def toMethod(m: java.lang.reflect.Method): Method = + Method(m.getName, List(m.getParameterTypes: _*), m.getReturnType, m.getDeclaringClass) + + private case class Ignore(cls: Class[_] ⇒ Boolean, name: String ⇒ Boolean, parameters: Int ⇒ Boolean, paramTypes: List[Class[_]] ⇒ Boolean) + + private def ignore(m: Method): Boolean = { + val ignores = Seq( + // private scaladsl method + Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "apply", _ == 1, _ == List(classOf[akka.stream.impl.SourceModule[_, _]])), + // corresponding matches on java side would need to have Function23 + Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "apply", _ == 24, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.FlowGraph.getClass, _ == "closed", _ == 24, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.FlowGraph.getClass, _ == "partial", _ == 24, _ ⇒ true), + // all generated methods like scaladsl.Sink$.akka$stream$scaladsl$Sink$$newOnCompleteStage$1 + Ignore(_ ⇒ true, _.contains("$"), _ ⇒ true, _ ⇒ true)) + + ignores.foldLeft(false) { + case (acc, i) ⇒ + acc || (i.cls(m.declaringClass) && i.name(m.name) && i.parameters(m.parameterTypes.length) && i.paramTypes(m.parameterTypes)) + } } - def runSpec(sClass: Class[_], jClass: Class[_]) { - val jMethods = getJMethods(jClass) - val sMethods = getSMethods(sClass) + /** + * Rename + * createN => create + * closedN => closed + * partialN => partial + */ + private val unspecializeName: PartialFunction[Method, Method] = { + case m ⇒ m.copy(name = m.name.filter(Character.isLetter)) + } + /** + * Adapt java side non curried functions to scala side like + */ + private val curryLikeJava: PartialFunction[Method, Method] = { + case m if m.parameterTypes.size > 1 ⇒ + m.copy(name = m.name.filter(Character.isLetter), parameterTypes = m.parameterTypes.dropRight(1) :+ classOf[akka.japi.function.Function[_, _]]) + case m ⇒ m + } + + def runSpec(sMethods: List[Method], jMethods: List[Method]) { var warnings = 0 val results = for { @@ -117,15 +160,15 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { if (matches.length == 0) { warnings += 1 alert("No match for " + row._1) - row._2 foreach { m ⇒ alert(" > " + m.toString) } + row._2 foreach { m ⇒ alert(s" > ${m.j.toString}: ${m.reason}") } } else if (matches.length == 1) { - info("Matched: Scala:" + row._1.getName + "(" + row._1.getParameterTypes.map(_.getName).mkString(",") + "): " + returnTypeString(row._1) + + info("Matched: Scala:" + row._1.name + "(" + row._1.parameterTypes.map(_.getName).mkString(",") + "): " + returnTypeString(row._1) + " == " + - "Java:" + matches.head.j.getName + "(" + matches.head.j.getParameterTypes.map(_.getName).mkString(",") + "): " + returnTypeString(matches.head.j)) + "Java:" + matches.head.j.name + "(" + matches.head.j.parameterTypes.map(_.getName).mkString(",") + "): " + returnTypeString(matches.head.j)) } else { warnings += 1 alert("Multiple matches for " + row._1 + "!") - matches foreach { m ⇒ alert(m.toString) } + matches foreach { m ⇒ alert(s" > ${m.j.toString}") } } } @@ -137,24 +180,27 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { } def returnTypeString(m: Method): String = - m.getReturnType.getName.drop("akka.stream.".length) + m.returnType.getName.drop("akka.stream.".length) + + case class Method(name: String, parameterTypes: List[Class[_]], returnType: Class[_], declaringClass: Class[_]) sealed trait MatchResult { def j: Method def s: Method + def reason: String def matches: Boolean } case class MatchFailure(s: Method, j: Method, reason: String = "") extends MatchResult { val matches = false } case class Match(s: Method, j: Method, reason: String = "") extends MatchResult { val matches = true } def delegationCheck(s: Method, j: Method): MatchResult = { - if (nameMatch(s.getName, j.getName)) { - if (s.getParameterTypes.length == j.getParameterTypes.length) - if (typeMatch(s.getParameterTypes, j.getParameterTypes)) - if (returnTypeMatch(s.getReturnType, j.getReturnType)) + if (nameMatch(s.name, j.name)) { + if (s.parameterTypes.length == j.parameterTypes.length) + if (typeMatch(s.parameterTypes, j.parameterTypes)) + if (returnTypeMatch(s.returnType, j.returnType)) Match(s, j) else - MatchFailure(s, j, "Return types don't match! " + s.getReturnType + ", " + j.getReturnType) + MatchFailure(s, j, "Return types don't match! " + s.returnType + ", " + j.returnType) else MatchFailure(s, j, "Types of parameters don't match!") else @@ -178,9 +224,11 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { def returnTypeMatch(s: Class[_], j: Class[_]): Boolean = (sSource.isAssignableFrom(s) && jSource.isAssignableFrom(j)) || (sSink.isAssignableFrom(s) && jSink.isAssignableFrom(j)) || - (sFlow.isAssignableFrom(s) && jFlow.isAssignableFrom(j)) + (sFlow.isAssignableFrom(s) && jFlow.isAssignableFrom(j)) || + (sRunnableGraph.isAssignableFrom(s) && jRunnableGraph.isAssignableFrom(j)) || + (graph.isAssignableFrom(s) && graph.isAssignableFrom(j)) - def typeMatch(scalaParams: Array[Class[_]], javaParams: Array[Class[_]]): Boolean = + def typeMatch(scalaParams: List[Class[_]], javaParams: List[Class[_]]): Boolean = (scalaParams.toList, javaParams.toList) match { case (s, j) if s == j ⇒ true case (s, j) if s.zip(j).forall(typeMatch) ⇒ true diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 037d91ac5e..fdf25d28c7 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -18,8 +18,8 @@ trait ZipWithApply { def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] = { val shape = new FanInShape1[[#A1#], O]("ZipWith1") new ZipWith1(shape, new ZipWith1Module(shape, zipper, Attributes.name("ZipWith1"))) - } - # + } + # ] diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 29ed4ccb9e..68627da322 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -31,13 +31,13 @@ object Merge { /** * Create a new `Merge` vertex with the specified output type. */ - def create[T](outputCount: Int): Graph[UniformFanInShape[T, T], Unit] = - scaladsl.Merge(outputCount) + def create[T](inputPorts: Int): Graph[UniformFanInShape[T, T], Unit] = + scaladsl.Merge(inputPorts) /** * Create a new `Merge` vertex with the specified output type. */ - def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanInShape[T, T], Unit] = create(outputCount) + def create[T](clazz: Class[T], inputPorts: Int): Graph[UniformFanInShape[T, T], Unit] = create(inputPorts) } @@ -66,13 +66,13 @@ object MergePreferred { /** * Create a new `MergePreferred` vertex with the specified output type. */ - def create[T](outputCount: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = - scaladsl.MergePreferred(outputCount) + def create[T](secondaryPorts: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = + scaladsl.MergePreferred(secondaryPorts) /** * Create a new `MergePreferred` vertex with the specified output type. */ - def create[T](clazz: Class[T], outputCount: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = create(outputCount) + def create[T](clazz: Class[T], secondaryPorts: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = create(secondaryPorts) } @@ -185,7 +185,7 @@ object Zip { import akka.japi.Pair /** - * Create a new `ZipWith` vertex with the specified input types and zipping-function + * Create a new `Zip` vertex with the specified input types and zipping-function * which creates `akka.japi.Pair`s. */ def create[A, B]: Graph[FanInShape2[A, B, A Pair B], Unit] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index fd0e3153fe..1cbf51ef5c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -213,6 +213,14 @@ object Source { def concat[T, M1, M2](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2]): Source[T, (M1, M2)] = new Source(scaladsl.Source.concat(first, second)) + /** + * Concatenates two sources so that the first element + * emitted by the second source is emitted after the last element of the first + * source. + */ + def concatMat[T, M1, M2, M3](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2], combine: function.Function2[M1, M2, M3]): Source[T, M3] = + new Source(scaladsl.Source.concatMat(first, second)(combinerToScala(combine))) + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type.