Revert "+str #15802 Make TickSource cancellable"

This reverts commit 9e19ab095cd6ae37ea689d00031d58a54a90c912.
This commit is contained in:
Patrik Nordwall 2014-11-07 10:54:30 +01:00
parent d9d905071b
commit d1daec8590
5 changed files with 18 additions and 82 deletions

View file

@ -75,14 +75,10 @@ class TickSourceSpec extends AkkaSpec {
"signal onError when tick closure throws" in {
val c = StreamTestKit.SubscriberProbe[String]()
val tickSource = Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace)
val m = tickSource.to(Sink(c)).run()
val cancellable = m.get(tickSource)
Source[String](1.second, 1.second, () throw new RuntimeException("tick err") with NoStackTrace).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(3)
c.expectError.getMessage should be("tick err")
awaitCond(cancellable.isCancelled)
c.expectNoMsg(100.millis)
}
"be usable with zip for a simple form of rate limiting" in {
@ -105,25 +101,5 @@ class TickSourceSpec extends AkkaSpec {
sub.cancel()
}
"be possible to cancel" in {
val tickGen = Iterator from 1
val c = StreamTestKit.SubscriberProbe[String]()
val tickSource = Source(1.second, 500.millis, () "tick-" + tickGen.next())
val m = tickSource.to(Sink(c)).run()
val cancellable = m.get(tickSource)
val sub = c.expectSubscription()
sub.request(3)
c.expectNoMsg(600.millis)
c.expectNext("tick-1")
c.expectNoMsg(200.millis)
c.expectNext("tick-2")
c.expectNoMsg(200.millis)
c.expectNext("tick-3")
cancellable.cancel()
awaitCond(cancellable.isCancelled)
sub.request(3)
c.expectComplete()
}
}
}

View file

@ -6,19 +6,17 @@ package akka.stream.impl
import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy }
import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
import org.reactivestreams.{ Subscriber, Subscription }
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicBoolean
/**
* INTERNAL API
*/
private[akka] object TickPublisher {
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any,
cancelled: AtomicBoolean, settings: MaterializerSettings): Props =
Props(new TickPublisher(initialDelay, interval, tick, cancelled, settings)).
withDispatcher(settings.dispatcher)
def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any, settings: MaterializerSettings): Props =
Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher)
object TickPublisherSubscription {
case class Cancel(subscriber: Subscriber[_ >: Any])
@ -44,14 +42,12 @@ private[akka] object TickPublisher {
* Each subscriber will receive the tick element if it has requested any elements,
* otherwise the tick element is dropped for that subscriber.
*/
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any,
cancelled: AtomicBoolean, settings: MaterializerSettings) extends Actor with SoftShutdown {
private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Any, settings: MaterializerSettings) extends Actor with SoftShutdown {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
import akka.stream.impl.TickPublisher._
var exposedPublisher: ActorPublisher[Any] = _
val demand = mutable.Map.empty[Subscriber[_ >: Any], Long]
var failed = false
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
@ -86,9 +82,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
} catch {
case NonFatal(e)
// tick closure throwed => onError downstream
failed = true
demand foreach { case (subscriber, _) subscriber.onError(e) }
softShutdown()
}
case RequestMore(elements, subscriber)
@ -123,13 +117,8 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
override def postStop(): Unit = {
tickTask.foreach(_.cancel)
cancelled.set(true)
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
if (!failed)
demand.foreach {
case (subscriber, _) subscriber.onComplete()
}
}
}

View file

@ -4,6 +4,7 @@
package akka.stream.javadsl
import java.util.concurrent.Callable
import akka.actor.ActorRef
import akka.actor.Props
import akka.japi.Util
@ -11,13 +12,13 @@ import akka.stream._
import akka.stream.scaladsl.PropsSource
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import scala.language.implicitConversions
import akka.actor.Cancellable
/** Java API */
object Source {
@ -109,13 +110,9 @@ object Source {
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*
* The [[MaterializedMap]] will contain a [[akka.actor.Cancellable]] for this
* `KeyedSource` and that can be used for stopping the tick source and thereby
* completing the stream.
*/
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.KeyedSource[O, Cancellable] =
new KeyedSource(scaladsl.Source(initialDelay, interval, () tick.call()))
def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.Source[O] =
new Source(scaladsl.Source(initialDelay, interval, () tick.call()))
/**
* Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects
@ -148,7 +145,7 @@ object Source {
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def subscriber[T](): KeyedSource[T, Subscriber[T]] =
def subscriber[T](): KeyedSource[Subscriber[T], T] =
new KeyedSource(scaladsl.Source.subscriber)
/**

View file

@ -10,15 +10,13 @@ import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast.AstNode
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Failure
import scala.util.Success
import akka.actor.Cancellable
import akka.actor.PoisonPill
import java.util.concurrent.atomic.AtomicBoolean
sealed trait ActorFlowSource[+Out] extends Source[Out] {
@ -183,34 +181,14 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*
* The [[MaterializedMap]] will contain a [[akka.actor.Cancellable]] for this
* `TickSource` and that can be used for stopping the tick source and thereby
* completing the stream.
*/
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends KeyedActorFlowSource[Out] {
override type MaterializedType = Cancellable
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Cancellable = {
val (pub, cancellable) = create(materializer, flowName)
pub.subscribe(flowSubscriber)
cancellable
}
final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () Out) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val cancelled = new AtomicBoolean(false)
val ref = materializer.actorOf(TickPublisher.props(initialDelay, interval, tick,
cancelled, materializer.settings), name = s"$flowName-0-tick")
val cancellable = new Cancellable {
override def cancel(): Boolean = {
if (!isCancelled)
ref ! PoisonPill
true
}
override def isCancelled: Boolean = cancelled.get
}
(ActorPublisher[Out](ref), cancellable)
}
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
(ActorPublisher[Out](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings),
name = s"$flowName-0-tick")), ())
}
/**

View file

@ -132,12 +132,8 @@ object Source {
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*
* The [[MaterializedMap]] will contain a [[akka.actor.Cancellable]] for this
* `TickSource` and that can be used for stopping the tick source and thereby
* completing the stream.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): TickSource[T] =
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] =
TickSource(initialDelay, interval, tick)
/**