+str #16114 better consistency specs and missing methods added - javadsl

This commit is contained in:
Konrad 'ktoso' Malawski 2014-10-20 14:09:24 +02:00
parent cdfd739778
commit 92810e39bc
5 changed files with 202 additions and 22 deletions

View file

@ -4,7 +4,6 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.dispatch.Foreach; import akka.dispatch.Foreach;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.dispatch.OnSuccess; import akka.dispatch.OnSuccess;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.Util; import akka.japi.Util;

View file

@ -0,0 +1,161 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import java.lang.reflect.Method
import org.scalatest.Matchers
import org.scalatest.WordSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DslFactoriesConsistencySpec extends WordSpec with Matchers {
// configuration //
val scalaIgnore =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass")
val javaIgnore =
Set("adapt") // the scaladsl -> javadsl bridge
val `scala -> java aliases` =
("apply" "create") ::
("apply" "of") ::
("apply" "from") ::
Nil
// format: OFF
val `scala -> java types` =
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
(classOf[scala.Function0[_]], classOf[akka.stream.javadsl.japi.Creator[_]]) ::
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
(classOf[scala.Function1[_, Unit]], classOf[akka.stream.javadsl.japi.Procedure[_]]) ::
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Function[_, _]]) ::
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) ::
(classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) ::
(classOf[akka.stream.scaladsl2.Source[_]], classOf[akka.stream.javadsl.Source[_]]) ::
(classOf[akka.stream.scaladsl2.Sink[_]], classOf[akka.stream.javadsl.Sink[_]]) ::
(classOf[akka.stream.scaladsl2.Flow[_, _]], classOf[akka.stream.javadsl.Flow[_, _]]) ::
(classOf[akka.stream.scaladsl2.FlowGraph], classOf[akka.stream.javadsl.FlowGraph]) ::
(classOf[akka.stream.scaladsl2.PartialFlowGraph], classOf[akka.stream.javadsl.PartialFlowGraph]) ::
Nil
// format: ON
"Java DSL" must provide {
"Source" which {
"allows creating the same Sources as Scala DSL" in {
val sClass = akka.stream.scaladsl2.Source.getClass
val jClass = akka.stream.javadsl.Source.getClass
runSpec(sClass, jClass)
}
}
"Flow" which {
"allows creating the same Sources as Scala DSL" in {
val sClass = akka.stream.scaladsl2.Flow.getClass
val jClass = akka.stream.javadsl.Flow.getClass
runSpec(sClass, jClass)
}
}
"Sink" which {
"allows creating the same Sources as Scala DSL" in {
val sClass = akka.stream.scaladsl2.Sink.getClass
val jClass = akka.stream.javadsl.Sink.getClass
runSpec(sClass, jClass)
}
}
}
// here be dragons...
private def getJMethods(jClass: Class[_]) = jClass.getDeclaredMethods.filterNot(javaIgnore contains _.getName)
private def getSMethods(sClass: Class[_]) = sClass.getMethods.filterNot(scalaIgnore contains _.getName)
def runSpec(sClass: Class[_], jClass: Class[_]) {
val jMethods = getJMethods(jClass)
val sMethods = getSMethods(sClass)
var warnings = 0
val results = for {
s sMethods
j jMethods
result = delegationCheck(s, j)
} yield {
result
}
for {
row results.groupBy(_.s)
matches = row._2.filter(_.matches)
} {
if (matches.length == 0) {
warnings += 1
alert("No match for " + row._1)
row._2 foreach { m alert(" > " + m.toString) }
} else if (matches.length == 1) {
info("Matched: Scala:" + row._1.getName + "(" + row._1.getParameterTypes.map(_.getName).mkString(",") + ")" +
" == " +
"Java:" + matches.head.j.getName + "(" + matches.head.j.getParameterTypes.map(_.getName).mkString(",") + ")")
} else {
warnings += 1
alert("Multiple matches for " + row._1 + "!")
matches foreach { m alert(m.toString) }
}
}
if (warnings > 0) {
jMethods foreach { m info(" java: " + m) }
sMethods foreach { m info(" scala: " + m) }
fail("Warnings were issued! Fix name / type mappings or delegation code!")
}
}
sealed trait MatchResult {
def j: Method
def s: Method
def matches: Boolean
}
case class MatchFailure(s: Method, j: Method, reason: String = "") extends MatchResult { val matches = false }
case class Match(s: Method, j: Method, reason: String = "") extends MatchResult { val matches = true }
def delegationCheck(s: Method, j: Method): MatchResult = {
if (nameMatch(s.getName, j.getName)) {
if (s.getParameterTypes.length == j.getParameterTypes.length)
if (typeMatch(s.getParameterTypes, j.getParameterTypes))
Match(s, j)
else
MatchFailure(s, j, "Types of parameters don't match!")
else
MatchFailure(s, j, "Same name, but different number of parameters!")
} else {
MatchFailure(s, j, "Names don't match!")
}
}
def nameMatch(scalaName: String, javaName: String): Boolean =
(scalaName, javaName) match {
case (s, j) if s == j true
case t if `scala -> java aliases` contains t true
case t false
}
def typeMatch(scalaParams: Array[Class[_]], javaParams: Array[Class[_]]): Boolean =
(scalaParams.toList, javaParams.toList) match {
case (s, j) if s == j true
case (s, j) if s.zip(j).forall(typeMatch) true
case _ false
}
def typeMatch(p: (Class[_], Class[_])): Boolean =
if (p._1 == p._2) true
else if (`scala -> java types` contains p) true
else false
private def provide = afterWord("provide")
}

View file

@ -23,7 +23,11 @@ object Flow {
/** Create a `Flow` which can process elements of type `T`. */ /** Create a `Flow` which can process elements of type `T`. */
def create[T](): javadsl.Flow[T, T] = def create[T](): javadsl.Flow[T, T] =
new javadsl.Flow[T, T](scaladsl2.Pipe.empty[T]) Flow.adapt[T, T](scaladsl2.Pipe.empty[T])
/** Create a `Flow` which can process elements of type `T`. */
def of[T](clazz: Class[T]): javadsl.Flow[T, T] =
create[T]()
/** /**
* Creates a `Flow` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and * Creates a `Flow` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and
@ -41,7 +45,7 @@ object Flow {
* Creates a `Flow` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects * Creates a `Flow` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects
* a [[FlowGraphBuilder]] and returns the `UndefinedSource` and `UndefinedSink`. * a [[FlowGraphBuilder]] and returns the `UndefinedSource` and `UndefinedSink`.
*/ */
def apply[I, O](graph: PartialFlowGraph, block: japi.Function[javadsl.FlowGraphBuilder, akka.japi.Pair[UndefinedSource[I], UndefinedSink[O]]]): Flow[I, O] = { def create[I, O](graph: PartialFlowGraph, block: japi.Function[javadsl.FlowGraphBuilder, akka.japi.Pair[UndefinedSource[I], UndefinedSink[O]]]): Flow[I, O] = {
val sFlow = scaladsl2.Flow(graph.asScala) { b val sFlow = scaladsl2.Flow(graph.asScala) { b
val pair = block.apply(b.asJava) val pair = block.apply(b.asJava)
pair.first.asScala pair.second.asScala pair.first.asScala pair.second.asScala
@ -49,13 +53,9 @@ object Flow {
new Flow[I, O](sFlow) new Flow[I, O](sFlow)
} }
/** Create a `Flow` which can process elements of type `T`. */
def of[T](clazz: Class[T]): javadsl.Flow[T, T] =
create[T]()
} }
/** Java API */ /** Create a `Flow` which can process elements of type `T`. */
class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) { class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.stream.scaladsl2.JavaConverters._ import akka.stream.scaladsl2.JavaConverters._
@ -83,7 +83,7 @@ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) {
* Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it.
* *
* The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`, * The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`,
* e.g. the `Subscriber` of a `SubscriberSource` and `Publisher` of a `PublisherSink`. * e.g. the `Subscriber` of a `Source.subscriber()` and `Publisher` of a `Sink.publisher()`.
* *
* @tparam T materialized type of given KeyedSource * @tparam T materialized type of given KeyedSource
* @tparam U materialized type of given KeyedSink * @tparam U materialized type of given KeyedSink
@ -96,8 +96,7 @@ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) {
/** /**
* Connect the `Source` to this `Flow` and then connect it to the `KeyedSink` and run it. * Connect the `Source` to this `Flow` and then connect it to the `KeyedSink` and run it.
* *
* The returned value will contain the materialized value of the `KeyedSink`, * The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a `Sink.publisher()`.
* e.g. `Publisher` of a `Sink.publisher()`.
* *
* @tparam T materialized type of given KeyedSink * @tparam T materialized type of given KeyedSink
*/ */
@ -107,8 +106,7 @@ class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) {
/** /**
* Connect the `KeyedSource` to this `Flow` and then connect it to the `Sink` and run it. * Connect the `KeyedSource` to this `Flow` and then connect it to the `Sink` and run it.
* *
* The returned value will contain the materialized value of the `KeyedSource`, * The returned value will contain the materialized value of the `KeyedSource`, e.g. `Subscriber` of a `Source.from(publisher)`.
* e.g. `Subscriber` of a `Source.from(publisher)`.
* *
* @tparam T materialized type of given KeyedSource * @tparam T materialized type of given KeyedSource
*/ */

View file

@ -3,9 +3,12 @@
*/ */
package akka.stream.javadsl package akka.stream.javadsl
import akka.actor.ActorRef
import akka.actor.Props
import akka.stream.javadsl import akka.stream.javadsl
import akka.stream.scaladsl2 import akka.stream.scaladsl2
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scaladsl2.FlowMaterializer import scaladsl2.FlowMaterializer
import scala.concurrent.Future import scala.concurrent.Future
@ -49,6 +52,14 @@ object Sink {
def create[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSource[T]]): Sink[T] = def create[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSource[T]]): Sink[T] =
new Sink[T](scaladsl2.Sink.apply(graph.asScala) { b block.apply(b.asJava).asScala }) new Sink[T](scaladsl2.Sink.apply(graph.asScala) { b block.apply(b.asJava).asScala })
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*/
def create[T](props: Props): KeyedSink[T, ActorRef] =
new KeyedSink(scaladsl2.Sink.apply(props))
/** /**
* A `Sink` that immediately cancels its upstream after materialization. * A `Sink` that immediately cancels its upstream after materialization.
*/ */
@ -126,9 +137,9 @@ class Sink[-In](delegate: scaladsl2.Sink[In]) {
// RUN WITH // // RUN WITH //
/** /**
* Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSource` and run it. * Connect the `KeyedSource` to this `Sink` and run it.
* The returned tuple contains the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a *
* [[akka.stream.scaladsl2.SubscriberSource]] and and `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. * The returned value is the materialized value of the `KeyedSource`, e.g. the `Subscriber` of a `Source.subscriber()`.
* *
* @tparam T materialized type of given Source * @tparam T materialized type of given Source
*/ */
@ -136,8 +147,7 @@ class Sink[-In](delegate: scaladsl2.Sink[In]) {
asScala.runWith(source.asScala)(materializer).asInstanceOf[T] asScala.runWith(source.asScala)(materializer).asInstanceOf[T]
/** /**
* Connect this `Source` to a `Source` and run it. The returned value is the materialized value * Connect this `Sink` to a `Source` and run it.
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]].
*/ */
def runWith(source: javadsl.Source[In], materializer: FlowMaterializer): Unit = def runWith(source: javadsl.Source[In], materializer: FlowMaterializer): Unit =
asScala.runWith(source.asScala)(materializer) asScala.runWith(source.asScala)(materializer)

View file

@ -5,11 +5,14 @@ package akka.stream.javadsl
import java.util.concurrent.Callable import java.util.concurrent.Callable
import akka.actor.ActorRef
import akka.actor.Props
import akka.japi.Util import akka.japi.Util
import akka.stream._ import akka.stream._
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
import scaladsl2.FlowMaterializer import scaladsl2.FlowMaterializer
import scaladsl2.PropsSource
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -123,6 +126,14 @@ object Source {
def from[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] = def from[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] =
new Source(scaladsl2.Source(graph.asScala)(x block.apply(x.asJava).asScala)) new Source(scaladsl2.Source(graph.asScala)(x block.apply(x.asJava).asScala))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*/
def from[T](props: Props): KeyedSource[T, ActorRef] =
new KeyedSource(scaladsl2.Source.apply(props))
/** /**
* Create a `Source` with one element. * Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element.
@ -174,7 +185,7 @@ class Source[+Out](delegate: scaladsl2.Source[Out]) {
new Source(delegate.connect(flow.asScala)) new Source(delegate.connect(flow.asScala))
/** /**
* Connect this source to a sink, concatenating the processing steps of both. * Connect this `Source` to a `Sink`, concatenating the processing steps of both.
*/ */
def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow = def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow =
new RunnableFlowAdapter(delegate.connect(sink.asScala)) new RunnableFlowAdapter(delegate.connect(sink.asScala))
@ -183,6 +194,7 @@ class Source[+Out](delegate: scaladsl2.Source[Out]) {
/** /**
* Connect this `Source` to a `KeyedSink` and run it. * Connect this `Source` to a `KeyedSink` and run it.
*
* The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. * The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`.
* *
* @tparam S materialized type of the given Sink * @tparam S materialized type of the given Sink
@ -191,8 +203,8 @@ class Source[+Out](delegate: scaladsl2.Source[Out]) {
asScala.runWith(sink.asScala)(materializer).asInstanceOf[S] asScala.runWith(sink.asScala)(materializer).asInstanceOf[S]
/** /**
* Connect this `Source` to a `Sink` and run it. * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`.
*/ */
def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit = def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit =
delegate.connect(sink.asScala).run()(materializer) delegate.connect(sink.asScala).run()(materializer)