+str #17284 allows deciding to stop after signaling terminal or not

This commit is contained in:
Konrad Malawski 2015-04-23 21:24:31 +02:00
parent 311c94f53a
commit 337850919e
5 changed files with 96 additions and 39 deletions

View file

@ -31,20 +31,24 @@ object ActorPublisherSpec {
case class TotalDemand(elements: Long) case class TotalDemand(elements: Long)
case class Produce(elem: String) case class Produce(elem: String)
case class Err(reason: String) case class Err(reason: String)
case class ErrThenStop(reason: String)
case object Boom case object Boom
case object Complete case object Complete
case object CompleteThenStop
case object ThreadName case object ThreadName
class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { class TestPublisher(probe: ActorRef) extends ActorPublisher[String] {
import akka.stream.actor.ActorPublisherMessage._ import akka.stream.actor.ActorPublisherMessage._
def receive = { def receive = {
case Request(element) probe ! TotalDemand(totalDemand) case Request(element) probe ! TotalDemand(totalDemand)
case Produce(elem) onNext(elem) case Produce(elem) onNext(elem)
case Err(reason) onError(new RuntimeException(reason) with NoStackTrace) case Err(reason) onError(new RuntimeException(reason) with NoStackTrace)
case Complete onComplete() case ErrThenStop(reason) onErrorThenStop(new RuntimeException(reason) with NoStackTrace)
case Boom throw new RuntimeException("boom") with NoStackTrace case Complete onComplete()
case ThreadName probe ! Thread.currentThread.getName case CompleteThenStop onCompleteThenStop()
case Boom throw new RuntimeException("boom") with NoStackTrace
case ThreadName probe ! Thread.currentThread.getName
} }
} }
@ -169,7 +173,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
s.expectError.getMessage should be("wrong") s.expectError.getMessage should be("wrong")
} }
"terminate after signalling error" in { "not terminate after signalling onError" in {
val probe = TestProbe() val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = StreamTestKit.SubscriberProbe[String]() val s = StreamTestKit.SubscriberProbe[String]()
@ -178,7 +182,19 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
probe.watch(ref) probe.watch(ref)
ref ! Err("wrong") ref ! Err("wrong")
s.expectError.getMessage should be("wrong") s.expectError.getMessage should be("wrong")
probe.expectTerminated(ref, 200.millis) probe.expectNoMsg(200.millis)
}
"terminate after signalling onErrorThenStop" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
s.expectSubscription
probe.watch(ref)
ref ! ErrThenStop("wrong")
s.expectError.getMessage should be("wrong")
probe.expectTerminated(ref, 3.seconds)
} }
"signal error before subscribe" in { "signal error before subscribe" in {
@ -237,7 +253,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
s.expectComplete s.expectComplete
} }
"terminate after signalling onComplete" in { "not terminate after signalling onComplete" in {
val probe = TestProbe() val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref)) val ref = system.actorOf(testPublisherProps(probe.ref))
val s = StreamTestKit.SubscriberProbe[String]() val s = StreamTestKit.SubscriberProbe[String]()
@ -250,7 +266,23 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
ref ! Complete ref ! Complete
s.expectNext("elem-1") s.expectNext("elem-1")
s.expectComplete s.expectComplete
probe.expectTerminated(ref, 200.millis) probe.expectNoMsg(200.millis)
}
"terminate after signalling onCompleteThenStop" in {
val probe = TestProbe()
val ref = system.actorOf(testPublisherProps(probe.ref))
val s = StreamTestKit.SubscriberProbe[String]()
ActorPublisher[String](ref).subscribe(s)
val sub = s.expectSubscription
sub.request(3)
probe.expectMsg(TotalDemand(3))
probe.watch(ref)
ref ! Produce("elem-1")
ref ! CompleteThenStop
s.expectNext("elem-1")
s.expectComplete
probe.expectTerminated(ref, 3.seconds)
} }
"signal immediate onComplete" in { "signal immediate onComplete" in {

View file

@ -146,7 +146,7 @@ abstract class ActorFlowMaterializer extends FlowMaterializer {
def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings
/** INTERNAL API */ /** INTERNAL API */
def system: ActorSystem private[akka] def system: ActorSystem
/** /**
* INTERNAL API: this might become public later * INTERNAL API: this might become public later

View file

@ -42,7 +42,8 @@ object ActorPublisher {
case object Active extends LifecycleState case object Active extends LifecycleState
case object Canceled extends LifecycleState case object Canceled extends LifecycleState
case object Completed extends LifecycleState case object Completed extends LifecycleState
final case class ErrorEmitted(cause: Throwable) extends LifecycleState case object CompleteThenStop extends LifecycleState
final case class ErrorEmitted(cause: Throwable, stop: Boolean) extends LifecycleState
} }
} }
@ -198,18 +199,12 @@ trait ActorPublisher[T] extends Actor {
/** /**
* Complete the stream. After that you are not allowed to * Complete the stream. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]]. * call [[#onNext]], [[#onError]] and [[#onComplete]].
*
* After signalling completion the Actor will then stop itself as it has completed the protocol.
* When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe
* to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well)
* will be delayed until such [[Subscriber]] arrives.
*/ */
def onComplete(): Unit = lifecycleState match { def onComplete(): Unit = lifecycleState match {
case Active | PreSubscriber case Active | PreSubscriber
lifecycleState = Completed lifecycleState = Completed
if (subscriber ne null) { if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
try tryOnComplete(subscriber) finally context.stop(self) try tryOnComplete(subscriber) finally subscriber = null
} // otherwise onComplete will be called when the subscription arrives
case Completed case Completed
throw new IllegalStateException("onComplete must only be called once") throw new IllegalStateException("onComplete must only be called once")
case _: ErrorEmitted case _: ErrorEmitted
@ -217,6 +212,38 @@ trait ActorPublisher[T] extends Actor {
case Canceled // drop case Canceled // drop
} }
/**
* Complete the stream. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*
* After signalling completion the Actor will then stop itself as it has completed the protocol.
* When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe
* to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well)
* will be delayed until such [[Subscriber]] arrives.
*/
def onCompleteThenStop(): Unit = lifecycleState match {
case Active | PreSubscriber
lifecycleState = CompleteThenStop
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
try tryOnComplete(subscriber) finally context.stop(self)
case _ onComplete()
}
/**
* Terminate the stream with failure. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*/
def onError(cause: Throwable): Unit = lifecycleState match {
case Active | PreSubscriber
lifecycleState = ErrorEmitted(cause, false)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives
try tryOnError(subscriber, cause) finally subscriber = null
case _: ErrorEmitted
throw new IllegalStateException("onError must only be called once")
case Completed
throw new IllegalStateException("onError must not be called after onComplete")
case Canceled // drop
}
/** /**
* Terminate the stream with failure. After that you are not allowed to * Terminate the stream with failure. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]]. * call [[#onNext]], [[#onError]] and [[#onComplete]].
@ -226,17 +253,12 @@ trait ActorPublisher[T] extends Actor {
* to this [[ActorPublisher]] the error signal (and therefore stopping of the Actor as well) * to this [[ActorPublisher]] the error signal (and therefore stopping of the Actor as well)
* will be delayed until such [[Subscriber]] arrives. * will be delayed until such [[Subscriber]] arrives.
*/ */
def onError(cause: Throwable): Unit = lifecycleState match { def onErrorThenStop(cause: Throwable): Unit = lifecycleState match {
case Active | PreSubscriber case Active | PreSubscriber
lifecycleState = ErrorEmitted(cause) lifecycleState = ErrorEmitted(cause, stop = true)
if (subscriber ne null) { if (subscriber ne null) // otherwise onError will be called when the subscription arrives
try tryOnError(subscriber, cause) finally context.stop(self) try tryOnError(subscriber, cause) finally context.stop(self)
} // otherwise onError will be called when the subscription arrives case _ onError(cause)
case _: ErrorEmitted
throw new IllegalStateException("onError must only be called once")
case Completed
throw new IllegalStateException("onError must not be called after onComplete")
case Canceled // drop
} }
/** /**
@ -263,11 +285,14 @@ trait ActorPublisher[T] extends Actor {
subscriber = sub subscriber = sub
lifecycleState = Active lifecycleState = Active
tryOnSubscribe(sub, new ActorPublisherSubscription(self)) tryOnSubscribe(sub, new ActorPublisherSubscription(self))
case ErrorEmitted(cause) case ErrorEmitted(cause, stop)
context.stop(self) if (stop) context.stop(self)
tryOnSubscribe(sub, CancelledSubscription) tryOnSubscribe(sub, CancelledSubscription)
tryOnError(sub, cause) tryOnError(sub, cause)
case Completed case Completed
tryOnSubscribe(sub, CancelledSubscription)
tryOnComplete(sub)
case CompleteThenStop
context.stop(self) context.stop(self)
tryOnSubscribe(sub, CancelledSubscription) tryOnSubscribe(sub, CancelledSubscription)
tryOnComplete(sub) tryOnComplete(sub)

View file

@ -48,7 +48,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom
readAndSignal(initialBuffer) readAndSignal(initialBuffer)
} catch { } catch {
case ex: Exception case ex: Exception
onError(ex) onErrorThenStop(ex)
} }
super.preStart() super.preStart()
@ -83,7 +83,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom
if (totalDemand > 0) signalOnNexts() if (totalDemand > 0) signalOnNexts()
} }
} else if (eofEncountered) onComplete() } else if (eofEncountered) onCompleteThenStop()
/** BLOCKING I/O READ */ /** BLOCKING I/O READ */
def loadChunk() = try { def loadChunk() = try {
@ -107,7 +107,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom
} }
} catch { } catch {
case ex: Exception case ex: Exception
onError(ex) onErrorThenStop(ex)
} }
private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue

View file

@ -50,7 +50,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[
readAndSignal(initialBuffer) readAndSignal(initialBuffer)
} catch { } catch {
case ex: Exception case ex: Exception
onError(ex) onErrorThenStop(ex)
} }
super.preStart() super.preStart()
@ -85,7 +85,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[
if (totalDemand > 0) signalOnNexts() if (totalDemand > 0) signalOnNexts()
} }
} else if (eofEncountered) onComplete() } else if (eofEncountered) onCompleteThenStop()
/** BLOCKING I/O READ */ /** BLOCKING I/O READ */
def loadChunk() = try { def loadChunk() = try {
@ -106,7 +106,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[
} }
} catch { } catch {
case ex: Exception case ex: Exception
onError(ex) onErrorThenStop(ex)
} }
private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue
@ -115,7 +115,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[
super.postStop() super.postStop()
bytesReadPromise.trySuccess(readBytesTotal) bytesReadPromise.trySuccess(readBytesTotal)
if (chan ne null) chan.close() try if (chan ne null) chan.close()
if (raf ne null) raf.close() finally if (raf ne null) raf.close()
} }
} }