diff --git a/akka-http-core/src/main/scala/akka/http/model/japi/Http.scala b/akka-http-core/src/main/scala/akka/http/model/japi/Http.scala index f74c111f2e..85fb3759d3 100644 --- a/akka-http-core/src/main/scala/akka/http/model/japi/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/model/japi/Http.scala @@ -8,7 +8,7 @@ import java.lang.{ Iterable ⇒ JIterable } import akka.japi.Util._ import akka.japi.{ Option ⇒ JOption } -import akka.stream.javadsl.japi.Function +import akka.japi.function.Function import scala.concurrent.Future @@ -164,4 +164,4 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[model.HttpResponse]], interface, port, backlog, immutableSeq(options), settings, log)(fm) .map(new ServerBinding(_))(ec) -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/model/japi/IncomingConnection.scala b/akka-http-core/src/main/scala/akka/http/model/japi/IncomingConnection.scala index ef25e30c98..0e834c9cf7 100644 --- a/akka-http-core/src/main/scala/akka/http/model/japi/IncomingConnection.scala +++ b/akka-http-core/src/main/scala/akka/http/model/japi/IncomingConnection.scala @@ -6,7 +6,7 @@ package akka.http.model.japi import java.net.InetSocketAddress -import akka.stream.javadsl.japi.Function +import akka.japi.function.Function import scala.concurrent.Future @@ -56,4 +56,4 @@ class IncomingConnection private[http] (delegate: http.Http.IncomingConnection) */ def handleWithAsyncHandler(handler: Function[HttpRequest, Future[HttpResponse]], materializer: FlowMaterializer): Unit = delegate.handleWithAsyncHandler(handler.apply(_).asInstanceOf[Future[model.HttpResponse]])(materializer) -} \ No newline at end of file +} diff --git a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java index 0e24caeb93..abfb53bc13 100644 --- a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java @@ -5,13 +5,13 @@ package akka.http.model.japi; import akka.actor.ActorSystem; +import akka.japi.function.Function; +import akka.japi.function.Procedure; import akka.http.engine.server.ServerSettings; import akka.stream.ActorFlowMaterializer; import akka.stream.FlowMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import akka.stream.javadsl.japi.Function; -import akka.stream.javadsl.japi.Procedure; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index df4e100f54..b6a6fdf0a1 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -43,7 +43,7 @@ public class ActorPublisherTest extends StreamTest { .actorOf(Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher")); final Publisher publisher = UntypedActorPublisher.create(ref); Source.from(publisher) - .runForeach(new akka.stream.javadsl.japi.Procedure() { + .runForeach(new akka.japi.function.Procedure() { @Override public void apply(Integer elem) throws Exception { probe.getRef().tell(elem, ActorRef.noSender()); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 57630b0a09..dbd337b858 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -21,6 +21,7 @@ import akka.stream.*; import akka.stream.testkit.AkkaSpec; import akka.stream.javadsl.FlowGraph.Builder; import akka.stream.javadsl.japi.*; +import akka.japi.function.*; import akka.util.ByteString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 94de608d4b..9ce455f5f9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -7,12 +7,13 @@ import akka.actor.ActorRef; import akka.japi.*; import akka.stream.*; import akka.stream.javadsl.FlowGraph.Builder; -import akka.stream.javadsl.japi.Creator; -import akka.stream.javadsl.japi.Function; -import akka.stream.javadsl.japi.Function2; -import akka.stream.javadsl.japi.Procedure; -import akka.stream.stage.*; import akka.stream.javadsl.japi.*; +import akka.japi.function.Creator; +import akka.japi.function.Function; +import akka.japi.function.Function2; +import akka.japi.function.Procedure; +import akka.stream.stage.*; +import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; @@ -39,7 +40,7 @@ public class FlowGraphTest extends StreamTest { @SuppressWarnings("serial") public Creator> op() { - return new akka.stream.javadsl.japi.Creator>() { + return new akka.japi.function.Creator>() { @Override public PushPullStage create() throws Exception { return new PushPullStage() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 06e67bccd8..d5acdf98fd 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -13,6 +13,7 @@ import akka.stream.StreamTest; import akka.stream.stage.*; import akka.stream.javadsl.FlowGraph.Builder; import akka.stream.javadsl.japi.*; +import akka.japi.function.*; import akka.stream.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -218,7 +219,7 @@ public class FlowTest extends StreamTest { } public Creator> op() { - return new akka.stream.javadsl.japi.Creator>() { + return new akka.japi.function.Creator>() { @Override public PushPullStage create() throws Exception { return new PushPullStage() { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 17743cc706..104db76021 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -15,7 +15,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import akka.stream.StreamTest; -import akka.stream.javadsl.japi.Function2; +import akka.japi.function.Function2; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 2926bb3120..c98ccb51cf 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -12,7 +12,7 @@ import akka.japi.Pair; import akka.stream.OverflowStrategy; import akka.stream.StreamTest; import akka.stream.stage.*; -import akka.stream.javadsl.japi.*; +import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -253,7 +253,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseCallableInput() { final JavaTestKit probe = new JavaTestKit(system); final Iterable input1 = Arrays.asList(4, 3, 2, 1, 0); - final akka.stream.javadsl.japi.Creator> input = new akka.stream.javadsl.japi.Creator>() { + final Creator> input = new Creator>() { @Override public Iterator create() { return input1.iterator(); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java index e5f745c1dc..19b9851ad3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java @@ -18,7 +18,7 @@ import scala.runtime.BoxedUnit; import akka.stream.*; import akka.stream.javadsl.StreamTcp.*; -import akka.stream.javadsl.japi.*; +import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.TestUtils; import akka.util.ByteString; diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index a0d33f7f9b..d4b94865c4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -29,12 +29,12 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { val `scala -> java types` = (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: - (classOf[scala.Function0[_]], classOf[akka.stream.javadsl.japi.Creator[_]]) :: + (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: - (classOf[scala.Function1[_, Unit]], classOf[akka.stream.javadsl.japi.Procedure[_]]) :: - (classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Function[_, _]]) :: - (classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) :: - (classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) :: + (classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) :: + (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) :: + (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Creator[_]]) :: + (classOf[scala.Function2[_, _, _]], classOf[akka.japi.function.Function2[_, _, _]]) :: (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: (classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) :: diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template index b5cfc26e04..999fe11dd8 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template @@ -15,7 +15,7 @@ trait BidiFlowCreate { /** * Creates a BidiFlow by applying a [[FlowGraph.Builder]] to the given create function. */ - def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder[Unit], BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] = + def create[I1, O1, I2, O2](block: akka.japi.function.Function[FlowGraph.Builder[Unit], BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] = new BidiFlow(scaladsl.BidiFlow() { b ⇒ block.apply(b.asJava) }) /** @@ -23,14 +23,14 @@ trait BidiFlowCreate { * The given graph will be imported (using `builder.graph()`) and the resulting [[Shape]] will be passed to the create function along with the builder. */ def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M], - block: japi.Function2[FlowGraph.Builder[M], S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = + block: akka.japi.function.Function2[FlowGraph.Builder[M], S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = new BidiFlow(scaladsl.BidiFlow(g1) { b ⇒ s => block.apply(b.asJava, s) }) /** * Creates a BidiFlow by applying a [[FlowGraph.Builder]] to the given create function. * The given graphs will be imported (using `builder.graph()`) and the resulting [[Shape]]s will be passed to the create function along with the builder. */ - def create[I1, O1, I2, O2, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M], + def create[I1, O1, I2, O2, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M], block: japi.Function3[FlowGraph.Builder[M], S1, S2, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = new BidiFlow(scaladsl.BidiFlow(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }) diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template index b0a5a8ea5b..ca160188cf 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template @@ -17,7 +17,7 @@ trait FlowCreate { * * The create function is expected to return a pair of [[Inlet]] and [[Outlet]] which correspond to the created Flows input and output ports. */ - def create[I, O](block: japi.Function[FlowGraph.Builder[Unit], Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] = + def create[I, O](block: akka.japi.function.Function[FlowGraph.Builder[Unit], Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] = new Flow(scaladsl.Flow() { b ⇒ block.apply(b.asJava) }) /** @@ -26,7 +26,7 @@ trait FlowCreate { * * The create function is expected to return a pair of [[Inlet]] and [[Outlet]] which correspond to the created Flows input and output ports. */ - def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = + def create[I, O, S <: Shape, M](g1: Graph[S, M], block: akka.japi.function.Function2[FlowGraph.Builder[M], S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = new Flow(scaladsl.Flow(g1) { b ⇒ s => block.apply(b.asJava, s) }) /** @@ -35,7 +35,7 @@ trait FlowCreate { * * The create function is expected to return a pair of [[Inlet]] and [[Outlet]] which correspond to the created Flows input and output ports. */ - def create[I, O, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M], + def create[I, O, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M], block: japi.Function3[FlowGraph.Builder[M], S1, S2, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = new Flow(scaladsl.Flow(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }) diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template index f48738c808..d4cba10242 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template @@ -17,7 +17,7 @@ trait GraphCreate { * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. */ @throws(classOf[IllegalArgumentException]) - def closed(block: japi.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] = + def closed(block: akka.japi.function.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] = scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) } /** @@ -25,7 +25,7 @@ trait GraphCreate { * * Partial graphs are allowed to have unconnected ports. */ - def partial[S <: Shape](block: japi.Function[FlowGraph.Builder[Unit], S]): Graph[S, Unit] = + def partial[S <: Shape](block: akka.japi.function.Function[FlowGraph.Builder[Unit], S]): Graph[S, Unit] = scaladsl.FlowGraph.partial() { b ⇒ block.apply(b.asJava) } /** @@ -46,7 +46,7 @@ trait GraphCreate { * Partial graphs are allowed to have unconnected ports. */ def partial[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M], - block: japi.Function2[FlowGraph.Builder[M], S1, S]): Graph[S, M] = + block: akka.japi.function.Function2[FlowGraph.Builder[M], S1, S]): Graph[S, M] = scaladsl.FlowGraph.partial(g1) { b ⇒ s => block.apply(b.asJava, s) } /** @@ -56,7 +56,7 @@ trait GraphCreate { * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. */ @throws(classOf[IllegalArgumentException]) - def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M], + def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M], block: japi.Procedure3[FlowGraph.Builder[M], S1, S2]): RunnableFlow[M] = scaladsl.FlowGraph.closed(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) } @@ -66,7 +66,7 @@ trait GraphCreate { * * Partial graphs are allowed to have unconnected ports. */ - def partial[S1 <: Shape, S2 <: Shape, S <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M], + def partial[S1 <: Shape, S2 <: Shape, S <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M], block: japi.Function3[FlowGraph.Builder[M], S1, S2, S]): Graph[S, M] = scaladsl.FlowGraph.partial(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) } diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template index 116ddf6cd6..96865b74d9 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template @@ -12,7 +12,7 @@ trait SinkCreate { * Creates a `Sink` by using a `FlowGraph.Builder[Unit]` on a block that expects * a [[FlowGraph.Builder]] and returns an [[Inlet]]. */ - def create[T](block: japi.Function[FlowGraph.Builder[Unit], Inlet[T]]): Sink[T, Unit] = + def create[T](block: akka.japi.function.Function[FlowGraph.Builder[Unit], Inlet[T]]): Sink[T, Unit] = new Sink(scaladsl.Sink() { b ⇒ block.apply(b.asJava) }) /** @@ -21,7 +21,7 @@ trait SinkCreate { * The create function is expected to return the created Sink's [[Inlet]]. */ def create[T, S <: Shape, M](g1: Graph[S, M], - block: japi.Function2[FlowGraph.Builder[M], S, Inlet[T]]): Sink[T, M] = + block: akka.japi.function.Function2[FlowGraph.Builder[M], S, Inlet[T]]): Sink[T, M] = new Sink(scaladsl.Sink(g1) { b ⇒ s => block.apply(b.asJava, s) }) /** @@ -29,7 +29,7 @@ trait SinkCreate { * with the `FlowGraph.Builder[M]` and the [[Shape]]s resulting from importing the graphs. * The create function is expected to return the created Sink's [[Inlet]]. */ - def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M], + def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M], block: japi.Function3[FlowGraph.Builder[M], S1, S2, Inlet[T]]): Sink[T, M] = new Sink(scaladsl.Sink(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }) diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template index 05e0493dbe..01f233f6e3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template @@ -12,7 +12,7 @@ trait SourceCreate { * Creates a `Source` by using a `FlowGraph.Builder[Unit]` on a block that expects * a [[FlowGraph.Builder]] and returns an [[Outlet]]. */ - def create[T](block: japi.Function[FlowGraph.Builder[Unit], Outlet[T]]): Source[T, Unit] = + def create[T](block: akka.japi.function.Function[FlowGraph.Builder[Unit], Outlet[T]]): Source[T, Unit] = new Source(scaladsl.Source() { b ⇒ block.apply(b.asJava) }) /** @@ -22,7 +22,7 @@ trait SourceCreate { * will be passed into the create block. */ def create[T, S <: Shape, M](g1: Graph[S, M], - block: japi.Function2[FlowGraph.Builder[M], S, Outlet[T]]): Source[T, M] = + block: akka.japi.function.Function2[FlowGraph.Builder[M], S, Outlet[T]]): Source[T, M] = new Source(scaladsl.Source(g1) { b ⇒ s => block.apply(b.asJava, s) }) /** @@ -31,7 +31,7 @@ trait SourceCreate { * The graphs will be imported (using `Builder.graph()`) and the resulting shapes * will be passed into the create block. */ - def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M], + def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M], block: japi.Function3[FlowGraph.Builder[M], S1, S2, Outlet[T]]): Source[T, M] = new Source(scaladsl.Source(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }) diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template index b62e7d2b50..8bd0dfa28e 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template @@ -14,7 +14,7 @@ object ZipWith { * @param f zipping-function from the input values to the output value * @param attributes optional attributes for this vertex */ - def create[A, B, Out](f: japi.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], Unit] = + def create[A, B, Out](f: akka.japi.function.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], Unit] = scaladsl.ZipWith(f.apply _) [3..20#/** Create a new `ZipWith` specialized for 1 inputs. diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 07c26e1544..87dcf61b6c 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -8,10 +8,10 @@ import java.util.concurrent.TimeUnit import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.stream.impl._ -import akka.stream.javadsl.japi import com.typesafe.config.Config import scala.concurrent.duration._ +import akka.japi.{ function ⇒ japi } object ActorFlowMaterializer { diff --git a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index d24d4b24cf..fd7e89e5a0 100644 --- a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -4,7 +4,7 @@ package akka.stream import scala.collection.immutable -import akka.stream.javadsl.japi +import akka.japi.{ function ⇒ japi } import akka.stream.impl.Stages.StageModule /** diff --git a/akka-stream/src/main/scala/akka/stream/Supervision.scala b/akka-stream/src/main/scala/akka/stream/Supervision.scala index e2e45bf372..c02a71f946 100644 --- a/akka-stream/src/main/scala/akka/stream/Supervision.scala +++ b/akka-stream/src/main/scala/akka/stream/Supervision.scala @@ -4,7 +4,7 @@ package akka.stream import scala.util.control.NonFatal -import akka.stream.javadsl.japi +import akka.japi.{ function ⇒ japi } object Supervision { sealed trait Directive diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala index 415d2398b3..3c81fa40b0 100644 --- a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala @@ -5,6 +5,7 @@ package akka.stream.io import java.io.InputStream +import akka.japi.function.Creator import akka.stream.io.impl.InputStreamSource import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source._ @@ -35,7 +36,7 @@ object InputStreamSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(createInputStream: javadsl.japi.Creator[InputStream]): javadsl.Source[ByteString, Future[Long]] = + def create(createInputStream: Creator[InputStream]): javadsl.Source[ByteString, Future[Long]] = create(createInputStream, DefaultChunkSize) /** @@ -46,7 +47,7 @@ object InputStreamSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(createInputStream: javadsl.japi.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[Long]] = + def create(createInputStream: Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[Long]] = apply(() ⇒ createInputStream.create(), chunkSize).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala index 896d69e9f9..436bf5dbde 100644 --- a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala @@ -5,6 +5,7 @@ package akka.stream.io import java.io.OutputStream +import akka.japi.function.Creator import akka.stream.io.impl.OutputStreamSink import akka.stream.scaladsl.Sink import akka.stream.{ ActorOperationAttributes, OperationAttributes, javadsl } @@ -37,7 +38,7 @@ object OutputStreamSink { * * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. */ - def create(f: javadsl.japi.Creator[OutputStream]): javadsl.Sink[ByteString, Future[Long]] = + def create(f: Creator[OutputStream]): javadsl.Sink[ByteString, Future[Long]] = apply(() ⇒ f.create()).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index 84ea594fed..162aa2ba0b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -3,6 +3,7 @@ */ package akka.stream.javadsl +import akka.japi.function import akka.stream.scaladsl import akka.stream.Graph import akka.stream.BidiShape @@ -22,7 +23,7 @@ object BidiFlow { * Create a BidiFlow where the top and bottom flows are just one simple mapping * stage each, expressed by the two functions. */ - def fromFunctions[I1, O1, I2, O2](top: japi.Function[I1, O1], bottom: japi.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = + def fromFunctions[I1, O1, I2, O2](top: function.Function[I1, O1], bottom: function.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = new BidiFlow(scaladsl.BidiFlow(top.apply _, bottom.apply _)) } @@ -73,7 +74,7 @@ class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting BidiFlow. */ - def atop[OO1, II2, Mat2, M](bidi: BidiFlow[O1, OO1, II2, I2, Mat2], combine: japi.Function2[Mat, Mat2, M]): BidiFlow[I1, OO1, II2, O2, M] = + def atop[OO1, II2, Mat2, M](bidi: BidiFlow[O1, OO1, II2, I2, Mat2], combine: function.Function2[Mat, Mat2, M]): BidiFlow[I1, OO1, II2, O2, M] = new BidiFlow(delegate.atopMat(bidi.asScala)(combinerToScala(combine))) /** @@ -116,7 +117,7 @@ class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting [[Flow]]. */ - def join[Mat2, M](flow: Flow[O1, I2, Mat2], combine: japi.Function2[Mat, Mat2, M]): Flow[I1, O2, M] = + def join[Mat2, M](flow: Flow[O1, I2, Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I1, O2, M] = new Flow(delegate.joinMat(flow.asScala)(combinerToScala(combine))) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 506dc7f42c..dbe5ad8c08 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -5,6 +5,7 @@ package akka.stream.javadsl import akka.stream._ import akka.japi.{ Util, Pair } +import akka.japi.function import akka.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Future @@ -52,7 +53,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Transform only the materialized value of this Flow, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Flow[In, Out, Mat2] = + def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] = new Flow(delegate.mapMaterialized(f.apply _)) /** @@ -64,7 +65,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Transform this [[Flow]] by appending the given processing steps. */ - def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: japi.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow.asScala)(combinerToScala(combine))) /** @@ -76,7 +77,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M, M2](sink: javadsl.Sink[Out, M], combine: japi.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = + def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = new Sink(delegate.toMat(sink.asScala)(combinerToScala(combine))) /** @@ -88,7 +89,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] */ - def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: japi.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = new RunnableFlowAdapter(delegate.joinMat(flow.asScala)(combinerToScala(combine))) /** @@ -127,7 +128,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * The `combine` function is used to compose the materialized values of this flow and that * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. */ - def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: japi.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = + def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = new Flow(delegate.joinMat(bidi.asScala)(combinerToScala(combine))) /** @@ -148,7 +149,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. */ - def map[T](f: japi.Function[Out, T]): javadsl.Flow[In, T, Mat] = + def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.map(f.apply)) /** @@ -158,7 +159,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * The returned list MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. */ - def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] = + def mapConcat[T](f: function.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) /** @@ -178,7 +179,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * * @see [[#mapAsyncUnordered]] */ - def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = + def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapAsync(parallelism, f.apply)) /** @@ -199,13 +200,13 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * * @see [[#mapAsync]] */ - def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = + def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapAsyncUnordered(parallelism, f.apply)) /** * Only pass on those elements that satisfy the given predicate. */ - def filter(p: japi.Predicate[Out]): javadsl.Flow[In, Out, Mat] = + def filter(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.filter(p.test)) /** @@ -235,7 +236,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * [[akka.stream.Supervision#restart]] current value starts at `zero` again * the stream will continue. */ - def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = + def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.scan(zero)(f.apply)) /** @@ -299,7 +300,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ - def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = + def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = new Flow(delegate.conflate(seed.apply)(aggregate.apply)) /** @@ -318,7 +319,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U, Mat] = + def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U, Mat] = new Flow(delegate.expand(seed(_))(s ⇒ { val p = extrapolate(s) (p.first, p.second) @@ -340,7 +341,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * This operator makes it possible to extend the `Flow` API when there is no specialized * operator that performs the transformation. */ - def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Flow[In, U, Mat] = + def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Flow[In, U, Mat] = new Flow(delegate.transform(() ⇒ mkStage.create())) /** @@ -370,7 +371,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]] * the element is dropped and the stream and substreams continue. */ - def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = + def groupBy[K](f: function.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step /** @@ -394,7 +395,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]] * the element is dropped and the stream and substreams continue. */ - def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] = + def splitWhen(p: function.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] = new Flow(delegate.splitWhen(p.test).map(_.asJava)) /** @@ -431,14 +432,14 @@ trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] { /** * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] + def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] } /** INTERNAL API */ private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] { def shape = ClosedShape def module = runnable.module - override def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] = + override def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] = new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _)) override def run(materializer: FlowMaterializer): Mat = runnable.run()(materializer) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 0ecf25315b..27b8a3a18b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -124,7 +124,7 @@ object Balance { } object Zip { - import akka.stream.javadsl.japi.Function2 + import akka.japi.function.Function2 import akka.japi.Pair /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala index 17aba3ff54..2983e2ae40 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala @@ -3,15 +3,16 @@ */ package akka.stream.javadsl +import akka.japi.function import akka.stream.scaladsl import akka.japi.Pair object Keep { - private val _left = new japi.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l } - private val _right = new japi.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r } - private val _both = new japi.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) } + private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l } + private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r } + private val _both = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) } - def left[L, R]: japi.Function2[L, R, L] = _left.asInstanceOf[japi.Function2[L, R, L]] - def right[L, R]: japi.Function2[L, R, R] = _right.asInstanceOf[japi.Function2[L, R, R]] - def both[L, R]: japi.Function2[L, R, L Pair R] = _both.asInstanceOf[japi.Function2[L, R, L Pair R]] + def left[L, R]: function.Function2[L, R, L] = _left.asInstanceOf[function.Function2[L, R, L]] + def right[L, R]: function.Function2[L, R, R] = _right.asInstanceOf[function.Function2[L, R, R]] + def both[L, R]: function.Function2[L, R, L Pair R] = _both.asInstanceOf[function.Function2[L, R, L Pair R]] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 3bca710d64..4abb4e9fef 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import akka.actor.{ ActorRef, Props } +import akka.japi.function import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -27,7 +28,7 @@ object Sink { * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure is signaled in the stream. */ - def fold[U, In](zero: U, f: japi.Function2[U, In, U]): javadsl.Sink[In, Future[U]] = + def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, Future[U]] = new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply)) /** @@ -61,7 +62,7 @@ object Sink { * normal end of the stream, or completed with `Failure` if there is a failure is signaled in * the stream.. */ - def foreach[T](f: japi.Procedure[T]): Sink[T, Future[Unit]] = + def foreach[T](f: function.Procedure[T]): Sink[T, Future[Unit]] = new Sink(scaladsl.Sink.foreach(f.apply)) /** @@ -76,7 +77,7 @@ object Sink { * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ - def onComplete[In](callback: japi.Procedure[Try[Unit]]): Sink[In, Unit] = + def onComplete[In](callback: function.Procedure[Try[Unit]]): Sink[In, Unit] = new Sink(scaladsl.Sink.onComplete[In](x ⇒ callback.apply(x))) /** @@ -142,7 +143,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ /** * Transform only the materialized value of this Sink, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Sink[In, Mat2] = + def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterialized(f.apply _)) override def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 955f81cc79..d706af778b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import java.io.File +import akka.japi.function import scala.collection.immutable import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } @@ -81,7 +82,7 @@ object Source { * in accordance with the demand coming from the downstream transformation * steps. */ - def fromIterator[O](f: japi.Creator[java.util.Iterator[O]]): javadsl.Source[O, Unit] = + def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, Unit] = new Source(scaladsl.Source(() ⇒ f.create().asScala)) /** @@ -233,7 +234,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Transform only the materialized value of this Source, leaving all other properties as they were. */ - def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Source[Out, Mat2] = + def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] = new Source(delegate.mapMaterialized(f.apply _)) /** @@ -245,7 +246,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Transform this [[Source]] by appending the given processing stages. */ - def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: japi.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.viaMat(flow.asScala)(combinerToScala(combine))) /** @@ -257,7 +258,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M, M2](sink: javadsl.Sink[Out, M], combine: japi.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = new RunnableFlowAdapter(delegate.toMat(sink.asScala)(combinerToScala(combine))) /** @@ -275,7 +276,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure is signaled in the stream. */ - def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = + def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = runWith(Sink.fold(zero, f), materializer) /** @@ -293,7 +294,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * normal end of the stream, or completed with `Failure` if there is a failure is signaled in * the stream. */ - def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = + def runForeach(f: function.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = runWith(Sink.foreach(f), materializer) // COMMON OPS // @@ -302,7 +303,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. */ - def map[T](f: japi.Function[Out, T]): javadsl.Source[T, Mat] = + def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] = new Source(delegate.map(f.apply)) /** @@ -312,7 +313,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * The returned list MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. */ - def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Source[T, Mat] = + def mapConcat[T](f: function.Function[Out, java.util.List[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) /** @@ -324,7 +325,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * * @see [[#mapAsyncUnordered]] */ - def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] = + def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapAsync(parallelism, f.apply)) /** @@ -337,13 +338,13 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * * @see [[#mapAsync]] */ - def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] = + def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapAsyncUnordered(parallelism, f.apply)) /** * Only pass on those elements that satisfy the given predicate. */ - def filter(p: japi.Predicate[Out]): javadsl.Source[Out, Mat] = + def filter(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.filter(p.test)) /** @@ -369,7 +370,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * applies the current and next value to the given function `f`, * yielding the next current value. */ - def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Source[T, Mat] = + def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.scan(zero)(f.apply)) /** @@ -431,7 +432,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ - def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Source[S, Mat] = + def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = new Source(delegate.conflate(seed.apply)(aggregate.apply)) /** @@ -447,7 +448,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U, Mat] = + def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U, Mat] = new Source(delegate.expand(seed(_))(s ⇒ { val p = extrapolate(s) (p.first, p.second) @@ -469,7 +470,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * This operator makes it possible to extend the `Flow` API when there is no specialized * operator that performs the transformation. */ - def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Source[U, Mat] = + def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Source[U, Mat] = new Source(delegate.transform(() ⇒ mkStage.create())) /** @@ -491,7 +492,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * care to unblock (or cancel) all of the produced streams even if you want * to consume only one of them. */ - def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = + def groupBy[K](f: function.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] = new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step /** @@ -507,7 +508,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * true, false, false // elements go into third substream * }}} */ - def splitWhen(p: japi.Predicate[Out]): javadsl.Source[javadsl.Source[Out, Unit], Mat] = + def splitWhen(p: function.Predicate[Out]): javadsl.Source[javadsl.Source[Out, Unit], Mat] = new Source(delegate.splitWhen(p.test).map(_.asJava)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala deleted file mode 100644 index 5b31a67ac1..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl.japi - -// TODO Same SAM-classes as in akka.japi, but with variance annotations -// TODO Remove these in favour of using akka.japi with added variance - -/** - * A Function interface. Used to create first-class-functions is Java. - */ -@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! -trait Function[-T, +R] { - @throws(classOf[Exception]) - def apply(param: T): R -} - -/** - * A Function interface. Used to create 2-arg first-class-functions is Java. - */ -@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! -trait Function2[-T1, -T2, +R] { - @throws(classOf[Exception]) - def apply(arg1: T1, arg2: T2): R -} - -/** - * A constructor/factory, takes no parameters but creates a new value of type T every call. - */ -@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! -trait Creator[+T] extends Serializable { - /** - * This method must return a different instance upon every call. - */ - @throws(classOf[Exception]) - def create(): T -} - -/** - * A Procedure is like a Function, but it doesn't produce a return value. - */ -@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! -trait Procedure[-T] { - @throws(classOf[Exception]) - def apply(param: T): Unit -} - -/** - * Java API: Defines a criteria and determines whether the parameter meets this criteria. - */ -@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi! -trait Predicate[-T] { - def test(param: T): Boolean -} - diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala index a70df99122..d13bc2105e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala @@ -5,10 +5,10 @@ package akka.stream package object javadsl { - def combinerToScala[M1, M2, M](f: japi.Function2[M1, M2, M]): (M1, M2) ⇒ M = + def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) ⇒ M = f match { case s: Function2[_, _, _] ⇒ s.asInstanceOf[(M1, M2) ⇒ M] case other ⇒ other.apply _ } -} \ No newline at end of file +}