diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index e925603dbc..8b07ce5dea 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -13,6 +13,13 @@ import akka.pattern.ask import akka.stream.{ MaterializerSettings, Transformer } import akka.stream.impl.{ ActorProcessor, ActorPublisher, ExposedPublisher, TransformProcessorImpl } import akka.stream.scaladsl2._ +import akka.stream.TimerTransformer +import akka.stream.impl.TimerTransformerProcessorsImpl +import akka.stream.OverflowStrategy +import akka.stream.impl.ConflateImpl +import akka.stream.impl.ExpandImpl +import akka.stream.impl.BufferImpl +import akka.stream.impl.FanoutProcessorImpl /** * INTERNAL API @@ -24,6 +31,8 @@ private[akka] object Ast { case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode + case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode + case class GroupBy(f: Any ⇒ Any) extends AstNode { override def name = "groupBy" } @@ -40,6 +49,18 @@ private[akka] object Ast { override def name = "concatFlatten" } + case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends AstNode { + override def name = "conflate" + } + + case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends AstNode { + override def name = "expand" + } + + case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode { + override def name = "buffer" + } + } /** @@ -196,11 +217,14 @@ private[akka] object ActorProcessorFactory { val settings = materializer.settings (op match { case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) + case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) case ConcatAll ⇒ Props(new ConcatAllImpl(materializer)) - + case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate)) + case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate)) + case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) }).withDispatcher(settings.dispatcher) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index cab2f2c6b4..ab9000620e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -3,12 +3,18 @@ */ package akka.stream.scaladsl2 +import scala.collection.immutable import scala.collection.immutable import akka.stream.impl2.Ast._ import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import scala.language.higherKinds import akka.stream.Transformer +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.Duration +import akka.util.Collections.EmptyImmutableSeq +import akka.stream.TimerTransformer +import akka.stream.OverflowStrategy /** * This is the interface from which all concrete Flows inherit. No generic @@ -17,10 +23,26 @@ import akka.stream.Transformer */ sealed trait Flow +object FlowOps { + private case object TakeWithinTimerKey + private case object DropWithinTimerKey + private case object GroupedWithinTimerKey + + private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + override def onNext(elem: Any) = Nil + override def isComplete = true + } + + private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + override def onNext(elem: Any) = List(elem) + } +} + /** * Operations offered by flows with a free output side: the DSL flows left-to-right only. */ trait FlowOps[-In, +Out] { + import FlowOps._ type Repr[-I, +O] <: FlowOps[I, O] // Storing ops in reverse order @@ -53,6 +75,199 @@ trait FlowOps[-In, +Out] { override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil }) + /** + * Chunk up this stream into groups of the given size, with the last group + * possibly smaller than requested due to end-of-stream. + * + * `n` must be positive, otherwise IllegalArgumentException is thrown. + */ + def grouped(n: Int): Repr[In, immutable.Seq[Out]] = { + require(n > 0, "n must be greater than 0") + transform("grouped", () ⇒ new Transformer[Out, immutable.Seq[Out]] { + var buf: Vector[Out] = Vector.empty + override def onNext(in: Out) = { + buf :+= in + if (buf.size == n) { + val group = buf + buf = Vector.empty + List(group) + } else + Nil + } + override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) + }) + } + + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * `n` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] = { + require(n > 0, "n must be greater than 0") + require(d > Duration.Zero) + timerTransform("groupedWithin", () ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { + schedulePeriodically(GroupedWithinTimerKey, d) + var buf: Vector[Out] = Vector.empty + + override def onNext(in: Out) = { + buf :+= in + if (buf.size == n) { + // start new time window + schedulePeriodically(GroupedWithinTimerKey, d) + emitGroup() + } else Nil + } + override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) + override def onTimer(timerKey: Any) = emitGroup() + private def emitGroup(): immutable.Seq[immutable.Seq[Out]] = + if (buf.isEmpty) EmptyImmutableSeq + else { + val group = buf + buf = Vector.empty + List(group) + } + }) + } + + /** + * Discard the given number of elements at the beginning of the stream. + * No elements will be dropped if `n` is zero or negative. + */ + def drop(n: Int): Repr[In, Out] = + transform("drop", () ⇒ new Transformer[Out, Out] { + var delegate: Transformer[Out, Out] = + if (n <= 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] + else new Transformer[Out, Out] { + var c = n + override def onNext(in: Out) = { + c -= 1 + if (c == 0) + delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + } + + override def onNext(in: Out) = delegate.onNext(in) + }) + + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): Repr[In, Out] = + timerTransform("dropWithin", () ⇒ new TimerTransformer[Out, Out] { + scheduleOnce(DropWithinTimerKey, d) + + var delegate: Transformer[Out, Out] = + new Transformer[Out, Out] { + override def onNext(in: Out) = Nil + } + + override def onNext(in: Out) = delegate.onNext(in) + override def onTimer(timerKey: Any) = { + delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + }) + + /** + * Terminate processing (and cancel the upstream publisher) after the given + * number of elements. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * The stream will be completed without producing any elements if `n` is zero + * or negative. + */ + def take(n: Int): Repr[In, Out] = + transform("take", () ⇒ new Transformer[Out, Out] { + var delegate: Transformer[Out, Out] = + if (n <= 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + else new Transformer[Out, Out] { + var c = n + override def onNext(in: Out) = { + c -= 1 + if (c == 0) + delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + List(in) + } + } + + override def onNext(in: Out) = delegate.onNext(in) + override def isComplete = delegate.isComplete + }) + + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): Repr[In, Out] = + timerTransform("takeWithin", () ⇒ new TimerTransformer[Out, Out] { + scheduleOnce(TakeWithinTimerKey, d) + + var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] + + override def onNext(in: Out) = delegate.onNext(in) + override def isComplete = delegate.isComplete + override def onTimer(timerKey: Any) = { + delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + }) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] = + andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) + + /** + * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the subscriber until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * subscriber. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Repr[In, U] = + andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)])) + + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] = { + require(size > 0, s"Buffer size must be larger than zero but was [$size]") + andThen(Buffer(size, overflowStrategy)) + } + /** * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] * function is invoked, expecting a (possibly empty) sequence of output elements @@ -125,6 +340,33 @@ trait FlowOps[-In, +Out] { case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]") } + + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Repr[In, U] = + andThen(TimerTransform(name, mkTransformer.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 5c2203c188..77ce7a241f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl2 import akka.actor.Props +import scala.collection.immutable import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success, Try } @@ -237,3 +238,31 @@ final case class ForeachSink[Out](f: Out ⇒ Unit) extends SinkWithKey[Out, Futu def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this) } +/** + * Invoke the given function for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. The sink holds a + * [[scala.concurrent.Future]] that will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is an error is signaled in the stream. + */ +final case class FoldSink[U, Out](zero: U)(f: (U, Out) ⇒ U) extends SinkWithKey[Out, Future[U]] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[U] = { + val promise = Promise[U]() + + FlowFrom(flowPublisher).transform("fold", () ⇒ new Transformer[Out, U] { + var state: U = zero + override def onNext(in: Out): immutable.Seq[U] = { state = f(state, in); Nil } + override def onTermination(e: Option[Throwable]) = { + e match { + case None ⇒ promise.success(state) + case Some(e) ⇒ promise.failure(e) + } + Nil + } + }).consume()(materializer.withNamePrefix(flowName)) + + promise.future + } + def future(m: MaterializedSink): Future[U] = m.getSinkFor(this) +} + diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala new file mode 100644 index 0000000000..b9d0df4cfe --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowBufferSpec.scala @@ -0,0 +1,190 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings +import akka.stream.OverflowStrategy + +class FlowBufferSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + .withFanOutBuffer(initialSize = 1, maxSize = 1) + + implicit val materializer = FlowMaterializer(settings) + + "Buffer" must { + + "pass elements through normally in backpressured mode" in { + val futureSink = FutureSink[Seq[Int]] + val mf = FlowFrom((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). + withSink(futureSink).run() + val future = futureSink.future(mf) + Await.result(future, 3.seconds) should be(1 to 1000) + } + + "pass elements through normally in backpressured mode with buffer size one" in { + val futureSink = FutureSink[Seq[Int]] + val mf = FlowFrom((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). + withSink(futureSink).run() + val future = futureSink.future(mf) + Await.result(future, 3.seconds) should be(1 to 1000) + } + + "pass elements through a chain of backpressured buffers of different size" in { + val futureSink = FutureSink[Seq[Int]] + val mf = FlowFrom((1 to 1000).iterator) + .buffer(1, overflowStrategy = OverflowStrategy.backpressure) + .buffer(10, overflowStrategy = OverflowStrategy.backpressure) + .buffer(256, overflowStrategy = OverflowStrategy.backpressure) + .buffer(1, overflowStrategy = OverflowStrategy.backpressure) + .buffer(5, overflowStrategy = OverflowStrategy.backpressure) + .buffer(128, overflowStrategy = OverflowStrategy.backpressure) + .grouped(1001) + .withSink(futureSink).run() + val future = futureSink.future(mf) + Await.result(future, 3.seconds) should be(1 to 1000) + } + + "accept elements that fit in the buffer while downstream is silent" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).buffer(100, overflowStrategy = OverflowStrategy.backpressure).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + // Fill up buffer + for (i ← 1 to 100) autoPublisher.sendNext(i) + + // drain + for (i ← 1 to 100) { + sub.request(1) + subscriber.expectNext(i) + } + sub.cancel() + } + + "drop head elements if buffer is full and configured so" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropHead).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + // Fill up buffer + for (i ← 1 to 200) autoPublisher.sendNext(i) + + // drain + for (i ← 101 to 200) { + sub.request(1) + subscriber.expectNext(i) + } + + sub.request(1) + subscriber.expectNoMsg(1.seconds) + + autoPublisher.sendNext(-1) + sub.request(1) + subscriber.expectNext(-1) + + sub.cancel() + } + + "drop tail elements if buffer is full and configured so" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropTail).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + // Fill up buffer + for (i ← 1 to 200) autoPublisher.sendNext(i) + + // drain + for (i ← 1 to 99) { + sub.request(1) + subscriber.expectNext(i) + } + + sub.request(1) + subscriber.expectNext(200) + + sub.request(1) + subscriber.expectNoMsg(1.seconds) + + autoPublisher.sendNext(-1) + sub.request(1) + subscriber.expectNext(-1) + + sub.cancel() + } + + "drop all elements if buffer is full and configured so" in { + val publisher = StreamTestKit.PublisherProbe[Int] + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + // Fill up buffer + for (i ← 1 to 150) autoPublisher.sendNext(i) + + // drain + for (i ← 101 to 150) { + sub.request(1) + subscriber.expectNext(i) + } + + sub.request(1) + subscriber.expectNoMsg(1.seconds) + + autoPublisher.sendNext(-1) + sub.request(1) + subscriber.expectNext(-1) + + sub.cancel() + } + + for (strategy ← List(OverflowStrategy.dropHead, OverflowStrategy.dropTail, OverflowStrategy.dropBuffer)) { + + s"work with $strategy if buffer size of one" in { + + val publisher = StreamTestKit.PublisherProbe[Int] + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).buffer(1, overflowStrategy = strategy).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + // Fill up buffer + for (i ← 1 to 200) autoPublisher.sendNext(i) + + sub.request(1) + subscriber.expectNext(200) + + sub.request(1) + subscriber.expectNoMsg(1.seconds) + + autoPublisher.sendNext(-1) + sub.request(1) + subscriber.expectNext(-1) + + sub.cancel() + } + } + + } +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala new file mode 100644 index 0000000000..4750813e84 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings + +class FlowConflateSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) + + "Conflate" must { + + "pass-through elements unchanged when there is no rate difference" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + for (i ← 1 to 100) { + sub.request(1) + autoPublisher.sendNext(i) + subscriber.expectNext(i) + } + + sub.cancel() + } + + "conflate elements while downstream is silent" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + for (i ← 1 to 100) { + autoPublisher.sendNext(i) + } + subscriber.expectNoMsg(1.second) + sub.request(1) + subscriber.expectNext(5050) + sub.cancel() + } + + "work on a variable rate chain" in { + val foldSink = FoldSink[Int, Int](0)(_ + _) + val mf = FlowFrom((1 to 1000).iterator) + .conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .withSink(foldSink) + .run() + val future = foldSink.future(mf) + Await.result(future, 10.seconds) should be(500500) + } + + "backpressure subscriber when upstream is slower" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + sub.request(1) + autoPublisher.sendNext(1) + subscriber.expectNext(1) + + sub.request(1) + subscriber.expectNoMsg(1.second) + autoPublisher.sendNext(2) + subscriber.expectNext(2) + + autoPublisher.sendNext(3) + autoPublisher.sendNext(4) + sub.request(1) + subscriber.expectNext(7) + + sub.request(1) + subscriber.expectNoMsg(1.second) + sub.cancel() + + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala new file mode 100644 index 0000000000..ed7e2a6d7e --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit2.ScriptedTest +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import akka.stream.testkit.StreamTestKit +import akka.stream.MaterializerSettings + +class FlowDropSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + "A Drop" must { + + "drop" in { + def script(d: Int) = Script((1 to 50) map { n ⇒ Seq(n) -> (if (n <= d) Nil else Seq(n)) }: _*) + (1 to 50) foreach { _ ⇒ + val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50) + runScript(script(d), settings)(_.drop(d)) + } + } + + "not drop anything for negative n" in { + val probe = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(List(1, 2, 3)).drop(-1).publishTo(probe) + probe.expectSubscription().request(10) + probe.expectNext(1) + probe.expectNext(2) + probe.expectNext(3) + probe.expectComplete() + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala new file mode 100644 index 0000000000..5ea6bef67d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowDropWithinSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "A DropWithin" must { + + "deliver elements after the duration, but not before" in { + val input = Iterator.from(1) + val p = StreamTestKit.PublisherProbe[Int]() + val c = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(p).dropWithin(1.second).publishTo(c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.request(100) + val demand1 = pSub.expectRequest + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequest + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequest + c.expectNoMsg(1500.millis) + (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + ((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)) foreach { n ⇒ c.expectNext(n) } + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(200.millis) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala new file mode 100644 index 0000000000..b1199c4b00 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings + +class FlowExpandSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = FlowMaterializer(settings) + + "Expand" must { + + "pass-through elements unchanged when there is no rate difference" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + // Simply repeat the last element as an extrapolation step + FlowFrom(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + for (i ← 1 to 100) { + // Order is important here: If the request comes first it will be extrapolated! + autoPublisher.sendNext(i) + sub.request(1) + subscriber.expectNext(i) + } + + sub.cancel() + } + + "expand elements while upstream is silent" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + // Simply repeat the last element as an extrapolation step + FlowFrom(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + autoPublisher.sendNext(42) + + for (i ← 1 to 100) { + sub.request(1) + subscriber.expectNext(42) + } + + autoPublisher.sendNext(-42) + sub.request(1) + subscriber.expectNext(-42) + + sub.cancel() + } + + "work on a variable rate chain" in { + val foldSink = FoldSink[Set[Int], Int](Set.empty[Int])(_ + _) + val mf = FlowFrom((1 to 100).iterator) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)) + .withSink(foldSink) + .run() + val future = foldSink.future(mf) + + Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100)) + } + + "backpressure publisher when subscriber is slower" in { + val publisher = StreamTestKit.PublisherProbe[Int]() + val subscriber = StreamTestKit.SubscriberProbe[Int]() + + FlowFrom(publisher).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).publishTo(subscriber) + + val autoPublisher = new StreamTestKit.AutoPublisher(publisher) + val sub = subscriber.expectSubscription() + + autoPublisher.sendNext(1) + sub.request(1) + subscriber.expectNext(1) + sub.request(1) + subscriber.expectNext(1) + + var pending = autoPublisher.pendingRequests + // Deplete pending requests coming from input buffer + while (pending > 0) { + autoPublisher.subscription.sendNext(2) + pending -= 1 + } + + // The above sends are absorbed in the input buffer, and will result in two one-sized batch requests + pending += autoPublisher.subscription.expectRequest() + pending += autoPublisher.subscription.expectRequest() + while (pending > 0) { + autoPublisher.subscription.sendNext(2) + pending -= 1 + } + + publisher.expectNoMsg(1.second) + + sub.request(2) + subscriber.expectNext(2) + subscriber.expectNext(2) + + // Now production is resumed + autoPublisher.subscription.expectRequest() + + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala new file mode 100644 index 0000000000..f8f5e9532d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec + +class FlowFoldSpec extends AkkaSpec { + implicit val mat = FlowMaterializer() + import system.dispatcher + + "A Fold" must { + + "fold" in { + val input = 1 to 100 + val foldSink = FoldSink[Int, Int](0)(_ + _) + val mf = FlowFrom(input).withSink(foldSink).run() + val future = foldSink.future(mf) + val expected = input.fold(0)(_ + _) + Await.result(future, 5.seconds) should be(expected) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedSpec.scala new file mode 100644 index 0000000000..2b9d715820 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.collection.immutable +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit2.ScriptedTest +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import akka.stream.MaterializerSettings + +class FlowGroupedSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + "A Grouped" must { + + "group evenly" in { + def script = Script((1 to 20) map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.grouped(3))) + } + + "group with rest" in { + def script = Script(((1 to 20).map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) } + :+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.grouped(3))) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala new file mode 100644 index 0000000000..f9f76e72f2 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit2.ScriptedTest +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + + implicit val materializer = FlowMaterializer() + + "A GroupedWithin" must { + + "group elements within the duration" in { + val input = Iterator.from(1) + val p = StreamTestKit.PublisherProbe[Int]() + val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() + FlowFrom(p).groupedWithin(1000, 1.second).publishTo(c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.request(100) + val demand1 = pSub.expectRequest + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequest + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequest + c.expectNext((1 to (demand1 + demand2)).toVector) + (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNoMsg(300.millis) + c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector) + c.expectNoMsg(300.millis) + pSub.expectRequest + val last = input.next() + pSub.sendNext(last) + pSub.sendComplete() + c.expectNext(List(last)) + c.expectComplete + c.expectNoMsg(200.millis) + } + + "deliver bufferd elements onComplete before the timeout" in { + val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() + FlowFrom(1 to 3).groupedWithin(1000, 10.second).publishTo(c) + val cSub = c.expectSubscription + cSub.request(100) + c.expectNext((1 to 3).toList) + c.expectComplete + c.expectNoMsg(200.millis) + } + + "buffer groups until requested from downstream" in { + val input = Iterator.from(1) + val p = StreamTestKit.PublisherProbe[Int]() + val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() + FlowFrom(p).groupedWithin(1000, 1.second).publishTo(c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.request(1) + val demand1 = pSub.expectRequest + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNext((1 to demand1).toVector) + val demand2 = pSub.expectRequest + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNoMsg(300.millis) + cSub.request(1) + c.expectNext(((demand1 + 1) to (demand1 + demand2)).toVector) + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(100.millis) + } + + "drop empty groups" in { + val p = StreamTestKit.PublisherProbe[Int]() + val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() + FlowFrom(p).groupedWithin(1000, 500.millis).publishTo(c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.request(2) + pSub.expectRequest + c.expectNoMsg(600.millis) + pSub.sendNext(1) + pSub.sendNext(2) + c.expectNext(List(1, 2)) + // nothing more requested + c.expectNoMsg(1100.millis) + cSub.request(3) + c.expectNoMsg(600.millis) + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(100.millis) + } + + "reset time window when max elements reached" in { + val input = Iterator.from(1) + val p = StreamTestKit.PublisherProbe[Int]() + val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() + FlowFrom(p).groupedWithin(3, 2.second).publishTo(c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.request(4) + val demand1 = pSub.expectRequest + demand1 should be(4) + c.expectNoMsg(1000.millis) + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.probe.within(1000.millis) { + c.expectNext((1 to 3).toVector) + } + c.expectNoMsg(1500.millis) + c.probe.within(1000.millis) { + c.expectNext(List(4)) + } + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(100.millis) + } + + "group evenly" in { + def script = Script((1 to 20) map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) + } + + "group with rest" in { + def script = Script(((1 to 20).map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) } + :+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala index 7528d40cc7..9dc933fc93 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala @@ -3,7 +3,9 @@ */ package akka.stream.scaladsl2 -import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit2.ScriptedTest import akka.testkit.TestProbe import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala index 938afc9c3e..e75555fe76 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.stream.MaterializerSettings +import akka.stream.testkit.StreamTestKit.SubscriberProbe class FlowPrefixAndTailSpec extends AkkaSpec { @@ -30,14 +31,22 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val futureSink = newFutureSink val mf = FlowFrom(Nil).prefixAndTail(10).withSink(futureSink).run() val fut = futureSink.future(mf) - Await.result(fut, 3.seconds) should be((Nil, FlowFrom(EmptyPublisher[Int]))) + val (prefix, tailFlow) = Await.result(fut, 3.seconds) + prefix should be(Nil) + val tailSubscriber = SubscriberProbe[Int] + tailFlow.publishTo(tailSubscriber) + tailSubscriber.expectComplete() } "work on short input" in { val futureSink = newFutureSink val mf = FlowFrom(List(1, 2, 3)).prefixAndTail(10).withSink(futureSink).run() val fut = futureSink.future(mf) - Await.result(fut, 3.seconds) should be((List(1, 2, 3), FlowFrom(EmptyPublisher[Int]))) + val (prefix, tailFlow) = Await.result(fut, 3.seconds) + prefix should be(List(1, 2, 3)) + val tailSubscriber = SubscriberProbe[Int] + tailFlow.publishTo(tailSubscriber) + tailSubscriber.expectComplete() } "work on longer inputs" in { @@ -47,11 +56,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 5) - // FIXME enable this again, when grouped is implemented - // val futureSink2 = ??? - // val mf2 = tail.grouped(6).withSink(futureSink2).run() - // val fut2 = futureSink2.future(mf2) - // Await.result(fut2, 3.seconds) should be(6 to 10) + val futureSink2 = FutureSink[immutable.Seq[Int]] + val mf2 = tail.grouped(6).withSink(futureSink2).run() + val fut2 = futureSink2.future(mf2) + Await.result(fut2, 3.seconds) should be(6 to 10) } "handle zero take count" in { @@ -61,8 +69,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) - // FIXME enable this again, when grouped is implemented - // Await.result(FlowFrom(tail).grouped(11).toFuture(), 3.seconds) should be(1 to 10) + val futureSink2 = FutureSink[immutable.Seq[Int]] + val mf2 = tail.grouped(11).withSink(futureSink2).run() + val fut2 = futureSink2.future(mf2) + Await.result(fut2, 3.seconds) should be(1 to 10) } "handle negative take count" in { @@ -72,8 +82,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) - // FIXME enable this again, when grouped is implemented - // Await.result(FlowFrom(tail).grouped(11).toFuture(), 3.seconds) should be(1 to 10) + val futureSink2 = FutureSink[immutable.Seq[Int]] + val mf2 = tail.grouped(11).withSink(futureSink2).run() + val fut2 = futureSink2.future(mf2) + Await.result(fut2, 3.seconds) should be(1 to 10) } "work if size of take is equal to stream size" in { diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala new file mode 100644 index 0000000000..fd4ab09f7d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeSpec.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.actor.ActorSubscriberMessage.OnComplete +import akka.stream.actor.ActorSubscriberMessage.OnNext +import akka.stream.impl.RequestMore +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit2.ScriptedTest +import akka.stream.testkit.StreamTestKit +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import akka.stream.MaterializerSettings + +class FlowTakeSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + muteDeadLetters(classOf[OnNext], OnComplete.getClass, classOf[RequestMore])() + + "A Take" must { + + "take" in { + def script(d: Int) = Script((1 to 50) map { n ⇒ Seq(n) -> (if (n > d) Nil else Seq(n)) }: _*) + (1 to 50) foreach { _ ⇒ + val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50) + runScript(script(d), settings)(_.take(d)) + } + } + + "not take anything for negative n" in { + val probe = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(List(1, 2, 3)).take(-1).publishTo(probe) + probe.expectSubscription().request(10) + probe.expectComplete() + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala new file mode 100644 index 0000000000..e0f820c634 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowTakeWithinSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "A TakeWithin" must { + + "deliver elements within the duration, but not afterwards" in { + val input = Iterator.from(1) + val p = StreamTestKit.PublisherProbe[Int]() + val c = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(p).takeWithin(1.second).publishTo(c) + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() + cSub.request(100) + val demand1 = pSub.expectRequest() + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequest() + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequest() + val sentN = demand1 + demand2 + (1 to sentN) foreach { n ⇒ c.expectNext(n) } + within(2.seconds) { + c.expectComplete() + } + (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNoMsg(200.millis) + } + + "deliver bufferd elements onComplete before the timeout" in { + val c = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(1 to 3).takeWithin(1.second).publishTo(c) + val cSub = c.expectSubscription() + c.expectNoMsg(200.millis) + cSub.request(100) + (1 to 3) foreach { n ⇒ c.expectNext(n) } + c.expectComplete() + c.expectNoMsg(200.millis) + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala index e9da74ef59..b8cfe4034f 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala @@ -3,7 +3,8 @@ */ package akka.stream.scaladsl2 -import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit2.ScriptedTest import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/TimerTransformerSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/TimerTransformerSpec.scala new file mode 100644 index 0000000000..8070bd835b --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/TimerTransformerSpec.scala @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import language.postfixOps +import scala.collection.immutable +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorCell +import akka.actor.ActorRef +import akka.actor.Props +import akka.stream.TimerTransformer.Scheduled +import akka.stream.testkit.AkkaSpec +import akka.testkit.TestDuration +import akka.testkit.TestKit +import akka.stream.TimerTransformer + +object TimerTransformerSpec { + case object TestSingleTimer + case object TestSingleTimerResubmit + case object TestCancelTimer + case object TestCancelTimerAck + case object TestRepeatedTimer + case class Tick(n: Int) + + def driverProps(probe: ActorRef): Props = + Props(classOf[Driver], probe).withDispatcher("akka.test.stream-dispatcher") + + class Driver(probe: ActorRef) extends Actor { + + // need implicit system for dilated + import context.system + + val tickCount = Iterator from 1 + + val transformer = new TimerTransformer[Int, Int] { + override def onNext(elem: Int): immutable.Seq[Int] = List(elem) + override def onTimer(timerKey: Any): immutable.Seq[Int] = { + val tick = Tick(tickCount.next()) + probe ! tick + if (timerKey == "TestSingleTimerResubmit" && tick.n == 1) + scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) + else if (timerKey == "TestRepeatedTimer" && tick.n == 5) + cancelTimer("TestRepeatedTimer") + Nil + } + } + + override def preStart(): Unit = { + super.preStart() + transformer.start(context) + } + + override def postStop(): Unit = { + super.postStop() + transformer.stop() + } + + def receive = { + case TestSingleTimer ⇒ + transformer.scheduleOnce("TestSingleTimer", 500.millis.dilated) + case TestSingleTimerResubmit ⇒ + transformer.scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) + case TestCancelTimer ⇒ + transformer.scheduleOnce("TestCancelTimer", 1.milli.dilated) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1.second.dilated) + transformer.cancelTimer("TestCancelTimer") + probe ! TestCancelTimerAck + transformer.scheduleOnce("TestCancelTimer", 500.milli.dilated) + case TestRepeatedTimer ⇒ + transformer.schedulePeriodically("TestRepeatedTimer", 100.millis.dilated) + case s: Scheduled ⇒ transformer.onScheduled(s) + } + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TimerTransformerSpec extends AkkaSpec { + import TimerTransformerSpec._ + + "A TimerTransformer" must { + + "receive single-shot timer" in { + val driver = system.actorOf(driverProps(testActor)) + within(2 seconds) { + within(500 millis, 1 second) { + driver ! TestSingleTimer + expectMsg(Tick(1)) + } + expectNoMsg(1 second) + } + } + + "resubmit single-shot timer" in { + val driver = system.actorOf(driverProps(testActor)) + within(2.5 seconds) { + within(500 millis, 1 second) { + driver ! TestSingleTimerResubmit + expectMsg(Tick(1)) + } + within(1 second) { + expectMsg(Tick(2)) + } + expectNoMsg(1 second) + } + } + + "correctly cancel a named timer" in { + val driver = system.actorOf(driverProps(testActor)) + driver ! TestCancelTimer + within(500 millis) { + expectMsg(TestCancelTimerAck) + } + within(300 millis, 1 second) { + expectMsg(Tick(1)) + } + expectNoMsg(1 second) + } + + "receive and cancel a repeated timer" in { + val driver = system.actorOf(driverProps(testActor)) + driver ! TestRepeatedTimer + val seq = receiveWhile(2 seconds) { + case t: Tick ⇒ t + } + seq should have length 5 + expectNoMsg(1 second) + } + + } + +}