From 9dd3685a2a06185256993a5e3456d6621f0c8f85 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 20 Oct 2014 14:09:24 +0200 Subject: [PATCH] =str #16103 Flattened javadsl class structure - no adapters, no ops --- .../akka/http/model/japi/JavaTestServer.java | 5 +- .../akka/stream/actor/ActorPublisherTest.java | 1 - .../stream/actor/ActorSubscriberTest.java | 4 +- .../java/akka/stream/javadsl/FlowTest.java | 11 +- .../akka/stream/DslConsistencySpec.scala | 3 +- .../main/scala/akka/stream/impl/Extract.scala | 12 +- .../akka/stream/javadsl/ActorFlowSink.scala | 15 - .../akka/stream/javadsl/ActorFlowSource.scala | 14 - .../main/scala/akka/stream/javadsl/Flow.scala | 403 +++++++++--- .../scala/akka/stream/javadsl/FlowGraph.scala | 11 +- .../akka/stream/javadsl/MaterializedMap.scala | 17 +- .../main/scala/akka/stream/javadsl/Sink.scala | 136 +++- .../scala/akka/stream/javadsl/Source.scala | 599 ++++++++++++------ .../scala/akka/stream/javadsl/StreamOps.scala | 560 ---------------- .../stream/scaladsl2/JavaConverters.scala | 21 +- .../scala/akka/stream/scaladsl2/Pipe.scala | 2 +- 16 files changed, 862 insertions(+), 952 deletions(-) delete mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSink.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSource.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/StreamOps.scala diff --git a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java index 5946831043..f4a7b117cc 100644 --- a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java @@ -9,9 +9,6 @@ import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.Foreach; -import akka.japi.Function; -import akka.japi.Procedure; -import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.scaladsl2.FlowMaterializer; @@ -46,7 +43,7 @@ public abstract class JavaTestServer { System.out.println("Handling request to " + request.getUri()); return JavaApiTestCases.handleRequest(request); } - }).runWith(Sink.subscriber(conn.getResponseSubscriber()), materializer); + }).runWith(Sink.create(conn.getResponseSubscriber()), materializer); } }, materializer); } diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index 95d9fdafb7..8054ab6284 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -16,7 +16,6 @@ import org.reactivestreams.Publisher; import static akka.stream.actor.ActorPublisherMessage.Request; -@Ignore public class ActorPublisherTest { @ClassRule diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java index c33222c3e4..cadaf53c48 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -11,7 +11,6 @@ import akka.stream.scaladsl2.FlowMaterializer; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.reactivestreams.Subscriber; @@ -20,7 +19,6 @@ import java.util.Arrays; import static akka.stream.actor.ActorSubscriberMessage.OnError; import static akka.stream.actor.ActorSubscriberMessage.OnNext; -@Ignore public class ActorSubscriberTest { @ClassRule @@ -69,7 +67,7 @@ public class ActorSubscriberTest { final Subscriber subscriber = UntypedActorSubscriber.create(ref); final java.util.Iterator input = Arrays.asList(1, 2, 3).iterator(); - Source.from(input).runWith(Sink.subscriber(subscriber), materializer); + Source.from(input).runWith(Sink.create(subscriber), materializer); ref.tell("run", null); probe.expectMsgEquals(1); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index d9f7398afe..1b0572a954 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -409,12 +409,11 @@ public class FlowTest { final java.lang.Iterable input = Arrays.asList("A", "B", "C"); Source.from(input) - .runWith(Sink.onComplete( - new OnComplete() { - @Override public void onComplete(Throwable failure, BoxedUnit success) throws Throwable { - probe.getRef().tell(success, ActorRef.noSender()); - } - }), + .runWith(Sink.onComplete(new Procedure() { + @Override public void apply(BoxedUnit param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }), materializer); probe.expectMsgClass(BoxedUnit.class); diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index ef03eabcb5..a8282c6fff 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -86,7 +86,8 @@ class DslConsistencySpec extends WordSpec with Matchers { for (c ← classes; op ← materializingOps) assertHasMethod(c, op) } - } + } } + } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Extract.scala b/akka-stream/src/main/scala/akka/stream/impl/Extract.scala index bde6679568..be3a766325 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Extract.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Extract.scala @@ -16,17 +16,17 @@ private[akka] object Extract { object Source { def unapply(a: Any): Option[scaladsl2.Source[Any]] = a match { - case s: scaladsl2.Source[Any] ⇒ Some(s) - case s: javadsl.SourceAdapter[Any] ⇒ Some(s.asScala) - case _ ⇒ None + case s: scaladsl2.Source[Any] ⇒ Some(s) + case s: javadsl.Source[Any] ⇒ Some(s.asScala) + case _ ⇒ None } } object Sink { def unapply(a: Any): Option[scaladsl2.Sink[Any]] = a match { - case s: scaladsl2.Sink[Any] ⇒ Some(s) - case s: javadsl.SinkAdapter[Any] ⇒ Some(s.asScala) - case _ ⇒ None + case s: scaladsl2.Sink[Any] ⇒ Some(s) + case s: javadsl.Sink[Any] ⇒ Some(s.asScala) + case _ ⇒ None } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSink.scala deleted file mode 100644 index dbcd61fdcf..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSink.scala +++ /dev/null @@ -1,15 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl - -import akka.stream.javadsl -import akka.stream.scaladsl2 - -final case class SimpleSink[-In](override val delegate: scaladsl2.Sink[In]) extends javadsl.SinkAdapter[In] { - override def asScala: scaladsl2.SimpleActorFlowSink[In] = super.asScala.asInstanceOf[scaladsl2.SimpleActorFlowSink[In]] -} - -final case class KeyedSink[-In, M](override val delegate: scaladsl2.Sink[In]) extends javadsl.SinkAdapter[In] { - override def asScala: scaladsl2.KeyedSink[In] = super.asScala.asInstanceOf[scaladsl2.KeyedSink[In]] -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSource.scala deleted file mode 100644 index 99acddfc91..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/ActorFlowSource.scala +++ /dev/null @@ -1,14 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl - -import akka.stream.scaladsl2 - -final case class SimpleSource[+Out](override val delegate: scaladsl2.Source[Out]) extends SourceAdapter[Out] { - override def asScala: scaladsl2.SimpleActorFlowSource[Out] = super.asScala.asInstanceOf[scaladsl2.SimpleActorFlowSource[Out]] -} - -final case class KeyedSource[+Out, T](override val delegate: scaladsl2.Source[Out]) extends SourceAdapter[Out] { - override def asScala: scaladsl2.KeyedActorFlowSource[Out] = super.asScala.asInstanceOf[scaladsl2.KeyedActorFlowSource[Out]] -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6594a17845..bab8649d9e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -5,11 +5,9 @@ package akka.stream.javadsl import akka.stream._ -import java.util - import akka.japi.Util -import akka.japi.Pair -import akka.stream.javadsl.japi.Function +import akka.stream.scaladsl2 +import scaladsl2.FlowMaterializer import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Future @@ -17,136 +15,368 @@ import scala.concurrent.duration.FiniteDuration object Flow { - /** Create a `Flow` which can process elements of type `T`. */ - def of[T](): javadsl.Flow[T, T] = new javadsl.FlowAdapter[T, T](scaladsl2.Pipe.empty[T]) + import akka.stream.scaladsl2.JavaConverters._ + + /** Adapt [[scaladsl2.Flow]] for use within Java DSL */ + def adapt[I, O](flow: scaladsl2.Flow[I, O]): javadsl.Flow[I, O] = + new Flow(flow) /** Create a `Flow` which can process elements of type `T`. */ - def of[T](clazz: Class[T]): javadsl.Flow[T, T] = of[T]() + def create[T](): javadsl.Flow[T, T] = + new javadsl.Flow[T, T](scaladsl2.Pipe.empty[T]) + + /** + * Creates a `Flow` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and + * returns the `UndefinedSource` and `UndefinedSink`. + */ + def apply[I, O](block: japi.Function[FlowGraphBuilder, akka.japi.Pair[UndefinedSource[I], UndefinedSink[O]]]): Flow[I, O] = { + val sFlow = scaladsl2.Flow() { b ⇒ + val pair = block.apply(b.asJava) + pair.first.asScala → pair.second.asScala + } + new javadsl.Flow[I, O](sFlow) + } + + /** + * Creates a `Flow` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSource` and `UndefinedSink`. + */ + def apply[I, O](graph: PartialFlowGraph, block: japi.Function[javadsl.FlowGraphBuilder, akka.japi.Pair[UndefinedSource[I], UndefinedSink[O]]]): Flow[I, O] = { + val sFlow = scaladsl2.Flow(graph.asScala) { b ⇒ + val pair = block.apply(b.asJava) + pair.first.asScala → pair.second.asScala + } + new Flow[I, O](sFlow) + } + + /** Create a `Flow` which can process elements of type `T`. */ + def of[T](clazz: Class[T]): javadsl.Flow[T, T] = + create[T]() } /** Java API */ -abstract class Flow[-In, +Out] extends FlowOps[In, Out] { - - /** - * Transform this flow by appending the given processing steps. - */ - def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] - - /** - * Connect this flow to a sink, concatenating the processing steps of both. - */ - def connect(sink: javadsl.Sink[Out]): javadsl.Sink[In] -} - -/** - * INTERNAL API - */ -private[akka] class FlowAdapter[-In, +Out](delegate: scaladsl2.Flow[In, Out]) extends javadsl.Flow[In, Out] { - +class Flow[-In, +Out](delegate: scaladsl2.Flow[In, Out]) { import scala.collection.JavaConverters._ import akka.stream.scaladsl2.JavaConverters._ /** Converts this Flow to it's Scala DSL counterpart */ def asScala: scaladsl2.Flow[In, Out] = delegate - // FLOW // + // CONNECT // /** * Transform this flow by appending the given processing steps. */ - override def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] = - new FlowAdapter(delegate.connect(flow.asScala)) + def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Flow[In, T] = + new Flow(delegate.connect(flow.asScala)) /** * Connect this flow to a sink, concatenating the processing steps of both. */ - override def connect(sink: javadsl.Sink[Out]): javadsl.Sink[In] = - SinkAdapter(delegate.connect(sink.asScala)) + def connect(sink: javadsl.Sink[Out]): javadsl.Sink[In] = + new Sink(delegate.connect(sink.asScala)) // RUN WITH // - def runWith[T, D](source: javadsl.KeyedSource[In, T], sink: javadsl.KeyedSink[Out, D], materializer: scaladsl2.FlowMaterializer): akka.japi.Pair[T, D] = { + /** + * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. + * + * The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink`, + * e.g. the `Subscriber` of a `SubscriberSource` and `Publisher` of a `PublisherSink`. + * + * @tparam T materialized type of given KeyedSource + * @tparam U materialized type of given KeyedSink + */ + def runWith[T, U](source: javadsl.KeyedSource[In, T], sink: javadsl.KeyedSink[Out, U], materializer: FlowMaterializer): akka.japi.Pair[T, U] = { val p = delegate.runWith(source.asScala, sink.asScala)(materializer) - akka.japi.Pair(p._1.asInstanceOf[T], p._2.asInstanceOf[D]) + akka.japi.Pair(p._1.asInstanceOf[T], p._2.asInstanceOf[U]) } - def runWith[D](source: javadsl.SimpleSource[In], sink: javadsl.KeyedSink[Out, D], materializer: scaladsl2.FlowMaterializer): D = - delegate.runWith(source.asScala, sink.asScala)(materializer).asInstanceOf[D] - - def runWith[T](source: javadsl.KeyedSource[In, T], sink: javadsl.SimpleSink[Out], materializer: scaladsl2.FlowMaterializer): T = + /** + * Connect the `Source` to this `Flow` and then connect it to the `KeyedSink` and run it. + * + * The returned value will contain the materialized value of the `KeyedSink`, + * e.g. `Publisher` of a `Sink.publisher()`. + * + * @tparam T materialized type of given KeyedSink + */ + def runWith[T](source: javadsl.Source[In], sink: javadsl.KeyedSink[Out, T], materializer: FlowMaterializer): T = delegate.runWith(source.asScala, sink.asScala)(materializer).asInstanceOf[T] - def runWith(source: javadsl.SimpleSource[In], sink: javadsl.SimpleSink[Out], materializer: scaladsl2.FlowMaterializer): Unit = + /** + * Connect the `KeyedSource` to this `Flow` and then connect it to the `Sink` and run it. + * + * The returned value will contain the materialized value of the `KeyedSource`, + * e.g. `Subscriber` of a `Source.from(publisher)`. + * + * @tparam T materialized type of given KeyedSource + */ + def runWith[T](source: javadsl.KeyedSource[In, T], sink: javadsl.Sink[Out], materializer: FlowMaterializer): T = + delegate.runWith(source.asScala, sink.asScala)(materializer).asInstanceOf[T] + + /** + * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. + * + * As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload. + */ + def runWith(source: javadsl.Source[In], sink: javadsl.Sink[Out], materializer: FlowMaterializer): Unit = delegate.runWith(source.asScala, sink.asScala)(materializer) // COMMON OPS // - override def map[T](f: Function[Out, T]): javadsl.Flow[In, T] = - new FlowAdapter(delegate.map(f.apply)) + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. + */ + def map[T](f: japi.Function[Out, T]): javadsl.Flow[In, T] = + new Flow(delegate.map(f.apply)) - override def mapConcat[U](f: Function[Out, java.util.List[U]]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + /** + * Transform each input element into a sequence of output elements that is + * then flattened into the output stream. + */ + def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Flow[In, T] = + new Flow(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) - override def mapAsync[U](f: Function[Out, Future[U]]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.mapAsync(f.apply)) + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and may complete in any order, but the elements that + * are emitted downstream are in the same order as from upstream. + * + * @see [[#mapAsyncUnordered]] + */ + def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] = + new Flow(delegate.mapAsync(f.apply)) - override def mapAsyncUnordered[T](f: Function[Out, Future[T]]): javadsl.Flow[In, T] = - new FlowAdapter(delegate.mapAsyncUnordered(f.apply)) + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and each processed element will be emitted dowstream + * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream + * in the same order as from upstream. + * + * @see [[#mapAsync]] + */ + def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] = + new Flow(delegate.mapAsyncUnordered(f.apply)) - override def filter(p: japi.Predicate[Out]): javadsl.Flow[In, Out] = - new FlowAdapter(delegate.filter(p.test)) + /** + * Only pass on those elements that satisfy the given predicate. + */ + def filter(p: japi.Predicate[Out]): javadsl.Flow[In, Out] = + new Flow(delegate.filter(p.test)) - override def collect[U](pf: PartialFunction[Out, U]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.collect(pf)) + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + */ + def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T] = + new Flow(delegate.collect(pf)) - override def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] = - new FlowAdapter(delegate.grouped(n).map(_.asJava)).asInstanceOf[javadsl.Flow[In, java.util.List[Out @uncheckedVariance]]] // FIXME optimize to one step + /** + * Chunk up this stream into groups of the given size, with the last group + * possibly smaller than requested due to end-of-stream. + * + * `n` must be positive, otherwise IllegalArgumentException is thrown. + */ + def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] = + new Flow(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step - override def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, util.List[Out @uncheckedVariance]] = - new FlowAdapter(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * `n` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] = + new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step - override def drop(n: Int): javadsl.Flow[In, Out] = - new FlowAdapter(delegate.drop(n)) + /** + * Discard the given number of elements at the beginning of the stream. + * No elements will be dropped if `n` is zero or negative. + */ + def drop(n: Int): javadsl.Flow[In, Out] = + new Flow(delegate.drop(n)) - override def dropWithin(d: FiniteDuration): javadsl.Flow[In, Out] = - new FlowAdapter(delegate.dropWithin(d)) + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): javadsl.Flow[In, Out] = + new Flow(delegate.dropWithin(d)) - override def take(n: Int): Flow[In, Out] = - new FlowAdapter(delegate.take(n)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * number of elements. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + */ + def take(n: Int): javadsl.Flow[In, Out] = + new Flow(delegate.take(n)) - override def takeWithin(d: FiniteDuration): Flow[In, Out] = - new FlowAdapter(delegate.takeWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): javadsl.Flow[In, Out] = + new Flow(delegate.takeWithin(d)) - override def transform[U](name: String, transformer: japi.Creator[Transformer[Out, U]]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.transform(name, () ⇒ transformer.create())) + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S] = + new Flow(delegate.conflate(seed.apply, aggregate.apply)) - override def timerTransform[U](name: String, transformer: japi.Creator[TimerTransformer[Out, U]]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.timerTransform(name, () ⇒ transformer.create())) - - override def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance]]] = - new FlowAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ akka.japi.Pair(taken.asJava, tail.asJava) }) - - override def groupBy[K](f: Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] = - new FlowAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step - - override def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, javadsl.Source[Out]] = - new FlowAdapter(delegate.splitWhen(p.test).map(_.asJava)) - - override def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.flatten(strategy)) - - override def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Flow[In, Out] = - new FlowAdapter(delegate.buffer(size, overflowStrategy)) - - override def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U] = - new FlowAdapter(delegate.expand(seed.apply, (s: S) ⇒ { + /** + * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the subscriber until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * subscriber. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U] = + new Flow(delegate.expand(seed.apply, (s: S) ⇒ { val p = extrapolate.apply(s) (p.first, p.second) })) - override def conflate[S](seed: Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S] = - new FlowAdapter(delegate.conflate(seed.apply, aggregate.apply)) + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Flow[In, Out] = + new Flow(delegate.buffer(size, overflowStrategy)) + + /** + * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. + */ + def transform[U](name: String, mkTransformer: japi.Creator[Transformer[Out, U]]): javadsl.Flow[In, U] = + new Flow(delegate.transform(name, () ⇒ mkTransformer.create())) + + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: japi.Creator[TimerTransformer[Out, U]]): javadsl.Flow[In, U] = + new Flow(delegate.timerTransform(name, () ⇒ mkTransformer.create())) + + /** + * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair + * of an empty collection and a stream containing the whole upstream unchanged. + */ + def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance]]] = + new Flow(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ akka.japi.Pair(taken.asJava, tail.asJava) }) + + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * it is emitted to the downstream subscriber together with a fresh + * flow that will eventually produce all the elements of the substream + * for that key. Not consuming the elements from the created streams will + * stop this processor from processing more elements, therefore you must take + * care to unblock (or cancel) all of the produced streams even if you want + * to consume only one of them. + */ + def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] = + new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step + + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. This means + * that for the following series of predicate values, three substreams will + * be produced with lengths 1, 2, and 3: + * + * {{{ + * false, // element goes into first substream + * true, false, // elements go into second substream + * true, false, false // elements go into third substream + * }}} + */ + def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out]] = + new Flow(delegate.splitWhen(p.test).map(_.asJava)) + + /** + * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. + * This operation can be used on a stream of element type [[Source]]. + */ + def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U] = + new Flow(delegate.flatten(strategy)) } @@ -161,5 +391,6 @@ trait RunnableFlow { /** INTERNAL API */ private[akka] class RunnableFlowAdapter(runnable: scaladsl2.RunnableFlow) extends RunnableFlow { - override def run(materializer: scaladsl2.FlowMaterializer): MaterializedMap = new MaterializedMapAdapter(runnable.run()(materializer)) + override def run(materializer: scaladsl2.FlowMaterializer): MaterializedMap = + new MaterializedMap(runnable.run()(materializer)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index baadb27881..5cd4013d89 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -7,6 +7,8 @@ import akka.stream.javadsl import akka.stream.scaladsl2 import akka.stream._ +import akka.stream.scaladsl2 +import akka.stream.scaladsl2 // elements // @@ -325,6 +327,9 @@ class FlowGraphBuilder(b: scaladsl2.FlowGraphBuilder) { this(new scaladsl2.FlowGraphBuilder()) } + /** Converts this Java DSL element to it's Scala DSL counterpart. */ + def asScala: scaladsl2.FlowGraphBuilder = b + def addEdge[In, Out](source: javadsl.UndefinedSource[In], flow: javadsl.Flow[In, Out], junctionIn: javadsl.JunctionInPort[Out]) = { b.addEdge(source.asScala, flow.asScala, junctionIn.asScala) this @@ -426,7 +431,7 @@ class FlowGraphBuilder(b: scaladsl2.FlowGraphBuilder) { /** Build the [[FlowGraph]] and materialize it. */ def run(materializer: scaladsl2.FlowMaterializer): javadsl.MaterializedMap = - new MaterializedMapAdapter(b.build().run()(materializer)) + new MaterializedMap(b.build().run()(materializer)) } @@ -471,10 +476,8 @@ class FlowGraph(delegate: scaladsl2.FlowGraph) extends RunnableFlow { def asScala: scaladsl2.FlowGraph = delegate - // TODO IMPLEMENT - override def run(materializer: scaladsl2.FlowMaterializer): javadsl.MaterializedMap = - new MaterializedMapAdapter(delegate.run()(materializer)) + new MaterializedMap(delegate.run()(materializer)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala b/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala index 1a93284e3f..a964a835be 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/MaterializedMap.scala @@ -13,24 +13,17 @@ import akka.stream.scaladsl2 * accessor method to retrieve the materialized `Source` or `Sink`, e.g. * [[akka.stream.scaladsl2.SubscriberSource#subscriber]] or [[akka.stream.scaladsl2.PublisherSink#publisher]]. */ -trait MaterializedMap { +class MaterializedMap(delegate: scaladsl2.MaterializedMap) { /** * Retrieve a materialized `Source`, e.g. the `Subscriber` of a [[akka.stream.scaladsl2.SubscriberSource]]. */ - def get[T](key: javadsl.KeyedSource[_, T]): T + def get[T](key: javadsl.KeyedSource[_, T]): T = + delegate.get(key.asScala).asInstanceOf[T] /** * Retrieve a materialized `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. */ - def get[D](key: javadsl.KeyedSink[_, D]): D -} - -/** INTERNAL API */ -private[akka] class MaterializedMapAdapter(delegate: scaladsl2.MaterializedMap) extends MaterializedMap { - - override def get[T](key: javadsl.KeyedSource[_, T]): T = - delegate.get(key.asScala).asInstanceOf[T] - - override def get[D](key: javadsl.KeyedSink[_, D]): D = + def get[D](key: javadsl.KeyedSink[_, D]): D = delegate.get(key.asScala).asInstanceOf[D] + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 65cddb881a..41a3ab6da7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -6,16 +6,18 @@ package akka.stream.javadsl import akka.stream.javadsl import akka.stream.scaladsl2 import org.reactivestreams.{ Publisher, Subscriber } +import scaladsl2.FlowMaterializer import scala.concurrent.Future +/** Java API */ object Sink { - /** - * Java API - * - * Adapt [[scaladsl2.Sink]] for use within JavaDSL - */ - def adapt[O](sink: scaladsl2.Sink[O]): javadsl.Sink[O] = SinkAdapter(sink) + + import akka.stream.scaladsl2.JavaConverters._ + + /** Adapt [[scaladsl2.Sink]] for use within Java DSL */ + def adapt[O](sink: scaladsl2.Sink[O]): javadsl.Sink[O] = + new Sink(sink) /** * A `Sink` that will invoke the given function for every received element, giving it its previous @@ -24,7 +26,47 @@ object Sink { * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ - def fold[U, In](zero: U, f: japi.Function2[U, In, U]): javadsl.KeyedSink[In, Future[U]] = KeyedSink(scaladsl2.Sink.fold[U, In](zero)(f.apply)) + def fold[U, In](zero: U, f: japi.Function2[U, In, U]): javadsl.KeyedSink[In, Future[U]] = + new KeyedSink(scaladsl2.Sink.fold[U, In](zero)(f.apply)) + + /** + * Helper to create [[Sink]] from `Subscriber`. + */ + def create[In](subs: Subscriber[In]): Sink[In] = + new Sink[In](scaladsl2.Sink(subs)) + + /** + * Creates a `Sink` by using an empty [[FlowGraphBuilder]] on a block that expects a [[FlowGraphBuilder]] and + * returns the `UndefinedSource`. + */ + def create[T]()(block: japi.Function[FlowGraphBuilder, UndefinedSource[T]]): Sink[T] = + new Sink(scaladsl2.Sink.apply() { b ⇒ block.apply(b.asJava).asScala }) + + /** + * Creates a `Sink` by using a FlowGraphBuilder from this [[PartialFlowGraph]] on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSource`. + */ + def create[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSource[T]]): Sink[T] = + new Sink[T](scaladsl2.Sink.apply(graph.asScala) { b ⇒ block.apply(b.asJava).asScala }) + + /** + * A `Sink` that immediately cancels its upstream after materialization. + */ + def cancelled[T]: Sink[T] = + new Sink(scaladsl2.Sink.cancelled) + + /** + * A `Sink` that will consume the stream and discard the elements. + */ + def ignore[T](): Sink[T] = + new Sink(scaladsl2.Sink.ignore) + + /** + * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. + * that can handle one [[org.reactivestreams.Subscriber]]. + */ + def publisher[In](): KeyedSink[In, Publisher[In]] = + new KeyedSink(scaladsl2.Sink.publisher) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized @@ -32,58 +74,82 @@ object Sink { * normal end of the stream, or completed with `Failure` if there is an error is signaled in * the stream.. */ - def foreach[In](f: japi.Procedure[In]): javadsl.KeyedSink[In, Future[Unit]] = KeyedSink(scaladsl2.Sink.foreach[In](x ⇒ f(x))) + def foreach[T](f: japi.Procedure[T]): KeyedSink[T, Future[Unit]] = + new KeyedSink(scaladsl2.Sink.foreach(f.apply)) /** - * Helper to create [[Sink]] from `Subscriber`. + * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] + * that can handle more than one [[org.reactivestreams.Subscriber]]. */ - def subscriber[In](subs: Subscriber[In]): KeyedSink[In, Subscriber[In]] = KeyedSink(scaladsl2.Sink(subs)) - - /** - * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. - * that can handle one [[org.reactivestreams.Subscriber]]. - */ - def publisher[In](): KeyedSink[In, Publisher[In]] = KeyedSink(scaladsl2.Sink.publisher) + def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): KeyedSink[Publisher[T], T] = + new KeyedSink(scaladsl2.Sink.fanoutPublisher(initialBufferSize, maximumBufferSize)) /** * A `Sink` that when the flow is completed, either through an error or normal * completion, apply the provided function with [[scala.util.Success]] * or [[scala.util.Failure]]. */ - def onComplete[In](onComplete: akka.dispatch.OnComplete[Unit]): SimpleSink[In] = - SimpleSink(scaladsl2.Sink.onComplete[In](x ⇒ onComplete.apply(x))) + def onComplete[In](onComplete: japi.Procedure[Unit]): Sink[In] = + new Sink(scaladsl2.Sink.onComplete[In](x ⇒ onComplete.apply(x))) /** * A `Sink` that materializes into a `Future` of the first value received. */ - def future[In]: KeyedSink[In, Future[In]] = KeyedSink(scaladsl2.Sink.future[In]) + def future[In]: KeyedSink[In, Future[In]] = + new KeyedSink(scaladsl2.Sink.future[In]) + + /** + * A `Sink` that will invoke the given function for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is an error is signaled in the stream. + */ + def fold[U, T](zero: U, f: Function[akka.japi.Pair[U, T], U]): KeyedSink[T, U] = { + val sSink = scaladsl2.Sink.fold[U, T](zero) { case (a, b) ⇒ f.apply(akka.japi.Pair(a, b)) } + new KeyedSink(sSink) + } + } /** + * Java API + * * A `Sink` is a set of stream processing steps that has one open input and an attached output. * Can be used as a `Subscriber` */ -abstract class Sink[-In] extends javadsl.SinkOps[In] - -/** INTERNAL API */ -private[akka] object SinkAdapter { - def apply[In](sink: scaladsl2.Sink[In]) = new SinkAdapter[In] { def delegate = sink } -} - -/** INTERNAL API */ -private[akka] abstract class SinkAdapter[-In] extends Sink[In] { - - protected def delegate: scaladsl2.Sink[In] +class Sink[-In](delegate: scaladsl2.Sink[In]) { /** Converts this Sink to it's Scala DSL counterpart */ def asScala: scaladsl2.Sink[In] = delegate // RUN WITH // - def runWith[T](source: javadsl.KeyedSource[In, T], materializer: scaladsl2.FlowMaterializer): T = - delegate.runWith(source.asScala)(materializer).asInstanceOf[T] - - def runWith(source: javadsl.SimpleSource[In], materializer: scaladsl2.FlowMaterializer): Unit = - delegate.runWith(source.asScala)(materializer) + /** + * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSource` and run it. + * The returned tuple contains the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a + * [[akka.stream.scaladsl2.SubscriberSource]] and and `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. + * + * @tparam T materialized type of given Source + */ + def runWith[T](source: javadsl.KeyedSource[In, T], materializer: FlowMaterializer): T = + asScala.runWith(source.asScala)(materializer).asInstanceOf[T] + /** + * Connect this `Source` to a `Source` and run it. The returned value is the materialized value + * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. + */ + def runWith(source: javadsl.Source[In], materializer: FlowMaterializer): Unit = + asScala.runWith(source.asScala)(materializer) +} + +/** + * Java API + * + * A `Sink` that will create an object during materialization that the user will need + * to retrieve in order to access aspects of this sink (could be a completion Future + * or a cancellation handle, etc.) + */ +final class KeyedSink[-In, M](delegate: scaladsl2.KeyedSink[In]) extends javadsl.Sink[In](delegate) { + override def asScala: scaladsl2.KeyedSink[In] = super.asScala.asInstanceOf[scaladsl2.KeyedSink[In]] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 880605a784..a9034c8b59 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,57 +3,201 @@ */ package akka.stream.javadsl -import akka.stream._ - -import java.util import java.util.concurrent.Callable import akka.japi.Util -import akka.stream.javadsl.japi.{ Predicate, Function2, Creator, Function } -import akka.stream.scaladsl2._ -import org.reactivestreams.{ Subscriber, Publisher } +import akka.stream._ +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber import scaladsl2.FlowMaterializer + import scala.annotation.unchecked.uncheckedVariance -import scala.collection.immutable import scala.collection.JavaConverters._ -import scala.collection.immutable.Seq import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration - import scala.language.higherKinds import scala.language.implicitConversions +/** Java API */ +object Source { + + import scaladsl2.JavaConverters._ + + /** Adapt [[scaladsl2.Source]] for use within JavaDSL */ + def adapt[O](source: scaladsl2.Source[O]): Source[O] = + new Source(source) + + /** Adapt [[scaladsl2.SourcePipe]] for use within JavaDSL */ + def adapt[O](source: scaladsl2.SourcePipe[O]): Source[O] = + new Source(source) + + /** + * Create a `Source` with no elements, i.e. an empty stream that is completed immediately + * for every connected `Sink`. + */ + def empty[O](): Source[O] = + new Source(scaladsl2.Source.empty()) + + /** + * Helper to create [[Source]] from `Publisher`. + * + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[org.reactivestreams.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + */ + def from[O](publisher: Publisher[O]): javadsl.Source[O] = + new Source(scaladsl2.Source.apply(publisher)) + + /** + * Helper to create [[Source]] from `Iterator`. + * Example usage: + * + * {{{ + * List data = new ArrayList(); + * data.add(1); + * data.add(2); + * data.add(3); + * Source.from(data.iterator()); + * }}} + * + * Start a new `Source` from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the `next()` method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. + */ + def from[O](iterator: java.util.Iterator[O]): javadsl.Source[O] = + new Source(scaladsl2.Source(iterator.asScala)) + + /** + * Helper to create [[Source]] from `Iterable`. + * Example usage: + * {{{ + * List data = new ArrayList(); + * data.add(1); + * data.add(2); + * data.add(3); + * Source.fom(data); + * }}} + * + * Starts a new `Source` from the given `Iterable`. This is like starting from an + * Iterator, but every Subscriber directly attached to the Publisher of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ + def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O] = + new Source(scaladsl2.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable))) + + /** + * Define the sequence of elements to be produced by the given closure. + * The stream ends normally when evaluation of the closure returns a `None`. + * The stream ends exceptionally when an exception is thrown from the closure. + */ + def from[O](f: japi.Creator[akka.japi.Option[O]]): javadsl.Source[O] = + new Source(scaladsl2.Source(() ⇒ f.create().asScala)) + + /** + * Start a new `Source` from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ + def from[O](future: Future[O]): javadsl.Source[O] = + new Source(scaladsl2.Source(future)) + + /** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ + def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.Source[O] = + new Source(scaladsl2.Source(initialDelay, interval, () ⇒ tick.call())) + + /** + * Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSink`. + */ + def from[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] = + new Source(scaladsl2.Source(graph.asScala)(x ⇒ block.apply(x.asJava).asScala)) + + /** + * Create a `Source` with one element. + * Every connected `Sink` of this stream will see an individual stream consisting of one element. + */ + def singleton[T](element: T): Source[T] = + new Source(scaladsl2.Source.singleton(element)) + + /** + * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. + */ + def failed[T](cause: Throwable): Source[T] = + new Source(scaladsl2.Source.failed(cause)) + + /** + * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] + */ + def subscriber[T](): KeyedSource[Subscriber[T], T] = + new KeyedSource(scaladsl2.Source.subscriber) + + /** + * Concatenates two sources so that the first element + * emitted by the second source is emitted after the last element of the first + * source. + */ + def concat[T](first: Source[T], second: Source[T]): Source[T] = + new KeyedSource(scaladsl2.Source.concat(first.asScala, second.asScala)) +} + /** * Java API * * A `Source` is a set of stream processing steps that has one open output and an attached input. * Can be used as a `Publisher` */ -abstract class Source[+Out] extends javadsl.SourceOps[Out] { +class Source[+Out](delegate: scaladsl2.Source[Out]) { + import akka.stream.scaladsl2.JavaConverters._ + + import scala.collection.JavaConverters._ + + /** Converts this Java DSL element to it's Scala DSL counterpart. */ + def asScala: scaladsl2.Source[Out] = delegate + + // CONNECT // /** * Transform this source by appending the given processing stages. */ - def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] + def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] = + new Source(delegate.connect(flow.asScala)) /** * Connect this source to a sink, concatenating the processing steps of both. */ - def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow + def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow = + new RunnableFlowAdapter(delegate.connect(sink.asScala)) + + // RUN WITH // /** - * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value - * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. + * Connect this `Source` to a `KeyedSink` and run it. + * The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. * * @tparam S materialized type of the given Sink */ - def runWith[S](sink: KeyedSink[Out, S], materializer: FlowMaterializer): S + def runWith[S](sink: KeyedSink[Out, S], materializer: FlowMaterializer): S = + asScala.runWith(sink.asScala)(materializer).asInstanceOf[S] /** - * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value - * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. + * Connect this `Source` to a `Sink` and run it. + * The returned value is the materialized value of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. */ - def runWith(sink: SimpleSink[Out], materializer: FlowMaterializer): Unit + def runWith(sink: Sink[Out], materializer: FlowMaterializer): Unit = + delegate.connect(sink.asScala).run()(materializer) + + // OPS // /** * Shortcut for running this `Source` with a fold function. @@ -63,14 +207,16 @@ abstract class Source[+Out] extends javadsl.SourceOps[Out] { * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ - def fold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] + def fold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = + runWith(Sink.fold(zero, f), materializer) /** * Concatenates a second source so that the first element * emitted by that source is emitted after the last element of this * source. */ - def concat[Out2 >: Out](second: Source[Out2]): Source[Out2] + def concat[Out2 >: Out](second: Source[Out2]): Source[Out2] = + delegate.concat(second.asScala).asJava /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked @@ -79,207 +225,270 @@ abstract class Source[+Out] extends javadsl.SourceOps[Out] { * normal end of the stream, or completed with `Failure` if there is an error is signaled in * the stream. */ - def foreach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] -} - -object Source { - - /** - * Java API - * - * Adapt [[scaladsl2.Source]] for use within JavaDSL - */ - def adapt[O](source: scaladsl2.Source[O]): Source[O] = SourceAdapter(source) - - /** - * Java API - * Adapt [[scaladsl2.SourcePipe]] for use within JavaDSL - */ - def adapt[O](source: scaladsl2.SourcePipe[O]): Source[O] = SourceAdapter(source) - - /** - * Java API - * - * Helper to create [[Source]] from `Publisher`. - * - * Construct a transformation starting with given publisher. The transformation steps - * are executed by a series of [[org.reactivestreams.Processor]] instances - * that mediate the flow of elements downstream and the propagation of - * back-pressure upstream. - */ - def from[O](publisher: Publisher[O]): javadsl.Source[O] = - SourceAdapter(scaladsl2.Source.apply(publisher)) - - /** - * Java API - * - * Helper to create [[Source]] from `Iterator`. - * Example usage: `Source(Seq(1,2,3).iterator)` - * - * Start a new `Source` from the given Iterator. The produced stream of elements - * will continue until the iterator runs empty or fails during evaluation of - * the `next()` method. Elements are pulled out of the iterator - * in accordance with the demand coming from the downstream transformation - * steps. - */ - def from[O](iterator: java.util.Iterator[O]): javadsl.Source[O] = - SourceAdapter(scaladsl2.Source(iterator.asScala)) - - /** - * Java API - * - * Helper to create [[Source]] from `Iterable`. - * Example usage: `Source.from(Seq(1,2,3))` - * - * Starts a new `Source` from the given `Iterable`. This is like starting from an - * Iterator, but every Subscriber directly attached to the Publisher of this - * stream will see an individual flow of elements (always starting from the - * beginning) regardless of when they subscribed. - */ - def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O] = - SourceAdapter(scaladsl2.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable))) - - /** - * Java API - * - * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure returns a `None`. - * The stream ends exceptionally when an exception is thrown from the closure. - */ - def from[O](f: japi.Creator[akka.japi.Option[O]]): javadsl.Source[O] = - SourceAdapter(scaladsl2.Source(() ⇒ f.create().asScala)) - - /** - * Java API - * - * Start a new `Source` from the given `Future`. The stream will consist of - * one element when the `Future` is completed with a successful value, which - * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. - */ - def from[O](future: Future[O]): javadsl.Source[O] = - SourceAdapter(scaladsl2.Source(future)) - - /** - * Java API - * - * Elements are produced from the tick closure periodically with the specified interval. - * The tick element will be delivered to downstream consumers that has requested any elements. - * If a consumer has not requested any elements at the point in time when the tick - * element is produced it will not receive that tick element later. It will - * receive new tick elements as soon as it has requested more elements. - */ - def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.Source[O] = - SourceAdapter(scaladsl2.Source(initialDelay, interval, () ⇒ tick.call())) -} - -/** INTERNAL API */ -private[akka] object SourceAdapter { - - def apply[O](source: scaladsl2.Source[O]): javadsl.Source[O] = - new SourceAdapter[O] { def delegate = source } -} - -/** INTERNAL API */ -private[akka] abstract class SourceAdapter[+Out] extends Source[Out] { - - import scala.collection.JavaConverters._ - import akka.stream.scaladsl2.JavaConverters._ - - protected def delegate: scaladsl2.Source[Out] - - /** Converts this Source to it's Scala DSL counterpart */ - def asScala: scaladsl2.Source[Out] = delegate - - // SOURCE // - - override def connect[T](flow: javadsl.Flow[Out, T]): javadsl.Source[T] = - SourceAdapter(delegate.connect(flow.asScala)) - - override def connect(sink: javadsl.Sink[Out]): javadsl.RunnableFlow = - new RunnableFlowAdapter(delegate.connect(sink.asScala)) - - override def runWith[D](sink: KeyedSink[Out, D], materializer: FlowMaterializer): D = - asScala.runWith(sink.asScala)(materializer).asInstanceOf[D] - - override def runWith(sink: SimpleSink[Out], materializer: FlowMaterializer): Unit = - delegate.connect(sink.asScala).run()(materializer) - - override def fold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = - runWith(Sink.fold(zero, f), materializer) - - override def concat[Out2 >: Out](second: javadsl.Source[Out2]): javadsl.Source[Out2] = - delegate.concat(second.asScala).asJava - - override def foreach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = + def foreach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = runWith(Sink.foreach(f), materializer) // COMMON OPS // - override def map[T](f: Function[Out, T]): javadsl.Source[T] = - SourceAdapter(delegate.map(f.apply)) + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. + */ + def map[T](f: japi.Function[Out, T]): javadsl.Source[T] = + new Source(delegate.map(f.apply)) - override def mapConcat[T](f: Function[Out, java.util.List[T]]): javadsl.Source[T] = - SourceAdapter(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + /** + * Transform each input element into a sequence of output elements that is + * then flattened into the output stream. + */ + def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Source[T] = + new Source(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) - override def mapAsync[T](f: Function[Out, Future[T]]): javadsl.Source[T] = - SourceAdapter(delegate.mapAsync(f.apply)) + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and may complete in any order, but the elements that + * are emitted downstream are in the same order as from upstream. + * + * @see [[#mapAsyncUnordered]] + */ + def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Source[T] = + new Source(delegate.mapAsync(f.apply)) - override def mapAsyncUnordered[T](f: Function[Out, Future[T]]): javadsl.Source[T] = - SourceAdapter(delegate.mapAsyncUnordered(f.apply)) + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and each processed element will be emitted dowstream + * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream + * in the same order as from upstream. + * + * @see [[#mapAsync]] + */ + def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Source[T] = + new Source(delegate.mapAsyncUnordered(f.apply)) - override def filter(p: Predicate[Out]): javadsl.Source[Out] = - SourceAdapter(delegate.filter(p.test)) + /** + * Only pass on those elements that satisfy the given predicate. + */ + def filter(p: japi.Predicate[Out]): javadsl.Source[Out] = + new Source(delegate.filter(p.test)) - override def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T] = - SourceAdapter(delegate.collect(pf)) + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + */ + def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T] = + new Source(delegate.collect(pf)) - override def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance]] = - SourceAdapter(delegate.grouped(n).map(_.asJava)) + /** + * Chunk up this stream into groups of the given size, with the last group + * possibly smaller than requested due to end-of-stream. + * + * @param n must be positive, otherwise [[IllegalArgumentException]] is thrown. + */ + def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance]] = + new Source(delegate.grouped(n).map(_.asJava)) - override def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance]] = - SourceAdapter(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * @param n must be positive, and `d` must be greater than 0 seconds, otherwise [[IllegalArgumentException]] is thrown. + */ + def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance]] = + new Source(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step - override def drop(n: Int): javadsl.Source[Out] = - SourceAdapter(delegate.drop(n)) + /** + * Discard the given number of elements at the beginning of the stream. + * No elements will be dropped if `n` is zero or negative. + */ + def drop(n: Int): javadsl.Source[Out] = + new Source(delegate.drop(n)) - override def dropWithin(d: FiniteDuration): javadsl.Source[Out] = - SourceAdapter(delegate.dropWithin(d)) + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): javadsl.Source[Out] = + new Source(delegate.dropWithin(d)) - override def take(n: Int): javadsl.Source[Out] = - SourceAdapter(delegate.take(n)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * number of elements. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * @param n if `n` is zero or negative the stream will be completed without producing any elements. + */ + def take(n: Int): javadsl.Source[Out] = + new Source(delegate.take(n)) - override def takeWithin(d: FiniteDuration): javadsl.Source[Out] = - SourceAdapter(delegate.takeWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): javadsl.Source[Out] = + new Source(delegate.takeWithin(d)) - override def conflate[S](seed: Function[Out, S], aggregate: Function2[S, Out, S]): javadsl.Source[S] = - SourceAdapter(delegate.conflate(seed.apply, aggregate.apply)) + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Source[S] = + new Source(delegate.conflate(seed.apply, aggregate.apply)) - override def expand[S, U](seed: Function[Out, S], extrapolate: Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U] = - SourceAdapter(delegate.expand(seed.apply, (s: S) ⇒ { + /** + * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the subscriber until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * subscriber. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U] = + new Source(delegate.expand(seed.apply, (s: S) ⇒ { val p = extrapolate.apply(s) (p.first, p.second) })) - override def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Source[Out] = - SourceAdapter(delegate.buffer(size, overflowStrategy)) + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Source[Out] = + new Source(delegate.buffer(size, overflowStrategy)) - override def transform[T](name: String, mkTransformer: japi.Creator[Transformer[Out, T]]): javadsl.Source[T] = - SourceAdapter(delegate.transform(name, () ⇒ mkTransformer.create())) + /** + * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. + */ + def transform[U](name: String, mkTransformer: japi.Creator[Transformer[Out, U]]): javadsl.Source[U] = + new Source(delegate.transform(name, () ⇒ mkTransformer.create())) - override def timerTransform[U](name: String, mkTransformer: Creator[TimerTransformer[Out, U]]): javadsl.Source[U] = - SourceAdapter(delegate.timerTransform(name, () ⇒ mkTransformer.create())) + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: japi.Creator[TimerTransformer[Out, U]]): javadsl.Source[U] = + new Source(delegate.timerTransform(name, () ⇒ mkTransformer.create())) - override def prefixAndTail(n: Int): javadsl.Source[akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance]]] = - SourceAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ akka.japi.Pair(taken.asJava, tail.asJava) }) + /** + * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair + * of an empty collection and a stream containing the whole upstream unchanged. + */ + def prefixAndTail(n: Int): javadsl.Source[akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance]]] = + new Source(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ akka.japi.Pair(taken.asJava, tail.asJava) }) - override def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] = - SourceAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * it is emitted to the downstream subscriber together with a fresh + * flow that will eventually produce all the elements of the substream + * for that key. Not consuming the elements from the created streams will + * stop this processor from processing more elements, therefore you must take + * care to unblock (or cancel) all of the produced streams even if you want + * to consume only one of them. + */ + def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] = + new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step - override def splitWhen(p: japi.Predicate[Out]): javadsl.Source[javadsl.Source[Out]] = - SourceAdapter(delegate.splitWhen(p.test).map(_.asJava)) + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. This means + * that for the following series of predicate values, three substreams will + * be produced with lengths 1, 2, and 3: + * + * {{{ + * false, // element goes into first substream + * true, false, // elements go into second substream + * true, false, false // elements go into third substream + * }}} + */ + def splitWhen(p: japi.Predicate[Out]): javadsl.Source[javadsl.Source[Out]] = + new Source(delegate.splitWhen(p.test).map(_.asJava)) - override def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U] = - SourceAdapter(delegate.flatten(strategy)) + /** + * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. + * This operation can be used on a stream of element type [[Source]]. + */ + def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Source[U] = + new Source(delegate.flatten(strategy)) } + +/** + * Java API + * + * A `Source` that will create an object during materialization that the user will need + * to retrieve in order to access aspects of this source (could be a Subscriber, a Future/Promise, etc.). + */ +final class KeyedSource[+Out, T](delegate: scaladsl2.Source[Out]) extends Source[Out](delegate) { + override def asScala: scaladsl2.KeyedActorFlowSource[Out] = super.asScala.asInstanceOf[scaladsl2.KeyedActorFlowSource[Out]] +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamOps.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamOps.scala deleted file mode 100644 index 9f30b87c92..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamOps.scala +++ /dev/null @@ -1,560 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl - -import akka.stream._ -import akka.stream.scaladsl2._ -import scaladsl2.FlowMaterializer - -import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration - -/** Java API */ -trait FlowOps[-In, +Out] { - - // RUN WITH // - - /** - * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. - * - * The returned tuple contains the materialized values of the `Source` and `Sink`, - * e.g. the `Subscriber` of a [[SubscriberSource]] and `Publisher` of a [[PublisherSink]]. - * - * @tparam T materialized type of given Source - * @tparam U materialized type of given Sink - */ - def runWith[T, U](source: javadsl.KeyedSource[In, T], sink: javadsl.KeyedSink[Out, U], materializer: FlowMaterializer): akka.japi.Pair[T, U] - - /** - * Connect the `Source` to this `Flow and then connect it to the `Sink` and run it. - * - * The returned value will contain the materialized value of the `KeyedSink`, e.g. `Publisher` of a [[PublisherSink]]. - * - * @tparam T materialized type of given Sink - */ - def runWith[T](source: javadsl.SimpleSource[In], sink: javadsl.KeyedSink[Out, T], materializer: FlowMaterializer): T - - /** - * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. - * - * The returned value will contain the materialized value of the `KeyedSource`, e.g. `Subscriber` of a [[SubscriberSource]]. - * - * @tparam T materialized type of given Source - */ - def runWith[T](source: javadsl.KeyedSource[In, T], sink: javadsl.SimpleSink[Out], materializer: FlowMaterializer): T - - /** - * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. - * - * As both `Source` and `Sink` are "simple", no value is returned from this `runWith` overload. - */ - def runWith(source: javadsl.SimpleSource[In], sink: javadsl.SimpleSink[Out], materializer: FlowMaterializer): Unit - - // COMMON OPS // - - /** - * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. - */ - def map[T](f: japi.Function[Out, T]): javadsl.Flow[In, T] - - /** - * Transform each input element into a sequence of output elements that is - * then flattened into the output stream. - */ - def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Flow[In, T] - - /** - * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by - * downstream may run in parallel and may complete in any order, but the elements that - * are emitted downstream are in the same order as from upstream. - * - * @see [[#mapAsyncUnordered]] - */ - def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] - - /** - * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by - * downstream may run in parallel and each processed element will be emitted dowstream - * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream - * in the same order as from upstream. - * - * @see [[#mapAsync]] - */ - def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] - - /** - * Only pass on those elements that satisfy the given predicate. - */ - def filter(p: japi.Predicate[Out]): javadsl.Flow[In, Out] - - /** - * Transform this stream by applying the given partial function to each of the elements - * on which the function is defined as they pass through this processing step. - * Non-matching elements are filtered out. - */ - def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T] - - /** - * Chunk up this stream into groups of the given size, with the last group - * possibly smaller than requested due to end-of-stream. - * - * `n` must be positive, otherwise IllegalArgumentException is thrown. - */ - def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] - - /** - * Chunk up this stream into groups of elements received within a time window, - * or limited by the given number of elements, whatever happens first. - * Empty groups will not be emitted if no elements are received from upstream. - * The last group before end-of-stream will contain the buffered elements - * since the previously emitted group. - * - * `n` must be positive, and `d` must be greater than 0 seconds, otherwise - * IllegalArgumentException is thrown. - */ - def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] - - /** - * Discard the given number of elements at the beginning of the stream. - * No elements will be dropped if `n` is zero or negative. - */ - def drop(n: Int): javadsl.Flow[In, Out] - - /** - * Discard the elements received within the given duration at beginning of the stream. - */ - def dropWithin(d: FiniteDuration): javadsl.Flow[In, Out] - - /** - * Terminate processing (and cancel the upstream publisher) after the given - * number of elements. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. - * - * The stream will be completed without producing any elements if `n` is zero - * or negative. - */ - def take(n: Int): javadsl.Flow[In, Out] - - /** - * Terminate processing (and cancel the upstream publisher) after the given - * duration. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. - * - * Note that this can be combined with [[#take]] to limit the number of elements - * within the duration. - */ - def takeWithin(d: FiniteDuration): javadsl.Flow[In, Out] - - /** - * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary - * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the - * upstream publisher is faster. - * - * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not - * duplicate elements. - * - * @param seed Provides the first state for a conflated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate - */ - def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S] - - /** - * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older - * element until new element comes from the upstream. For example an expand step might repeat the last element for - * the subscriber until it receives an update from upstream. - * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. - * - * @param seed Provides the first state for extrapolation using the first unconsumed element - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. - */ - def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U] - - /** - * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. - * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no - * space available - * - * @param size The size of the buffer in element count - * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer - */ - def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Flow[In, Out] - - /** - * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] - * function is invoked, expecting a (possibly empty) sequence of output elements - * to be produced. - * After handing off the elements produced from one input element to the downstream - * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end - * stream processing at this point; in that case the upstream subscription is - * canceled. Before signaling normal completion to the downstream subscribers, - * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) - * sequence of elements in response to the end-of-stream event. - * - * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. - * - * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. - * - * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with - * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and - * therefore you do not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. - */ - def transform[U](name: String, mkTransformer: japi.Creator[Transformer[Out, U]]): javadsl.Flow[In, U] - - /** - * Transformation of a stream, with additional support for scheduled events. - * - * For each element the [[akka.stream.Transformer#onNext]] - * function is invoked, expecting a (possibly empty) sequence of output elements - * to be produced. - * After handing off the elements produced from one input element to the downstream - * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end - * stream processing at this point; in that case the upstream subscription is - * canceled. Before signaling normal completion to the downstream subscribers, - * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) - * sequence of elements in response to the end-of-stream event. - * - * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. - * - * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. - * - * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with - * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and - * therefore you do not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. - */ - def timerTransform[U](name: String, mkTransformer: japi.Creator[TimerTransformer[Out, U]]): javadsl.Flow[In, U] - - /** - * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element - * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair - * of an empty collection and a stream containing the whole upstream unchanged. - */ - def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance]]] - - /** - * This operation demultiplexes the incoming stream into separate output - * streams, one for each element key. The key is computed for each element - * using the given function. When a new key is encountered for the first time - * it is emitted to the downstream subscriber together with a fresh - * flow that will eventually produce all the elements of the substream - * for that key. Not consuming the elements from the created streams will - * stop this processor from processing more elements, therefore you must take - * care to unblock (or cancel) all of the produced streams even if you want - * to consume only one of them. - */ - def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] - - /** - * This operation applies the given predicate to all incoming elements and - * emits them to a stream of output streams, always beginning a new one with - * the current element if the given predicate returns true for it. This means - * that for the following series of predicate values, three substreams will - * be produced with lengths 1, 2, and 3: - * - * {{{ - * false, // element goes into first substream - * true, false, // elements go into second substream - * true, false, false // elements go into third substream - * }}} - */ - def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out]] - - /** - * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type [[Source]]. - */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Flow[In, U] - -} - -/** Java API */ -trait SourceOps[+Out] { - - // RUN WITH // - - /** - * Connect the `Sink` to this `Source` and then connect it to the `Source` and run it. - * - * The returned value the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a - * [[akka.stream.scaladsl2.SubscriberSource]] and and `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. - * - * @tparam M materialized type of given Source - */ - def runWith[M](sink: javadsl.KeyedSink[Out, M], materializer: FlowMaterializer): M - - /** - * Connect this `Source` to a `Source` and run it. - */ - def runWith(sink: javadsl.SimpleSink[Out], materializer: FlowMaterializer): Unit - - // COMMON OPS // - - /** - * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. - */ - def map[T](f: japi.Function[Out, T]): javadsl.Source[T] - - /** - * Transform each input element into a sequence of output elements that is - * then flattened into the output stream. - */ - def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Source[T] - - /** - * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by - * downstream may run in parallel and may complete in any order, but the elements that - * are emitted downstream are in the same order as from upstream. - * - * @see [[#mapAsyncUnordered]] - */ - def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Source[T] - - /** - * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by - * downstream may run in parallel and each processed element will be emitted dowstream - * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream - * in the same order as from upstream. - * - * @see [[#mapAsync]] - */ - def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Source[T] - - /** - * Only pass on those elements that satisfy the given predicate. - */ - def filter(p: japi.Predicate[Out]): javadsl.Source[Out] - - /** - * Transform this stream by applying the given partial function to each of the elements - * on which the function is defined as they pass through this processing step. - * Non-matching elements are filtered out. - */ - def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T] - - /** - * Chunk up this stream into groups of the given size, with the last group - * possibly smaller than requested due to end-of-stream. - * - * `n` must be positive, otherwise IllegalArgumentException is thrown. - */ - def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance]] - - /** - * Chunk up this stream into groups of elements received within a time window, - * or limited by the given number of elements, whatever happens first. - * Empty groups will not be emitted if no elements are received from upstream. - * The last group before end-of-stream will contain the buffered elements - * since the previously emitted group. - * - * `n` must be positive, and `d` must be greater than 0 seconds, otherwise - * IllegalArgumentException is thrown. - */ - def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance]] - - /** - * Discard the given number of elements at the beginning of the stream. - * No elements will be dropped if `n` is zero or negative. - */ - def drop(n: Int): javadsl.Source[Out] - - /** - * Discard the elements received within the given duration at beginning of the stream. - */ - def dropWithin(d: FiniteDuration): javadsl.Source[Out] - - /** - * Terminate processing (and cancel the upstream publisher) after the given - * number of elements. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. - * - * The stream will be completed without producing any elements if `n` is zero - * or negative. - */ - def take(n: Int): javadsl.Source[Out] - - /** - * Terminate processing (and cancel the upstream publisher) after the given - * duration. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. - * - * Note that this can be combined with [[#take]] to limit the number of elements - * within the duration. - */ - def takeWithin(d: FiniteDuration): javadsl.Source[Out] - - /** - * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary - * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the - * upstream publisher is faster. - * - * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not - * duplicate elements. - * - * @param seed Provides the first state for a conflated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate - */ - def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Source[S] - - /** - * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older - * element until new element comes from the upstream. For example an expand step might repeat the last element for - * the subscriber until it receives an update from upstream. - * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. - * - * @param seed Provides the first state for extrapolation using the first unconsumed element - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. - */ - def expand[S, U](seed: japi.Function[Out, S], extrapolate: japi.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U] - - /** - * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. - * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no - * space available - * - * @param size The size of the buffer in element count - * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer - */ - def buffer(size: Int, overflowStrategy: OverflowStrategy): javadsl.Source[Out] - - /** - * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] - * function is invoked, expecting a (possibly empty) sequence of output elements - * to be produced. - * After handing off the elements produced from one input element to the downstream - * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end - * stream processing at this point; in that case the upstream subscription is - * canceled. Before signaling normal completion to the downstream subscribers, - * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) - * sequence of elements in response to the end-of-stream event. - * - * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. - * - * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. - * - * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with - * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and - * therefore you do not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. - */ - def transform[U](name: String, mkTransformer: japi.Creator[Transformer[Out, U]]): javadsl.Source[U] - - /** - * Transformation of a stream, with additional support for scheduled events. - * - * For each element the [[akka.stream.Transformer#onNext]] - * function is invoked, expecting a (possibly empty) sequence of output elements - * to be produced. - * After handing off the elements produced from one input element to the downstream - * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end - * stream processing at this point; in that case the upstream subscription is - * canceled. Before signaling normal completion to the downstream subscribers, - * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) - * sequence of elements in response to the end-of-stream event. - * - * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. - * - * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. - * - * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with - * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and - * therefore you do not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. - */ - def timerTransform[U](name: String, mkTransformer: japi.Creator[TimerTransformer[Out, U]]): javadsl.Source[U] - - /** - * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element - * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair - * of an empty collection and a stream containing the whole upstream unchanged. - */ - def prefixAndTail(n: Int): javadsl.Source[akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance]]] - - /** - * This operation demultiplexes the incoming stream into separate output - * streams, one for each element key. The key is computed for each element - * using the given function. When a new key is encountered for the first time - * it is emitted to the downstream subscriber together with a fresh - * flow that will eventually produce all the elements of the substream - * for that key. Not consuming the elements from the created streams will - * stop this processor from processing more elements, therefore you must take - * care to unblock (or cancel) all of the produced streams even if you want - * to consume only one of them. - */ - def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] - - /** - * This operation applies the given predicate to all incoming elements and - * emits them to a stream of output streams, always beginning a new one with - * the current element if the given predicate returns true for it. This means - * that for the following series of predicate values, three substreams will - * be produced with lengths 1, 2, and 3: - * - * {{{ - * false, // element goes into first substream - * true, false, // elements go into second substream - * true, false, false // elements go into third substream - * }}} - */ - def splitWhen(p: japi.Predicate[Out]): javadsl.Source[javadsl.Source[Out]] - - /** - * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type [[Source]]. - */ - def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): javadsl.Source[U] - -} - -/** Java API */ -trait SinkOps[-In] { - - // RUN WITH // - - /** - * Connect the `Source` to this `Flow` and then connect it to the `Source` and run it. - * The returned tuple contains the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a - * [[akka.stream.scaladsl2.SubscriberSource]] and and `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. - * - * @tparam T materialized type of given Source - */ - def runWith[T](source: javadsl.KeyedSource[In, T], materializer: FlowMaterializer): T - - /** - * Connect this `Source` to a `Source` and run it. The returned value is the materialized value - * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl2.PublisherSink]]. - */ - def runWith(source: javadsl.SimpleSource[In], materializer: FlowMaterializer): Unit -} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/JavaConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/JavaConverters.scala index 0b948a9e2b..c189fac4cc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/JavaConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/JavaConverters.scala @@ -4,9 +4,6 @@ package akka.stream.scaladsl2 import akka.stream.javadsl -import akka.stream.javadsl.FlowAdapter -import akka.stream.javadsl.SinkAdapter -import akka.stream.javadsl.SourceAdapter import akka.stream.scaladsl2 /** @@ -15,34 +12,40 @@ import akka.stream.scaladsl2 private[akka] object JavaConverters { implicit final class AddAsJavaSource[Out](val source: scaladsl2.Source[Out]) extends AnyVal { - def asJava: javadsl.Source[Out] = SourceAdapter(source) + def asJava: javadsl.Source[Out] = new javadsl.Source(source) } implicit final class AddAsJavaUndefinedSource[Out](val source: scaladsl2.UndefinedSource[Out]) extends AnyVal { def asJava: javadsl.UndefinedSource[Out] = new javadsl.UndefinedSource(source) } implicit final class AddAsJavaFlow[In, Out](val flow: scaladsl2.Flow[In, Out]) extends AnyVal { - def asJava: javadsl.Flow[In, Out] = new FlowAdapter[In, Out](flow) + def asJava: javadsl.Flow[In, Out] = new javadsl.Flow[In, Out](flow) } implicit final class AddAsJavaSink[In](val sink: scaladsl2.Sink[In]) extends AnyVal { - def asJava: javadsl.Sink[In] = SinkAdapter[In](sink) + def asJava: javadsl.Sink[In] = new javadsl.Sink[In](sink) } implicit final class AddAsJavaUndefinedSink[Out](val sink: scaladsl2.UndefinedSink[Out]) extends AnyVal { def asJava: javadsl.UndefinedSink[Out] = new javadsl.UndefinedSink(sink) } + implicit final class AsAsJavaFlowGraphBuilder[Out](val builder: scaladsl2.FlowGraphBuilder) extends AnyVal { + def asJava: javadsl.FlowGraphBuilder = new javadsl.FlowGraphBuilder(builder) + } implicit final class AddAsScalaSource[Out](val source: javadsl.Source[Out]) extends AnyVal { - def asScala: scaladsl2.Source[Out] = source.asInstanceOf[javadsl.SourceAdapter[Out]].asScala + def asScala: scaladsl2.Source[Out] = source.asInstanceOf[javadsl.Source[Out]].asScala } implicit final class AsAsScalaUndefinedSource[Out](val source: javadsl.UndefinedSource[Out]) extends AnyVal { def asScala: scaladsl2.UndefinedSource[Out] = source.asScala } implicit final class AddAsScalaFlow[In, Out](val flow: javadsl.Flow[In, Out]) extends AnyVal { - def asScala: scaladsl2.Flow[In, Out] = flow.asInstanceOf[javadsl.FlowAdapter[In, Out]].asScala + def asScala: scaladsl2.Flow[In, Out] = flow.asInstanceOf[javadsl.Flow[In, Out]].asScala } implicit final class AddAsScalaSink[In](val sink: javadsl.Sink[In]) extends AnyVal { - def asScala: scaladsl2.Sink[In] = sink.asInstanceOf[javadsl.SinkAdapter[In]].asScala + def asScala: scaladsl2.Sink[In] = sink.asInstanceOf[javadsl.Sink[In]].asScala } implicit final class AsAsScalaUndefinedSink[Out](val sink: javadsl.UndefinedSink[Out]) extends AnyVal { def asScala: scaladsl2.UndefinedSink[Out] = sink.asScala } + implicit final class AsAsScalaFlowGraphBuilder[Out](val builder: javadsl.FlowGraphBuilder) extends AnyVal { + def asScala: FlowGraphBuilder = builder.asScala + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala index 2874211b28..ac4da8368a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Pipe.scala @@ -81,7 +81,7 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As /** * Pipe with attached input and output, can be executed. */ -private[scaladsl2] final case class RunnablePipe(input: Source[_], output: Sink[_], ops: List[AstNode]) extends RunnableFlow { +private[stream] final case class RunnablePipe(input: Source[_], output: Sink[_], ops: List[AstNode]) extends RunnableFlow { def run()(implicit materializer: FlowMaterializer): MaterializedMap = materializer.materialize(input, output, ops) }