Provide Scheduler implicitly from implicit typed actorsystem (#27986)
This commit is contained in:
parent
78281ba92f
commit
6190f0bc58
13 changed files with 19 additions and 24 deletions
|
|
@ -8,7 +8,6 @@ import java.util.concurrent.TimeoutException
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import akka.actor.testkit.typed.TestKitSettings
|
import akka.actor.testkit.typed.TestKitSettings
|
||||||
import akka.actor.testkit.typed.internal.ActorTestKitGuardian
|
import akka.actor.testkit.typed.internal.ActorTestKitGuardian
|
||||||
import akka.actor.testkit.typed.internal.TestKitUtils
|
import akka.actor.testkit.typed.internal.TestKitUtils
|
||||||
|
|
@ -144,11 +143,12 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
|
||||||
|
|
||||||
implicit def system: ActorSystem[Nothing] = internalSystem
|
implicit def system: ActorSystem[Nothing] = internalSystem
|
||||||
|
|
||||||
implicit def scheduler: Scheduler = system.scheduler
|
|
||||||
private val childName: Iterator[String] = Iterator.from(0).map(_.toString)
|
private val childName: Iterator[String] = Iterator.from(0).map(_.toString)
|
||||||
|
|
||||||
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
|
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
|
||||||
|
|
||||||
|
def scheduler: Scheduler = system.scheduler
|
||||||
|
|
||||||
def shutdownTestKit(): Unit = {
|
def shutdownTestKit(): Unit = {
|
||||||
ActorTestKit.shutdown(
|
ActorTestKit.shutdown(
|
||||||
system,
|
system,
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ import akka.actor.testkit.typed.internal.TestKitUtils
|
||||||
import akka.actor.typed.ActorRef
|
import akka.actor.typed.ActorRef
|
||||||
import akka.actor.typed.ActorSystem
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.actor.typed.Behavior
|
import akka.actor.typed.Behavior
|
||||||
import akka.actor.typed.Scheduler
|
|
||||||
import akka.actor.typed.Props
|
import akka.actor.typed.Props
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
@ -65,11 +64,6 @@ abstract class ActorTestKitBase(val testKit: ActorTestKit) {
|
||||||
*/
|
*/
|
||||||
implicit def timeout: Timeout = testKit.timeout
|
implicit def timeout: Timeout = testKit.timeout
|
||||||
|
|
||||||
/**
|
|
||||||
* See corresponding method on [[ActorTestKit]]
|
|
||||||
*/
|
|
||||||
implicit def scheduler: Scheduler = testKit.scheduler
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See corresponding method on [[ActorTestKit]]
|
* See corresponding method on [[ActorTestKit]]
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -130,7 +130,6 @@ class AskSpec extends ScalaTestWithActorTestKit with WordSpecLike with LogCaptur
|
||||||
import scaladsl.AskPattern._
|
import scaladsl.AskPattern._
|
||||||
import akka.actor.typed.scaladsl.adapter._
|
import akka.actor.typed.scaladsl.adapter._
|
||||||
implicit val timeout: Timeout = 3.seconds
|
implicit val timeout: Timeout = 3.seconds
|
||||||
implicit val scheduler = classicSystem.toTyped.scheduler
|
|
||||||
val typedLegacy: ActorRef[AnyRef] = legacyActor
|
val typedLegacy: ActorRef[AnyRef] = legacyActor
|
||||||
(typedLegacy.ask(Ping)).failed.futureValue should ===(ex)
|
(typedLegacy.ask(Ping)).failed.futureValue should ===(ex)
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
||||||
|
|
@ -20,8 +20,7 @@ object ReceptionistApiSpec {
|
||||||
implicit val timeout: Timeout = 3.seconds
|
implicit val timeout: Timeout = 3.seconds
|
||||||
val service: ActorRef[String] = ???
|
val service: ActorRef[String] = ???
|
||||||
val key: ServiceKey[String] = ServiceKey[String]("id")
|
val key: ServiceKey[String] = ServiceKey[String]("id")
|
||||||
val system: ActorSystem[Void] = ???
|
implicit val system: ActorSystem[Void] = ???
|
||||||
implicit val scheduler = system.scheduler
|
|
||||||
import system.executionContext
|
import system.executionContext
|
||||||
|
|
||||||
// registration from outside, without ack, should be rare
|
// registration from outside, without ack, should be rare
|
||||||
|
|
|
||||||
|
|
@ -410,10 +410,9 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
||||||
// asking someone requires a timeout and a scheduler, if the timeout hits without response
|
// asking someone requires a timeout if the timeout hits without response
|
||||||
// the ask is failed with a TimeoutException
|
// the ask is failed with a TimeoutException
|
||||||
implicit val timeout: Timeout = 3.seconds
|
implicit val timeout: Timeout = 3.seconds
|
||||||
implicit val scheduler = system.scheduler
|
|
||||||
|
|
||||||
val result: Future[CookieFabric.Cookies] = cookieFabric.ask(ref => CookieFabric.GiveMeCookies(ref))
|
val result: Future[CookieFabric.Cookies] = cookieFabric.ask(ref => CookieFabric.GiveMeCookies(ref))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,6 @@ import akka.actor.typed.ActorRef
|
||||||
import akka.actor.typed.ActorSystem
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.actor.typed.Props
|
import akka.actor.typed.Props
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.actor.typed.Scheduler
|
|
||||||
|
|
||||||
//#imports2
|
//#imports2
|
||||||
|
|
||||||
|
|
@ -57,14 +56,13 @@ class SpawnProtocolDocSpec extends ScalaTestWithActorTestKit with WordSpecLike w
|
||||||
"be able to spawn actors" in {
|
"be able to spawn actors" in {
|
||||||
//#system-spawn
|
//#system-spawn
|
||||||
|
|
||||||
val system: ActorSystem[SpawnProtocol.Command] =
|
implicit val system: ActorSystem[SpawnProtocol.Command] =
|
||||||
ActorSystem(HelloWorldMain(), "hello")
|
ActorSystem(HelloWorldMain(), "hello")
|
||||||
|
|
||||||
// needed in implicit scope for ask (?)
|
// needed in implicit scope for ask (?)
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
implicit val ec: ExecutionContext = system.executionContext
|
implicit val ec: ExecutionContext = system.executionContext
|
||||||
implicit val timeout: Timeout = Timeout(3.seconds)
|
implicit val timeout: Timeout = Timeout(3.seconds)
|
||||||
implicit val scheduler: Scheduler = system.scheduler
|
|
||||||
|
|
||||||
val greeter: Future[ActorRef[HelloWorld.Greet]] =
|
val greeter: Future[ActorRef[HelloWorld.Greet]] =
|
||||||
system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
|
system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
|
||||||
|
|
|
||||||
|
|
@ -422,14 +422,13 @@ object StyleGuideDocExamples {
|
||||||
object Ask {
|
object Ask {
|
||||||
import Messages.CounterProtocol._
|
import Messages.CounterProtocol._
|
||||||
|
|
||||||
val system: ActorSystem[Nothing] = ???
|
implicit val system: ActorSystem[Nothing] = ???
|
||||||
|
|
||||||
//#ask-1
|
//#ask-1
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
||||||
implicit val timeout = Timeout(3.seconds)
|
implicit val timeout = Timeout(3.seconds)
|
||||||
implicit val scheduler = system.scheduler
|
|
||||||
val counter: ActorRef[Command] = ???
|
val counter: ActorRef[Command] = ???
|
||||||
|
|
||||||
val result: Future[OperationResult] = counter.ask(replyTo => Increment(delta = 2, replyTo))
|
val result: Future[OperationResult] = counter.ask(replyTo => Increment(delta = 2, replyTo))
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import java.util.concurrent.TimeoutException
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import akka.actor.{ Address, RootActorPath }
|
import akka.actor.{ Address, RootActorPath }
|
||||||
import akka.actor.typed.ActorRef
|
import akka.actor.typed.ActorRef
|
||||||
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.actor.typed.Scheduler
|
import akka.actor.typed.Scheduler
|
||||||
import akka.actor.typed.internal.{ adapter => adapt }
|
import akka.actor.typed.internal.{ adapter => adapt }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
|
|
@ -25,6 +26,12 @@ import com.github.ghik.silencer.silent
|
||||||
*/
|
*/
|
||||||
object AskPattern {
|
object AskPattern {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides a scheduler from an actor system (that will likely already be implicit in the scope) to minimize ask
|
||||||
|
* boilerplate.
|
||||||
|
*/
|
||||||
|
implicit def schedulerFromActorSystem(implicit system: ActorSystem[_]): Scheduler = system.scheduler
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See [[ask]]
|
* See [[ask]]
|
||||||
*
|
*
|
||||||
|
|
@ -53,7 +60,7 @@ object AskPattern {
|
||||||
* case class Request(msg: String, replyTo: ActorRef[Reply])
|
* case class Request(msg: String, replyTo: ActorRef[Reply])
|
||||||
* case class Reply(msg: String)
|
* case class Reply(msg: String)
|
||||||
*
|
*
|
||||||
* implicit val scheduler = system.scheduler
|
* implicit val system = ...
|
||||||
* implicit val timeout = Timeout(3.seconds)
|
* implicit val timeout = Timeout(3.seconds)
|
||||||
* val target: ActorRef[Request] = ...
|
* val target: ActorRef[Request] = ...
|
||||||
* val f: Future[Reply] = target ? (replyTo => Request("hello", replyTo))
|
* val f: Future[Reply] = target ? (replyTo => Request("hello", replyTo))
|
||||||
|
|
@ -86,7 +93,7 @@ object AskPattern {
|
||||||
* case class Request(msg: String, replyTo: ActorRef[Reply])
|
* case class Request(msg: String, replyTo: ActorRef[Reply])
|
||||||
* case class Reply(msg: String)
|
* case class Reply(msg: String)
|
||||||
*
|
*
|
||||||
* implicit val scheduler = system.scheduler
|
* implicit val system = ...
|
||||||
* implicit val timeout = Timeout(3.seconds)
|
* implicit val timeout = Timeout(3.seconds)
|
||||||
* val target: ActorRef[Request] = ...
|
* val target: ActorRef[Request] = ...
|
||||||
* val f: Future[Reply] = target.ask(replyTo => Request("hello", replyTo))
|
* val f: Future[Reply] = target.ask(replyTo => Request("hello", replyTo))
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,6 @@ class TypedActorBenchmark {
|
||||||
implicit var system: ActorSystem[Start] = _
|
implicit var system: ActorSystem[Start] = _
|
||||||
|
|
||||||
implicit val askTimeout = akka.util.Timeout(timeout)
|
implicit val askTimeout = akka.util.Timeout(timeout)
|
||||||
implicit def scheduler = system.scheduler
|
|
||||||
|
|
||||||
@Setup(Level.Trial)
|
@Setup(Level.Trial)
|
||||||
def setup(): Unit = {
|
def setup(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,6 @@ abstract class ClusterReceptionistUnreachabilitySpec
|
||||||
val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command]
|
val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command]
|
||||||
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
|
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
|
||||||
implicit val timeout: Timeout = 3.seconds
|
implicit val timeout: Timeout = 3.seconds
|
||||||
implicit val scheduler = typedSystem.scheduler
|
|
||||||
val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _))
|
val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _))
|
||||||
|
|
||||||
Await.result(f, 3.seconds)
|
Await.result(f, 3.seconds)
|
||||||
|
|
|
||||||
|
|
@ -618,6 +618,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
|
||||||
The reason is to encourage right usage and detect mistakes like not creating a new instance (via `setup`)
|
The reason is to encourage right usage and detect mistakes like not creating a new instance (via `setup`)
|
||||||
when the behavior is supervised and restarted.
|
when the behavior is supervised and restarted.
|
||||||
* `LoggingEventFilter` has been renamed to `LoggingTestKit` and its `intercept` method renamed to `assert`
|
* `LoggingEventFilter` has been renamed to `LoggingTestKit` and its `intercept` method renamed to `assert`
|
||||||
|
* Scala `ask` from `AskPattern` now implicitly converts an implicit `ActorSystem[_]` to `Scheduler` to eliminate some boilerplate.
|
||||||
|
|
||||||
#### Akka Typed Stream API changes
|
#### Akka Typed Stream API changes
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -296,7 +296,6 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
||||||
PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId)
|
PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId)
|
||||||
|
|
||||||
import system.executionContext
|
import system.executionContext
|
||||||
implicit val scheduler = system.scheduler
|
|
||||||
implicit val timeout = Timeout(3.seconds)
|
implicit val timeout = Timeout(3.seconds)
|
||||||
|
|
||||||
val bidProjection = new MyResumableProjection("bid")
|
val bidProjection = new MyResumableProjection("bid")
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.persistence.typed.scaladsl
|
package akka.persistence.typed.scaladsl
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorSystem
|
||||||
|
|
||||||
import scala.concurrent.ExecutionContext
|
import scala.concurrent.ExecutionContext
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor.typed.{ ActorRef, Behavior }
|
import akka.actor.typed.{ ActorRef, Behavior }
|
||||||
|
|
@ -64,7 +66,7 @@ object PersistentActorCompileOnlyTest {
|
||||||
def performSideEffect(sender: ActorRef[AcknowledgeSideEffect], correlationId: Int, data: String): Unit = {
|
def performSideEffect(sender: ActorRef[AcknowledgeSideEffect], correlationId: Int, data: String): Unit = {
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
import akka.actor.typed.scaladsl.AskPattern._
|
||||||
implicit val timeout: akka.util.Timeout = 1.second
|
implicit val timeout: akka.util.Timeout = 1.second
|
||||||
implicit val scheduler: akka.actor.typed.Scheduler = ???
|
implicit val system: ActorSystem[_] = ???
|
||||||
implicit val ec: ExecutionContext = ???
|
implicit val ec: ExecutionContext = ???
|
||||||
|
|
||||||
val response: Future[RecoveryComplete.Response] =
|
val response: Future[RecoveryComplete.Response] =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue