pekko/akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala

68 lines
1.8 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
2017-07-20 23:02:34 +10:00
*/
2017-07-20 23:02:34 +10:00
package docs.stream
import akka.NotUsed
import akka.stream.KillSwitches
2017-07-20 23:02:34 +10:00
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
import scala.concurrent.duration._
import scala.concurrent._
class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
import system.dispatcher
// Mock akka-http interfaces
object Http {
def apply() = this
def singleRequest(req: HttpRequest) = Future.successful(())
}
case class HttpRequest(uri: String)
case class Unmarshal(b: Any) {
def to[T]: Future[T] = Promise[T]().future
}
case class ServerSentEvent()
def doSomethingElse(): Unit = ()
"Restart stages" should {
"demonstrate a restart with backoff source" in compileOnlySpec {
//#restart-with-backoff-source
2019-03-13 10:56:20 +01:00
val restartSource = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
maxRestarts = 20 // limits the amount of restarts to 20
) { () =>
2017-07-20 23:02:34 +10:00
// Create a source from a future of a source
Source.fromFutureSource {
// Make a single request with akka-http
2019-03-11 10:38:24 +01:00
Http()
.singleRequest(HttpRequest(uri = "http://example.com/eventstream"))
2017-07-20 23:02:34 +10:00
// Unmarshall it as a source of server sent events
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
}
}
//#restart-with-backoff-source
//#with-kill-switch
val killSwitch = restartSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
2017-07-20 23:02:34 +10:00
.run()
doSomethingElse()
killSwitch.shutdown()
//#with-kill-switch
}
}
2017-10-06 10:30:28 +02:00
}