Merge pull request #15074 from akka/wip-15073-flow-produceTo-patriknw

+str #15073 Add Flow produceTo consumer
This commit is contained in:
Patrik Nordwall 2014-05-07 14:00:25 +02:00
commit 9cf563af3f
3 changed files with 43 additions and 2 deletions

View file

@ -6,6 +6,7 @@ package akka.stream.impl
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import Ast.{ AstNode, Recover, Transform }
import akka.stream.FlowMaterializer
@ -167,5 +168,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
}).consume(materializer)
override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops)
override def produceTo(materializer: FlowMaterializer, consumer: Consumer[O]) =
toProducer(materializer).produceTo(consumer)
}

View file

@ -8,9 +8,8 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NoStackTrace
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import akka.stream.FlowMaterializer
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
import akka.stream.impl.FlowImpl
@ -257,6 +256,16 @@ trait Flow[+T] {
*/
def toProducer(materializer: FlowMaterializer): Producer[T @uncheckedVariance]
/**
* Attaches a consumer to this stream.
*
* *This will materialize the flow and initiate its execution.*
*
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def produceTo(materializer: FlowMaterializer, consumer: Consumer[T @uncheckedVariance]): Unit
}
/**

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
class FlowProduceToConsumerSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings())
"A Flow with toProducer" must {
"produce elements to the consumer" in {
val c = StreamTestKit.consumerProbe[Int]
Flow(List(1, 2, 3)).produceTo(materializer, c)
val s = c.expectSubscription()
s.requestMore(3)
c.expectNext(1)
c.expectNext(2)
c.expectNext(3)
c.expectComplete()
}
}
}