UnfoldResourceSource closing twice on failure, #24924

* Don't write to actual disk in test

* Silence expected log errors

* Bugfix #24924 UnfoldResource closed resource twice on failure

* One more case where close would be called twice

* Toggle closed flag after operation in restartState

* open false _before_ closing in restartState
This commit is contained in:
Johan Andrén 2018-05-07 17:00:46 +02:00 committed by Patrik Nordwall
parent 969cdb873a
commit 9b7c2a8cb4
2 changed files with 85 additions and 36 deletions

View file

@ -5,17 +5,22 @@
package akka.stream.scaladsl
import java.io._
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.ActorAttributes._
import akka.stream.Supervision._
import akka.stream.{ ActorMaterializer, _ }
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.{ ActorMaterializer, _ }
import akka.testkit.EventFilter
import akka.util.ByteString
import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.concurrent.duration._
@ -24,7 +29,9 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val manyLines = {
private val fs = Jimfs.newFileSystem("UnfoldResourceSourceSpec", Configuration.unix())
private val manyLines = {
("a" * 100 + "\n") * 10 +
("b" * 100 + "\n") * 10 +
("c" * 100 + "\n") * 10 +
@ -32,18 +39,18 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
("e" * 100 + "\n") * 10 +
("f" * 100 + "\n") * 10
}
val manyLinesArray = manyLines.split("\n")
private val manyLinesArray = manyLines.split("\n")
val manyLinesFile = {
val f = File.createTempFile("blocking-source-spec", ".tmp")
new FileWriter(f).append(manyLines).close()
f
private val manyLinesPath = {
val file = Files.createFile(fs.getPath("/test.dat"))
Files.write(file, manyLines.getBytes(StandardCharsets.UTF_8))
}
private def newBufferedReader() = Files.newBufferedReader(manyLinesPath, StandardCharsets.UTF_8)
"Unfold Resource Source" must {
"read contents from a file" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](
() new BufferedReader(new FileReader(manyLinesFile)),
() newBufferedReader(),
reader Option(reader.readLine()),
reader reader.close())
.runWith(Sink.asPublisher(false))
@ -70,7 +77,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
"continue when Strategy is Resume and exception happened" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](
() new BufferedReader(new FileReader(manyLinesFile)),
() newBufferedReader(),
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Option(s)
@ -92,7 +99,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
"close and open stream again when Strategy is Restart" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](
() new BufferedReader(new FileReader(manyLinesFile)),
() newBufferedReader(),
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Option(s)
@ -115,7 +122,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val chunkSize = 50
val buffer = new Array[Char](chunkSize)
val p = Source.unfoldResource[ByteString, Reader](
() new BufferedReader(new FileReader(manyLinesFile)),
() newBufferedReader(),
reader {
val s = reader.read(buffer)
if (s > 0) Some(ByteString(buffer.mkString("")).take(s)) else None
@ -147,7 +154,7 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val materializer = ActorMaterializer()(sys)
try {
val p = Source.unfoldResource[String, BufferedReader](
() new BufferedReader(new FileReader(manyLinesFile)),
() newBufferedReader(),
reader Option(reader.readLine()),
reader reader.close()).runWith(TestSink.probe)(materializer)
@ -158,21 +165,23 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
}
"fail when create throws exception" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](
() throw TE(""),
reader Option(reader.readLine()),
reader reader.close())
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
EventFilter[TE](occurrences = 1).intercept {
val p = Source.unfoldResource[String, BufferedReader](
() throw TE(""),
reader Option(reader.readLine()),
reader reader.close())
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
c.expectSubscription()
c.expectError(TE(""))
c.expectSubscription()
c.expectError(TE(""))
}
}
"fail when close throws exception" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](
() new BufferedReader(new FileReader(manyLinesFile)),
() newBufferedReader(),
reader Option(reader.readLine()),
reader throw TE(""))
.runWith(Sink.asPublisher(false))
@ -181,11 +190,48 @@ class UnfoldResourceSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val sub = c.expectSubscription()
sub.request(61)
c.expectNextN(60)
c.expectError(TE(""))
EventFilter[TE](occurrences = 1).intercept {
c.expectNextN(60)
c.expectError(TE(""))
}
}
// issue #24924
"not close the resource twice when read fails" in {
val closedCounter = new AtomicInteger(0)
val probe = Source.unfoldResource[Int, Int](
() 23, // the best resource there is
_ throw TE("failing read"),
_ closedCounter.incrementAndGet()
).runWith(TestSink.probe[Int])
probe.request(1)
probe.expectError(TE("failing read"))
closedCounter.get() should ===(1)
}
// issue #24924
"not close the resource twice when read fails and then close fails" in {
val closedCounter = new AtomicInteger(0)
val probe = Source.unfoldResource[Int, Int](
() 23, // the best resource there is
_ throw TE("failing read"),
{ _
closedCounter.incrementAndGet()
if (closedCounter.get == 1) throw TE("boom")
}
).runWith(TestSink.probe[Int])
EventFilter[TE](occurrences = 1).intercept {
probe.request(1)
probe.expectError(TE("boom"))
}
closedCounter.get() should ===(1)
}
}
override def afterTermination(): Unit = {
manyLinesFile.delete()
fs.close()
}
}

View file

@ -44,16 +44,18 @@ import scala.util.control.NonFatal
case None closeStage()
}
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop
close(blockingStream)
failStage(ex)
case Supervision.Restart
restartState()
resumingMode = true
case Supervision.Resume
resumingMode = true
}
case NonFatal(ex)
decider(ex) match {
case Supervision.Stop
open = false
close(blockingStream)
failStage(ex)
case Supervision.Restart
restartState()
resumingMode = true
case Supervision.Resume
resumingMode = true
}
}
if (resumingMode) onPull()
}
@ -61,6 +63,7 @@ import scala.util.control.NonFatal
override def onDownstreamFinish(): Unit = closeStage()
private def restartState(): Unit = {
open = false
close(blockingStream)
blockingStream = create()
open = true