From 6c3985f1c589ef94898b65ee39372f73c94b53d6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 29 Apr 2014 15:47:37 +0200 Subject: [PATCH] +str #15073 Add Flow produceTo consumer * Fixes #15073 * Only sugar on top of toProducer produceTo --- .../scala/akka/stream/impl/FlowImpl.scala | 4 +++ .../scala/akka/stream/scaladsl/Flow.scala | 15 ++++++++-- .../stream/FlowProduceToConsumerSpec.scala | 28 +++++++++++++++++++ 3 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index aefd7fe63b..700dc8f51e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -6,6 +6,7 @@ package akka.stream.impl import scala.collection.immutable import scala.concurrent.{ Future, Promise } import scala.util.Try +import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import Ast.{ AstNode, Recover, Transform } import akka.stream.FlowMaterializer @@ -107,5 +108,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: onComplete = ok ⇒ { if (ok) callback(SuccessUnit); Nil }).consume(materializer) override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops) + + override def produceTo(materializer: FlowMaterializer, consumer: Consumer[O]) = + toProducer(materializer).produceTo(consumer) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9ada03cf4e..3d75d199cd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -8,9 +8,8 @@ import scala.collection.immutable import scala.concurrent.Future import scala.util.Try import scala.util.control.NoStackTrace - +import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer - import akka.stream.FlowMaterializer import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.FlowImpl @@ -189,7 +188,7 @@ trait Flow[+T] { * the current element if the given predicate returns true for it. This means * that for the following series of predicate values, three substreams will * be produced with lengths 1, 2, and 3: - * + * * {{{ * false, // element goes into first substream * true, false, // elements go into second substream @@ -262,5 +261,15 @@ trait Flow[+T] { */ def toProducer(materializer: FlowMaterializer): Producer[T @uncheckedVariance] + /** + * Attaches a consumer to this stream. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def produceTo(materializer: FlowMaterializer, consumer: Consumer[T @uncheckedVariance]): Unit + } diff --git a/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala new file mode 100644 index 0000000000..b3a5127905 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowProduceToConsumerSpec.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +class FlowProduceToConsumerSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings()) + + "A Flow with toProducer" must { + + "produce elements to the consumer" in { + val c = StreamTestKit.consumerProbe[Int] + Flow(List(1, 2, 3)).produceTo(materializer, c) + val s = c.expectSubscription() + s.requestMore(3) + c.expectNext(1) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + } + +} \ No newline at end of file