diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index b5f5e8e52b..5d4e3d0a05 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -6,6 +6,7 @@ package akka.stream.impl import scala.collection.immutable import scala.concurrent.{ Future, Promise } import scala.util.Try +import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import Ast.{ AstNode, Recover, Transform } import akka.stream.FlowMaterializer @@ -167,5 +168,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: }).consume(materializer) override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops) + + override def produceTo(materializer: FlowMaterializer, consumer: Consumer[O]) = + toProducer(materializer).produceTo(consumer) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2019a52d88..c2e4b8688c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -8,9 +8,8 @@ import scala.collection.immutable import scala.concurrent.Future import scala.util.Try import scala.util.control.NoStackTrace - +import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer - import akka.stream.FlowMaterializer import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.FlowImpl @@ -257,6 +256,16 @@ trait Flow[+T] { */ def toProducer(materializer: FlowMaterializer): Producer[T @uncheckedVariance] + /** + * Attaches a consumer to this stream. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def produceTo(materializer: FlowMaterializer, consumer: Consumer[T @uncheckedVariance]): Unit + } /** diff --git a/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala new file mode 100644 index 0000000000..b3a5127905 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +class FlowProduceToConsumerSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings()) + + "A Flow with toProducer" must { + + "produce elements to the consumer" in { + val c = StreamTestKit.consumerProbe[Int] + Flow(List(1, 2, 3)).produceTo(materializer, c) + val s = c.expectSubscription() + s.requestMore(3) + c.expectNext(1) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + } + +} \ No newline at end of file