From 1a96c230704ffe5585ef9115fd075f48399acf26 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 20 May 2014 13:46:35 +0200 Subject: [PATCH] +str #15091 Add time driven operators * TimerTransformer with support for named timers * dropWithin * takeWithin * groupedWithin --- .../scala/akka/stream/TimerTransformer.scala | 130 ++++++++++++++++ .../impl/ActorBasedFlowMaterializer.scala | 1 + .../akka/stream/impl/ActorProcessor.scala | 3 + .../scala/akka/stream/impl/FlowImpl.scala | 64 ++++++++ .../stream/impl/SingleStreamProcessors.scala | 11 +- .../impl/TimerTransformerProcessorsImpl.scala | 73 +++++++++ .../main/scala/akka/stream/javadsl/Duct.scala | 36 +++++ .../main/scala/akka/stream/javadsl/Flow.scala | 35 +++++ .../scala/akka/stream/scaladsl/Duct.scala | 29 ++++ .../scala/akka/stream/scaladsl/Flow.scala | 29 ++++ .../java/akka/stream/javadsl/FlowTest.java | 48 +++--- .../akka/stream/FlowDropWithinSpec.scala | 42 ++++++ .../akka/stream/FlowGroupedWithinSpec.scala | 139 ++++++++++++++++++ .../akka/stream/FlowTakeWithinSpec.scala | 55 +++++++ .../stream/FlowTimerTransformerSpec.scala | 73 +++++++++ .../akka/stream/TimerTransformerSpec.scala | 132 +++++++++++++++++ .../akka/stream/testkit/ScriptedTest.scala | 4 +- 17 files changed, 878 insertions(+), 26 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/TimerTransformer.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala create mode 100644 akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/TimerTransformerSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala new file mode 100644 index 0000000000..8dc6d75232 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.collection.immutable +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorContext +import akka.actor.Cancellable + +/** + * [[Transformer]] with support for scheduling keyed (named) timer events. + */ +abstract class TimerTransformer[-T, +U] extends Transformer[T, U] { + import TimerTransformer._ + private val timers = mutable.Map[Any, Timer]() + private val timerIdGen = Iterator from 1 + + private var context: Option[ActorContext] = None + // when scheduling before `start` we must queue the operations + private var queued = List.empty[Queued] + + /** + * INTERNAL API + */ + private[akka] final def start(ctx: ActorContext): Unit = { + context = Some(ctx) + queued.reverse.foreach { + case QueuedSchedule(timerKey, interval) ⇒ schedulePeriodically(timerKey, interval) + case QueuedScheduleOnce(timerKey, delay) ⇒ scheduleOnce(timerKey, delay) + case QueuedCancelTimer(timerKey) ⇒ cancelTimer(timerKey) + } + queued = Nil + } + + /** + * INTERNAL API + */ + private[akka] final def stop(): Unit = { + timers.foreach { case (_, Timer(_, task)) ⇒ task.cancel() } + timers.clear() + } + + /** + * Schedule timer to call [[#onTimer]] periodically with the given interval. + * Any existing timer with the same key will automatically be canceled before + * adding the new timer. + */ + def schedulePeriodically(timerKey: Any, interval: FiniteDuration): Unit = + context match { + case Some(ctx) ⇒ + cancelTimer(timerKey) + val id = timerIdGen.next() + val task = ctx.system.scheduler.schedule(interval, interval, ctx.self, + Scheduled(timerKey, id, repeating = true))(ctx.dispatcher) + timers(timerKey) = Timer(id, task) + case None ⇒ + queued = QueuedSchedule(timerKey, interval) :: queued + } + + /** + * Schedule timer to call [[#onTimer]] after given delay. + * Any existing timer with the same key will automatically be canceled before + * adding the new timer. + */ + def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit = + context match { + case Some(ctx) ⇒ + cancelTimer(timerKey) + val id = timerIdGen.next() + val task = ctx.system.scheduler.scheduleOnce(delay, ctx.self, + Scheduled(timerKey, id, repeating = false))(ctx.dispatcher) + timers(timerKey) = Timer(id, task) + case None ⇒ + queued = QueuedScheduleOnce(timerKey, delay) :: queued + } + + /** + * Cancel timer, ensuring that the [[#onTimer]] is not subsequently called. + * @param timerKey key of the timer to cancel + */ + def cancelTimer(timerKey: Any): Unit = + timers.get(timerKey).foreach { t ⇒ + t.task.cancel() + timers -= timerKey + } + + /** + * Inquire whether the timer is still active. Returns true unless the + * timer does not exist, has previously been canceled or if it was a + * single-shot timer that was already triggered. + */ + final def isTimerActive(timerKey: Any): Boolean = timers contains timerKey + + /** + * INTERNAL API + */ + private[akka] def onScheduled(scheduled: Scheduled): immutable.Seq[U] = { + val Id = scheduled.timerId + timers.get(scheduled.timerKey) match { + case Some(Timer(Id, _)) ⇒ + if (!scheduled.repeating) timers -= scheduled.timerKey + onTimer(scheduled.timerKey) + case _ ⇒ Nil // already canceled, or re-scheduled + } + } + + /** + * Will be called when the scheduled timer is triggered. + * @param timerKey key of the scheduled timer + */ + def onTimer(timerKey: Any): immutable.Seq[U] +} + +/** + * INTERNAL API + */ +private object TimerTransformer { + case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) + + sealed trait Queued + case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued + case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued + case class QueuedCancelTimer(timerKey: Any) extends Queued + + case class Timer(id: Int, task: Cancellable) + +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index e335b19d0c..e172552a01 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -22,6 +22,7 @@ import akka.actor.ActorSystem import akka.actor.Extension import akka.stream.actor.ActorConsumer import scala.concurrent.duration.FiniteDuration +import akka.stream.TimerTransformer /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 81b8d4f192..b97493dde4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -14,6 +14,7 @@ import org.reactivestreams.api.Consumer import akka.stream.actor.ActorSubscriber import akka.stream.actor.ActorConsumer.{ OnNext, OnError, OnComplete, OnSubscribe } import org.reactivestreams.spi.Subscription +import akka.stream.TimerTransformer /** * INTERNAL API @@ -23,6 +24,8 @@ private[akka] object ActorProcessor { import Ast._ def props(settings: MaterializerSettings, op: AstNode): Props = (op match { + case Transform(transformer: TimerTransformer[_, _]) ⇒ + Props(new TimerTransformerProcessorsImpl(settings, transformer)) case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 79e3e4ee0d..8b4a46e7d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -16,6 +16,9 @@ import scala.util.Success import scala.util.Failure import org.reactivestreams.api.Consumer import akka.stream.scaladsl.Duct +import scala.concurrent.duration.FiniteDuration +import akka.stream.TimerTransformer +import akka.util.Collections.EmptyImmutableSeq /** * INTERNAL API @@ -116,6 +119,10 @@ private[akka] object Builder { val SuccessUnit = Success[Unit](()) private val ListOfUnit = List(()) + private case object TakeWithinTimerKey + private case object DropWithinTimerKey + private case object GroupedWithinTimerKey + private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { override def onNext(elem: Any) = Nil override def isComplete = true @@ -192,6 +199,23 @@ private[akka] trait Builder[Out] { override def name = "drop" }) + def dropWithin(d: FiniteDuration): Thing[Out] = + transform(new TimerTransformer[Out, Out] { + scheduleOnce(DropWithinTimerKey, d) + + var delegate: Transformer[Out, Out] = + new Transformer[Out, Out] { + override def onNext(in: Out) = Nil + } + + override def onNext(in: Out) = delegate.onNext(in) + override def onTimer(timerKey: Any) = { + delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + override def name = "dropWithin" + }) + def take(n: Int): Thing[Out] = transform(new Transformer[Out, Out] { var delegate: Transformer[Out, Out] = @@ -211,6 +235,21 @@ private[akka] trait Builder[Out] { override def name = "take" }) + def takeWithin(d: FiniteDuration): Thing[Out] = + transform(new TimerTransformer[Out, Out] { + scheduleOnce(TakeWithinTimerKey, d) + + var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] + + override def onNext(in: Out) = delegate.onNext(in) + override def isComplete = delegate.isComplete + override def onTimer(timerKey: Any) = { + delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + Nil + } + override def name = "takeWithin" + }) + def prefixAndTail(n: Int): Thing[(immutable.Seq[Out], Producer[Out])] = andThen(PrefixAndTail(n)) def grouped(n: Int): Thing[immutable.Seq[Out]] = @@ -229,6 +268,31 @@ private[akka] trait Builder[Out] { override def name = "grouped" }) + def groupedWithin(n: Int, d: FiniteDuration): Thing[immutable.Seq[Out]] = + transform(new TimerTransformer[Out, immutable.Seq[Out]] { + schedulePeriodically(GroupedWithinTimerKey, d) + var buf: Vector[Out] = Vector.empty + + override def onNext(in: Out) = { + buf :+= in + if (buf.size == n) { + // start new time window + schedulePeriodically(GroupedWithinTimerKey, d) + emitGroup() + } else Nil + } + override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) + override def onTimer(timerKey: Any) = emitGroup() + private def emitGroup(): immutable.Seq[immutable.Seq[Out]] = + if (buf.isEmpty) EmptyImmutableSeq + else { + val group = buf + buf = Vector.empty + List(group) + } + override def name = "groupedWithin" + }) + def mapConcat[U](f: Out ⇒ immutable.Seq[U]): Thing[U] = transform(new Transformer[Out, U] { override def onNext(in: Out) = f(in) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 4bb80c4be9..594b26cf50 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -19,6 +19,11 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran var emits = immutable.Seq.empty[Any] var errorEvent: Option[Throwable] = None + override def preStart(): Unit = { + super.preStart() + nextPhase(running) + } + override def onError(e: Throwable): Unit = { try { transformer.onError(e) @@ -32,7 +37,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran def isCompleted = false } - val running: TransferPhase = TransferPhase(NeedsInputAndDemandOrCompletion) { () ⇒ + private val runningPhase: TransferPhase = TransferPhase(NeedsInputAndDemandOrCompletion) { () ⇒ if (primaryInputs.inputsDepleted) nextPhase(terminate) else { emits = transformer.onNext(primaryInputs.dequeueInputElement()) @@ -41,6 +46,8 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran } } + def running: TransferPhase = runningPhase + val terminate = TransferPhase(Always) { () ⇒ emits = transformer.onTermination(errorEvent) emitAndThen(completedPhase) @@ -63,8 +70,6 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran if (emits.isEmpty) nextPhase(phaseAfterFlush) } - nextPhase(running) - override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" override def softShutdown(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala new file mode 100644 index 0000000000..ade6c3057b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import java.util.LinkedList +import akka.stream.MaterializerSettings +import akka.stream.TimerTransformer + +/** + * INTERNAL API + */ +private[akka] class TimerTransformerProcessorsImpl( + _settings: MaterializerSettings, + transformer: TimerTransformer[Any, Any]) + extends TransformProcessorImpl(_settings, transformer) { + import TimerTransformer._ + + override def preStart(): Unit = { + super.preStart() + transformer.start(context) + } + + override def postStop(): Unit = { + super.postStop() + transformer.stop() + } + + val schedulerInputs: Inputs = new DefaultInputTransferStates { + val queue = new LinkedList[Any] + + override def dequeueInputElement(): Any = queue.removeFirst() + + override def subreceive: SubReceive = new SubReceive({ + case s: Scheduled ⇒ + transformer.onScheduled(s) foreach { elem ⇒ + queue.add(elem) + } + pump() + }) + + override def cancel(): Unit = () + override def isClosed: Boolean = false + override def inputsDepleted: Boolean = false + override def inputsAvailable: Boolean = !queue.isEmpty + } + + override def receive = super.receive orElse schedulerInputs.subreceive + + object RunningCondition extends TransferState { + def isReady = { + ((primaryInputs.inputsAvailable || schedulerInputs.inputsAvailable || transformer.isComplete) && + primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted + } + def isCompleted = false + } + + private val runningPhase: TransferPhase = TransferPhase(RunningCondition) { () ⇒ + if (primaryInputs.inputsDepleted || (transformer.isComplete && !schedulerInputs.inputsAvailable)) { + nextPhase(terminate) + } else if (schedulerInputs.inputsAvailable) { + emits = List(schedulerInputs.dequeueInputElement()) + emitAndThen(running) + } else { + emits = transformer.onNext(primaryInputs.dequeueInputElement()) + if (transformer.isComplete) emitAndThen(terminate) + else emitAndThen(running) + } + } + + override def running: TransferPhase = runningPhase + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 42c0c99262..1d12145561 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -18,6 +18,7 @@ import akka.japi.Util.immutableSeq import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Duct ⇒ SDuct } import akka.stream.impl.Ast +import scala.concurrent.duration.FiniteDuration /** * Java API @@ -86,6 +87,11 @@ abstract class Duct[In, Out] { */ def drop(n: Int): Duct[In, Out] + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): Duct[In, Out] + /** * Terminate processing (and cancel the upstream producer) after the given * number of elements. Due to input buffering some elements may have been @@ -94,12 +100,32 @@ abstract class Duct[In, Out] { */ def take(n: Int): Duct[In, Out] + /** + * Terminate processing (and cancel the upstream producer) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream producers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): Duct[In, Out] + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. */ def grouped(n: Int): Duct[In, java.util.List[Out]] + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + */ + def groupedWithin(n: Int, d: FiniteDuration): Duct[In, java.util.List[Out]] + /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. @@ -123,6 +149,9 @@ abstract class Duct[In, Out] { * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and * therefore you don not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[akka.stream.TimerTransformer]] if you need support + * for scheduled events in the transformer. */ def transform[U](transformer: Transformer[Out, U]): Duct[In, U] @@ -313,11 +342,18 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def drop(n: Int): Duct[In, T] = new DuctAdapter(delegate.drop(n)) + override def dropWithin(d: FiniteDuration): Duct[In, T] = new DuctAdapter(delegate.dropWithin(d)) + override def take(n: Int): Duct[In, T] = new DuctAdapter(delegate.take(n)) + override def takeWithin(d: FiniteDuration): Duct[In, T] = new DuctAdapter(delegate.takeWithin(d)) + override def grouped(n: Int): Duct[In, java.util.List[T]] = new DuctAdapter(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step + def groupedWithin(n: Int, d: FiniteDuration): Duct[In, java.util.List[T]] = + new DuctAdapter(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + override def mapConcat[U](f: Function[T, java.util.List[U]]): Duct[In, U] = new DuctAdapter(delegate.mapConcat(elem ⇒ immutableSeq(f.apply(elem)))) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c2bc467d17..c9cd543af4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -149,6 +149,11 @@ abstract class Flow[T] { */ def drop(n: Int): Flow[T] + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): Flow[T] + /** * Terminate processing (and cancel the upstream producer) after the given * number of elements. Due to input buffering some elements may have been @@ -157,12 +162,32 @@ abstract class Flow[T] { */ def take(n: Int): Flow[T] + /** + * Terminate processing (and cancel the upstream producer) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream producers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): Flow[T] + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. */ def grouped(n: Int): Flow[java.util.List[T]] + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + */ + def groupedWithin(n: Int, d: FiniteDuration): Flow[java.util.List[T]] + /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. @@ -186,6 +211,9 @@ abstract class Flow[T] { * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and * therefore you do not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[akka.stream.TimerTransformer]] if you need support + * for scheduled events in the transformer. */ def transform[U](transformer: Transformer[T, U]): Flow[U] @@ -386,11 +414,18 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def drop(n: Int): Flow[T] = new FlowAdapter(delegate.drop(n)) + override def dropWithin(d: FiniteDuration): Flow[T] = new FlowAdapter(delegate.dropWithin(d)) + override def take(n: Int): Flow[T] = new FlowAdapter(delegate.take(n)) + override def takeWithin(d: FiniteDuration): Flow[T] = new FlowAdapter(delegate.takeWithin(d)) + override def grouped(n: Int): Flow[java.util.List[T]] = new FlowAdapter(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step + override def groupedWithin(n: Int, d: FiniteDuration): Flow[java.util.List[T]] = + new FlowAdapter(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step + override def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] = new FlowAdapter(delegate.mapConcat(elem ⇒ immutableSeq(f.apply(elem)))) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index ef950e9275..74cb076180 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -11,6 +11,7 @@ import org.reactivestreams.api.Producer import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.impl.DuctImpl import akka.stream.impl.Ast +import scala.concurrent.duration.FiniteDuration object Duct { @@ -75,6 +76,11 @@ trait Duct[In, +Out] { */ def drop(n: Int): Duct[In, Out] + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): Duct[In, Out] + /** * Terminate processing (and cancel the upstream producer) after the given * number of elements. Due to input buffering some elements may have been @@ -83,12 +89,32 @@ trait Duct[In, +Out] { */ def take(n: Int): Duct[In, Out] + /** + * Terminate processing (and cancel the upstream producer) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream producers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): Duct[In, Out] + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. */ def grouped(n: Int): Duct[In, immutable.Seq[Out]] + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + */ + def groupedWithin(n: Int, d: FiniteDuration): Duct[In, immutable.Seq[Out]] + /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. @@ -112,6 +138,9 @@ trait Duct[In, +Out] { * ordinary instance variables. The [[Transformer]] is executed by an actor and * therefore you don not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[akka.stream.TimerTransformer]] if you need support + * for scheduled events in the transformer. */ def transform[U](transformer: Transformer[Out, U]): Duct[In, U] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index dc63b98614..b1aff5b0f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -15,6 +15,7 @@ import akka.stream.impl.Ast.FutureProducerNode import akka.stream.impl.FlowImpl import akka.stream.impl.Ast.TickProducerNode import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.FiniteDuration /** * Scala API @@ -142,6 +143,11 @@ trait Flow[+T] { */ def drop(n: Int): Flow[T] + /** + * Discard the elements received within the given duration at beginning of the stream. + */ + def dropWithin(d: FiniteDuration): Flow[T] + /** * Terminate processing (and cancel the upstream producer) after the given * number of elements. Due to input buffering some elements may have been @@ -150,12 +156,32 @@ trait Flow[+T] { */ def take(n: Int): Flow[T] + /** + * Terminate processing (and cancel the upstream producer) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream producers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + */ + def takeWithin(d: FiniteDuration): Flow[T] + /** * Chunk up this stream into groups of the given size, with the last group * possibly smaller than requested due to end-of-stream. */ def grouped(n: Int): Flow[immutable.Seq[T]] + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + */ + def groupedWithin(n: Int, d: FiniteDuration): Flow[immutable.Seq[T]] + /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. @@ -181,6 +207,9 @@ trait Flow[+T] { * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and * therefore you do not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[akka.stream.TimerTransformer]] if you need support + * for scheduled events in the transformer. */ def transform[U](transformer: Transformer[T, U]): Flow[U] diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 92fc309427..8dc1cacd3d 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -53,27 +53,33 @@ public class FlowTest { final JavaTestKit probe = new JavaTestKit(system); final String[] lookup = { "a", "b", "c", "d", "e", "f" }; final java.util.Iterator input = Arrays.asList(0, 1, 2, 3, 4, 5).iterator(); - Flow.create(input).drop(2).take(3).map(new Function() { - public String apply(Integer elem) { - return lookup[elem]; - } - }).filter(new Predicate() { - public boolean test(String elem) { - return !elem.equals("c"); - } - }).grouped(2).mapConcat(new Function, java.util.List>() { - public java.util.List apply(java.util.List elem) { - return elem; - } - }).fold("", new Function2() { - public String apply(String acc, String elem) { - return acc + elem; - } - }).foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }).consume(materializer); + Flow.create(input).drop(2).take(3).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)) + .map(new Function() { + public String apply(Integer elem) { + return lookup[elem]; + } + }).filter(new Predicate() { + public boolean test(String elem) { + return !elem.equals("c"); + } + }).grouped(2).mapConcat(new Function, java.util.List>() { + public java.util.List apply(java.util.List elem) { + return elem; + } + }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)) + .mapConcat(new Function, java.util.List>() { + public java.util.List apply(java.util.List elem) { + return elem; + } + }).fold("", new Function2() { + public String apply(String acc, String elem) { + return acc + elem; + } + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); probe.expectMsgEquals("de"); diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala new file mode 100644 index 0000000000..4fb9a8a038 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowDropWithinSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + dispatcher = "akka.test.stream-dispatcher")) + + "A DropWithin" must { + + "deliver elements after the duration, but not before" in { + val input = Iterator.from(1) + val p = StreamTestKit.producerProbe[Int] + val c = StreamTestKit.consumerProbe[Int] + Flow(p).dropWithin(1.second).produceTo(materializer, c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.requestMore(100) + val demand1 = pSub.expectRequestMore + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequestMore + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequestMore + c.expectNoMsg(1500.millis) + (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + ((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)) foreach { n ⇒ c.expectNext(n) } + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(200.millis) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala new file mode 100644 index 0000000000..3540f46c2d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.stream.scaladsl.Flow +import akka.stream.testkit.ScriptedTest + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + val materializer = FlowMaterializer(settings) + + "A GroupedWithin" must { + + "group elements within the duration" in { + val input = Iterator.from(1) + val p = StreamTestKit.producerProbe[Int] + val c = StreamTestKit.consumerProbe[immutable.Seq[Int]] + Flow(p).groupedWithin(1000, 1.second).produceTo(materializer, c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.requestMore(100) + val demand1 = pSub.expectRequestMore + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequestMore + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequestMore + c.expectNext((1 to (demand1 + demand2)).toVector) + (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNoMsg(300.millis) + c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector) + c.expectNoMsg(300.millis) + pSub.expectRequestMore + val last = input.next() + pSub.sendNext(last) + pSub.sendComplete() + c.expectNext(List(last)) + c.expectComplete + c.expectNoMsg(200.millis) + } + + "deliver bufferd elements onComplete before the timeout" in { + val c = StreamTestKit.consumerProbe[immutable.Seq[Int]] + Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(materializer, c) + val cSub = c.expectSubscription + cSub.requestMore(100) + c.expectNext((1 to 3).toList) + c.expectComplete + c.expectNoMsg(200.millis) + } + + "buffer groups until requested from downstream" in { + val input = Iterator.from(1) + val p = StreamTestKit.producerProbe[Int] + val c = StreamTestKit.consumerProbe[immutable.Seq[Int]] + Flow(p).groupedWithin(1000, 1.second).produceTo(materializer, c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.requestMore(1) + val demand1 = pSub.expectRequestMore + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNext((1 to demand1).toVector) + val demand2 = pSub.expectRequestMore + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNoMsg(300.millis) + cSub.requestMore(1) + c.expectNext(((demand1 + 1) to (demand1 + demand2)).toVector) + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(100.millis) + } + + "drop empty groups" in { + val input = Iterator.from(1) + val p = StreamTestKit.producerProbe[Int] + val c = StreamTestKit.consumerProbe[immutable.Seq[Int]] + Flow(p).groupedWithin(1000, 500.millis).produceTo(materializer, c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.requestMore(2) + pSub.expectRequestMore + c.expectNoMsg(600.millis) + pSub.sendNext(1) + pSub.sendNext(2) + c.expectNext(List(1, 2)) + // nothing more requested + c.expectNoMsg(1100.millis) + cSub.requestMore(3) + c.expectNoMsg(600.millis) + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(100.millis) + } + + "reset time window when max elements reached" in { + val input = Iterator.from(1) + val p = StreamTestKit.producerProbe[Int] + val c = StreamTestKit.consumerProbe[immutable.Seq[Int]] + Flow(p).groupedWithin(3, 2.second).produceTo(materializer, c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.requestMore(4) + val demand1 = pSub.expectRequestMore + demand1 should be(4) + c.expectNoMsg(1000.millis) + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.probe.within(1000.millis) { + c.expectNext((1 to 3).toVector) + } + c.expectNoMsg(1500.millis) + c.probe.within(1000.millis) { + c.expectNext(List(4)) + } + pSub.sendComplete() + c.expectComplete + c.expectNoMsg(100.millis) + } + + "group evenly" in { + def script = Script((1 to 20) map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) + } + + "group with rest" in { + def script = Script(((1 to 20).map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) } + :+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala new file mode 100644 index 0000000000..3cf34bb975 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.stream.scaladsl.Flow + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowTakeWithinSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + dispatcher = "akka.test.stream-dispatcher")) + + "A TakeWithin" must { + + "deliver elements within the duration, but not afterwards" in { + val input = Iterator.from(1) + val p = StreamTestKit.producerProbe[Int] + val c = StreamTestKit.consumerProbe[Int] + Flow(p).takeWithin(1.second).produceTo(materializer, c) + val pSub = p.expectSubscription + val cSub = c.expectSubscription + cSub.requestMore(100) + val demand1 = pSub.expectRequestMore + (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequestMore + (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequestMore + val sentN = demand1 + demand2 + (1 to sentN) foreach { n ⇒ c.expectNext(n) } + within(2.seconds) { + c.expectComplete + } + (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNoMsg(200.millis) + } + + "deliver bufferd elements onComplete before the timeout" in { + val input = Iterator.from(1) + val c = StreamTestKit.consumerProbe[Int] + Flow(1 to 3).takeWithin(1.second).produceTo(materializer, c) + val cSub = c.expectSubscription + c.expectNoMsg(200.millis) + cSub.requestMore(100) + (1 to 3) foreach { n ⇒ c.expectNext(n) } + c.expectComplete + c.expectNoMsg(200.millis) + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala new file mode 100644 index 0000000000..22b1fd4eaa --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.AkkaSpec +import akka.testkit.EventFilter +import com.typesafe.config.ConfigFactory +import akka.stream.scaladsl.Flow +import akka.testkit.TestProbe +import scala.util.control.NoStackTrace +import scala.collection.immutable + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowTimerTransformerSpec extends AkkaSpec { + + import system.dispatcher + + val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + + "A Flow with TimerTransformer operations" must { + "produce scheduled ticks as expected" in { + val p = StreamTestKit.producerProbe[Int] + val p2 = Flow(p). + transform(new TimerTransformer[Int, Int] { + schedulePeriodically("tick", 100.millis) + var tickCount = 0 + override def onNext(elem: Int) = List(elem) + override def onTimer(timerKey: Any) = { + tickCount += 1 + if (tickCount == 3) cancelTimer("tick") + List(tickCount) + } + override def isComplete: Boolean = !isTimerActive("tick") + }). + toProducer(materializer) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(5) + consumer.expectNext(1) + consumer.expectNext(2) + consumer.expectNext(3) + consumer.expectComplete() + } + + "schedule ticks when last transformation step (consume)" in { + val p = StreamTestKit.producerProbe[Int] + val p2 = Flow(p). + transform(new TimerTransformer[Int, Int] { + schedulePeriodically("tick", 100.millis) + var tickCount = 0 + override def onNext(elem: Int) = List(elem) + override def onTimer(timerKey: Any) = { + tickCount += 1 + if (tickCount == 3) cancelTimer("tick") + testActor ! "tick-" + tickCount + List(tickCount) + } + override def isComplete: Boolean = !isTimerActive("tick") + }). + consume(materializer) + val pSub = p.expectSubscription + expectMsg("tick-1") + expectMsg("tick-2") + expectMsg("tick-3") + pSub.sendComplete() + } + + } +} diff --git a/akka-stream/src/test/scala/akka/stream/TimerTransformerSpec.scala b/akka-stream/src/test/scala/akka/stream/TimerTransformerSpec.scala new file mode 100644 index 0000000000..8053bdc737 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/TimerTransformerSpec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import language.postfixOps +import scala.collection.immutable +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorCell +import akka.actor.ActorRef +import akka.actor.Props +import akka.stream.TimerTransformer.Scheduled +import akka.stream.testkit.AkkaSpec +import akka.testkit.TestDuration +import akka.testkit.TestKit + +object TimerTransformerSpec { + case object TestSingleTimer + case object TestSingleTimerResubmit + case object TestCancelTimer + case object TestCancelTimerAck + case object TestRepeatedTimer + case class Tick(n: Int) + + def driverProps(probe: ActorRef): Props = + Props(classOf[Driver], probe).withDispatcher("akka.test.stream-dispatcher") + + class Driver(probe: ActorRef) extends Actor { + + // need implicit system for dilated + import context.system + + val tickCount = Iterator from 1 + + val transformer = new TimerTransformer[Int, Int] { + override def onNext(elem: Int): immutable.Seq[Int] = List(elem) + override def onTimer(timerKey: Any): immutable.Seq[Int] = { + val tick = Tick(tickCount.next()) + probe ! tick + if (timerKey == "TestSingleTimerResubmit" && tick.n == 1) + scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) + else if (timerKey == "TestRepeatedTimer" && tick.n == 5) + cancelTimer("TestRepeatedTimer") + Nil + } + } + + override def preStart(): Unit = { + super.preStart() + transformer.start(context) + } + + override def postStop(): Unit = { + super.postStop() + transformer.stop() + } + + def receive = { + case TestSingleTimer ⇒ + transformer.scheduleOnce("TestSingleTimer", 500.millis.dilated) + case TestSingleTimerResubmit ⇒ + transformer.scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated) + case TestCancelTimer ⇒ + transformer.scheduleOnce("TestCancelTimer", 1.milli.dilated) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1.second.dilated) + transformer.cancelTimer("TestCancelTimer") + probe ! TestCancelTimerAck + transformer.scheduleOnce("TestCancelTimer", 500.milli.dilated) + case TestRepeatedTimer ⇒ + transformer.schedulePeriodically("TestRepeatedTimer", 100.millis.dilated) + case s: Scheduled ⇒ transformer.onScheduled(s) + } + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TimerTransformerSpec extends AkkaSpec { + import TimerTransformerSpec._ + + "A TimerTransformer" must { + + "receive single-shot timer" in { + val driver = system.actorOf(driverProps(testActor)) + within(2 seconds) { + within(500 millis, 1 second) { + driver ! TestSingleTimer + expectMsg(Tick(1)) + } + expectNoMsg(1 second) + } + } + + "resubmit single-shot timer" in { + val driver = system.actorOf(driverProps(testActor)) + within(2.5 seconds) { + within(500 millis, 1 second) { + driver ! TestSingleTimerResubmit + expectMsg(Tick(1)) + } + within(1 second) { + expectMsg(Tick(2)) + } + expectNoMsg(1 second) + } + } + + "correctly cancel a named timer" in { + val driver = system.actorOf(driverProps(testActor)) + driver ! TestCancelTimer + within(500 millis) { + expectMsg(TestCancelTimerAck) + } + within(300 millis, 1 second) { + expectMsg(Tick(1)) + } + expectNoMsg(1 second) + } + + "receive and cancel a repeated timer" in { + val driver = system.actorOf(driverProps(testActor)) + driver ! TestRepeatedTimer + val seq = receiveWhile(2 seconds) { + case t: Tick ⇒ t + } + seq should have length 5 + expectNoMsg(1 second) + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 190109d7dc..90c6dfb532 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -3,7 +3,6 @@ */ package akka.stream.testkit -import org.scalatest._ import akka.actor.ActorSystem import scala.annotation.tailrec import scala.collection.immutable @@ -12,8 +11,9 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.stream.scaladsl.Flow import akka.stream.MaterializerSettings +import org.scalatest.Matchers -trait ScriptedTest extends ShouldMatchers { +trait ScriptedTest extends Matchers { class ScriptException(msg: String) extends RuntimeException(msg)