From 19d0bdb14847b5053d769628fad0db4c93ed7f44 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 15 Mar 2016 16:45:50 +0100 Subject: [PATCH] fix ActorPublisher state machine, fixes #20031 --- .../scala/akka/stream/io/FileSourceSpec.scala | 16 +++++++++++++--- .../scala/akka/stream/actor/ActorPublisher.scala | 10 ++++------ .../akka/stream/impl/io/FilePublisher.scala | 4 ++-- .../stream/impl/io/InputStreamPublisher.scala | 6 +++--- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index afa51bef4a..c22a4ed60c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -4,7 +4,6 @@ package akka.stream.io import java.io.File -import java.io.FileWriter import java.util.Random import akka.actor.ActorSystem import akka.stream.ActorMaterializer @@ -24,6 +23,9 @@ import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ import akka.testkit.AkkaSpec +import java.io.OutputStreamWriter +import java.io.FileOutputStream +import java.nio.charset.StandardCharsets.UTF_8 object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) @@ -45,7 +47,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val testFile = { val f = File.createTempFile("file-source-spec", ".tmp") - new FileWriter(f).append(TestText).close() + new OutputStreamWriter(new FileOutputStream(f), UTF_8).append(TestText).close() f } @@ -60,7 +62,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val manyLines = { val f = File.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp") - val w = new FileWriter(f) + val w = new OutputStreamWriter(new FileOutputStream(f), UTF_8) (1 to LinesCount).foreach { l ⇒ w.append("a" * l).append("\n") } @@ -196,6 +198,14 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { try assertDispatcher(ref, "akka.actor.default-dispatcher") finally p.cancel() } finally shutdown(sys) } + + "not signal onComplete more than once" in { + FileIO.fromFile(testFile, 2 * TestText.length) + .runWith(TestSink.probe) + .requestNext(ByteString(TestText, UTF_8.name)) + .expectComplete() + .expectNoMsg(1.second) + } } override def afterTermination(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 1a13f11ea7..3efc9cfa66 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -192,11 +192,11 @@ trait ActorPublisher[T] extends Actor { * call [[#onNext]], [[#onError]] and [[#onComplete]]. */ def onComplete(): Unit = lifecycleState match { - case Active | PreSubscriber | CompleteThenStop ⇒ + case Active | PreSubscriber ⇒ lifecycleState = Completed if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives try tryOnComplete(subscriber) finally subscriber = null - case Completed ⇒ + case Completed | CompleteThenStop ⇒ throw new IllegalStateException("onComplete must only be called once") case _: ErrorEmitted ⇒ throw new IllegalStateException("onComplete must not be called after onError") @@ -225,13 +225,13 @@ trait ActorPublisher[T] extends Actor { * call [[#onNext]], [[#onError]] and [[#onComplete]]. */ def onError(cause: Throwable): Unit = lifecycleState match { - case Active | PreSubscriber | CompleteThenStop ⇒ + case Active | PreSubscriber ⇒ lifecycleState = ErrorEmitted(cause, stop = false) if (subscriber ne null) // otherwise onError will be called when the subscription arrives try tryOnError(subscriber, cause) finally subscriber = null case _: ErrorEmitted ⇒ throw new IllegalStateException("onError must only be called once") - case Completed ⇒ + case Completed | CompleteThenStop ⇒ throw new IllegalStateException("onError must not be called after onComplete") case Canceled ⇒ // drop } @@ -260,8 +260,6 @@ trait ActorPublisher[T] extends Actor { if (n < 1) { if (lifecycleState == Active) onError(numberOfElementsInRequestMustBePositiveException) - else - super.aroundReceive(receive, msg) } else { demand += n if (demand < 0) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 8486ffdb16..0f020a1fc2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -66,8 +66,8 @@ private[akka] final class FilePublisher(f: File, completionPromise: Promise[IORe def readAndSignal(maxReadAhead: Int): Unit = if (isActive) { - // Write previously buffered, read into buffer, write newly buffered - availableChunks = signalOnNexts(readAhead(maxReadAhead, signalOnNexts(availableChunks))) + // Write previously buffered, then refill buffer + availableChunks = readAhead(maxReadAhead, signalOnNexts(availableChunks)) if (totalDemand > 0 && isActive) self ! Continue } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index 5cca525b33..4d601836ac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -28,8 +28,8 @@ private[akka] object InputStreamPublisher { /** INTERNAL API */ private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int) - extends akka.stream.actor.ActorPublisher[ByteString] - with ActorLogging { + extends akka.stream.actor.ActorPublisher[ByteString] + with ActorLogging { // TODO possibly de-duplicate with FilePublisher? @@ -47,7 +47,7 @@ private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Pro def readAndSignal(): Unit = if (isActive) { readAndEmit() - if (totalDemand > 0) self ! Continue + if (totalDemand > 0 && isActive) self ! Continue } def readAndEmit(): Unit = if (totalDemand > 0) try {