Merge pull request #20141 from agolubev/19443-add-support-for-Java-Stream-agolubev

19443 add support for java stream agolubev
This commit is contained in:
drewhk 2016-05-03 11:59:59 +02:00
commit c3c8a0bc73
18 changed files with 684 additions and 59 deletions

View file

@ -447,11 +447,48 @@ The ``InputStream`` will be closed when the ``Source`` is canceled from its down
asOutputStream
^^^^^^^^^^^^^^
Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they
are emitted from the source
are emitted from the source.
The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and
closing the ``OutputStream`` will complete the ``Source``.
asJavaStream
^^^^^^^^^^^^
Create a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
The Java 8 a ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
``Stream`` will cancel the inflow of this ``Sink``. Java ``Stream`` throws exception in case reactive stream failed.
Be aware that Java 8 ``Stream`` blocks current thread while waiting on next element from downstream.
fromJavaStream
^^^^^^^^^^^^^^
Create a source that wraps Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its elements and send them
downstream on demand.
javaCollector
^^^^^^^^^^^^^
Create a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector``
transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
result container, optionally transformed into a final representation after all input elements have been processed.
The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially
Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
to handle multiple invocations.
javaCollectorParallelUnordered
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Create a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 Collector
transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
The ``Collector`` will trigger demand downstream.. Elements emitted through the stream will be accumulated into a mutable
result container, optionally transformed into a final representation after all input elements have been processed.
The ``Collector`` can also do reduction at the end. Reduction processing is performed in parallel based on graph ``Balance``.
Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
to handle multiple invocations.
File IO Sinks and Sources
-------------------------
Sources and sinks for reading and writing files can be found on ``FileIO``.

View file

@ -436,11 +436,48 @@ The ``InputStream`` will be closed when the ``Source`` is canceled from its down
asOutputStream
^^^^^^^^^^^^^^
Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they
are emitted from the source
are emitted from the source.
The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and
closing the ``OutputStream`` will complete the ``Source``.
asJavaStream
^^^^^^^^^^^^
Create a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
``Stream`` will cancel the inflow of this ``Sink``. Java ``Stream`` throws exception in case reactive stream failed.
Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
fromJavaStream
^^^^^^^^^^^^^^
Create a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its elements and send them
downstream on demand.
javaCollector
^^^^^^^^^^^^^
Create a sink which materializes into a ``Future`` which will be completed with a result of the Java 8 ``Collector``
transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
result container, optionally transformed into a final representation after all input elements have been processed.
The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially
Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
to handle multiple invocations.
javaCollectorParallelUnordered
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Create a sink which materializes into a ``Future`` which will be completed with a result of the Java 8 ``Collector``
transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
The ``Collector`` is triggering demand downstream. Elements emitted through the stream will be accumulated into a mutable
result container, optionally transformed into a final representation after all input elements have been processed.
The ``Collector`` can also do reduction at the end. Reduction processing is performed in parallel based on graph ``Balance``.
Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
to handle multiple invocations.
File IO Sinks and Sources
-------------------------
Sources and sinks for reading and writing files can be found on ``FileIO``.

View file

@ -0,0 +1,39 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.io;
import akka.stream.StreamTest;
import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters;
import akka.stream.testkit.Utils;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
public class SinkAsJavaSourceTest extends StreamTest {
public SinkAsJavaSourceTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSource",
Utils.UnboundedMailboxConfig());
@Test
public void mustBeAbleToUseAsJavaStream() throws Exception {
final List<Integer> list = Arrays.asList(1, 2, 3);
final Sink<Integer, Stream<Integer>> streamSink = StreamConverters.asJavaStream();
java.util.stream.Stream<Integer> javaStream= Source.from(list).runWith(streamSink, materializer);
assertEquals(list, javaStream.collect(Collectors.toList()));
}
}

View file

@ -10,6 +10,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import akka.NotUsed;
import akka.japi.function.Function;
@ -68,6 +69,14 @@ public class SinkTest extends StreamTest {
probe.expectMsgEquals("done");
}
@Test
public void mustBeAbleToUseCollector() throws Exception {
final List<Integer> list = Arrays.asList(1, 2, 3);
final Sink<Integer, CompletionStage<List<Integer>>> collectorSink = StreamConverters.javaCollector(Collectors::toList);
CompletionStage<List<Integer>> result = Source.from(list).runWith(collectorSink, materializer);
assertEquals(list, result.toCompletableFuture().get(1, TimeUnit.SECONDS));
}
@Test
public void mustBeAbleToCombine() throws Exception {
final JavaTestKit probe1 = new JavaTestKit(system);

View file

@ -12,8 +12,6 @@ import akka.stream.stage._
import akka.stream.testkit.Utils.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.impl.fusing._
import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration.Duration
class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit {

View file

@ -107,7 +107,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
Source.fromIterator(() Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer)
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}

View file

@ -133,6 +133,17 @@ class QueueSinkSpec extends AkkaSpec {
}
"cancel upstream on cancel" in assertAllStagesStopped {
val queue = Source.repeat(1).runWith(Sink.queue())
queue.pull()
queue.cancel()
}
"be able to cancel upstream right away" in assertAllStagesStopped {
val queue = Source.repeat(1).runWith(Sink.queue())
queue.cancel()
}
"work with one element buffer" in assertAllStagesStopped {
val sink = Sink.queue[Int]().withAttributes(inputBuffer(1, 1))
val probe = TestPublisher.manualProbe[Int]()

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.stream.Collectors
import akka.actor.ActorSystem
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl }
import akka.stream._
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec
import akka.util.ByteString
class SinkAsJavaStreamSpec extends AkkaSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
"Java Stream Sink" must {
"work in happy case" in {
val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream())
javaSource.count() should ===(100)
}
"fail if parent stream is failed" in {
val javaSource = Source(1 to 100).map(_ throw TE("")).runWith(StreamConverters.asJavaStream())
a[TE] shouldBe thrownBy {
javaSource.findFirst()
}
}
"work with collector that is assigned to materialized value" in {
val javaSource = Source(1 to 100).map(_.toString).runWith(StreamConverters.asJavaStream())
javaSource.collect(Collectors.joining(", ")) should ===((1 to 100).mkString(", "))
}
"work with empty stream" in {
val javaSource = Source.empty.runWith(StreamConverters.asJavaStream())
javaSource.count() should ===(0)
}
"work with endless stream" in Utils.assertAllStagesStopped {
val javaSource = Source.repeat(1).runWith(StreamConverters.asJavaStream())
javaSource.limit(10).count() should ===(10)
javaSource.close()
}
"work in separate IO dispatcher" in Utils.assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
TestSource.probe[ByteString].runWith(StreamConverters.asJavaStream())(materializer)
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "asJavaStream").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}
}
}

View file

@ -3,22 +3,28 @@
*/
package akka.stream.scaladsl
import java.util
import java.util.function
import java.util.function.{ BinaryOperator, BiConsumer, Supplier, ToIntFunction }
import java.util.stream.Collector.Characteristics
import java.util.stream.{ Collector, Collectors }
import akka.stream._
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.scalactic.ConversionCheckedTripleEquals
import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
class SinkSpec extends AkkaSpec {
class SinkSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
import GraphDSL.Implicits._
implicit val materializer = ActorMaterializer()
"A Sink" must {
"be composable without importing modules" in {
val probes = Array.fill(3)(TestSubscriber.manualProbe[Int])
val sink = Sink.fromGraph(GraphDSL.create() { implicit b
@ -133,4 +139,129 @@ class SinkSpec extends AkkaSpec {
}
}
"Java collector Sink" must {
import scala.compat.java8.FunctionConverters._
class TestCollector(_supplier: () Supplier[Array[Int]],
_accumulator: () BiConsumer[Array[Int], Int],
_combiner: () BinaryOperator[Array[Int]],
_finisher: () function.Function[Array[Int], Int]) extends Collector[Int, Array[Int], Int] {
override def supplier(): Supplier[Array[Int]] = _supplier()
override def combiner(): BinaryOperator[Array[Int]] = _combiner()
override def finisher(): function.Function[Array[Int], Int] = _finisher()
override def accumulator(): BiConsumer[Array[Int], Int] = _accumulator()
override def characteristics(): util.Set[Characteristics] = util.Collections.emptySet()
}
val intIdentity: ToIntFunction[Int] = new ToIntFunction[Int] {
override def applyAsInt(value: Int): Int = value
}
def supplier(): Supplier[Array[Int]] = new Supplier[Array[Int]] {
override def get(): Array[Int] = Array.ofDim(1)
}
def accumulator(): BiConsumer[Array[Int], Int] = new BiConsumer[Array[Int], Int] {
override def accept(a: Array[Int], b: Int): Unit = a(0) = intIdentity.applyAsInt(b)
}
def combiner(): BinaryOperator[Array[Int]] = new BinaryOperator[Array[Int]] {
override def apply(a: Array[Int], b: Array[Int]): Array[Int] = {
a(0) += b(0); a
}
}
def finisher(): function.Function[Array[Int], Int] = new function.Function[Array[Int], Int] {
override def apply(a: Array[Int]): Int = a(0)
}
"work in the happy case" in {
Source(1 to 100).map(_.toString).runWith(StreamConverters.javaCollector(() Collectors.joining(", ")))
.futureValue should ===((1 to 100).mkString(", "))
}
"work parallelly in the happy case" in {
Source(1 to 100).runWith(StreamConverters
.javaCollectorParallelUnordered(4)(
() Collectors.summingInt[Int](intIdentity)))
.futureValue should ===(5050)
}
"be reusable" in {
val sink = StreamConverters.javaCollector[Int, Integer](() Collectors.summingInt[Int](intIdentity))
Source(1 to 4).runWith(sink).futureValue should ===(10)
// Collector has state so it preserves all previous elements that went though
Source(4 to 6).runWith(sink).futureValue should ===(15)
}
"be reusable with parallel version" in {
val sink = StreamConverters.javaCollectorParallelUnordered(4)(() Collectors.summingInt[Int](intIdentity))
Source(1 to 4).runWith(sink).futureValue should ===(10)
Source(4 to 6).runWith(sink).futureValue should ===(15)
}
"fail if getting the supplier fails" in {
def failedSupplier(): Supplier[Array[Int]] = throw TE("")
val future = Source(1 to 100).runWith(StreamConverters.javaCollector(
() new TestCollector(failedSupplier, accumulator, combiner, finisher)))
a[TE] shouldBe thrownBy {
Await.result(future, 300.millis)
}
}
"fail if the supplier fails" in {
def failedSupplier(): Supplier[Array[Int]] = new Supplier[Array[Int]] {
override def get(): Array[Int] = throw TE("")
}
val future = Source(1 to 100).runWith(StreamConverters.javaCollector(
() new TestCollector(failedSupplier, accumulator, combiner, finisher)))
a[TE] shouldBe thrownBy {
Await.result(future, 300.millis)
}
}
"fail if getting the accumulator fails" in {
def failedAccumulator(): BiConsumer[Array[Int], Int] = throw TE("")
val future = Source(1 to 100).runWith(StreamConverters.javaCollector(
() new TestCollector(supplier, failedAccumulator, combiner, finisher)))
a[TE] shouldBe thrownBy {
Await.result(future, 300.millis)
}
}
"fail if the accumulator fails" in {
def failedAccumulator(): BiConsumer[Array[Int], Int] = new BiConsumer[Array[Int], Int] {
override def accept(a: Array[Int], b: Int): Unit = throw TE("")
}
val future = Source(1 to 100).runWith(StreamConverters.javaCollector(
() new TestCollector(supplier, failedAccumulator, combiner, finisher)))
a[TE] shouldBe thrownBy {
Await.result(future, 300.millis)
}
}
"fail if getting the finisher fails" in {
def failedFinisher(): function.Function[Array[Int], Int] = throw TE("")
val future = Source(1 to 100).runWith(StreamConverters.javaCollector(
() new TestCollector(supplier, accumulator, combiner, failedFinisher)))
a[TE] shouldBe thrownBy {
Await.result(future, 300.millis)
}
}
"fail if the finisher fails" in {
def failedFinisher(): function.Function[Array[Int], Int] = new function.Function[Array[Int], Int] {
override def apply(a: Array[Int]): Int = throw TE("")
}
val future = Source(1 to 100).runWith(StreamConverters.javaCollector(
() new TestCollector(supplier, accumulator, combiner, failedFinisher)))
a[TE] shouldBe thrownBy {
Await.result(future, 300.millis)
}
}
}
}

View file

@ -322,4 +322,51 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
}
}
"Java Stream source" must {
import scala.compat.java8.FunctionConverters._
import java.util.stream.{ Stream, IntStream }
def javaStreamInts = IntStream.iterate(1, { i: Int i + 1 }.asJava)
"work with Java collections" in {
val list = new java.util.LinkedList[Integer]()
list.add(0)
list.add(1)
list.add(2)
StreamConverters.fromJavaStream(() list.stream()).map(_.intValue).runWith(Sink.seq).futureValue should ===(List(0, 1, 2))
}
"work with primitive streams" in {
StreamConverters.fromJavaStream(() IntStream.rangeClosed(1, 10)).map(_.intValue).runWith(Sink.seq).futureValue should ===(1 to 10)
}
"work with an empty stream" in {
StreamConverters.fromJavaStream(() Stream.empty[Int]()).runWith(Sink.seq).futureValue should ===(Nil)
}
"work with an infinite stream" in {
StreamConverters.fromJavaStream(() javaStreamInts).take(1000).runFold(0)(_ + _).futureValue should ===(500500)
}
"work with a filtered stream" in {
StreamConverters.fromJavaStream(() javaStreamInts.filter({ i: Int i % 2 == 0 }.asJava))
.take(1000).runFold(0)(_ + _).futureValue should ===(1001000)
}
"properly report errors during iteration" in {
import akka.stream.testkit.Utils.TE
// Filtering is lazy on Java Stream
val failyFilter: Int Boolean = i throw TE("failing filter")
a[TE] must be thrownBy {
Await.result(
StreamConverters.fromJavaStream(() javaStreamInts.filter(failyFilter.asJava)).runWith(Sink.ignore),
3.seconds)
}
}
}
}

View file

@ -3,19 +3,28 @@
*/
package akka.stream.impl
import akka.stream.impl.QueueSink.{ Output, Pull }
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Props }
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.AtomicModule
import java.util.concurrent.atomic.AtomicReference
import java.util.function.BiConsumer
import akka.actor.{ ActorRef, Props }
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.concurrent.{ Promise, Future }
import scala.language.postfixOps
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import akka.stream.scaladsl.SinkQueue
import akka.stream.scaladsl.{ SinkQueueWithCancel, SinkQueue }
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
@ -183,7 +192,7 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
val in = Inlet[T]("lastOption.in")
val in: Inlet[T] = Inlet("lastOption.in")
override val shape: SinkShape[T] = SinkShape.of(in)
@ -220,7 +229,7 @@ private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedV
private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
val in = Inlet[T]("headOption.in")
val in: Inlet[T] = Inlet("headOption.in")
override val shape: SinkShape[T] = SinkShape.of(in)
@ -290,10 +299,16 @@ private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[Si
}
}
private[stream] object QueueSink {
sealed trait Output[+T]
final case class Pull[T](promise: Promise[Option[T]]) extends Output[T]
case object Cancel extends Output[Nothing]
}
/**
* INTERNAL API
*/
final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] {
final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
@ -303,7 +318,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
override def toString: String = "QueueSink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] {
type Received[E] = Try[Option[E]]
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
@ -321,20 +336,25 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
pull(in)
}
override def postStop(): Unit = stopCallback(promise
promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached")))
override def postStop(): Unit = stopCallback {
case Pull(promise) promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached"))
case _ //do nothing
}
private val callback: AsyncCallback[Requested[T]] =
getAsyncCallback(promise currentRequest match {
case Some(_)
promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
case None
if (buffer.isEmpty) currentRequest = Some(promise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(promise)
}
})
private val callback: AsyncCallback[Output[T]] =
getAsyncCallback {
case QueueSink.Pull(pullPromise) currentRequest match {
case Some(_)
pullPromise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
case None
if (buffer.isEmpty) currentRequest = Some(pullPromise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(pullPromise)
}
}
case QueueSink.Cancel completeStage()
}
def sendDownstream(promise: Requested[T]): Unit = {
val e = buffer.dequeue()
@ -366,17 +386,58 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
})
}
(stageLogic, new SinkQueue[T] {
(stageLogic, new SinkQueueWithCancel[T] {
override def pull(): Future[Option[T]] = {
val p = Promise[Option[T]]
stageLogic.invoke(p)
stageLogic.invoke(Pull(p))
p.future
}
override def cancel(): Unit = {
stageLogic.invoke(QueueSink.Cancel)
}
})
}
}
private[akka] final class SinkQueueAdapter[T](delegate: SinkQueue[T]) extends akka.stream.javadsl.SinkQueue[T] {
private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext same }
def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava
def cancel(): Unit = delegate.cancel()
}
/**
* INTERNAL API
*
* Helper class to be able to express collection as a fold using mutable data
*/
private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
lazy val accumulated = collector.supplier().get()
private lazy val accumulator = collector.accumulator()
def update(elem: T): CollectorState[T, R] = {
accumulator.accept(accumulated, elem)
this
}
def finish(): R = collector.finisher().apply(accumulated)
}
/**
* INTERNAL API
*
* Helper class to be able to express reduce as a fold for parallel collector
*/
private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
private var reduced: Any = null.asInstanceOf[Any]
private lazy val combiner = collector.combiner()
def update(batch: Any): ReducerState[T, R] = {
if (reduced == null) reduced = batch
else reduced = combiner(reduced, batch)
this
}
def finish(): R = collector.finisher().apply(reduced)
}

View file

@ -100,6 +100,9 @@ private[stream] object Stages {
val fileSource = name("fileSource") and IODispatcher
val unfoldResourceSource = name("unfoldResourceSource") and IODispatcher
val unfoldResourceSourceAsync = name("unfoldResourceSourceAsync") and IODispatcher
val asJavaStream = name("asJavaStream") and IODispatcher
val javaCollectorParallelUnordered = name("javaCollectorParallelUnordered")
val javaCollector = name("javaCollector")
val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink")
@ -117,7 +120,8 @@ private[stream] object Stages {
val queueSink = name("queueSink")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
val fileSink = name("fileSource") and IODispatcher
val fileSink = name("fileSink") and IODispatcher
val fromJavaStream = name("fromJavaStream")
}
import DefaultAttributes._

View file

@ -63,3 +63,14 @@ trait SinkQueue[T] {
*/
def pull(): CompletionStage[Optional[T]]
}
/**
* This trait adds cancel support to [[SinkQueue]].
*/
trait SinkQueueWithCancel[T] extends SinkQueue[T] {
/**
* Cancel the stream.
*/
def cancel(): Unit
}

View file

@ -12,7 +12,7 @@ import akka.stream.impl.StreamLayout
import akka.stream.{ javadsl, scaladsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.{ Future, ExecutionContext }
import scala.util.Try
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters.FutureOps
@ -229,8 +229,8 @@ object Sink {
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]].
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``.
* Creates a `Sink` that is materialized as an [[akka.stream.javadsl.SinkQueue]].
* [[akka.stream.javadsl.SinkQueue.pull]] method is pulling element from the stream and returns ``CompletionStage[Option[T]]``.
* `CompletionStage` completes when element is available.
*
* Before calling pull method second time you need to wait until previous CompletionStage completes.
@ -240,12 +240,12 @@ object Sink {
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None
* For stream completion you need to pull all elements from [[akka.stream.javadsl.SinkQueue]] including last None
* as completion marker
*
* @see [[akka.stream.SinkQueue]]
* @see [[akka.stream.javadsl.SinkQueueWithCancel]]
*/
def queue[T](): Sink[T, SinkQueue[T]] =
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_)))
}

View file

@ -4,15 +4,17 @@
package akka.stream.javadsl
import java.io.{ InputStream, OutputStream }
import java.util.stream.Collector
import akka.japi.function
import akka.stream.{ scaladsl, javadsl }
import akka.stream.IOResult
import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.CompletionStage
import akka.NotUsed
/**
* Converters for interacting with the blocking `java.io` streams APIs
* Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams
*/
object StreamConverters {
/**
@ -22,7 +24,7 @@ object StreamConverters {
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it.
*
@ -40,7 +42,7 @@ object StreamConverters {
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]]
* will cancel the stream when the [[OutputStream]] is no longer writable.
@ -61,7 +63,7 @@ object StreamConverters {
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
@ -75,7 +77,7 @@ object StreamConverters {
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
@ -91,7 +93,7 @@ object StreamConverters {
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion.
*
@ -106,7 +108,7 @@ object StreamConverters {
* except the last element, which will be up to 8192 in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
@ -122,7 +124,7 @@ object StreamConverters {
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
* will complete this [[Source]].
@ -140,7 +142,7 @@ object StreamConverters {
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
* will complete this [[Source]].
@ -148,4 +150,58 @@ object StreamConverters {
def asOutputStream(): javadsl.Source[ByteString, OutputStream] =
new Source(scaladsl.StreamConverters.asOutputStream())
/**
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
*
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
* ``Stream`` will cancel the inflow of this ``Sink``.
*
* Java 8 ``Stream`` throws exception in case reactive stream failed.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
* configured through the ``akka.stream.blocking-io-dispatcher``.
*/
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream())
/**
* Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
* elements and send them downstream on demand.
*
* Example usage: `Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10))`
*
* You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream
* and the rest of flow.
*/
def fromJavaStream[O, S <: java.util.stream.BaseStream[O, S]](stream: function.Creator[java.util.stream.BaseStream[O, S]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.StreamConverters.fromJavaStream(stream.create))
/**
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector``
* transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
* The Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
* result container, optionally transformed into a final representation after all input elements have been processed.
* The ``Collector`` can also do reduction at the end. Reduction processing is performed sequentially
*
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
* to handle multiple invocations.
*/
def javaCollector[T, R](collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] =
new Sink(scaladsl.StreamConverters.javaCollector[T, R](() collector.create()).toCompletionStage())
/**
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java 8 ``Collector``
* transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
* The ``Collector`` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
* result container, optionally transformed into a final representation after all input elements have been processed.
* ``Collector`` can also do reduction at the end. Reduction processing is performed in parallel based on graph ``Balance``.
*
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
* to handle multiple invocations.
*/
def javaCollectorParallelUnordered[T, R](parallelism: Int)(collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] =
new Sink(scaladsl.StreamConverters.javaCollectorParallelUnordered[T, R](parallelism)(() collector.create()).toCompletionStage())
}

View file

@ -62,3 +62,13 @@ trait SinkQueue[T] {
*/
def pull(): Future[Option[T]]
}
/**
* This trait adds cancel support to [[SinkQueue]].
*/
trait SinkQueueWithCancel[T] extends SinkQueue[T] {
/**
* Cancel the stream. This method returns right away without waiting for actual finalizing stream.
*/
def cancel(): Unit
}

View file

@ -3,6 +3,9 @@
*/
package akka.stream.scaladsl
import java.util.{ Spliterators, Spliterator }
import java.util.stream.StreamSupport
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ Status, ActorRef, Props }
@ -15,7 +18,8 @@ import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
@ -326,8 +330,8 @@ object Sink {
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]].
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueue]].
* [[akka.stream.scaladsl.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* `Future` completes when element is available.
*
* Before calling pull method second time you need to wait until previous Future completes.
@ -337,11 +341,11 @@ object Sink {
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None
* as completion marker
*
* @see [[akka.stream.SinkQueue]]
* @see [[akka.stream.scaladsl.SinkQueueWithCancel]]
*/
def queue[T](): Sink[T, SinkQueue[T]] =
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink())
}

View file

@ -4,17 +4,23 @@
package akka.stream.scaladsl
import java.io.{ OutputStream, InputStream }
import java.util.Spliterators
import java.util.concurrent.atomic.AtomicReference
import java.util.stream.{Collector, StreamSupport}
import akka.stream.IOResult
import akka.stream.{Attributes, SinkShape, IOResult}
import akka.stream.impl._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, OutputStreamSourceStage, InputStreamSource }
import akka.util.ByteString
import scala.concurrent.Future
import scala.concurrent.duration.Duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import akka.NotUsed
/**
* Converters for interacting with the blocking `java.io` streams APIs
* Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams
*/
object StreamConverters {
@ -27,7 +33,7 @@ object StreamConverters {
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
@ -47,7 +53,7 @@ object StreamConverters {
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
* will complete this [[Source]].
@ -64,7 +70,7 @@ object StreamConverters {
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
* If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false.
*
* The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]]
@ -80,7 +86,7 @@ object StreamConverters {
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
@ -90,4 +96,105 @@ object StreamConverters {
def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] =
Sink.fromGraph(new InputStreamSinkStage(readTimeout))
/**
* Creates a sink which materializes into a ``Future`` which will be completed with result of the Java 8 ``Collector`` transformation
* and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. The ``Collector`` will trigger
* demand downstream. Elements emitted through the stream will be accumulated into a mutable result container, optionally transformed
* into a final representation after all input elements have been processed. The ``Collector`` can also do reduction
* at the end. Reduction processing is performed sequentially
*
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
* to handle multiple invocations.
*/
def javaCollector[T, R](collectorFactory: () java.util.stream.Collector[T, _ <: Any, R]): Sink[T, Future[R]] =
Flow[T].fold(()
new CollectorState[T,R](collectorFactory().asInstanceOf[Collector[T, Any, R]])) { (state, elem) () state().update(elem) }
.map(state state().finish())
.toMat(Sink.head)(Keep.right).withAttributes(DefaultAttributes.javaCollector)
/**
* Creates a sink which materializes into a ``Future`` which will be completed with result of the Java 8 ``Collector`` transformation
* and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. The ``Collector`` will trigger demand
* downstream. Elements emitted through the stream will be accumulated into a mutable result container, optionally transformed
* into a final representation after all input elements have been processed. The ``Collector`` can also do reduction
* at the end. Reduction processing is performed in parallel based on graph ``Balance``.
*
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
* to handle multiple invocations.
*/
def javaCollectorParallelUnordered[T, R](parallelism: Int)(collectorFactory: () java.util.stream.Collector[T, _ <: Any, R]): Sink[T, Future[R]] = {
if (parallelism == 1) javaCollector[T, R](collectorFactory)
else {
Sink.fromGraph(GraphDSL.create(Sink.head[R]) { implicit b
sink
import GraphDSL.Implicits._
val collector = collectorFactory().asInstanceOf[Collector[T, Any, R]]
val balance = b.add(Balance[T](parallelism))
val merge = b.add(Merge[() CollectorState[T, R]](parallelism))
for (i 0 until parallelism) {
val worker = Flow[T]
.fold(() => new CollectorState(collector)) { (state, elem) () state().update(elem) }
.async
balance.out(i) ~> worker ~> merge.in(i)
}
merge.out
.fold(() => new ReducerState(collector)) { (state, elem) () state().update(elem().accumulated) }
.map(state => state().finish()) ~> sink.in
SinkShape(balance.in)
}).withAttributes(DefaultAttributes.javaCollectorParallelUnordered)
}
}
/**
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
*
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
* ``Stream`` will cancel the inflow of this ``Sink``.
*
* Java 8 ``Stream`` throws exception in case reactive stream failed.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
* configured through the ``akka.stream.blocking-io-dispatcher``.
*/
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = {
Sink.fromGraph(new QueueSink[T]())
.mapMaterializedValue(queue StreamSupport.stream(
Spliterators.spliteratorUnknownSize(new java.util.Iterator[T] {
var nextElementFuture: Future[Option[T]] = queue.pull()
var nextElement: Option[T] = null
override def hasNext: Boolean = {
nextElement = Await.result(nextElementFuture, Inf)
nextElement.isDefined
}
override def next(): T = {
val next = nextElement.get
nextElementFuture = queue.pull()
next
}
}, 0), false).onClose(new Runnable { def run = queue.cancel() }))
.withAttributes(DefaultAttributes.asJavaStream)
}
/**
* Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
* elements and send them downstream on demand.
*
* Example usage: `Source.fromJavaStream(() ⇒ IntStream.rangeClosed(1, 10))`
*
* You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream``
* and the rest of flow.
*/
def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](stream: () java.util.stream.BaseStream[T, S]): Source[T, NotUsed] = {
import scala.collection.JavaConverters._
Source.fromIterator(() stream().iterator().asScala).withAttributes(DefaultAttributes.fromJavaStream)
}
}