diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 7defef30f5..d880eac55a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -31,20 +31,24 @@ object ActorPublisherSpec { case class TotalDemand(elements: Long) case class Produce(elem: String) case class Err(reason: String) + case class ErrThenStop(reason: String) case object Boom case object Complete + case object CompleteThenStop case object ThreadName class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { import akka.stream.actor.ActorPublisherMessage._ def receive = { - case Request(element) ⇒ probe ! TotalDemand(totalDemand) - case Produce(elem) ⇒ onNext(elem) - case Err(reason) ⇒ onError(new RuntimeException(reason) with NoStackTrace) - case Complete ⇒ onComplete() - case Boom ⇒ throw new RuntimeException("boom") with NoStackTrace - case ThreadName ⇒ probe ! Thread.currentThread.getName + case Request(element) ⇒ probe ! TotalDemand(totalDemand) + case Produce(elem) ⇒ onNext(elem) + case Err(reason) ⇒ onError(new RuntimeException(reason) with NoStackTrace) + case ErrThenStop(reason) ⇒ onErrorThenStop(new RuntimeException(reason) with NoStackTrace) + case Complete ⇒ onComplete() + case CompleteThenStop ⇒ onCompleteThenStop() + case Boom ⇒ throw new RuntimeException("boom") with NoStackTrace + case ThreadName ⇒ probe ! Thread.currentThread.getName } } @@ -169,7 +173,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic s.expectError.getMessage should be("wrong") } - "terminate after signalling error" in { + "not terminate after signalling onError" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) val s = StreamTestKit.SubscriberProbe[String]() @@ -178,7 +182,19 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic probe.watch(ref) ref ! Err("wrong") s.expectError.getMessage should be("wrong") - probe.expectTerminated(ref, 200.millis) + probe.expectNoMsg(200.millis) + } + + "terminate after signalling onErrorThenStop" in { + val probe = TestProbe() + val ref = system.actorOf(testPublisherProps(probe.ref)) + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + s.expectSubscription + probe.watch(ref) + ref ! ErrThenStop("wrong") + s.expectError.getMessage should be("wrong") + probe.expectTerminated(ref, 3.seconds) } "signal error before subscribe" in { @@ -237,7 +253,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic s.expectComplete } - "terminate after signalling onComplete" in { + "not terminate after signalling onComplete" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) val s = StreamTestKit.SubscriberProbe[String]() @@ -250,7 +266,23 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic ref ! Complete s.expectNext("elem-1") s.expectComplete - probe.expectTerminated(ref, 200.millis) + probe.expectNoMsg(200.millis) + } + + "terminate after signalling onCompleteThenStop" in { + val probe = TestProbe() + val ref = system.actorOf(testPublisherProps(probe.ref)) + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + val sub = s.expectSubscription + sub.request(3) + probe.expectMsg(TotalDemand(3)) + probe.watch(ref) + ref ! Produce("elem-1") + ref ! CompleteThenStop + s.expectNext("elem-1") + s.expectComplete + probe.expectTerminated(ref, 3.seconds) } "signal immediate onComplete" in { diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 98e9806767..07c26e1544 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -146,7 +146,7 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings /** INTERNAL API */ - def system: ActorSystem + private[akka] def system: ActorSystem /** * INTERNAL API: this might become public later diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 73c95e6b3f..5e3b097c28 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -42,7 +42,8 @@ object ActorPublisher { case object Active extends LifecycleState case object Canceled extends LifecycleState case object Completed extends LifecycleState - final case class ErrorEmitted(cause: Throwable) extends LifecycleState + case object CompleteThenStop extends LifecycleState + final case class ErrorEmitted(cause: Throwable, stop: Boolean) extends LifecycleState } } @@ -198,18 +199,12 @@ trait ActorPublisher[T] extends Actor { /** * Complete the stream. After that you are not allowed to * call [[#onNext]], [[#onError]] and [[#onComplete]]. - * - * After signalling completion the Actor will then stop itself as it has completed the protocol. - * When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe - * to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well) - * will be delayed until such [[Subscriber]] arrives. */ def onComplete(): Unit = lifecycleState match { case Active | PreSubscriber ⇒ lifecycleState = Completed - if (subscriber ne null) { - try tryOnComplete(subscriber) finally context.stop(self) - } // otherwise onComplete will be called when the subscription arrives + if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives + try tryOnComplete(subscriber) finally subscriber = null case Completed ⇒ throw new IllegalStateException("onComplete must only be called once") case _: ErrorEmitted ⇒ @@ -217,6 +212,38 @@ trait ActorPublisher[T] extends Actor { case Canceled ⇒ // drop } + /** + * Complete the stream. After that you are not allowed to + * call [[#onNext]], [[#onError]] and [[#onComplete]]. + * + * After signalling completion the Actor will then stop itself as it has completed the protocol. + * When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe + * to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well) + * will be delayed until such [[Subscriber]] arrives. + */ + def onCompleteThenStop(): Unit = lifecycleState match { + case Active | PreSubscriber ⇒ + lifecycleState = CompleteThenStop + if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives + try tryOnComplete(subscriber) finally context.stop(self) + case _ ⇒ onComplete() + } + + /** + * Terminate the stream with failure. After that you are not allowed to + * call [[#onNext]], [[#onError]] and [[#onComplete]]. + */ + def onError(cause: Throwable): Unit = lifecycleState match { + case Active | PreSubscriber ⇒ + lifecycleState = ErrorEmitted(cause, false) + if (subscriber ne null) // otherwise onError will be called when the subscription arrives + try tryOnError(subscriber, cause) finally subscriber = null + case _: ErrorEmitted ⇒ + throw new IllegalStateException("onError must only be called once") + case Completed ⇒ + throw new IllegalStateException("onError must not be called after onComplete") + case Canceled ⇒ // drop + } /** * Terminate the stream with failure. After that you are not allowed to * call [[#onNext]], [[#onError]] and [[#onComplete]]. @@ -226,17 +253,12 @@ trait ActorPublisher[T] extends Actor { * to this [[ActorPublisher]] the error signal (and therefore stopping of the Actor as well) * will be delayed until such [[Subscriber]] arrives. */ - def onError(cause: Throwable): Unit = lifecycleState match { + def onErrorThenStop(cause: Throwable): Unit = lifecycleState match { case Active | PreSubscriber ⇒ - lifecycleState = ErrorEmitted(cause) - if (subscriber ne null) { + lifecycleState = ErrorEmitted(cause, stop = true) + if (subscriber ne null) // otherwise onError will be called when the subscription arrives try tryOnError(subscriber, cause) finally context.stop(self) - } // otherwise onError will be called when the subscription arrives - case _: ErrorEmitted ⇒ - throw new IllegalStateException("onError must only be called once") - case Completed ⇒ - throw new IllegalStateException("onError must not be called after onComplete") - case Canceled ⇒ // drop + case _ ⇒ onError(cause) } /** @@ -263,11 +285,14 @@ trait ActorPublisher[T] extends Actor { subscriber = sub lifecycleState = Active tryOnSubscribe(sub, new ActorPublisherSubscription(self)) - case ErrorEmitted(cause) ⇒ - context.stop(self) + case ErrorEmitted(cause, stop) ⇒ + if (stop) context.stop(self) tryOnSubscribe(sub, CancelledSubscription) tryOnError(sub, cause) case Completed ⇒ + tryOnSubscribe(sub, CancelledSubscription) + tryOnComplete(sub) + case CompleteThenStop ⇒ context.stop(self) tryOnSubscribe(sub, CancelledSubscription) tryOnComplete(sub) diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala index b866b3d1ad..33808e9748 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala @@ -48,7 +48,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom readAndSignal(initialBuffer) } catch { case ex: Exception ⇒ - onError(ex) + onErrorThenStop(ex) } super.preStart() @@ -83,7 +83,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom if (totalDemand > 0) signalOnNexts() } - } else if (eofEncountered) onComplete() + } else if (eofEncountered) onCompleteThenStop() /** BLOCKING I/O READ */ def loadChunk() = try { @@ -107,7 +107,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom } } catch { case ex: Exception ⇒ - onError(ex) + onErrorThenStop(ex) } private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala b/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala index 1b5ff9f363..436a086fbe 100644 --- a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala @@ -50,7 +50,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[ readAndSignal(initialBuffer) } catch { case ex: Exception ⇒ - onError(ex) + onErrorThenStop(ex) } super.preStart() @@ -85,7 +85,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[ if (totalDemand > 0) signalOnNexts() } - } else if (eofEncountered) onComplete() + } else if (eofEncountered) onCompleteThenStop() /** BLOCKING I/O READ */ def loadChunk() = try { @@ -106,7 +106,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[ } } catch { case ex: Exception ⇒ - onError(ex) + onErrorThenStop(ex) } private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue @@ -115,7 +115,7 @@ private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[ super.postStop() bytesReadPromise.trySuccess(readBytesTotal) - if (chan ne null) chan.close() - if (raf ne null) raf.close() + try if (chan ne null) chan.close() + finally if (raf ne null) raf.close() } }