Merge pull request #16403 from drewhk/wip-15802-ticksource-cancellable-drewhk

=str: #15802: TickSource cancellable
This commit is contained in:
drewhk 2014-12-01 17:45:02 +01:00
commit 9ce116c18b
4 changed files with 103 additions and 75 deletions

View file

@ -49,7 +49,7 @@ class TickSourceSpec extends AkkaSpec {
c.expectNoMsg(200.millis)
}
"produce ticks with multiple subscribers" in {
"reject multiple subscribers, but keep the first" in {
val tickGen = Iterator from 1
val p = Source(1.second, 1.second, () "tick-" + tickGen.next()).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[String]()
@ -57,28 +57,24 @@ class TickSourceSpec extends AkkaSpec {
p.subscribe(c1)
p.subscribe(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
c2.expectError()
sub1.request(1)
sub2.request(2)
c1.expectNext("tick-1")
c2.expectNext("tick-1")
c2.expectNoMsg(200.millis)
c2.expectNext("tick-2")
c1.expectNoMsg(200.millis)
sub1.request(2)
sub2.request(2)
c1.expectNext("tick-3")
c2.expectNext("tick-3")
c1.expectNext("tick-2")
sub1.cancel()
sub2.cancel()
}
"signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]()
Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).to(Sink(c)).run()
val tickSource = Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace)
val m = tickSource.to(Sink(c)).run()
val cancellable = m.get(tickSource)
val sub = c.expectSubscription()
sub.request(3)
c.expectError.getMessage should be("tick err")
awaitCond(cancellable.isCancelled)
}
"be usable with zip for a simple form of rate limiting" in {
@ -101,5 +97,25 @@ class TickSourceSpec extends AkkaSpec {
sub.cancel()
}
"be possible to cancel" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
val tickSource = Source(1.second, 500.millis, () "tick-" + tickGen.next())
val m = tickSource.to(Sink(c)).run()
val cancellable = m.get(tickSource)
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
c.expectNext("tick-1")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNoMsg(200.millis)
c.expectNext("tick-3")
cancellable.cancel()
awaitCond(cancellable.isCancelled)
sub.request(3)
c.expectComplete()
}
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy }
import akka.stream.MaterializerSettings
import org.reactivestreams.{ Subscriber, Subscription }
@ -15,20 +17,21 @@ import scala.util.control.NonFatal
* INTERNAL API
*/
private[akka] object TickPublisher {
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any, settings: MaterializerSettings): Props =
Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher)
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any,
settings: MaterializerSettings, cancelled: AtomicBoolean): Props =
Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher)
object TickPublisherSubscription {
case class Cancel(subscriber: Subscriber[_ >: Any])
case class RequestMore(elements: Long, subscriber: Subscriber[_ >: Any])
case object Cancel
case class RequestMore(elements: Long)
}
class TickPublisherSubscription(ref: ActorRef, subscriber: Subscriber[_ >: Any]) extends Subscription {
class TickPublisherSubscription(ref: ActorRef) extends Subscription {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
def cancel(): Unit = ref ! Cancel(subscriber)
def cancel(): Unit = ref ! Cancel
def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(elements, subscriber)
else ref ! RequestMore(elements)
override def toString = "TickPublisherSubscription"
}
@ -38,16 +41,18 @@ private[akka] object TickPublisher {
/**
* INTERNAL API
*
* Elements are produced from the tick closure periodically with the specified interval.
* Each subscriber will receive the tick element if it has requested any elements,
* otherwise the tick element is dropped for that subscriber.
* Elements are produced from the tick closure periodically with the specified interval. Supports only one subscriber.
* The subscriber will receive the tick element if it has requested any elements,
* otherwise the tick element is dropped.
*/
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any, settings: MaterializerSettings) extends Actor with SoftShutdown {
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any,
settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
import akka.stream.impl.TickPublisher._
var exposedPublisher: ActorPublisher[Any] = _
val demand = mutable.Map.empty[Subscriber[_ >: Any], Long]
private var subscriber: Subscriber[_ >: Any] = null
private var demand: Long = 0
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
@ -72,51 +77,51 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
case Tick
try {
val tickElement = tick()
demand foreach {
case (subscriber, d)
if (d > 0) {
demand(subscriber) = d - 1
subscriber.onNext(tickElement)
}
if (demand > 0) {
demand -= 1
subscriber.onNext(tickElement)
}
} catch {
case NonFatal(e)
// tick closure throwed => onError downstream
demand foreach { case (subscriber, _) subscriber.onError(e) }
if (subscriber ne null) {
subscriber.onError(e)
subscriber = null
}
exposedPublisher.shutdown(Some(e))
context.stop(self)
}
case RequestMore(elements, subscriber)
demand.get(subscriber) match {
case Some(d) demand(subscriber) = d + elements
case None // canceled
case RequestMore(elements)
demand += elements
if (demand < 0) {
// Long has overflown, reactive-streams specification rule 3.17
exposedPublisher.shutdown(Some(
new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue)))
context.stop(self)
}
case Cancel(subscriber) unregisterSubscriber(subscriber)
case Cancel
subscriber = null
context.stop(self)
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
}
def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
if (demand.contains(subscriber))
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
def registerSubscriber(s: Subscriber[_ >: Any]): Unit = {
if (subscriber ne null) s.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
else {
val subscription = new TickPublisherSubscription(self, subscriber)
demand(subscriber) = 0
val subscription = new TickPublisherSubscription(self)
subscriber = s
subscriber.onSubscribe(subscription)
}
}
private def unregisterSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
demand -= subscriber
if (demand.isEmpty) {
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
softShutdown()
}
}
override def postStop(): Unit = {
tickTask.foreach(_.cancel)
cancelled.set(true)
if (subscriber ne null) subscriber.onComplete()
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
}

View file

@ -3,7 +3,9 @@
*/
package akka.stream.scaladsl
import akka.actor.{ Props, ActorRef }
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ PoisonPill, Cancellable, Props, ActorRef }
import akka.stream.impl._
import akka.stream.impl.Ast.AstNode
import org.reactivestreams.Publisher
@ -155,33 +157,29 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends SimpleActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors?
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
(ActorPublisher[Out](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings),
name = s"$flowName-0-tick")), ())
}
/**
* This Source takes two Sources and concatenates them together by draining the elements coming from the first Source
* completely, then draining the elements arriving from the second Source. If the first Source is infinite then the
* second Source will be never drained.
*/
final case class ConcatSource[Out](source1: Source[Out], source2: Source[Out]) extends SimpleActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors?
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends KeyedActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors?
override type MaterializedType = Cancellable
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val concatter = Concat[Out]
val concatGraph = FlowGraph { builder
builder
.addEdge(source1, Pipe.empty[Out], concatter.first)
.addEdge(source2, Pipe.empty[Out], concatter.second)
.addEdge(concatter.out, Sink(flowSubscriber))
}.run()(materializer)
val (pub, cancellable) = create(materializer, flowName)
pub.subscribe(flowSubscriber)
cancellable
}
override def isActive: Boolean = false
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val cancelled = new AtomicBoolean(false)
val ref =
materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings, cancelled),
name = s"$flowName-0-tick")
(ActorPublisher[Out](ref), new Cancellable {
override def cancel(): Boolean = {
if (!isCancelled) ref ! PoisonPill
true
}
override def isCancelled: Boolean = cancelled.get()
})
}
}
/**

View file

@ -130,7 +130,7 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] = // FIXME why is tick () => T and not T?
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): TickSource[T] =
TickSource(initialDelay, interval, tick)
/**
@ -181,7 +181,16 @@ object Source {
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T](source1: Source[T], source2: Source[T]): Source[T] = ConcatSource(source1, source2)
def concat[T](source1: Source[T], source2: Source[T]): Source[T] = {
val output = UndefinedSink[T]
val concat = Concat[T]
Source() { b
b.addEdge(source1, Pipe.empty[T], concat.first)
.addEdge(source2, Pipe.empty[T], concat.second)
.addEdge(concat.out, Pipe.empty[T], output)
output
}
}
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]