Fix for unstable UnfoldResourceAsync tests #23382

This commit is contained in:
Johan Andrén 2017-10-15 18:02:46 +02:00 committed by GitHub
parent 3ad2db52c4
commit 501c3f9fb8

View file

@ -3,259 +3,272 @@
*/
package akka.stream.scaladsl
import java.io._
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorAttributes._
import akka.stream.Supervision._
import akka.stream.{ ActorMaterializer, _ }
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.{ ActorMaterializer, _ }
import akka.testkit.TestLatch
import akka.util.ByteString
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
object UnfoldResourceAsyncSourceSpec {
class ResourceDummy[T](
values: Seq[T],
// these can be used to control when the resource creates, reads first element and completes closing
createFuture: Future[Done] = Future.successful(Done),
firstReadFuture: Future[Done] = Future.successful(Done),
closeFuture: Future[Done] = Future.successful(Done))(implicit ec: ExecutionContext) {
private val iterator = values.iterator
private val createdP = Promise[Done]()
private val closedP = Promise[Done]()
private val firstReadP = Promise[Done]()
// these can be used to observe when the resource calls has happened
val created: Future[Done] = createdP.future
val firstElementRead: Future[Done] = firstReadP.future
val closed: Future[Done] = closedP.future
def create: Future[ResourceDummy[T]] = {
createdP.trySuccess(Done)
createFuture.map(_ this)
}
def read: Future[Option[T]] = {
if (!firstReadP.isCompleted) firstReadP.trySuccess(Done)
firstReadFuture.map { _
if (iterator.hasNext) Some(iterator.next())
else None
}
}
def close(): Future[Done] = {
closedP.trySuccess(Done)
closeFuture
}
}
}
class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
import UnfoldResourceAsyncSourceSpec._
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val manyLines = {
("a" * 100 + "\n") * 10 +
("b" * 100 + "\n") * 10 +
("c" * 100 + "\n") * 10 +
("d" * 100 + "\n") * 10 +
("e" * 100 + "\n") * 10 +
("f" * 100 + "\n") * 10
}
val manyLinesArray = manyLines.split("\n")
val manyLinesFile = {
val f = File.createTempFile("blocking-source-async-spec", ".tmp")
new FileWriter(f).append(manyLines).close()
f
}
val open: () Future[BufferedReader] = () Promise.successful(new BufferedReader(new FileReader(manyLinesFile))).future
val read: (BufferedReader) Future[Option[String]] = reader Promise.successful(Option(reader.readLine())).future
val close: (BufferedReader) Future[Done] =
reader {
reader.close()
Promise.successful(Done).future
}
import system.dispatcher
"Unfold Resource Async Source" must {
"read contents from a file" in assertAllStagesStopped {
val createPromise = Promise[BufferedReader]()
val readPromise = Promise[Option[String]]()
"unfold data from a resource" in assertAllStagesStopped {
val createPromise = Promise[Done]()
val closePromise = Promise[Done]()
val createPromiseCalled = Promise[Done]()
val readPromiseCalled = Promise[Done]()
val closePromiseCalled = Promise[Done]()
val values = 0 to 1000
val resource = new ResourceDummy[Int](
values,
createFuture = createPromise.future,
closeFuture = closePromise.future)
val resource = new BufferedReader(new FileReader(manyLinesFile))
val p = Source.unfoldResourceAsync[String, BufferedReader](
() {
createPromiseCalled.success(Done)
createPromise.future
},
reader {
readPromiseCalled.success(Done)
readPromise.future
},
reader {
closePromiseCalled.success(Done)
closePromise.future
})
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
val probe = TestSubscriber.probe[Int]()
Source.unfoldResourceAsync[Int, ResourceDummy[Int]](
resource.create _,
_.read,
_.close
).runWith(Sink.fromSubscriber(probe))
sub.request(1)
Await.ready(createPromiseCalled.future, 3.seconds)
c.expectNoMsg(200.millis)
createPromise.success(resource)
probe.request(1)
resource.created.futureValue
probe.expectNoMsg(200.millis)
createPromise.success(Done)
val chunks = manyLinesArray.toList.iterator
values.foreach { n
resource.firstElementRead.futureValue
probe.expectNext() should ===(n)
probe.request(1)
}
Await.ready(readPromiseCalled.future, 3.seconds)
c.expectNoMsg(200.millis)
readPromise.success(Option(resource.readLine()))
c.expectNext() should ===(chunks.next())
sub.cancel()
Await.ready(closePromiseCalled.future, 3.seconds)
resource.close()
resource.closed.futureValue
closePromise.success(Done)
probe.expectComplete()
}
"close resource successfully right after open" in assertAllStagesStopped {
val createPromise = Promise[BufferedReader]()
val readPromise = Promise[Option[String]]()
val closePromise = Promise[Done]()
val probe = TestSubscriber.probe[Int]()
val firstRead = Promise[Done]()
val resource = new ResourceDummy[Int](1 :: Nil, firstReadFuture = firstRead.future)
val createPromiseCalled = Promise[Done]()
val readPromiseCalled = Promise[Done]()
val closePromiseCalled = Promise[Done]()
Source.unfoldResourceAsync[Int, ResourceDummy[Int]](
resource.create _,
_.read,
_.close
).runWith(Sink.fromSubscriber(probe))
val resource = new BufferedReader(new FileReader(manyLinesFile))
val p = Source.unfoldResourceAsync[String, BufferedReader](
() {
createPromiseCalled.success(Done)
createPromise.future
},
reader {
readPromiseCalled.success(Done)
readPromise.future
},
reader {
closePromiseCalled.success(Done)
closePromise.future
})
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
probe.request(1L)
resource.firstElementRead.futureValue
// we cancel before we complete first read (racy)
probe.cancel()
Thread.sleep(100)
firstRead.success(Done)
Await.ready(createPromiseCalled.future, 3.seconds)
createPromise.success(resource)
sub.cancel()
Await.ready(closePromiseCalled.future, 3.seconds)
resource.close()
closePromise.success(Done)
resource.closed.futureValue
}
"continue when Strategy is Resume and exception happened" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](
open,
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Promise.successful(Option(s)).future
}, close).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
"fail when create throws exception" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Unit]()
Source.unfoldResourceAsync[Unit, Unit](
() throw TE("create failed"),
_ ???,
_ ???).runWith(Sink.fromSubscriber(probe))
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 49).foreach(i {
sub.request(1)
c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10))
})
sub.request(1)
c.expectComplete()
probe.ensureSubscription()
probe.expectError(TE("create failed"))
}
"close and open stream again when Strategy is Restart" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](
open,
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Promise.successful(Option(s)).future
}, close).withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
"fail when create returns failed future" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Unit]()
Source.unfoldResourceAsync[Unit, Unit](
() Future.failed(TE("create failed")),
_ ???,
_ ???).runWith(Sink.fromSubscriber(probe))
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 19).foreach(i {
sub.request(1)
c.expectNext() should ===(manyLinesArray(0))
})
sub.cancel()
probe.ensureSubscription()
probe.expectError(TE("create failed"))
}
"work with ByteString as well" in assertAllStagesStopped {
val chunkSize = 50
val buffer = new Array[Char](chunkSize)
val p = Source.unfoldResourceAsync[ByteString, Reader](
open,
reader {
val p = Promise[Option[ByteString]]
val s = reader.read(buffer)
if (s > 0) p.success(Some(ByteString(buffer.mkString("")).take(s))) else p.success(None)
p.future
"fail when close throws exception" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Unit]()
Source.unfoldResourceAsync[Unit, Unit](
() Future.successful(()),
_ Future.successful[Option[Unit]](None),
_ throw TE(""))
.runWith(Sink.fromSubscriber(probe))
probe.ensureSubscription()
probe.request(1L)
probe.expectError()
}
"fail when close returns failed future" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Unit]()
Source.unfoldResourceAsync[Unit, Unit](
() Future.successful(()),
_ Future.successful[Option[Unit]](None),
_ Future.failed(throw TE("")))
.runWith(Sink.fromSubscriber(probe))
probe.ensureSubscription()
probe.request(1L)
probe.expectError()
}
"continue when Strategy is Resume and read throws" in assertAllStagesStopped {
val result = Source.unfoldResourceAsync[Int, Iterator[Any]](
() Future.successful(List(1, 2, TE("read-error"), 3).iterator),
iterator
if (iterator.hasNext) {
iterator.next() match {
case n: Int Future.successful(Some(n))
case e: TE throw e
}
} else Future.successful(None),
_ Future.successful(Done)
).withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
result.futureValue should ===(Seq(1, 2, 3))
}
"continue when Strategy is Resume and read returns failed future" in assertAllStagesStopped {
val result = Source.unfoldResourceAsync[Int, Iterator[Any]](
() Future.successful(List(1, 2, TE("read-error"), 3).iterator),
iterator
if (iterator.hasNext) {
iterator.next() match {
case n: Int Future.successful(Some(n))
case e: TE Future.failed(e)
}
} else Future.successful(None),
_ Future.successful(Done)
).withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
result.futureValue should ===(Seq(1, 2, 3))
}
"close and open stream again when Strategy is Restart and read throws" in assertAllStagesStopped {
@volatile var failed = false
val startCount = new AtomicInteger(0)
val result = Source.unfoldResourceAsync[Int, Iterator[Int]](
() Future.successful {
startCount.incrementAndGet()
List(1, 2, 3).iterator
},
reader {
reader.close()
Promise.successful(Done).future
}).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]()
reader
if (!failed) {
failed = true
throw TE("read-error")
} else if (reader.hasNext) Future.successful(Some(reader.next))
else Future.successful(None),
_ Future.successful(Done)
).withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
var remaining = manyLines
def nextChunk() = {
val (chunk, rest) = remaining.splitAt(chunkSize)
remaining = rest
chunk
}
result.futureValue should ===(Seq(1, 2, 3))
startCount.get should ===(2)
}
p.subscribe(c)
val sub = c.expectSubscription()
"close and open stream again when Strategy is Restart and read returns failed future" in assertAllStagesStopped {
@volatile var failed = false
val startCount = new AtomicInteger(0)
(0 to 121).foreach(i {
sub.request(1)
c.expectNext().utf8String should ===(nextChunk().toString)
})
sub.request(1)
c.expectComplete()
val result = Source.unfoldResourceAsync[Int, Iterator[Int]](
() Future.successful {
startCount.incrementAndGet()
List(1, 2, 3).iterator
},
reader
if (!failed) {
failed = true
Future.failed(TE("read-error"))
} else if (reader.hasNext) Future.successful(Some(reader.next))
else Future.successful(None),
_ Future.successful(Done)
).withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
result.futureValue should ===(Seq(1, 2, 3))
startCount.get should ===(2)
}
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
val p = Source.unfoldResourceAsync[String, BufferedReader](
open,
read, close).runWith(TestSink.probe)(materializer)
val p = Source.unfoldResourceAsync[String, Unit](
() Promise[Unit].future, // never complete
_ ???,
_ ???).runWith(Sink.ignore)(materializer)
materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel()
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}
"fail when create throws exception" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](
() throw TE(""),
read, close).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
c.expectSubscription()
c.expectError(TE(""))
}
"fail when close throws exception" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](
open,
read, reader throw TE(""))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(61)
c.expectNextN(60)
c.expectError()
}
"close resource when stream is abruptly terminated" in {
import system.dispatcher
val closeLatch = TestLatch(1)
val mat = ActorMaterializer()
val p = Source.unfoldResourceAsync[String, BufferedReader](
open,
read,
reader Future.successful {
val p = Source.unfoldResourceAsync[String, Unit](
() Future.successful(()),
// a slow trickle of elements that never ends
_ akka.pattern.after(100.millis, system.scheduler)(Future.successful(Some("element"))),
_ Future.successful {
closeLatch.countDown()
Done
})
@ -268,7 +281,5 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
Await.ready(closeLatch, remainingOrDefault)
}
}
override def afterTermination(): Unit = {
manyLinesFile.delete()
}
}