From 9b7c2a8cb4a3d13d47a12d020ca9c1da8a8ab22c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 7 May 2018 17:00:46 +0200 Subject: [PATCH] UnfoldResourceSource closing twice on failure, #24924 * Don't write to actual disk in test * Silence expected log errors * Bugfix #24924 UnfoldResource closed resource twice on failure * One more case where close would be called twice * Toggle closed flag after operation in restartState * open false _before_ closing in restartState --- .../scaladsl/UnfoldResourceSourceSpec.scala | 98 ++++++++++++++----- .../stream/impl/UnfoldResourceSource.scala | 23 +++-- 2 files changed, 85 insertions(+), 36 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala index 587c931180..6be2d49551 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala @@ -5,17 +5,22 @@ package akka.stream.scaladsl import java.io._ +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorSystem import akka.stream.ActorAttributes._ import akka.stream.Supervision._ -import akka.stream.{ ActorMaterializer, _ } import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } -import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.stream.{ ActorMaterializer, _ } +import akka.testkit.EventFilter import akka.util.ByteString +import com.google.common.jimfs.{ Configuration, Jimfs } import scala.concurrent.duration._ @@ -24,7 +29,9 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") implicit val materializer = ActorMaterializer(settings) - val manyLines = { + private val fs = Jimfs.newFileSystem("UnfoldResourceSourceSpec", Configuration.unix()) + + private val manyLines = { ("a" * 100 + "\n") * 10 + ("b" * 100 + "\n") * 10 + ("c" * 100 + "\n") * 10 + @@ -32,18 +39,18 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { ("e" * 100 + "\n") * 10 + ("f" * 100 + "\n") * 10 } - val manyLinesArray = manyLines.split("\n") + private val manyLinesArray = manyLines.split("\n") - val manyLinesFile = { - val f = File.createTempFile("blocking-source-spec", ".tmp") - new FileWriter(f).append(manyLines).close() - f + private val manyLinesPath = { + val file = Files.createFile(fs.getPath("/test.dat")) + Files.write(file, manyLines.getBytes(StandardCharsets.UTF_8)) } + private def newBufferedReader() = Files.newBufferedReader(manyLinesPath, StandardCharsets.UTF_8) "Unfold Resource Source" must { "read contents from a file" in assertAllStagesStopped { val p = Source.unfoldResource[String, BufferedReader]( - () ⇒ new BufferedReader(new FileReader(manyLinesFile)), + () ⇒ newBufferedReader(), reader ⇒ Option(reader.readLine()), reader ⇒ reader.close()) .runWith(Sink.asPublisher(false)) @@ -70,7 +77,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { "continue when Strategy is Resume and exception happened" in assertAllStagesStopped { val p = Source.unfoldResource[String, BufferedReader]( - () ⇒ new BufferedReader(new FileReader(manyLinesFile)), + () ⇒ newBufferedReader(), reader ⇒ { val s = reader.readLine() if (s != null && s.contains("b")) throw TE("") else Option(s) @@ -92,7 +99,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { "close and open stream again when Strategy is Restart" in assertAllStagesStopped { val p = Source.unfoldResource[String, BufferedReader]( - () ⇒ new BufferedReader(new FileReader(manyLinesFile)), + () ⇒ newBufferedReader(), reader ⇒ { val s = reader.readLine() if (s != null && s.contains("b")) throw TE("") else Option(s) @@ -115,7 +122,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val chunkSize = 50 val buffer = new Array[Char](chunkSize) val p = Source.unfoldResource[ByteString, Reader]( - () ⇒ new BufferedReader(new FileReader(manyLinesFile)), + () ⇒ newBufferedReader(), reader ⇒ { val s = reader.read(buffer) if (s > 0) Some(ByteString(buffer.mkString("")).take(s)) else None @@ -147,7 +154,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val materializer = ActorMaterializer()(sys) try { val p = Source.unfoldResource[String, BufferedReader]( - () ⇒ new BufferedReader(new FileReader(manyLinesFile)), + () ⇒ newBufferedReader(), reader ⇒ Option(reader.readLine()), reader ⇒ reader.close()).runWith(TestSink.probe)(materializer) @@ -158,21 +165,23 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "fail when create throws exception" in assertAllStagesStopped { - val p = Source.unfoldResource[String, BufferedReader]( - () ⇒ throw TE(""), - reader ⇒ Option(reader.readLine()), - reader ⇒ reader.close()) - .runWith(Sink.asPublisher(false)) - val c = TestSubscriber.manualProbe[String]() - p.subscribe(c) + EventFilter[TE](occurrences = 1).intercept { + val p = Source.unfoldResource[String, BufferedReader]( + () ⇒ throw TE(""), + reader ⇒ Option(reader.readLine()), + reader ⇒ reader.close()) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) - c.expectSubscription() - c.expectError(TE("")) + c.expectSubscription() + c.expectError(TE("")) + } } "fail when close throws exception" in assertAllStagesStopped { val p = Source.unfoldResource[String, BufferedReader]( - () ⇒ new BufferedReader(new FileReader(manyLinesFile)), + () ⇒ newBufferedReader(), reader ⇒ Option(reader.readLine()), reader ⇒ throw TE("")) .runWith(Sink.asPublisher(false)) @@ -181,11 +190,48 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val sub = c.expectSubscription() sub.request(61) - c.expectNextN(60) - c.expectError(TE("")) + + EventFilter[TE](occurrences = 1).intercept { + c.expectNextN(60) + c.expectError(TE("")) + } } + + // issue #24924 + "not close the resource twice when read fails" in { + val closedCounter = new AtomicInteger(0) + val probe = Source.unfoldResource[Int, Int]( + () ⇒ 23, // the best resource there is + _ ⇒ throw TE("failing read"), + _ ⇒ closedCounter.incrementAndGet() + ).runWith(TestSink.probe[Int]) + + probe.request(1) + probe.expectError(TE("failing read")) + closedCounter.get() should ===(1) + } + + // issue #24924 + "not close the resource twice when read fails and then close fails" in { + val closedCounter = new AtomicInteger(0) + val probe = Source.unfoldResource[Int, Int]( + () ⇒ 23, // the best resource there is + _ ⇒ throw TE("failing read"), + { _ ⇒ + closedCounter.incrementAndGet() + if (closedCounter.get == 1) throw TE("boom") + } + ).runWith(TestSink.probe[Int]) + + EventFilter[TE](occurrences = 1).intercept { + probe.request(1) + probe.expectError(TE("boom")) + } + closedCounter.get() should ===(1) + } + } override def afterTermination(): Unit = { - manyLinesFile.delete() + fs.close() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala index 7d5efee55a..c04b73af93 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala @@ -44,16 +44,18 @@ import scala.util.control.NonFatal case None ⇒ closeStage() } } catch { - case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ - close(blockingStream) - failStage(ex) - case Supervision.Restart ⇒ - restartState() - resumingMode = true - case Supervision.Resume ⇒ - resumingMode = true - } + case NonFatal(ex) ⇒ + decider(ex) match { + case Supervision.Stop ⇒ + open = false + close(blockingStream) + failStage(ex) + case Supervision.Restart ⇒ + restartState() + resumingMode = true + case Supervision.Resume ⇒ + resumingMode = true + } } if (resumingMode) onPull() } @@ -61,6 +63,7 @@ import scala.util.control.NonFatal override def onDownstreamFinish(): Unit = closeStage() private def restartState(): Unit = { + open = false close(blockingStream) blockingStream = create() open = true