Merge pull request #15274 from akka/wip-15080-tick-producer-patriknw

+str #15080 Add tick producer
This commit is contained in:
Patrik Nordwall 2014-05-30 11:55:17 +02:00
commit 4351601fc2
6 changed files with 285 additions and 0 deletions

View file

@ -21,6 +21,7 @@ import akka.actor.ExtendedActorSystem
import akka.actor.ActorSystem
import akka.actor.Extension
import akka.stream.actor.ActorConsumer
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
@ -108,6 +109,11 @@ private[akka] object Ast {
name = s"$flowName-0-future"), Some(future))
}
}
final case class TickProducerNode[I](interval: FiniteDuration, tick: () I) extends ProducerNode[I] {
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
new ActorProducer(materializer.context.actorOf(TickProducer.props(interval, tick, materializer.settings),
name = s"$flowName-0-tick"))
}
}
/**

View file

@ -0,0 +1,136 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import org.reactivestreams.spi.Subscriber
import org.reactivestreams.spi.Subscription
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.stream.MaterializerSettings
import scala.util.control.NonFatal
import akka.actor.Cancellable
/**
* INTERNAL API
*/
private[akka] object TickProducer {
def props(interval: FiniteDuration, tick: () Any, settings: MaterializerSettings): Props =
Props(new TickProducer(interval, tick, settings)).withDispatcher(settings.dispatcher)
object TickProducerSubscription {
case class Cancel(subscriber: Subscriber[Any])
case class RequestMore(elements: Int, subscriber: Subscriber[Any])
}
class TickProducerSubscription(ref: ActorRef, subscriber: Subscriber[Any])
extends Subscription {
import TickProducerSubscription._
def cancel(): Unit = ref ! Cancel(subscriber)
def requestMore(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else ref ! RequestMore(elements, subscriber)
override def toString = "TickProducerSubscription"
}
private case object Tick
}
/**
* INTERNAL API
*
* Elements are produced from the tick closure periodically with the specified interval.
* Each subscriber will receive the tick element if it has requested any elements,
* otherwise the tick element is dropped for that subscriber.
*/
private[akka] class TickProducer(interval: FiniteDuration, tick: () Any, settings: MaterializerSettings) extends Actor with SoftShutdown {
import TickProducer._
import TickProducer.TickProducerSubscription._
var exposedPublisher: ActorPublisher[Any] = _
val demand = mutable.Map.empty[Subscriber[Any], Long]
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
var tickTask: Option[Cancellable] = None
def receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
context.become(waitingForFirstSubscriber)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
}
def waitingForFirstSubscriber: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
context.setReceiveTimeout(Duration.Undefined)
import context.dispatcher
tickTask = Some(context.system.scheduler.schedule(interval, interval, self, Tick))
context.become(active)
}
def active: Receive = {
case Tick
ActorBasedFlowMaterializer.withCtx(context) {
try {
val tickElement = tick()
demand foreach {
case (subscriber, d)
if (d > 0) {
demand(subscriber) = d - 1
subscriber.onNext(tickElement)
}
}
} catch {
case NonFatal(e)
// tick closure throwed => onError downstream
demand foreach { case (subscriber, _) subscriber.onError(e) }
}
}
case RequestMore(elements, subscriber)
demand.get(subscriber) match {
case Some(d) demand(subscriber) = d + elements
case None // canceled
}
case Cancel
softShutdown()
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
}
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (demand.contains(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice"))
else {
val subscription = new TickProducerSubscription(self, subscriber)
demand(subscriber) = 0
subscriber.onSubscribe(subscription)
}
}
private def unregisterSubscriber(subscriber: Subscriber[Any]): Unit = {
demand -= subscriber
if (demand.isEmpty) {
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
softShutdown()
}
}
override def postStop(): Unit = {
tickTask.foreach(_.cancel)
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
}
}

View file

@ -19,6 +19,7 @@ import akka.japi.Util.immutableSeq
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.{ Flow SFlow }
import org.reactivestreams.api.Consumer
import scala.concurrent.duration.FiniteDuration
/**
* Java API
@ -64,6 +65,16 @@ object Flow {
*/
def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() block.call()))
/**
* Elements are produced from the tick `Callable` periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def create[T](interval: FiniteDuration, tick: Callable[T]): Flow[T] =
new FlowAdapter(SFlow.apply(interval, () tick.call()))
}
/**

View file

@ -13,6 +13,8 @@ import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transf
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
import akka.stream.impl.Ast.FutureProducerNode
import akka.stream.impl.FlowImpl
import akka.stream.impl.Ast.TickProducerNode
import scala.concurrent.duration.FiniteDuration
/**
* Scala API
@ -59,6 +61,15 @@ object Flow {
*/
def apply[T](future: Future[T]): Flow[T] = FlowImpl(FutureProducerNode(future), Nil)
/**
* Elements are produced from the tick closure periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](interval: FiniteDuration, tick: () T): Flow[T] = FlowImpl(TickProducerNode(interval, tick), Nil)
}
/**

View file

@ -495,4 +495,26 @@ public class FlowTest {
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("A", result);
}
@Test
public void mustProduceTicks() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Callable<String> tick = new Callable<String>() {
private int count = 1;
@Override
public String call() {
return "tick-" + (count++);
}
};
Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), tick).foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
probe.expectMsgEquals("tick-1");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
probe.expectMsgEquals("tick-2");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}
}

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import scala.util.control.NoStackTrace
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TickProducerSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
dispatcher = "akka.test.stream-dispatcher"))
"A Flow based on tick producer" must {
"produce ticks" in {
val tickGen = Iterator from 1
val c = StreamTestKit.consumerProbe[String]
Flow(1.second, () "tick-" + tickGen.next()).produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(3)
c.expectNext("tick-1")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNoMsg(200.millis)
c.expectNext("tick-3")
sub.cancel()
c.expectNoMsg(200.millis)
}
"drop ticks when not requested" in {
val tickGen = Iterator from 1
val c = StreamTestKit.consumerProbe[String]
Flow(1.second, () "tick-" + tickGen.next()).produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(2)
c.expectNext("tick-1")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNoMsg(1400.millis)
sub.requestMore(2)
c.expectNext("tick-4")
c.expectNoMsg(200.millis)
c.expectNext("tick-5")
sub.cancel()
c.expectNoMsg(200.millis)
}
"produce ticks with multiple subscribers" in {
val tickGen = Iterator from 1
val p = Flow(1.second, () "tick-" + tickGen.next()).toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[String]
val c2 = StreamTestKit.consumerProbe[String]
p.produceTo(c1)
p.produceTo(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.requestMore(1)
sub2.requestMore(2)
c1.expectNext("tick-1")
c2.expectNext("tick-1")
c2.expectNoMsg(200.millis)
c2.expectNext("tick-2")
c1.expectNoMsg(200.millis)
sub1.requestMore(2)
sub2.requestMore(2)
c1.expectNext("tick-3")
c2.expectNext("tick-3")
sub1.cancel()
sub2.cancel()
}
"signal onError when tick closure throws" in {
val tickGen = Iterator from 1
val c = StreamTestKit.consumerProbe[String]
Flow(1.second, () throw new RuntimeException("tick err") with NoStackTrace).produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(3)
c.expectError.getMessage should be("tick err")
}
"be usable with zip for a simple form of rate limiting" in {
val c = StreamTestKit.consumerProbe[Int]
val rate = Flow(1.second, () "tick").toProducer(materializer)
Flow(1 to 100).zip(rate).map { case (n, _) n }.produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(1000)
c.expectNext(1)
c.expectNoMsg(200.millis)
c.expectNext(2)
c.expectNoMsg(200.millis)
sub.cancel()
}
}
}