diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 4d51e0c946..a0a26b8d36 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -447,11 +447,48 @@ The ``InputStream`` will be closed when the ``Source`` is canceled from its down asOutputStream ^^^^^^^^^^^^^^ Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they -are emitted from the source +are emitted from the source. The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and closing the ``OutputStream`` will complete the ``Source``. +asJavaStream +^^^^^^^^^^^^ +Create a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. +Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + +The Java 8 a ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java +``Stream`` will cancel the inflow of this ``Sink``. Java ``Stream`` throws exception in case reactive stream failed. + +Be aware that Java 8 ``Stream`` blocks current thread while waiting on next element from downstream. + +fromJavaStream +^^^^^^^^^^^^^^ +Create a source that wraps Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its elements and send them +downstream on demand. + +javaCollector +^^^^^^^^^^^^^ +Create a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector`` +transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. +The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable +result container, optionally transformed into a final representation after all input elements have been processed. +The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially + +Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able +to handle multiple invocations. + +javaCollectorParallelUnordered +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Create a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 Collector +transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. +The ``Collector`` will trigger demand downstream.. Elements emitted through the stream will be accumulated into a mutable +result container, optionally transformed into a final representation after all input elements have been processed. +The ``Collector`` can also do reduction at the end. Reduction processing is performed in parallel based on graph ``Balance``. + +Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able +to handle multiple invocations. + File IO Sinks and Sources ------------------------- Sources and sinks for reading and writing files can be found on ``FileIO``. diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index b307e23ec3..09a5288523 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -436,11 +436,48 @@ The ``InputStream`` will be closed when the ``Source`` is canceled from its down asOutputStream ^^^^^^^^^^^^^^ Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they -are emitted from the source +are emitted from the source. The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and closing the ``OutputStream`` will complete the ``Source``. +asJavaStream +^^^^^^^^^^^^ +Create a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. +Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + +The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java +``Stream`` will cancel the inflow of this ``Sink``. Java ``Stream`` throws exception in case reactive stream failed. + +Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + +fromJavaStream +^^^^^^^^^^^^^^ +Create a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its elements and send them +downstream on demand. + +javaCollector +^^^^^^^^^^^^^ +Create a sink which materializes into a ``Future`` which will be completed with a result of the Java 8 ``Collector`` +transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. +The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable +result container, optionally transformed into a final representation after all input elements have been processed. +The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially + +Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able +to handle multiple invocations. + +javaCollectorParallelUnordered +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Create a sink which materializes into a ``Future`` which will be completed with a result of the Java 8 ``Collector`` +transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. +The ``Collector`` is triggering demand downstream. Elements emitted through the stream will be accumulated into a mutable +result container, optionally transformed into a final representation after all input elements have been processed. +The ``Collector`` can also do reduction at the end. Reduction processing is performed in parallel based on graph ``Balance``. + +Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able +to handle multiple invocations. + File IO Sinks and Sources ------------------------- Sources and sinks for reading and writing files can be found on ``FileIO``. diff --git a/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java new file mode 100644 index 0000000000..ff8ceb2b53 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.io; + + +import akka.stream.StreamTest; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.StreamConverters; +import akka.stream.testkit.Utils; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +public class SinkAsJavaSourceTest extends StreamTest { + public SinkAsJavaSourceTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSource", + Utils.UnboundedMailboxConfig()); + + @Test + public void mustBeAbleToUseAsJavaStream() throws Exception { + final List list = Arrays.asList(1, 2, 3); + final Sink> streamSink = StreamConverters.asJavaStream(); + java.util.stream.Stream javaStream= Source.from(list).runWith(streamSink, materializer); + assertEquals(list, javaStream.collect(Collectors.toList())); + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 913317f345..29f9d914dc 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -10,6 +10,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import akka.NotUsed; import akka.japi.function.Function; @@ -68,6 +69,14 @@ public class SinkTest extends StreamTest { probe.expectMsgEquals("done"); } + @Test + public void mustBeAbleToUseCollector() throws Exception { + final List list = Arrays.asList(1, 2, 3); + final Sink>> collectorSink = StreamConverters.javaCollector(Collectors::toList); + CompletionStage> result = Source.from(list).runWith(collectorSink, materializer); + assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToCombine() throws Exception { final JavaTestKit probe1 = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index 0071db3cbc..193c6887d7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -12,8 +12,6 @@ import akka.stream.stage._ import akka.stream.testkit.Utils.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink import akka.stream.impl.fusing._ -import org.scalactic.ConversionCheckedTripleEquals -import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration.Duration class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 7ac81dd9d5..86c7e14ea5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -107,7 +107,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 01f3f9d0b0..aeebb483a1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -133,6 +133,17 @@ class QueueSinkSpec extends AkkaSpec { } + "cancel upstream on cancel" in assertAllStagesStopped { + val queue = Source.repeat(1).runWith(Sink.queue()) + queue.pull() + queue.cancel() + } + + "be able to cancel upstream right away" in assertAllStagesStopped { + val queue = Source.repeat(1).runWith(Sink.queue()) + queue.cancel() + } + "work with one element buffer" in assertAllStagesStopped { val sink = Sink.queue[Int]().withAttributes(inputBuffer(1, 1)) val probe = TestPublisher.manualProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala new file mode 100644 index 0000000000..3016309c9a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkAsJavaStreamSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.stream.Collectors +import akka.actor.ActorSystem +import akka.stream.impl.StreamSupervisor.Children +import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl } +import akka.stream._ +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.AkkaSpec +import akka.util.ByteString + +class SinkAsJavaStreamSpec extends AkkaSpec(UnboundedMailboxConfig) { + val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorMaterializer(settings) + + "Java Stream Sink" must { + + "work in happy case" in { + val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream()) + javaSource.count() should ===(100) + } + + "fail if parent stream is failed" in { + val javaSource = Source(1 to 100).map(_ ⇒ throw TE("")).runWith(StreamConverters.asJavaStream()) + a[TE] shouldBe thrownBy { + javaSource.findFirst() + } + } + + "work with collector that is assigned to materialized value" in { + val javaSource = Source(1 to 100).map(_.toString).runWith(StreamConverters.asJavaStream()) + javaSource.collect(Collectors.joining(", ")) should ===((1 to 100).mkString(", ")) + } + + "work with empty stream" in { + val javaSource = Source.empty.runWith(StreamConverters.asJavaStream()) + javaSource.count() should ===(0) + } + + "work with endless stream" in Utils.assertAllStagesStopped { + val javaSource = Source.repeat(1).runWith(StreamConverters.asJavaStream()) + javaSource.limit(10).count() should ===(10) + javaSource.close() + } + + "work in separate IO dispatcher" in Utils.assertAllStagesStopped { + val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) + val materializer = ActorMaterializer()(sys) + + try { + TestSource.probe[ByteString].runWith(StreamConverters.asJavaStream())(materializer) + materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "asJavaStream").get + assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") + } finally shutdown(sys) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 77331f0b82..2e67760b12 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -3,22 +3,28 @@ */ package akka.stream.scaladsl +import java.util +import java.util.function +import java.util.function.{ BinaryOperator, BiConsumer, Supplier, ToIntFunction } +import java.util.stream.Collector.Characteristics +import java.util.stream.{ Collector, Collectors } import akka.stream._ +import akka.stream.testkit.Utils._ import akka.stream.testkit._ import org.scalactic.ConversionCheckedTripleEquals +import akka.testkit.DefaultTimeout import org.scalatest.concurrent.ScalaFutures -import scala.concurrent.Future +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import akka.testkit.AkkaSpec -class SinkSpec extends AkkaSpec { +class SinkSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { import GraphDSL.Implicits._ implicit val materializer = ActorMaterializer() "A Sink" must { - "be composable without importing modules" in { val probes = Array.fill(3)(TestSubscriber.manualProbe[Int]) val sink = Sink.fromGraph(GraphDSL.create() { implicit b ⇒ @@ -133,4 +139,129 @@ class SinkSpec extends AkkaSpec { } } + "Java collector Sink" must { + import scala.compat.java8.FunctionConverters._ + + class TestCollector(_supplier: () ⇒ Supplier[Array[Int]], + _accumulator: () ⇒ BiConsumer[Array[Int], Int], + _combiner: () ⇒ BinaryOperator[Array[Int]], + _finisher: () ⇒ function.Function[Array[Int], Int]) extends Collector[Int, Array[Int], Int] { + override def supplier(): Supplier[Array[Int]] = _supplier() + override def combiner(): BinaryOperator[Array[Int]] = _combiner() + override def finisher(): function.Function[Array[Int], Int] = _finisher() + override def accumulator(): BiConsumer[Array[Int], Int] = _accumulator() + override def characteristics(): util.Set[Characteristics] = util.Collections.emptySet() + } + + val intIdentity: ToIntFunction[Int] = new ToIntFunction[Int] { + override def applyAsInt(value: Int): Int = value + } + + def supplier(): Supplier[Array[Int]] = new Supplier[Array[Int]] { + override def get(): Array[Int] = Array.ofDim(1) + } + def accumulator(): BiConsumer[Array[Int], Int] = new BiConsumer[Array[Int], Int] { + override def accept(a: Array[Int], b: Int): Unit = a(0) = intIdentity.applyAsInt(b) + } + + def combiner(): BinaryOperator[Array[Int]] = new BinaryOperator[Array[Int]] { + override def apply(a: Array[Int], b: Array[Int]): Array[Int] = { + a(0) += b(0); a + } + } + def finisher(): function.Function[Array[Int], Int] = new function.Function[Array[Int], Int] { + override def apply(a: Array[Int]): Int = a(0) + } + + "work in the happy case" in { + Source(1 to 100).map(_.toString).runWith(StreamConverters.javaCollector(() ⇒ Collectors.joining(", "))) + .futureValue should ===((1 to 100).mkString(", ")) + } + + "work parallelly in the happy case" in { + Source(1 to 100).runWith(StreamConverters + .javaCollectorParallelUnordered(4)( + () ⇒ Collectors.summingInt[Int](intIdentity))) + .futureValue should ===(5050) + } + + "be reusable" in { + val sink = StreamConverters.javaCollector[Int, Integer](() ⇒ Collectors.summingInt[Int](intIdentity)) + Source(1 to 4).runWith(sink).futureValue should ===(10) + + // Collector has state so it preserves all previous elements that went though + Source(4 to 6).runWith(sink).futureValue should ===(15) + } + + "be reusable with parallel version" in { + val sink = StreamConverters.javaCollectorParallelUnordered(4)(() ⇒ Collectors.summingInt[Int](intIdentity)) + + Source(1 to 4).runWith(sink).futureValue should ===(10) + Source(4 to 6).runWith(sink).futureValue should ===(15) + } + + "fail if getting the supplier fails" in { + def failedSupplier(): Supplier[Array[Int]] = throw TE("") + val future = Source(1 to 100).runWith(StreamConverters.javaCollector( + () ⇒ new TestCollector(failedSupplier, accumulator, combiner, finisher))) + a[TE] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + "fail if the supplier fails" in { + def failedSupplier(): Supplier[Array[Int]] = new Supplier[Array[Int]] { + override def get(): Array[Int] = throw TE("") + } + val future = Source(1 to 100).runWith(StreamConverters.javaCollector( + () ⇒ new TestCollector(failedSupplier, accumulator, combiner, finisher))) + a[TE] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + "fail if getting the accumulator fails" in { + def failedAccumulator(): BiConsumer[Array[Int], Int] = throw TE("") + + val future = Source(1 to 100).runWith(StreamConverters.javaCollector( + () ⇒ new TestCollector(supplier, failedAccumulator, combiner, finisher))) + a[TE] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + "fail if the accumulator fails" in { + def failedAccumulator(): BiConsumer[Array[Int], Int] = new BiConsumer[Array[Int], Int] { + override def accept(a: Array[Int], b: Int): Unit = throw TE("") + } + + val future = Source(1 to 100).runWith(StreamConverters.javaCollector( + () ⇒ new TestCollector(supplier, failedAccumulator, combiner, finisher))) + a[TE] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + "fail if getting the finisher fails" in { + def failedFinisher(): function.Function[Array[Int], Int] = throw TE("") + + val future = Source(1 to 100).runWith(StreamConverters.javaCollector( + () ⇒ new TestCollector(supplier, accumulator, combiner, failedFinisher))) + a[TE] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + "fail if the finisher fails" in { + def failedFinisher(): function.Function[Array[Int], Int] = new function.Function[Array[Int], Int] { + override def apply(a: Array[Int]): Int = throw TE("") + } + val future = Source(1 to 100).runWith(StreamConverters.javaCollector( + () ⇒ new TestCollector(supplier, accumulator, combiner, failedFinisher))) + a[TE] shouldBe thrownBy { + Await.result(future, 300.millis) + } + } + + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 84118af688..a784c89d7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -322,4 +322,51 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { } } + "Java Stream source" must { + import scala.compat.java8.FunctionConverters._ + import java.util.stream.{ Stream, IntStream } + + def javaStreamInts = IntStream.iterate(1, { i: Int ⇒ i + 1 }.asJava) + + "work with Java collections" in { + val list = new java.util.LinkedList[Integer]() + list.add(0) + list.add(1) + list.add(2) + + StreamConverters.fromJavaStream(() ⇒ list.stream()).map(_.intValue).runWith(Sink.seq).futureValue should ===(List(0, 1, 2)) + } + + "work with primitive streams" in { + StreamConverters.fromJavaStream(() ⇒ IntStream.rangeClosed(1, 10)).map(_.intValue).runWith(Sink.seq).futureValue should ===(1 to 10) + } + + "work with an empty stream" in { + StreamConverters.fromJavaStream(() ⇒ Stream.empty[Int]()).runWith(Sink.seq).futureValue should ===(Nil) + } + + "work with an infinite stream" in { + StreamConverters.fromJavaStream(() ⇒ javaStreamInts).take(1000).runFold(0)(_ + _).futureValue should ===(500500) + } + + "work with a filtered stream" in { + StreamConverters.fromJavaStream(() ⇒ javaStreamInts.filter({ i: Int ⇒ i % 2 == 0 }.asJava)) + .take(1000).runFold(0)(_ + _).futureValue should ===(1001000) + } + + "properly report errors during iteration" in { + import akka.stream.testkit.Utils.TE + // Filtering is lazy on Java Stream + + val failyFilter: Int ⇒ Boolean = i ⇒ throw TE("failing filter") + + a[TE] must be thrownBy { + Await.result( + StreamConverters.fromJavaStream(() ⇒ javaStreamInts.filter(failyFilter.asJava)).runWith(Sink.ignore), + 3.seconds) + } + } + + } + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index ad8780e16a..3929c9f39c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -3,19 +3,28 @@ */ package akka.stream.impl +import akka.stream.impl.QueueSink.{ Output, Pull } import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Props } import akka.stream.Attributes.InputBuffer import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.AtomicModule +import java.util.concurrent.atomic.AtomicReference +import java.util.function.BiConsumer +import akka.actor.{ ActorRef, Props } +import akka.stream.Attributes.InputBuffer +import akka.stream._ +import akka.stream.impl.StreamLayout.Module import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{ Promise, Future } +import scala.language.postfixOps +import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.stream.scaladsl.SinkQueue +import akka.stream.scaladsl.{ SinkQueueWithCancel, SinkQueue } import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ @@ -183,7 +192,7 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { - val in = Inlet[T]("lastOption.in") + val in: Inlet[T] = Inlet("lastOption.in") override val shape: SinkShape[T] = SinkShape.of(in) @@ -220,7 +229,7 @@ private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedV private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { - val in = Inlet[T]("headOption.in") + val in: Inlet[T] = Inlet("headOption.in") override val shape: SinkShape[T] = SinkShape.of(in) @@ -290,10 +299,16 @@ private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[Si } } +private[stream] object QueueSink { + sealed trait Output[+T] + final case class Pull[T](promise: Promise[Option[T]]) extends Output[T] + case object Cancel extends Output[Nothing] +} + /** * INTERNAL API */ -final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] { +final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] { type Requested[E] = Promise[Option[E]] val in = Inlet[T]("queueSink.in") @@ -303,7 +318,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal override def toString: String = "QueueSink" override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] { + val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] { type Received[E] = Try[Option[E]] val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max @@ -321,20 +336,25 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal pull(in) } - override def postStop(): Unit = stopCallback(promise ⇒ - promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached"))) + override def postStop(): Unit = stopCallback { + case Pull(promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached")) + case _ ⇒ //do nothing + } - private val callback: AsyncCallback[Requested[T]] = - getAsyncCallback(promise ⇒ currentRequest match { - case Some(_) ⇒ - promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request")) - case None ⇒ - if (buffer.isEmpty) currentRequest = Some(promise) - else { - if (buffer.used == maxBuffer) tryPull(in) - sendDownstream(promise) - } - }) + private val callback: AsyncCallback[Output[T]] = + getAsyncCallback { + case QueueSink.Pull(pullPromise) ⇒ currentRequest match { + case Some(_) ⇒ + pullPromise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request")) + case None ⇒ + if (buffer.isEmpty) currentRequest = Some(pullPromise) + else { + if (buffer.used == maxBuffer) tryPull(in) + sendDownstream(pullPromise) + } + } + case QueueSink.Cancel ⇒ completeStage() + } def sendDownstream(promise: Requested[T]): Unit = { val e = buffer.dequeue() @@ -366,17 +386,58 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal }) } - (stageLogic, new SinkQueue[T] { + (stageLogic, new SinkQueueWithCancel[T] { override def pull(): Future[Option[T]] = { val p = Promise[Option[T]] - stageLogic.invoke(p) + stageLogic.invoke(Pull(p)) p.future } + override def cancel(): Unit = { + stageLogic.invoke(QueueSink.Cancel) + } }) } } -private[akka] final class SinkQueueAdapter[T](delegate: SinkQueue[T]) extends akka.stream.javadsl.SinkQueue[T] { +private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] { import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same } def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava + def cancel(): Unit = delegate.cancel() + } + +/** + * INTERNAL API + * + * Helper class to be able to express collection as a fold using mutable data + */ +private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) { + lazy val accumulated = collector.supplier().get() + private lazy val accumulator = collector.accumulator() + + def update(elem: T): CollectorState[T, R] = { + accumulator.accept(accumulated, elem) + this + } + + def finish(): R = collector.finisher().apply(accumulated) +} + +/** + * INTERNAL API + * + * Helper class to be able to express reduce as a fold for parallel collector + */ +private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) { + private var reduced: Any = null.asInstanceOf[Any] + private lazy val combiner = collector.combiner() + + def update(batch: Any): ReducerState[T, R] = { + if (reduced == null) reduced = batch + else reduced = combiner(reduced, batch) + this + } + + def finish(): R = collector.finisher().apply(reduced) +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 9c660d004c..e654d45551 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -100,6 +100,9 @@ private[stream] object Stages { val fileSource = name("fileSource") and IODispatcher val unfoldResourceSource = name("unfoldResourceSource") and IODispatcher val unfoldResourceSourceAsync = name("unfoldResourceSourceAsync") and IODispatcher + val asJavaStream = name("asJavaStream") and IODispatcher + val javaCollectorParallelUnordered = name("javaCollectorParallelUnordered") + val javaCollector = name("javaCollector") val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") @@ -117,7 +120,8 @@ private[stream] object Stages { val queueSink = name("queueSink") val outputStreamSink = name("outputStreamSink") and IODispatcher val inputStreamSink = name("inputStreamSink") and IODispatcher - val fileSink = name("fileSource") and IODispatcher + val fileSink = name("fileSink") and IODispatcher + val fromJavaStream = name("fromJavaStream") } import DefaultAttributes._ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala index 22853d38ed..1e55c5a8c8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala @@ -63,3 +63,14 @@ trait SinkQueue[T] { */ def pull(): CompletionStage[Optional[T]] } + +/** + * This trait adds cancel support to [[SinkQueue]]. + */ +trait SinkQueueWithCancel[T] extends SinkQueue[T] { + /** + * Cancel the stream. + */ + def cancel(): Unit +} + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index b8ce7d091a..cfcd9b0b11 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -12,7 +12,7 @@ import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.compat.java8.OptionConverters._ -import scala.concurrent.ExecutionContext +import scala.concurrent.{ Future, ExecutionContext } import scala.util.Try import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters.FutureOps @@ -229,8 +229,8 @@ object Sink { } /** - * Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]]. - * [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``. + * Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueue]]. + * [[akka.stream.javadsl.SinkQueue.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``. * `CompletionStage` completes when element is available. * * Before calling pull method second time you need to wait until previous CompletionStage completes. @@ -240,12 +240,12 @@ object Sink { * upstream and then stop back pressure. You can configure size of input * buffer by using [[Sink.withAttributes]] method. * - * For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None + * For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueue]] including last None * as completion marker * - * @see [[akka.stream.SinkQueue]] + * @see [[akka.stream.javadsl.SinkQueueWithCancel]] */ - def queue[T](): Sink[T, SinkQueue[T]] = + def queue[T](): Sink[T, SinkQueueWithCancel[T]] = new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_))) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index d8b76df8dd..9b4ecb7de1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -4,15 +4,17 @@ package akka.stream.javadsl import java.io.{ InputStream, OutputStream } +import java.util.stream.Collector import akka.japi.function import akka.stream.{ scaladsl, javadsl } import akka.stream.IOResult import akka.util.ByteString import scala.concurrent.duration.FiniteDuration import java.util.concurrent.CompletionStage +import akka.NotUsed /** - * Converters for interacting with the blocking `java.io` streams APIs + * Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams */ object StreamConverters { /** @@ -22,7 +24,7 @@ object StreamConverters { * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it. * @@ -40,7 +42,7 @@ object StreamConverters { * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]] * will cancel the stream when the [[OutputStream]] is no longer writable. @@ -61,7 +63,7 @@ object StreamConverters { * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. @@ -75,7 +77,7 @@ object StreamConverters { * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. @@ -91,7 +93,7 @@ object StreamConverters { * except the final element, which will be up to `chunkSize` in size. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion. * @@ -106,7 +108,7 @@ object StreamConverters { * except the last element, which will be up to 8192 in size. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. @@ -122,7 +124,7 @@ object StreamConverters { * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] * will complete this [[Source]]. @@ -140,7 +142,7 @@ object StreamConverters { * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] * will complete this [[Source]]. @@ -148,4 +150,58 @@ object StreamConverters { def asOutputStream(): javadsl.Source[ByteString, OutputStream] = new Source(scaladsl.StreamConverters.asOutputStream()) + + /** + * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. + * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + * + * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java + * ``Stream`` will cancel the inflow of this ``Sink``. + * + * Java 8 ``Stream`` throws exception in case reactive stream failed. + * + * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + * As it is interacting wit blocking API the implementation runs on a separate dispatcher + * configured through the ``akka.stream.blocking-io-dispatcher``. + */ + def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream()) + + /** + * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its + * elements and send them downstream on demand. + * + * Example usage: `Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10))` + * + * You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream + * and the rest of flow. + */ + def fromJavaStream[O, S <: java.util.stream.BaseStream[O, S]](stream: function.Creator[java.util.stream.BaseStream[O, S]]): javadsl.Source[O, NotUsed] = + new Source(scaladsl.StreamConverters.fromJavaStream(stream.create)) + + /** + * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector`` + * transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. + * The Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable + * result container, optionally transformed into a final representation after all input elements have been processed. + * The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially + * + * Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able + * to handle multiple invocations. + */ + def javaCollector[T, R](collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] = + new Sink(scaladsl.StreamConverters.javaCollector[T, R](() ⇒ collector.create()).toCompletionStage()) + + /** + * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector`` + * transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. + * The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable + * result container, optionally transformed into a final representation after all input elements have been processed. + * ``Collector`` can also do reduction at the end. Reduction processing is performed in parallel based on graph ``Balance``. + * + * Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able + * to handle multiple invocations. + */ + def javaCollectorParallelUnordered[T, R](parallelism: Int)(collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] = + new Sink(scaladsl.StreamConverters.javaCollectorParallelUnordered[T, R](parallelism)(() ⇒ collector.create()).toCompletionStage()) + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala index 9a7fb20483..8dc05c6aa3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala @@ -62,3 +62,13 @@ trait SinkQueue[T] { */ def pull(): Future[Option[T]] } + +/** + * This trait adds cancel support to [[SinkQueue]]. + */ +trait SinkQueueWithCancel[T] extends SinkQueue[T] { + /** + * Cancel the stream. This method returns right away without waiting for actual finalizing stream. + */ + def cancel(): Unit +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 1896978c61..27737f245f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,6 +3,9 @@ */ package akka.stream.scaladsl +import java.util.{ Spliterators, Spliterator } +import java.util.stream.StreamSupport + import akka.{ Done, NotUsed } import akka.dispatch.ExecutionContexts import akka.actor.{ Status, ActorRef, Props } @@ -15,7 +18,8 @@ import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.Duration.Inf +import scala.concurrent.{ Await, ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** @@ -326,8 +330,8 @@ object Sink { } /** - * Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]]. - * [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. + * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueue]]. + * [[akka.stream.scaladsl.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. * `Future` completes when element is available. * * Before calling pull method second time you need to wait until previous Future completes. @@ -337,11 +341,11 @@ object Sink { * upstream and then stop back pressure. You can configure size of input * buffer by using [[Sink.withAttributes]] method. * - * For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None + * For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None * as completion marker * - * @see [[akka.stream.SinkQueue]] + * @see [[akka.stream.scaladsl.SinkQueueWithCancel]] */ - def queue[T](): Sink[T, SinkQueue[T]] = + def queue[T](): Sink[T, SinkQueueWithCancel[T]] = Sink.fromGraph(new QueueSink()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index a8cfd9e779..b9a1408a24 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -4,17 +4,23 @@ package akka.stream.scaladsl import java.io.{ OutputStream, InputStream } +import java.util.Spliterators +import java.util.concurrent.atomic.AtomicReference +import java.util.stream.{Collector, StreamSupport} -import akka.stream.IOResult +import akka.stream.{Attributes, SinkShape, IOResult} +import akka.stream.impl._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, OutputStreamSourceStage, InputStreamSource } import akka.util.ByteString -import scala.concurrent.Future +import scala.concurrent.duration.Duration._ +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import akka.NotUsed /** - * Converters for interacting with the blocking `java.io` streams APIs + * Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams */ object StreamConverters { @@ -27,7 +33,7 @@ object StreamConverters { * except the final element, which will be up to `chunkSize` in size. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. @@ -47,7 +53,7 @@ object StreamConverters { * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] * will complete this [[Source]]. @@ -64,7 +70,7 @@ object StreamConverters { * and a possible exception if IO operation was not completed successfully. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false. * * The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]] @@ -80,7 +86,7 @@ object StreamConverters { * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. @@ -90,4 +96,105 @@ object StreamConverters { def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] = Sink.fromGraph(new InputStreamSinkStage(readTimeout)) + /** + * Creates a sink which materializes into a ``Future`` which will be completed with result of the Java 8 ``Collector`` transformation + * and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. The ``Collector`` will trigger + * demand downstream. Elements emitted through the stream will be accumulated into a mutable result container, optionally transformed + * into a final representation after all input elements have been processed. The ``Collector`` can also do reduction + * at the end. Reduction processing is performed sequentially + * + * Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able + * to handle multiple invocations. + */ + def javaCollector[T, R](collectorFactory: () ⇒ java.util.stream.Collector[T, _ <: Any, R]): Sink[T, Future[R]] = + Flow[T].fold(() ⇒ + new CollectorState[T,R](collectorFactory().asInstanceOf[Collector[T, Any, R]])) { (state, elem) ⇒ () ⇒ state().update(elem) } + .map(state ⇒ state().finish()) + .toMat(Sink.head)(Keep.right).withAttributes(DefaultAttributes.javaCollector) + + /** + * Creates a sink which materializes into a ``Future`` which will be completed with result of the Java 8 ``Collector`` transformation + * and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. The ``Collector`` will trigger demand + * downstream. Elements emitted through the stream will be accumulated into a mutable result container, optionally transformed + * into a final representation after all input elements have been processed. The ``Collector`` can also do reduction + * at the end. Reduction processing is performed in parallel based on graph ``Balance``. + * + * Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able + * to handle multiple invocations. + */ + def javaCollectorParallelUnordered[T, R](parallelism: Int)(collectorFactory: () ⇒ java.util.stream.Collector[T, _ <: Any, R]): Sink[T, Future[R]] = { + if (parallelism == 1) javaCollector[T, R](collectorFactory) + else { + Sink.fromGraph(GraphDSL.create(Sink.head[R]) { implicit b ⇒ + sink ⇒ + import GraphDSL.Implicits._ + val collector = collectorFactory().asInstanceOf[Collector[T, Any, R]] + val balance = b.add(Balance[T](parallelism)) + val merge = b.add(Merge[() ⇒ CollectorState[T, R]](parallelism)) + + for (i ← 0 until parallelism) { + val worker = Flow[T] + .fold(() => new CollectorState(collector)) { (state, elem) ⇒ () ⇒ state().update(elem) } + .async + + balance.out(i) ~> worker ~> merge.in(i) + } + + merge.out + .fold(() => new ReducerState(collector)) { (state, elem) ⇒ () ⇒ state().update(elem().accumulated) } + .map(state => state().finish()) ~> sink.in + + SinkShape(balance.in) + }).withAttributes(DefaultAttributes.javaCollectorParallelUnordered) + } + } + + /** + * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. + * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + * + * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java + * ``Stream`` will cancel the inflow of this ``Sink``. + * + * Java 8 ``Stream`` throws exception in case reactive stream failed. + * + * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + * As it is interacting wit blocking API the implementation runs on a separate dispatcher + * configured through the ``akka.stream.blocking-io-dispatcher``. + */ + def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = { + Sink.fromGraph(new QueueSink[T]()) + .mapMaterializedValue(queue ⇒ StreamSupport.stream( + Spliterators.spliteratorUnknownSize(new java.util.Iterator[T] { + var nextElementFuture: Future[Option[T]] = queue.pull() + var nextElement: Option[T] = null + + override def hasNext: Boolean = { + nextElement = Await.result(nextElementFuture, Inf) + nextElement.isDefined + } + + override def next(): T = { + val next = nextElement.get + nextElementFuture = queue.pull() + next + } + }, 0), false).onClose(new Runnable { def run = queue.cancel() })) + .withAttributes(DefaultAttributes.asJavaStream) + } + + /** + * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its + * elements and send them downstream on demand. + * + * Example usage: `Source.fromJavaStream(() ⇒ IntStream.rangeClosed(1, 10))` + * + * You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream`` + * and the rest of flow. + */ + def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](stream: () ⇒ java.util.stream.BaseStream[T, S]): Source[T, NotUsed] = { + import scala.collection.JavaConverters._ + Source.fromIterator(() ⇒ stream().iterator().asScala).withAttributes(DefaultAttributes.fromJavaStream) + } + }