2016-01-29 22:06:36 -05:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
|
2016-01-29 22:06:36 -05:00
|
|
|
*/
|
|
|
|
|
package akka.stream.scaladsl
|
|
|
|
|
|
2017-04-03 20:21:02 +02:00
|
|
|
import akka.stream.stage.{ GraphStage, GraphStageLogic }
|
2016-07-28 16:43:08 +08:00
|
|
|
import akka.stream.testkit.StreamSpec
|
2016-01-29 22:06:36 -05:00
|
|
|
import akka.stream.testkit.scaladsl.TestSink
|
2017-04-03 20:21:02 +02:00
|
|
|
import akka.stream._
|
2016-01-29 22:06:36 -05:00
|
|
|
import akka.stream.testkit.Utils._
|
|
|
|
|
|
|
|
|
|
import scala.util.control.NoStackTrace
|
|
|
|
|
|
2016-07-28 16:43:08 +08:00
|
|
|
class FlowRecoverWithSpec extends StreamSpec {
|
2016-01-29 22:06:36 -05:00
|
|
|
|
|
|
|
|
val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1)
|
|
|
|
|
|
|
|
|
|
implicit val materializer = ActorMaterializer(settings)
|
|
|
|
|
|
|
|
|
|
val ex = new RuntimeException("ex") with NoStackTrace
|
|
|
|
|
|
|
|
|
|
"A RecoverWith" must {
|
|
|
|
|
"recover when there is a handler" in assertAllStagesStopped {
|
|
|
|
|
Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a }
|
|
|
|
|
.recoverWith { case t: Throwable ⇒ Source(List(0, -1)) }
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(1 to 2)
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectNext(0)
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectNext(-1)
|
|
|
|
|
.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"cancel substream if parent is terminated when there is a handler" in assertAllStagesStopped {
|
|
|
|
|
Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a }
|
|
|
|
|
.recoverWith { case t: Throwable ⇒ Source(List(0, -1)) }
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(1 to 2)
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectNext(0)
|
|
|
|
|
.cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"failed stream if handler is not for such exception type" in assertAllStagesStopped {
|
|
|
|
|
Source(1 to 3).map { a ⇒ if (a == 2) throw ex else a }
|
|
|
|
|
.recoverWith { case t: IndexOutOfBoundsException ⇒ Source.single(0) }
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectNext(1)
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectError(ex)
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-20 06:24:12 -07:00
|
|
|
"be able to recover with the same unmaterialized source if configured" in assertAllStagesStopped {
|
2016-01-29 22:06:36 -05:00
|
|
|
val src = Source(1 to 3).map { a ⇒ if (a == 3) throw ex else a }
|
|
|
|
|
src.recoverWith { case t: Throwable ⇒ src }
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(1 to 2)
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(1 to 2)
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(1 to 2)
|
|
|
|
|
.cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"not influence stream when there is no exceptions" in assertAllStagesStopped {
|
|
|
|
|
Source(1 to 3).map(identity)
|
|
|
|
|
.recoverWith { case t: Throwable ⇒ Source.single(0) }
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(3)
|
|
|
|
|
.expectNextN(1 to 3)
|
|
|
|
|
.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"finish stream if it's empty" in assertAllStagesStopped {
|
|
|
|
|
Source.empty.map(identity)
|
|
|
|
|
.recoverWith { case t: Throwable ⇒ Source.single(0) }
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(3)
|
|
|
|
|
.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"switch the second time if alternative source throws exception" in assertAllStagesStopped {
|
|
|
|
|
val k = Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a }
|
|
|
|
|
.recoverWith {
|
|
|
|
|
case t: IndexOutOfBoundsException ⇒
|
|
|
|
|
Source(List(11, 22)).map(m ⇒ if (m == 22) throw new IllegalArgumentException() else m)
|
|
|
|
|
case t: IllegalArgumentException ⇒ Source(List(33, 44))
|
|
|
|
|
}.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(List(1, 2))
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(List(11, 33))
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectNext(44)
|
|
|
|
|
.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-20 06:24:12 -07:00
|
|
|
"terminate with exception if partial function fails to match after an alternative source failure" in assertAllStagesStopped {
|
2016-01-29 22:06:36 -05:00
|
|
|
Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a }
|
|
|
|
|
.recoverWith {
|
|
|
|
|
case t: IndexOutOfBoundsException ⇒
|
|
|
|
|
Source(List(11, 22)).map(m ⇒ if (m == 22) throw ex else m)
|
|
|
|
|
}.runWith(TestSink.probe[Int])
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNextN(List(1, 2))
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectNextN(List(11))
|
|
|
|
|
.request(1)
|
|
|
|
|
.expectError(ex)
|
|
|
|
|
}
|
2016-04-20 06:24:12 -07:00
|
|
|
|
|
|
|
|
"terminate with exception after set number of retries" in assertAllStagesStopped {
|
|
|
|
|
Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a }
|
2016-04-27 09:45:58 +02:00
|
|
|
.recoverWithRetries(3, {
|
|
|
|
|
case t: Throwable ⇒
|
2017-03-07 12:26:26 +01:00
|
|
|
Source(List(11, 22, 33)).map(m ⇒ if (m == 33) throw ex else m)
|
2016-04-27 09:45:58 +02:00
|
|
|
}).runWith(TestSink.probe[Int])
|
2017-03-07 12:26:26 +01:00
|
|
|
.request(100)
|
2016-04-27 09:45:58 +02:00
|
|
|
.expectNextN(List(1, 2))
|
|
|
|
|
.expectNextN(List(11, 22))
|
|
|
|
|
.expectNextN(List(11, 22))
|
|
|
|
|
.expectNextN(List(11, 22))
|
|
|
|
|
.expectError(ex)
|
2016-04-20 06:24:12 -07:00
|
|
|
}
|
2016-05-19 11:45:31 +02:00
|
|
|
|
|
|
|
|
"throw IllegalArgumentException if number of retries is less than -1" in assertAllStagesStopped {
|
|
|
|
|
intercept[IllegalArgumentException] {
|
|
|
|
|
Flow[Int].recoverWithRetries(-2, { case t: Throwable ⇒ Source.empty[Int] })
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-04-03 20:21:02 +02:00
|
|
|
|
|
|
|
|
"fail correctly when materialization of recover source fails" in assertAllStagesStopped {
|
|
|
|
|
val matFail = TE("fail!")
|
|
|
|
|
object FailingInnerMat extends GraphStage[SourceShape[String]] {
|
|
|
|
|
val out = Outlet[String]("out")
|
|
|
|
|
val shape = SourceShape(out)
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
|
|
|
|
throw matFail
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val result = Source.failed(TE("trigger")).recoverWithRetries(1, {
|
|
|
|
|
case _: TE ⇒ Source.fromGraph(FailingInnerMat)
|
|
|
|
|
}).runWith(Sink.ignore)
|
|
|
|
|
|
|
|
|
|
result.failed.futureValue should ===(matFail)
|
|
|
|
|
|
|
|
|
|
}
|
2016-01-29 22:06:36 -05:00
|
|
|
}
|
|
|
|
|
}
|