=str #15755 implement Sources

* Implement IteratorSource
* and add tests for IterableSource and IteratorSource
* Implement ThunkSource
* Implement FutureSource
* Implement TickSource
This commit is contained in:
Patrik Nordwall 2014-09-02 14:48:51 +02:00 committed by Roland Kuhn
parent c21a72a5a8
commit 4bee84f149
7 changed files with 770 additions and 45 deletions

View file

@ -17,6 +17,7 @@ import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.impl.ActorPublisher
import akka.stream.impl.IterablePublisher
import akka.stream.impl.IteratorPublisher
import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.ActorProcessor
import akka.stream.impl.ExposedPublisher
@ -25,6 +26,15 @@ import akka.stream.scaladsl2.Sink
import akka.stream.scaladsl2.MaterializedFlow
import akka.stream.scaladsl2.IterableSource
import akka.stream.impl.EmptyPublisher
import akka.stream.scaladsl2.IteratorSource
import akka.stream.scaladsl2.PublisherSource
import akka.stream.scaladsl2.ThunkSource
import akka.stream.impl.SimpleCallbackPublisher
import akka.stream.scaladsl2.FutureSource
import akka.stream.impl.FuturePublisher
import akka.stream.impl.ErrorPublisher
import akka.stream.impl.TickPublisher
import akka.stream.scaladsl2.TickSource
/**
* INTERNAL API
@ -69,6 +79,22 @@ private[akka] case class ActorBasedFlowMaterializer(
// Ops come in reverse order
override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = {
val flowName = createFlowName()
// FIXME specialcasing, otherwise some tests fail in FlowIterableSpec due to the injected identityProcessor:
// - "have value equality of publisher"
// - "produce elements to later subscriber"
def specialCase: PartialFunction[Source[In], Publisher[Out]] = {
case PublisherSource(p) p.asInstanceOf[Publisher[Out]]
case src: IterableSource[In] materializeSource(src, flowName).asInstanceOf[Publisher[Out]]
case src: IteratorSource[In] materializeSource(src, flowName).asInstanceOf[Publisher[Out]]
case src: TickSource[In] materializeSource(src, flowName).asInstanceOf[Publisher[Out]]
}
if (ops.isEmpty && specialCase.isDefinedAt(source)) {
val p = specialCase(source)
val sinkValue = sink.attach(p, this)
new MaterializedFlow(source, None, sink, sinkValue)
} else {
val (s, p) =
if (ops.isEmpty) {
val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
@ -82,6 +108,7 @@ private[akka] case class ActorBasedFlowMaterializer(
val sourceValue = source.attach(s, this, flowName)
val sinkValue = sink.attach(p, this)
new MaterializedFlow(source, sourceValue, sink, sinkValue)
}
}
@ -99,6 +126,35 @@ private[akka] case class ActorBasedFlowMaterializer(
name = s"$flowName-0-iterable"), Some(source.iterable))
}
override def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In] = {
if (source.iterator.isEmpty) EmptyPublisher[In]
else ActorPublisher[In](actorOf(IteratorPublisher.props(source.iterator, settings),
name = s"$flowName-0-iterator"))
}
override def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In] = {
ActorPublisher[In](actorOf(SimpleCallbackPublisher.props(settings, source.f),
name = s"$flowName-0-thunk"))
}
override def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In] = {
source.future.value match {
case Some(Success(element))
ActorPublisher[In](actorOf(IterablePublisher.props(List(element), settings),
name = s"$flowName-0-future"), Some(source.future))
case Some(Failure(t))
ErrorPublisher(t).asInstanceOf[Publisher[In]]
case None
ActorPublisher[In](actorOf(FuturePublisher.props(source.future, settings),
name = s"$flowName-0-future"), Some(source.future))
}
}
override def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In] = {
ActorPublisher[In](actorOf(TickPublisher.props(source.initialDelay, source.interval, source.tick, settings),
name = s"$flowName-0-tick"))
}
private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl)

View file

@ -16,6 +16,7 @@ import akka.stream.impl.EmptyPublisher
import akka.stream.impl.IterablePublisher
import akka.stream.impl2.ActorBasedFlowMaterializer
import org.reactivestreams._
import scala.concurrent.duration.FiniteDuration
sealed trait Flow
@ -27,15 +28,63 @@ object FlowFrom {
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil)
/**
* Helper to create `Flow` with [[Source]] from `Iterable`.
* Example usage: `FlowFrom(Seq(1,2,3))`
* Helper to create `Flow` with [[Source]] from `Publisher`.
*
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def apply[T](i: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(i))
def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher))
/**
* Helper to create `Flow` with [[Source]] from `Publisher`.
* Helper to create `Flow` with [[Source]] from `Iterator`.
* Example usage: `FlowFrom(Seq(1,2,3).iterator)`
*
* Start a new `Flow` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
*/
def apply[T](p: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(p))
def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator))
/**
* Helper to create `Flow` with [[Source]] from `Iterable`.
* Example usage: `FlowFrom(Seq(1,2,3))`
*
* Starts a new `Flow` from the given `Iterable`. This is like starting from an
* Iterator, but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable))
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the `Callable` returns a `None`.
* The stream ends exceptionally when an exception is thrown from the `Callable`.
*/
def apply[T](f: () Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f))
/**
* Start a new `Flow` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future))
/**
* Elements are produced from the tick closure periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): FlowWithSource[T, T] =
FlowFrom[T].withSource(TickSource(initialDelay, interval, tick))
}
trait Source[+In] {
@ -57,8 +106,7 @@ final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): Subscriber[In] =
flowSubscriber
def subscriber(m: MaterializedSource): Subscriber[In] =
m.getSourceFor(this)
def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this)
}
/**
@ -67,7 +115,18 @@ final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] {
final case class PublisherSource[In](p: Publisher[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
p.subscribe(flowSubscriber)
p
None
}
}
/**
* [[Source]] from `Iterator`
*/
final case class IteratorSource[In](iterator: Iterator[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
@ -78,15 +137,41 @@ final case class IterableSource[In](iterable: immutable.Iterable[In]) extends So
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
iterable
None
}
}
/**
* [[Source]] from closure
*/
final case class ThunkSource[In](f: () Option[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
/**
* [[Source]] from closure
*/
final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () In) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
/**
* [[Source]] from `Future`
*/
final case class FutureSource[In](f: Future[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = ???
final case class FutureSource[In](future: Future[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
trait Sink[-Out] {

View file

@ -3,19 +3,17 @@
*/
package akka.stream.scaladsl2
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import org.reactivestreams.Publisher
import akka.actor.ActorContext
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.stream.MaterializerSettings
import akka.stream.impl2.ActorBasedFlowMaterializer
import akka.stream.impl2.Ast
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.duration._
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.ActorContext
import akka.stream.impl2.StreamSupervisor
import akka.stream.impl2.FlowNameCounter
import akka.stream.MaterializerSettings
import org.reactivestreams.Processor
import akka.stream.impl2.StreamSupervisor
object FlowMaterializer {
@ -25,26 +23,68 @@ object FlowMaterializer {
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create one actor that in turn creates actors for the transformation steps.
*
* The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the
* configuration of the `context`'s underlying [[akka.actor.ActorSystem]].
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = context match {
case s: ExtendedActorSystem s
case c: ActorContext c.system
case null throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " +
"got [${_contex.getClass.getName}]")
def apply(materializerSettings: Option[MaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = actorSystemOf(context)
val settings = materializerSettings getOrElse MaterializerSettings(system)
apply(settings, namePrefix.getOrElse("flow"))(context)
}
/**
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(materializerSettings: MaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = actorSystemOf(context)
new ActorBasedFlowMaterializer(
settings,
context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)),
materializerSettings,
context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)),
FlowNameCounter(system).counter,
namePrefix.getOrElse("flow"))
namePrefix)
}
/**
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(materializerSettings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer =
apply(Some(materializerSettings), None)
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create(context: ActorRefFactory): FlowMaterializer =
apply()(context)
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
@ -52,7 +92,33 @@ object FlowMaterializer {
* will be used to create one actor that in turn creates actors for the transformation steps.
*/
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
apply(settings)(context)
apply(Option(settings), None)(context)
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create(settings: MaterializerSettings, context: ActorRefFactory, namePrefix: String): FlowMaterializer =
apply(Option(settings), Option(namePrefix))(context)
private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
val system = context match {
case s: ExtendedActorSystem s
case c: ActorContext c.system
case null throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _
throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]")
}
system
}
}
/**
@ -78,5 +144,13 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) {
def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In]
}

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._
import akka.stream.MaterializerSettings
import scala.util.control.NoStackTrace
class FlowFromFutureSpec extends AkkaSpec {
val settings = MaterializerSettings(system)
implicit val materializer = FlowMaterializer(settings)
"A Flow based on a Future" must {
"produce one element from already successful Future" in {
val p = FlowFrom(Future.successful(1)).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
c.expectNoMsg(100.millis)
sub.request(1)
c.expectNext(1)
c.expectComplete()
}
"produce error from already failed Future" in {
val ex = new RuntimeException("test") with NoStackTrace
val p = FlowFrom(Future.failed[Int](ex)).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectError(ex)
}
"produce one element when Future is completed" in {
val promise = Promise[Int]()
val p = FlowFrom(promise.future).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectNoMsg(100.millis)
promise.success(1)
c.expectNext(1)
c.expectComplete()
c.expectNoMsg(100.millis)
}
"produce one element when Future is completed but not before request" in {
val promise = Promise[Int]()
val p = FlowFrom(promise.future).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
promise.success(1)
c.expectNoMsg(200.millis)
sub.request(1)
c.expectNext(1)
c.expectComplete()
}
"produce elements with multiple subscribers" in {
val promise = Promise[Int]()
val p = FlowFrom(promise.future).toPublisher()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
p.subscribe(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
promise.success(1)
sub2.request(2)
c1.expectNext(1)
c2.expectNext(1)
c1.expectComplete()
c2.expectComplete()
}
"produce elements to later subscriber" in {
val promise = Promise[Int]()
val p = FlowFrom(promise.future).toPublisher()
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(keepAlive)
p.subscribe(c1)
val sub1 = c1.expectSubscription()
sub1.request(1)
promise.success(1)
c1.expectNext(1)
c1.expectComplete()
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(1)
c2.expectNext(1)
c2.expectComplete()
}
"allow cancel before receiving element" in {
val promise = Promise[Int]()
val p = FlowFrom(promise.future).toPublisher()
val keepAlive = StreamTestKit.SubscriberProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(keepAlive)
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
sub.cancel()
c.expectNoMsg(500.millis)
promise.success(1)
c.expectNoMsg(200.millis)
}
}
}

View file

@ -0,0 +1,154 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.stream.testkit.StreamTestKit.{ OnComplete, OnError, OnNext }
import scala.concurrent.duration._
import akka.stream.MaterializerSettings
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowIterableSpec extends AkkaSpec {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 512)
implicit val materializer = FlowMaterializer(settings)
"A Flow based on an iterable" must {
"produce elements" in {
val p = FlowFrom(List(1, 2, 3)).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectNext(1)
c.expectNoMsg(100.millis)
sub.request(2)
c.expectNext(2)
c.expectNext(3)
c.expectComplete()
}
"complete empty" in {
val p = FlowFrom(List.empty[Int]).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
c.expectNoMsg(100.millis)
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectComplete()
}
"produce elements with multiple subscribers" in {
val p = FlowFrom(List(1, 2, 3)).toPublisher()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
p.subscribe(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
sub2.request(2)
c1.expectNext(1)
c2.expectNext(1)
c2.expectNext(2)
c1.expectNoMsg(100.millis)
c2.expectNoMsg(100.millis)
sub1.request(2)
sub2.request(2)
c1.expectNext(2)
c1.expectNext(3)
c2.expectNext(3)
c1.expectComplete()
c2.expectComplete()
}
"produce elements to later subscriber" in {
val p = FlowFrom(List(1, 2, 3)).toPublisher()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
val sub1 = c1.expectSubscription()
sub1.request(1)
c1.expectNext(1)
c1.expectNoMsg(100.millis)
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(2)
// starting from first element, new iterator per subscriber
c2.expectNext(1)
c2.expectNext(2)
c2.expectNoMsg(100.millis)
sub2.request(1)
c2.expectNext(3)
c2.expectComplete()
sub1.request(2)
c1.expectNext(2)
c1.expectNext(3)
c1.expectComplete()
}
"produce elements with one transformation step" in {
val p = FlowFrom(List(1, 2, 3)).map(_ * 2).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(10)
c.expectNext(2)
c.expectNext(4)
c.expectNext(6)
c.expectComplete()
}
"produce elements with two transformation steps" ignore {
// val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher()
// val c = StreamTestKit.SubscriberProbe[Int]()
// p.subscribe(c)
// val sub = c.expectSubscription()
// sub.request(10)
// c.expectNext(4)
// c.expectNext(8)
// c.expectComplete()
}
"allow cancel before receiving all elements" in {
val count = 100000
val p = FlowFrom(1 to count).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
"have value equality of publisher" in {
val p1 = FlowFrom(List(1, 2, 3)).toPublisher()
val p2 = FlowFrom(List(1, 2, 3)).toPublisher()
p1 should be(p2)
p2 should be(p1)
val p3 = FlowFrom(List(1, 2, 3, 4)).toPublisher()
p1 should not be (p3)
p3 should not be (p1)
val p4 = FlowFrom(Vector.empty[String]).toPublisher()
val p5 = FlowFrom(Set.empty[String]).toPublisher()
p1 should not be (p4)
p4 should be(p5)
p5 should be(p4)
val p6 = FlowFrom(List(1, 2, 3).iterator).toPublisher()
p1 should not be (p6)
p6 should not be (p1)
}
}
}

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.OnComplete
import akka.stream.testkit.StreamTestKit.OnError
import akka.stream.MaterializerSettings
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowIteratorSpec extends AkkaSpec {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
.withFanOutBuffer(initialSize = 4, maxSize = 4)
implicit val materializer = FlowMaterializer(settings)
"A Flow based on an iterator" must {
"produce elements" in {
val p = FlowFrom(List(1, 2, 3).iterator).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectNext(1)
c.expectNoMsg(100.millis)
sub.request(3)
c.expectNext(2)
c.expectNext(3)
c.expectComplete()
}
"complete empty" in {
val p = FlowFrom(List.empty[Int].iterator).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
c.expectNoMsg(100.millis)
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectComplete()
}
"produce elements with multiple subscribers" in {
val p = FlowFrom(List(1, 2, 3).iterator).toPublisher()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
p.subscribe(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
sub2.request(2)
c1.expectNext(1)
c2.expectNext(1)
c2.expectNext(2)
c1.expectNoMsg(100.millis)
c2.expectNoMsg(100.millis)
sub1.request(2)
sub2.request(2)
c1.expectNext(2)
c1.expectNext(3)
c2.expectNext(3)
c1.expectComplete()
c2.expectComplete()
}
"produce elements to later subscriber" in {
val p = FlowFrom(List(1, 2, 3).iterator).toPublisher()
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
val sub1 = c1.expectSubscription()
sub1.request(1)
c1.expectNext(1)
c1.expectNoMsg(100.millis)
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(3)
// element 1 is already gone
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
sub1.request(3)
c1.expectNext(2)
c1.expectNext(3)
c1.expectComplete()
}
"produce elements with one transformation step" in {
val p = FlowFrom(List(1, 2, 3).iterator).map(_ * 2).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(10)
c.expectNext(2)
c.expectNext(4)
c.expectNext(6)
c.expectComplete()
}
// FIXME enable test when filter is implemented
"produce elements with two transformation steps" ignore {
// val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher()
// val c = StreamTestKit.SubscriberProbe[Int]()
// p.subscribe(c)
// val sub = c.expectSubscription()
// sub.request(10)
// c.expectNext(4)
// c.expectNext(8)
// c.expectComplete()
}
"allow cancel before receiving all elements" in {
val count = 100000
val p = FlowFrom((1 to count).iterator).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -0,0 +1,98 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.concurrent.duration._
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import scala.util.control.NoStackTrace
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TickPublisherSpec extends AkkaSpec {
implicit val materializer = FlowMaterializer()
"A Flow based on tick publisher" must {
"produce ticks" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
FlowFrom(1.second, 500.millis, () "tick-" + tickGen.next()).publishTo(c)
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
c.expectNext("tick-1")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNoMsg(200.millis)
c.expectNext("tick-3")
sub.cancel()
c.expectNoMsg(200.millis)
}
"drop ticks when not requested" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
FlowFrom(1.second, 1.second, () "tick-" + tickGen.next()).publishTo(c)
val sub = c.expectSubscription()
sub.request(2)
c.expectNext("tick-1")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNoMsg(1400.millis)
sub.request(2)
c.expectNext("tick-4")
c.expectNoMsg(200.millis)
c.expectNext("tick-5")
sub.cancel()
c.expectNoMsg(200.millis)
}
"produce ticks with multiple subscribers" in {
val tickGen = Iterator from 1
val p = FlowFrom(1.second, 1.second, () "tick-" + tickGen.next()).toPublisher()
val c1 = StreamTestKit.SubscriberProbe[String]()
val c2 = StreamTestKit.SubscriberProbe[String]()
p.subscribe(c1)
p.subscribe(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
sub2.request(2)
c1.expectNext("tick-1")
c2.expectNext("tick-1")
c2.expectNoMsg(200.millis)
c2.expectNext("tick-2")
c1.expectNoMsg(200.millis)
sub1.request(2)
sub2.request(2)
c1.expectNext("tick-3")
c2.expectNext("tick-3")
sub1.cancel()
sub2.cancel()
}
"signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]()
FlowFrom(1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).publishTo(c)
val sub = c.expectSubscription()
sub.request(3)
c.expectError.getMessage should be("tick err")
}
// FIXME enable this test again when zip is back
"be usable with zip for a simple form of rate limiting" ignore {
// val c = StreamTestKit.SubscriberProbe[Int]()
// val rate = FlowFrom(1.second, 1.second, () "tick").toPublisher()
// FlowFrom(1 to 100).zip(rate).map { case (n, _) n }.publishTo(c)
// val sub = c.expectSubscription()
// sub.request(1000)
// c.expectNext(1)
// c.expectNoMsg(200.millis)
// c.expectNext(2)
// c.expectNoMsg(200.millis)
// sub.cancel()
}
}
}