!str #17277 remove akka.stream.javadsl.japi.WithVariance
sadly we forgot to also move the boilerplate-generated FunctionN and ProcedureN into akka-actor, so they are still in akka.stream.javadsl.japi.
This commit is contained in:
parent
3626f3ee0d
commit
354a8e3a41
30 changed files with 108 additions and 153 deletions
|
|
@ -8,7 +8,7 @@ import java.lang.{ Iterable ⇒ JIterable }
|
|||
|
||||
import akka.japi.Util._
|
||||
import akka.japi.{ Option ⇒ JOption }
|
||||
import akka.stream.javadsl.japi.Function
|
||||
import akka.japi.function.Function
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.http.model.japi
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.stream.javadsl.japi.Function
|
||||
import akka.japi.function.Function
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
|
|||
|
|
@ -5,13 +5,13 @@
|
|||
package akka.http.model.japi;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.function.Function;
|
||||
import akka.japi.function.Procedure;
|
||||
import akka.http.engine.server.ServerSettings;
|
||||
import akka.stream.ActorFlowMaterializer;
|
||||
import akka.stream.FlowMaterializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.javadsl.japi.Function;
|
||||
import akka.stream.javadsl.japi.Procedure;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class ActorPublisherTest extends StreamTest {
|
|||
.actorOf(Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher"));
|
||||
final Publisher<Integer> publisher = UntypedActorPublisher.create(ref);
|
||||
Source.from(publisher)
|
||||
.runForeach(new akka.stream.javadsl.japi.Procedure<Integer>() {
|
||||
.runForeach(new akka.japi.function.Procedure<Integer>() {
|
||||
@Override
|
||||
public void apply(Integer elem) throws Exception {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import akka.stream.*;
|
|||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.stream.javadsl.FlowGraph.Builder;
|
||||
import akka.stream.javadsl.japi.*;
|
||||
import akka.japi.function.*;
|
||||
import akka.util.ByteString;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
|
|
|
|||
|
|
@ -7,12 +7,13 @@ import akka.actor.ActorRef;
|
|||
import akka.japi.*;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.FlowGraph.Builder;
|
||||
import akka.stream.javadsl.japi.Creator;
|
||||
import akka.stream.javadsl.japi.Function;
|
||||
import akka.stream.javadsl.japi.Function2;
|
||||
import akka.stream.javadsl.japi.Procedure;
|
||||
import akka.stream.stage.*;
|
||||
import akka.stream.javadsl.japi.*;
|
||||
import akka.japi.function.Creator;
|
||||
import akka.japi.function.Function;
|
||||
import akka.japi.function.Function2;
|
||||
import akka.japi.function.Procedure;
|
||||
import akka.stream.stage.*;
|
||||
import akka.japi.function.*;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.testkit.TestProbe;
|
||||
|
|
@ -39,7 +40,7 @@ public class FlowGraphTest extends StreamTest {
|
|||
|
||||
@SuppressWarnings("serial")
|
||||
public <T> Creator<Stage<T, T>> op() {
|
||||
return new akka.stream.javadsl.japi.Creator<Stage<T, T>>() {
|
||||
return new akka.japi.function.Creator<Stage<T, T>>() {
|
||||
@Override
|
||||
public PushPullStage<T, T> create() throws Exception {
|
||||
return new PushPullStage<T, T>() {
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import akka.stream.StreamTest;
|
|||
import akka.stream.stage.*;
|
||||
import akka.stream.javadsl.FlowGraph.Builder;
|
||||
import akka.stream.javadsl.japi.*;
|
||||
import akka.japi.function.*;
|
||||
import akka.stream.*;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
|
|
@ -218,7 +219,7 @@ public class FlowTest extends StreamTest {
|
|||
}
|
||||
|
||||
public <T> Creator<Stage<T, T>> op() {
|
||||
return new akka.stream.javadsl.japi.Creator<Stage<T, T>>() {
|
||||
return new akka.japi.function.Creator<Stage<T, T>>() {
|
||||
@Override
|
||||
public PushPullStage<T, T> create() throws Exception {
|
||||
return new PushPullStage<T, T>() {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.Await;
|
|||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.stream.StreamTest;
|
||||
import akka.stream.javadsl.japi.Function2;
|
||||
import akka.japi.function.Function2;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.japi.Pair;
|
|||
import akka.stream.OverflowStrategy;
|
||||
import akka.stream.StreamTest;
|
||||
import akka.stream.stage.*;
|
||||
import akka.stream.javadsl.japi.*;
|
||||
import akka.japi.function.*;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
|
||||
|
|
@ -253,7 +253,7 @@ public class SourceTest extends StreamTest {
|
|||
public void mustBeAbleToUseCallableInput() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Iterable<Integer> input1 = Arrays.asList(4, 3, 2, 1, 0);
|
||||
final akka.stream.javadsl.japi.Creator<Iterator<Integer>> input = new akka.stream.javadsl.japi.Creator<Iterator<Integer>>() {
|
||||
final Creator<Iterator<Integer>> input = new Creator<Iterator<Integer>>() {
|
||||
@Override
|
||||
public Iterator<Integer> create() {
|
||||
return input1.iterator();
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import scala.runtime.BoxedUnit;
|
|||
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.StreamTcp.*;
|
||||
import akka.stream.javadsl.japi.*;
|
||||
import akka.japi.function.*;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.stream.testkit.TestUtils;
|
||||
import akka.util.ByteString;
|
||||
|
|
|
|||
|
|
@ -29,12 +29,12 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
|||
val `scala -> java types` =
|
||||
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
|
||||
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[akka.stream.javadsl.japi.Creator[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
|
||||
(classOf[scala.Function1[_, Unit]], classOf[akka.stream.javadsl.japi.Procedure[_]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Function[_, _]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.stream.javadsl.japi.Creator[_]]) ::
|
||||
(classOf[scala.Function2[_, _, _]], classOf[akka.stream.javadsl.japi.Function2[_, _, _]]) ::
|
||||
(classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Creator[_]]) ::
|
||||
(classOf[scala.Function2[_, _, _]], classOf[akka.japi.function.Function2[_, _, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) ::
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ trait BidiFlowCreate {
|
|||
/**
|
||||
* Creates a BidiFlow by applying a [[FlowGraph.Builder]] to the given create function.
|
||||
*/
|
||||
def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder[Unit], BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] =
|
||||
def create[I1, O1, I2, O2](block: akka.japi.function.Function[FlowGraph.Builder[Unit], BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] =
|
||||
new BidiFlow(scaladsl.BidiFlow() { b ⇒ block.apply(b.asJava) })
|
||||
|
||||
/**
|
||||
|
|
@ -23,14 +23,14 @@ trait BidiFlowCreate {
|
|||
* The given graph will be imported (using `builder.graph()`) and the resulting [[Shape]] will be passed to the create function along with the builder.
|
||||
*/
|
||||
def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M],
|
||||
block: japi.Function2[FlowGraph.Builder[M], S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] =
|
||||
block: akka.japi.function.Function2[FlowGraph.Builder[M], S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] =
|
||||
new BidiFlow(scaladsl.BidiFlow(g1) { b ⇒ s => block.apply(b.asJava, s) })
|
||||
|
||||
/**
|
||||
* Creates a BidiFlow by applying a [[FlowGraph.Builder]] to the given create function.
|
||||
* The given graphs will be imported (using `builder.graph()`) and the resulting [[Shape]]s will be passed to the create function along with the builder.
|
||||
*/
|
||||
def create[I1, O1, I2, O2, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M],
|
||||
def create[I1, O1, I2, O2, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M],
|
||||
block: japi.Function3[FlowGraph.Builder[M], S1, S2, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] =
|
||||
new BidiFlow(scaladsl.BidiFlow(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) })
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ trait FlowCreate {
|
|||
*
|
||||
* The create function is expected to return a pair of [[Inlet]] and [[Outlet]] which correspond to the created Flows input and output ports.
|
||||
*/
|
||||
def create[I, O](block: japi.Function[FlowGraph.Builder[Unit], Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] =
|
||||
def create[I, O](block: akka.japi.function.Function[FlowGraph.Builder[Unit], Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] =
|
||||
new Flow(scaladsl.Flow() { b ⇒ block.apply(b.asJava) })
|
||||
|
||||
/**
|
||||
|
|
@ -26,7 +26,7 @@ trait FlowCreate {
|
|||
*
|
||||
* The create function is expected to return a pair of [[Inlet]] and [[Outlet]] which correspond to the created Flows input and output ports.
|
||||
*/
|
||||
def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] =
|
||||
def create[I, O, S <: Shape, M](g1: Graph[S, M], block: akka.japi.function.Function2[FlowGraph.Builder[M], S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] =
|
||||
new Flow(scaladsl.Flow(g1) { b ⇒ s => block.apply(b.asJava, s) })
|
||||
|
||||
/**
|
||||
|
|
@ -35,7 +35,7 @@ trait FlowCreate {
|
|||
*
|
||||
* The create function is expected to return a pair of [[Inlet]] and [[Outlet]] which correspond to the created Flows input and output ports.
|
||||
*/
|
||||
def create[I, O, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M],
|
||||
def create[I, O, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M],
|
||||
block: japi.Function3[FlowGraph.Builder[M], S1, S2, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] =
|
||||
new Flow(scaladsl.Flow(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) })
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ trait GraphCreate {
|
|||
* The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown.
|
||||
*/
|
||||
@throws(classOf[IllegalArgumentException])
|
||||
def closed(block: japi.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] =
|
||||
def closed(block: akka.japi.function.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] =
|
||||
scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) }
|
||||
|
||||
/**
|
||||
|
|
@ -25,7 +25,7 @@ trait GraphCreate {
|
|||
*
|
||||
* Partial graphs are allowed to have unconnected ports.
|
||||
*/
|
||||
def partial[S <: Shape](block: japi.Function[FlowGraph.Builder[Unit], S]): Graph[S, Unit] =
|
||||
def partial[S <: Shape](block: akka.japi.function.Function[FlowGraph.Builder[Unit], S]): Graph[S, Unit] =
|
||||
scaladsl.FlowGraph.partial() { b ⇒ block.apply(b.asJava) }
|
||||
|
||||
/**
|
||||
|
|
@ -46,7 +46,7 @@ trait GraphCreate {
|
|||
* Partial graphs are allowed to have unconnected ports.
|
||||
*/
|
||||
def partial[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M],
|
||||
block: japi.Function2[FlowGraph.Builder[M], S1, S]): Graph[S, M] =
|
||||
block: akka.japi.function.Function2[FlowGraph.Builder[M], S1, S]): Graph[S, M] =
|
||||
scaladsl.FlowGraph.partial(g1) { b ⇒ s => block.apply(b.asJava, s) }
|
||||
|
||||
/**
|
||||
|
|
@ -56,7 +56,7 @@ trait GraphCreate {
|
|||
* The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown.
|
||||
*/
|
||||
@throws(classOf[IllegalArgumentException])
|
||||
def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M],
|
||||
def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M],
|
||||
block: japi.Procedure3[FlowGraph.Builder[M], S1, S2]): RunnableFlow[M] =
|
||||
scaladsl.FlowGraph.closed(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ trait GraphCreate {
|
|||
*
|
||||
* Partial graphs are allowed to have unconnected ports.
|
||||
*/
|
||||
def partial[S1 <: Shape, S2 <: Shape, S <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M],
|
||||
def partial[S1 <: Shape, S2 <: Shape, S <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M],
|
||||
block: japi.Function3[FlowGraph.Builder[M], S1, S2, S]): Graph[S, M] =
|
||||
scaladsl.FlowGraph.partial(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ trait SinkCreate {
|
|||
* Creates a `Sink` by using a `FlowGraph.Builder[Unit]` on a block that expects
|
||||
* a [[FlowGraph.Builder]] and returns an [[Inlet]].
|
||||
*/
|
||||
def create[T](block: japi.Function[FlowGraph.Builder[Unit], Inlet[T]]): Sink[T, Unit] =
|
||||
def create[T](block: akka.japi.function.Function[FlowGraph.Builder[Unit], Inlet[T]]): Sink[T, Unit] =
|
||||
new Sink(scaladsl.Sink() { b ⇒ block.apply(b.asJava) })
|
||||
|
||||
/**
|
||||
|
|
@ -21,7 +21,7 @@ trait SinkCreate {
|
|||
* The create function is expected to return the created Sink's [[Inlet]].
|
||||
*/
|
||||
def create[T, S <: Shape, M](g1: Graph[S, M],
|
||||
block: japi.Function2[FlowGraph.Builder[M], S, Inlet[T]]): Sink[T, M] =
|
||||
block: akka.japi.function.Function2[FlowGraph.Builder[M], S, Inlet[T]]): Sink[T, M] =
|
||||
new Sink(scaladsl.Sink(g1) { b ⇒ s => block.apply(b.asJava, s) })
|
||||
|
||||
/**
|
||||
|
|
@ -29,7 +29,7 @@ trait SinkCreate {
|
|||
* with the `FlowGraph.Builder[M]` and the [[Shape]]s resulting from importing the graphs.
|
||||
* The create function is expected to return the created Sink's [[Inlet]].
|
||||
*/
|
||||
def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M],
|
||||
def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M],
|
||||
block: japi.Function3[FlowGraph.Builder[M], S1, S2, Inlet[T]]): Sink[T, M] =
|
||||
new Sink(scaladsl.Sink(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) })
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ trait SourceCreate {
|
|||
* Creates a `Source` by using a `FlowGraph.Builder[Unit]` on a block that expects
|
||||
* a [[FlowGraph.Builder]] and returns an [[Outlet]].
|
||||
*/
|
||||
def create[T](block: japi.Function[FlowGraph.Builder[Unit], Outlet[T]]): Source[T, Unit] =
|
||||
def create[T](block: akka.japi.function.Function[FlowGraph.Builder[Unit], Outlet[T]]): Source[T, Unit] =
|
||||
new Source(scaladsl.Source() { b ⇒ block.apply(b.asJava) })
|
||||
|
||||
/**
|
||||
|
|
@ -22,7 +22,7 @@ trait SourceCreate {
|
|||
* will be passed into the create block.
|
||||
*/
|
||||
def create[T, S <: Shape, M](g1: Graph[S, M],
|
||||
block: japi.Function2[FlowGraph.Builder[M], S, Outlet[T]]): Source[T, M] =
|
||||
block: akka.japi.function.Function2[FlowGraph.Builder[M], S, Outlet[T]]): Source[T, M] =
|
||||
new Source(scaladsl.Source(g1) { b ⇒ s => block.apply(b.asJava, s) })
|
||||
|
||||
/**
|
||||
|
|
@ -31,7 +31,7 @@ trait SourceCreate {
|
|||
* The graphs will be imported (using `Builder.graph()`) and the resulting shapes
|
||||
* will be passed into the create block.
|
||||
*/
|
||||
def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: japi.Function2[M1, M2, M],
|
||||
def create[T, S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: akka.japi.function.Function2[M1, M2, M],
|
||||
block: japi.Function3[FlowGraph.Builder[M], S1, S2, Outlet[T]]): Source[T, M] =
|
||||
new Source(scaladsl.Source(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) })
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ object ZipWith {
|
|||
* @param f zipping-function from the input values to the output value
|
||||
* @param attributes optional attributes for this vertex
|
||||
*/
|
||||
def create[A, B, Out](f: japi.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], Unit] =
|
||||
def create[A, B, Out](f: akka.japi.function.Function2[A, B, Out]): Graph[FanInShape2[A, B, Out], Unit] =
|
||||
scaladsl.ZipWith(f.apply _)
|
||||
|
||||
[3..20#/** Create a new `ZipWith` specialized for 1 inputs.
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ import java.util.concurrent.TimeUnit
|
|||
|
||||
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
|
||||
import akka.stream.impl._
|
||||
import akka.stream.javadsl.japi
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.japi.{ function ⇒ japi }
|
||||
|
||||
object ActorFlowMaterializer {
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.stream
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.stream.javadsl.japi
|
||||
import akka.japi.{ function ⇒ japi }
|
||||
import akka.stream.impl.Stages.StageModule
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.stream
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import akka.stream.javadsl.japi
|
||||
import akka.japi.{ function ⇒ japi }
|
||||
|
||||
object Supervision {
|
||||
sealed trait Directive
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package akka.stream.io
|
|||
|
||||
import java.io.InputStream
|
||||
|
||||
import akka.japi.function.Creator
|
||||
import akka.stream.io.impl.InputStreamSource
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.scaladsl.Source._
|
||||
|
|
@ -35,7 +36,7 @@ object InputStreamSource {
|
|||
*
|
||||
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
|
||||
*/
|
||||
def create(createInputStream: javadsl.japi.Creator[InputStream]): javadsl.Source[ByteString, Future[Long]] =
|
||||
def create(createInputStream: Creator[InputStream]): javadsl.Source[ByteString, Future[Long]] =
|
||||
create(createInputStream, DefaultChunkSize)
|
||||
|
||||
/**
|
||||
|
|
@ -46,7 +47,7 @@ object InputStreamSource {
|
|||
*
|
||||
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
|
||||
*/
|
||||
def create(createInputStream: javadsl.japi.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[Long]] =
|
||||
def create(createInputStream: Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[Long]] =
|
||||
apply(() ⇒ createInputStream.create(), chunkSize).asJava
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package akka.stream.io
|
|||
|
||||
import java.io.OutputStream
|
||||
|
||||
import akka.japi.function.Creator
|
||||
import akka.stream.io.impl.OutputStreamSink
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.{ ActorOperationAttributes, OperationAttributes, javadsl }
|
||||
|
|
@ -37,7 +38,7 @@ object OutputStreamSink {
|
|||
*
|
||||
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
|
||||
*/
|
||||
def create(f: javadsl.japi.Creator[OutputStream]): javadsl.Sink[ByteString, Future[Long]] =
|
||||
def create(f: Creator[OutputStream]): javadsl.Sink[ByteString, Future[Long]] =
|
||||
apply(() ⇒ f.create()).asJava
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import akka.japi.function
|
||||
import akka.stream.scaladsl
|
||||
import akka.stream.Graph
|
||||
import akka.stream.BidiShape
|
||||
|
|
@ -22,7 +23,7 @@ object BidiFlow {
|
|||
* Create a BidiFlow where the top and bottom flows are just one simple mapping
|
||||
* stage each, expressed by the two functions.
|
||||
*/
|
||||
def fromFunctions[I1, O1, I2, O2](top: japi.Function[I1, O1], bottom: japi.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] =
|
||||
def fromFunctions[I1, O1, I2, O2](top: function.Function[I1, O1], bottom: function.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] =
|
||||
new BidiFlow(scaladsl.BidiFlow(top.apply _, bottom.apply _))
|
||||
|
||||
}
|
||||
|
|
@ -73,7 +74,7 @@ class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2,
|
|||
* The `combine` function is used to compose the materialized values of this flow and that
|
||||
* flow into the materialized value of the resulting BidiFlow.
|
||||
*/
|
||||
def atop[OO1, II2, Mat2, M](bidi: BidiFlow[O1, OO1, II2, I2, Mat2], combine: japi.Function2[Mat, Mat2, M]): BidiFlow[I1, OO1, II2, O2, M] =
|
||||
def atop[OO1, II2, Mat2, M](bidi: BidiFlow[O1, OO1, II2, I2, Mat2], combine: function.Function2[Mat, Mat2, M]): BidiFlow[I1, OO1, II2, O2, M] =
|
||||
new BidiFlow(delegate.atopMat(bidi.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -116,7 +117,7 @@ class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2,
|
|||
* The `combine` function is used to compose the materialized values of this flow and that
|
||||
* flow into the materialized value of the resulting [[Flow]].
|
||||
*/
|
||||
def join[Mat2, M](flow: Flow[O1, I2, Mat2], combine: japi.Function2[Mat, Mat2, M]): Flow[I1, O2, M] =
|
||||
def join[Mat2, M](flow: Flow[O1, I2, Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I1, O2, M] =
|
||||
new Flow(delegate.joinMat(flow.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package akka.stream.javadsl
|
|||
|
||||
import akka.stream._
|
||||
import akka.japi.{ Util, Pair }
|
||||
import akka.japi.function
|
||||
import akka.stream.scaladsl
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.concurrent.Future
|
||||
|
|
@ -52,7 +53,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
/**
|
||||
* Transform only the materialized value of this Flow, leaving all other properties as they were.
|
||||
*/
|
||||
def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
|
||||
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
|
||||
new Flow(delegate.mapMaterialized(f.apply _))
|
||||
|
||||
/**
|
||||
|
|
@ -64,7 +65,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
/**
|
||||
* Transform this [[Flow]] by appending the given processing steps.
|
||||
*/
|
||||
def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: japi.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
|
||||
def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
|
||||
new Flow(delegate.viaMat(flow.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -76,7 +77,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
/**
|
||||
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
|
||||
*/
|
||||
def to[M, M2](sink: javadsl.Sink[Out, M], combine: japi.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
|
||||
def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
|
||||
new Sink(delegate.toMat(sink.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -88,7 +89,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
/**
|
||||
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]
|
||||
*/
|
||||
def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: japi.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
|
||||
def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
|
||||
new RunnableFlowAdapter(delegate.joinMat(flow.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -127,7 +128,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* The `combine` function is used to compose the materialized values of this flow and that
|
||||
* [[BidiFlow]] into the materialized value of the resulting [[Flow]].
|
||||
*/
|
||||
def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: japi.Function2[Mat, Mat2, M]): Flow[I2, O2, M] =
|
||||
def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] =
|
||||
new Flow(delegate.joinMat(bidi.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -148,7 +149,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step.
|
||||
*/
|
||||
def map[T](f: japi.Function[Out, T]): javadsl.Flow[In, T, Mat] =
|
||||
def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.map(f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -158,7 +159,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* The returned list MUST NOT contain `null` values,
|
||||
* as they are illegal as stream elements - according to the Reactive Streams specification.
|
||||
*/
|
||||
def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] =
|
||||
def mapConcat[T](f: function.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem))))
|
||||
|
||||
/**
|
||||
|
|
@ -178,7 +179,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
*
|
||||
* @see [[#mapAsyncUnordered]]
|
||||
*/
|
||||
def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
|
||||
def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.mapAsync(parallelism, f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -199,13 +200,13 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
*
|
||||
* @see [[#mapAsync]]
|
||||
*/
|
||||
def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
|
||||
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.mapAsyncUnordered(parallelism, f.apply))
|
||||
|
||||
/**
|
||||
* Only pass on those elements that satisfy the given predicate.
|
||||
*/
|
||||
def filter(p: japi.Predicate[Out]): javadsl.Flow[In, Out, Mat] =
|
||||
def filter(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] =
|
||||
new Flow(delegate.filter(p.test))
|
||||
|
||||
/**
|
||||
|
|
@ -235,7 +236,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* [[akka.stream.Supervision#restart]] current value starts at `zero` again
|
||||
* the stream will continue.
|
||||
*/
|
||||
def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
|
||||
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
|
||||
new Flow(delegate.scan(zero)(f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -299,7 +300,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
|
||||
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
|
||||
*/
|
||||
def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] =
|
||||
def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] =
|
||||
new Flow(delegate.conflate(seed.apply)(aggregate.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -318,7 +319,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
|
||||
* state.
|
||||
*/
|
||||
def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U, Mat] =
|
||||
def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U, Mat] =
|
||||
new Flow(delegate.expand(seed(_))(s ⇒ {
|
||||
val p = extrapolate(s)
|
||||
(p.first, p.second)
|
||||
|
|
@ -340,7 +341,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* This operator makes it possible to extend the `Flow` API when there is no specialized
|
||||
* operator that performs the transformation.
|
||||
*/
|
||||
def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Flow[In, U, Mat] =
|
||||
def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Flow[In, U, Mat] =
|
||||
new Flow(delegate.transform(() ⇒ mkStage.create()))
|
||||
|
||||
/**
|
||||
|
|
@ -370,7 +371,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*/
|
||||
def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
|
||||
def groupBy[K](f: function.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
|
||||
new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
|
|
@ -394,7 +395,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
|
||||
* the element is dropped and the stream and substreams continue.
|
||||
*/
|
||||
def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] =
|
||||
def splitWhen(p: function.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] =
|
||||
new Flow(delegate.splitWhen(p.test).map(_.asJava))
|
||||
|
||||
/**
|
||||
|
|
@ -431,14 +432,14 @@ trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] {
|
|||
/**
|
||||
* Transform only the materialized value of this RunnableFlow, leaving all other properties as they were.
|
||||
*/
|
||||
def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2]
|
||||
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2]
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] {
|
||||
def shape = ClosedShape
|
||||
def module = runnable.module
|
||||
override def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] =
|
||||
override def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] =
|
||||
new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _))
|
||||
override def run(materializer: FlowMaterializer): Mat = runnable.run()(materializer)
|
||||
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ object Balance {
|
|||
}
|
||||
|
||||
object Zip {
|
||||
import akka.stream.javadsl.japi.Function2
|
||||
import akka.japi.function.Function2
|
||||
import akka.japi.Pair
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -3,15 +3,16 @@
|
|||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import akka.japi.function
|
||||
import akka.stream.scaladsl
|
||||
import akka.japi.Pair
|
||||
|
||||
object Keep {
|
||||
private val _left = new japi.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l }
|
||||
private val _right = new japi.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r }
|
||||
private val _both = new japi.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) }
|
||||
private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l }
|
||||
private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r }
|
||||
private val _both = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) }
|
||||
|
||||
def left[L, R]: japi.Function2[L, R, L] = _left.asInstanceOf[japi.Function2[L, R, L]]
|
||||
def right[L, R]: japi.Function2[L, R, R] = _right.asInstanceOf[japi.Function2[L, R, R]]
|
||||
def both[L, R]: japi.Function2[L, R, L Pair R] = _both.asInstanceOf[japi.Function2[L, R, L Pair R]]
|
||||
def left[L, R]: function.Function2[L, R, L] = _left.asInstanceOf[function.Function2[L, R, L]]
|
||||
def right[L, R]: function.Function2[L, R, R] = _right.asInstanceOf[function.Function2[L, R, R]]
|
||||
def both[L, R]: function.Function2[L, R, L Pair R] = _both.asInstanceOf[function.Function2[L, R, L Pair R]]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.stream.javadsl
|
||||
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.japi.function
|
||||
import akka.stream.impl.StreamLayout
|
||||
import akka.stream.{ javadsl, scaladsl, _ }
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
|
@ -27,7 +28,7 @@ object Sink {
|
|||
* function evaluation when the input stream ends, or completed with `Failure`
|
||||
* if there is a failure is signaled in the stream.
|
||||
*/
|
||||
def fold[U, In](zero: U, f: japi.Function2[U, In, U]): javadsl.Sink[In, Future[U]] =
|
||||
def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, Future[U]] =
|
||||
new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -61,7 +62,7 @@ object Sink {
|
|||
* normal end of the stream, or completed with `Failure` if there is a failure is signaled in
|
||||
* the stream..
|
||||
*/
|
||||
def foreach[T](f: japi.Procedure[T]): Sink[T, Future[Unit]] =
|
||||
def foreach[T](f: function.Procedure[T]): Sink[T, Future[Unit]] =
|
||||
new Sink(scaladsl.Sink.foreach(f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -76,7 +77,7 @@ object Sink {
|
|||
* completion, apply the provided function with [[scala.util.Success]]
|
||||
* or [[scala.util.Failure]].
|
||||
*/
|
||||
def onComplete[In](callback: japi.Procedure[Try[Unit]]): Sink[In, Unit] =
|
||||
def onComplete[In](callback: function.Procedure[Try[Unit]]): Sink[In, Unit] =
|
||||
new Sink(scaladsl.Sink.onComplete[In](x ⇒ callback.apply(x)))
|
||||
|
||||
/**
|
||||
|
|
@ -142,7 +143,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[
|
|||
/**
|
||||
* Transform only the materialized value of this Sink, leaving all other properties as they were.
|
||||
*/
|
||||
def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Sink[In, Mat2] =
|
||||
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
|
||||
new Sink(delegate.mapMaterialized(f.apply _))
|
||||
|
||||
override def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] =
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.stream.javadsl
|
||||
|
||||
import java.io.File
|
||||
import akka.japi.function
|
||||
import scala.collection.immutable
|
||||
import java.util.concurrent.Callable
|
||||
import akka.actor.{ Cancellable, ActorRef, Props }
|
||||
|
|
@ -81,7 +82,7 @@ object Source {
|
|||
* in accordance with the demand coming from the downstream transformation
|
||||
* steps.
|
||||
*/
|
||||
def fromIterator[O](f: japi.Creator[java.util.Iterator[O]]): javadsl.Source[O, Unit] =
|
||||
def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, Unit] =
|
||||
new Source(scaladsl.Source(() ⇒ f.create().asScala))
|
||||
|
||||
/**
|
||||
|
|
@ -233,7 +234,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
/**
|
||||
* Transform only the materialized value of this Source, leaving all other properties as they were.
|
||||
*/
|
||||
def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Source[Out, Mat2] =
|
||||
def mapMaterialized[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] =
|
||||
new Source(delegate.mapMaterialized(f.apply _))
|
||||
|
||||
/**
|
||||
|
|
@ -245,7 +246,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
/**
|
||||
* Transform this [[Source]] by appending the given processing stages.
|
||||
*/
|
||||
def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: japi.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
|
||||
def via[T, M, M2](flow: javadsl.Flow[Out, T, M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
|
||||
new Source(delegate.viaMat(flow.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -257,7 +258,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
/**
|
||||
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
|
||||
*/
|
||||
def to[M, M2](sink: javadsl.Sink[Out, M], combine: japi.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
|
||||
def to[M, M2](sink: javadsl.Sink[Out, M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
|
||||
new RunnableFlowAdapter(delegate.toMat(sink.asScala)(combinerToScala(combine)))
|
||||
|
||||
/**
|
||||
|
|
@ -275,7 +276,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* function evaluation when the input stream ends, or completed with `Failure`
|
||||
* if there is a failure is signaled in the stream.
|
||||
*/
|
||||
def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] =
|
||||
def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] =
|
||||
runWith(Sink.fold(zero, f), materializer)
|
||||
|
||||
/**
|
||||
|
|
@ -293,7 +294,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* normal end of the stream, or completed with `Failure` if there is a failure is signaled in
|
||||
* the stream.
|
||||
*/
|
||||
def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] =
|
||||
def runForeach(f: function.Procedure[Out], materializer: FlowMaterializer): Future[Unit] =
|
||||
runWith(Sink.foreach(f), materializer)
|
||||
|
||||
// COMMON OPS //
|
||||
|
|
@ -302,7 +303,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step.
|
||||
*/
|
||||
def map[T](f: japi.Function[Out, T]): javadsl.Source[T, Mat] =
|
||||
def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.map(f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -312,7 +313,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* The returned list MUST NOT contain `null` values,
|
||||
* as they are illegal as stream elements - according to the Reactive Streams specification.
|
||||
*/
|
||||
def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Source[T, Mat] =
|
||||
def mapConcat[T](f: function.Function[Out, java.util.List[T]]): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem))))
|
||||
|
||||
/**
|
||||
|
|
@ -324,7 +325,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
*
|
||||
* @see [[#mapAsyncUnordered]]
|
||||
*/
|
||||
def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
|
||||
def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.mapAsync(parallelism, f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -337,13 +338,13 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
*
|
||||
* @see [[#mapAsync]]
|
||||
*/
|
||||
def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
|
||||
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.mapAsyncUnordered(parallelism, f.apply))
|
||||
|
||||
/**
|
||||
* Only pass on those elements that satisfy the given predicate.
|
||||
*/
|
||||
def filter(p: japi.Predicate[Out]): javadsl.Source[Out, Mat] =
|
||||
def filter(p: function.Predicate[Out]): javadsl.Source[Out, Mat] =
|
||||
new Source(delegate.filter(p.test))
|
||||
|
||||
/**
|
||||
|
|
@ -369,7 +370,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* applies the current and next value to the given function `f`,
|
||||
* yielding the next current value.
|
||||
*/
|
||||
def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Source[T, Mat] =
|
||||
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
|
||||
new Source(delegate.scan(zero)(f.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -431,7 +432,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
|
||||
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
|
||||
*/
|
||||
def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Source[S, Mat] =
|
||||
def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] =
|
||||
new Source(delegate.conflate(seed.apply)(aggregate.apply))
|
||||
|
||||
/**
|
||||
|
|
@ -447,7 +448,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
|
||||
* state.
|
||||
*/
|
||||
def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U, Mat] =
|
||||
def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U, Mat] =
|
||||
new Source(delegate.expand(seed(_))(s ⇒ {
|
||||
val p = extrapolate(s)
|
||||
(p.first, p.second)
|
||||
|
|
@ -469,7 +470,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* This operator makes it possible to extend the `Flow` API when there is no specialized
|
||||
* operator that performs the transformation.
|
||||
*/
|
||||
def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Source[U, Mat] =
|
||||
def transform[U](mkStage: function.Creator[Stage[Out, U]]): javadsl.Source[U, Mat] =
|
||||
new Source(delegate.transform(() ⇒ mkStage.create()))
|
||||
|
||||
/**
|
||||
|
|
@ -491,7 +492,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* care to unblock (or cancel) all of the produced streams even if you want
|
||||
* to consume only one of them.
|
||||
*/
|
||||
def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
|
||||
def groupBy[K](f: function.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
|
||||
new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
|
|
@ -507,7 +508,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* true, false, false // elements go into third substream
|
||||
* }}}
|
||||
*/
|
||||
def splitWhen(p: japi.Predicate[Out]): javadsl.Source[javadsl.Source[Out, Unit], Mat] =
|
||||
def splitWhen(p: function.Predicate[Out]): javadsl.Source[javadsl.Source[Out, Unit], Mat] =
|
||||
new Source(delegate.splitWhen(p.test).map(_.asJava))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1,55 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl.japi
|
||||
|
||||
// TODO Same SAM-classes as in akka.japi, but with variance annotations
|
||||
// TODO Remove these in favour of using akka.japi with added variance
|
||||
|
||||
/**
|
||||
* A Function interface. Used to create first-class-functions is Java.
|
||||
*/
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Function[-T, +R] {
|
||||
@throws(classOf[Exception])
|
||||
def apply(param: T): R
|
||||
}
|
||||
|
||||
/**
|
||||
* A Function interface. Used to create 2-arg first-class-functions is Java.
|
||||
*/
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Function2[-T1, -T2, +R] {
|
||||
@throws(classOf[Exception])
|
||||
def apply(arg1: T1, arg2: T2): R
|
||||
}
|
||||
|
||||
/**
|
||||
* A constructor/factory, takes no parameters but creates a new value of type T every call.
|
||||
*/
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Creator[+T] extends Serializable {
|
||||
/**
|
||||
* This method must return a different instance upon every call.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def create(): T
|
||||
}
|
||||
|
||||
/**
|
||||
* A Procedure is like a Function, but it doesn't produce a return value.
|
||||
*/
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Procedure[-T] {
|
||||
@throws(classOf[Exception])
|
||||
def apply(param: T): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Defines a criteria and determines whether the parameter meets this criteria.
|
||||
*/
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Predicate[-T] {
|
||||
def test(param: T): Boolean
|
||||
}
|
||||
|
||||
|
|
@ -5,7 +5,7 @@ package akka.stream
|
|||
|
||||
package object javadsl {
|
||||
|
||||
def combinerToScala[M1, M2, M](f: japi.Function2[M1, M2, M]): (M1, M2) ⇒ M =
|
||||
def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) ⇒ M =
|
||||
f match {
|
||||
case s: Function2[_, _, _] ⇒ s.asInstanceOf[(M1, M2) ⇒ M]
|
||||
case other ⇒ other.apply _
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue