(Prefer) non symbolic typed ask 26306

This commit is contained in:
Johan Andrén 2019-03-01 15:06:50 +01:00 committed by GitHub
commit 26b0869318
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 171 additions and 46 deletions

View file

@ -30,7 +30,7 @@ cache:
script: script:
- jabba use "adopt@~1.8.202-08" - jabba use "adopt@~1.8.202-08"
# need to override as the default is to test # need to override as the default is to test
- sbt -jvm-opts .jvmopts-travis ++$TRAVIS_SCALA_VERSION update mimaReportBinaryIssues test:compile - sbt -jvm-opts .jvmopts-travis ";++$TRAVIS_SCALA_VERSION update ;++$TRAVIS_SCALA_VERSION mimaReportBinaryIssues ;++$TRAVIS_SCALA_VERSION test:compile"
# make 'git branch' work again # make 'git branch' work again
- git branch -f "$TRAVIS_BRANCH" && git checkout "$TRAVIS_BRANCH" - git branch -f "$TRAVIS_BRANCH" && git checkout "$TRAVIS_BRANCH"
# check policies, if on master also upload # check policies, if on master also upload

View file

@ -159,7 +159,7 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
* guardian * guardian
*/ */
def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] = def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] =
Await.result(internalSystem ? (ActorTestKitGuardian.SpawnActorAnonymous(behavior, _, props)), timeout.duration) Await.result(internalSystem.ask(ActorTestKitGuardian.SpawnActorAnonymous(behavior, _, props)), timeout.duration)
/** /**
* Spawn the given behavior. This is created as a child of the test kit * Spawn the given behavior. This is created as a child of the test kit
@ -173,7 +173,7 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
* guardian * guardian
*/ */
def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] = def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
Await.result(internalSystem ? (ActorTestKitGuardian.SpawnActor(name, behavior, _, props)), timeout.duration) Await.result(internalSystem.ask(ActorTestKitGuardian.SpawnActor(name, behavior, _, props)), timeout.duration)
/** /**
* Stop the actor under test and wait until it terminates. * Stop the actor under test and wait until it terminates.
@ -181,7 +181,7 @@ final class ActorTestKit private[akka] (val name: String, val config: Config, se
* Other actors will not be stopped by this method. * Other actors will not be stopped by this method.
*/ */
def stop[T](ref: ActorRef[T], max: FiniteDuration = timeout.duration): Unit = try { def stop[T](ref: ActorRef[T], max: FiniteDuration = timeout.duration): Unit = try {
Await.result(internalSystem ? { x: ActorRef[ActorTestKitGuardian.Ack.type] ActorTestKitGuardian.StopActor(ref, x) }, max) Await.result(internalSystem.ask { x: ActorRef[ActorTestKitGuardian.Ack.type] ActorTestKitGuardian.StopActor(ref, x) }, max)
} catch { } catch {
case _: TimeoutException case _: TimeoutException
assert(false, s"timeout ($max) during stop() waiting for actor [${ref.path}] to stop") assert(false, s"timeout ($max) during stop() waiting for actor [${ref.path}] to stop")

View file

@ -43,7 +43,7 @@ object AsyncTestingExampleSpec {
} }
private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = { private def publish(i: Int)(implicit timeout: Timeout): Future[Try[Int]] = {
publisher ? (ref Message(i, ref)) publisher.ask(ref Message(i, ref))
} }
} }

View file

@ -18,9 +18,11 @@ import scala.util.Success
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike import org.scalatest.WordSpecLike
import scala.concurrent.Future
object AskSpec { object AskSpec {
sealed trait Msg sealed trait Msg
final case class Foo(s: String)(val replyTo: ActorRef[String]) extends Msg final case class Foo(s: String, replyTo: ActorRef[String]) extends Msg
final case class Stop(replyTo: ActorRef[Unit]) extends Msg final case class Stop(replyTo: ActorRef[Unit]) extends Msg
} }
@ -51,12 +53,14 @@ class AskSpec extends ScalaTestWithActorTestKit("""
"Ask pattern" must { "Ask pattern" must {
"fail the future if the actor is already terminated" in { "fail the future if the actor is already terminated" in {
val ref = spawn(behavior) val ref = spawn(behavior)
(ref ? Stop).futureValue val stopResult: Future[Unit] = ref.ask(Stop)
stopResult.futureValue
val probe = createTestProbe() val probe = createTestProbe()
probe.expectTerminated(ref, probe.remainingOrDefault) probe.expectTerminated(ref, probe.remainingOrDefault)
val answer = val answer: Future[String] =
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept { EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
ref ? Foo("bar") ref.ask(Foo("bar", _))
} }
val result = answer.failed.futureValue val result = answer.failed.futureValue
result shouldBe a[TimeoutException] result shouldBe a[TimeoutException]
@ -65,7 +69,13 @@ class AskSpec extends ScalaTestWithActorTestKit("""
"succeed when the actor is alive" in { "succeed when the actor is alive" in {
val ref = spawn(behavior) val ref = spawn(behavior)
val response = ref ? Foo("bar") val response: Future[String] = ref.ask(Foo("bar", _))
response.futureValue should ===("foo")
}
"provide a symbolic alias that works the same" in {
val ref = spawn(behavior)
val response: Future[String] = ref ? (Foo("bar", _))
response.futureValue should ===("foo") response.futureValue should ===("foo")
} }
@ -73,7 +83,7 @@ class AskSpec extends ScalaTestWithActorTestKit("""
val actor = spawn(Behaviors.empty[Foo]) val actor = spawn(Behaviors.empty[Foo])
implicit val timeout: Timeout = 10.millis implicit val timeout: Timeout = 10.millis
EventFilter.warning(pattern = ".*unhandled message.*", occurrences = 1).intercept { EventFilter.warning(pattern = ".*unhandled message.*", occurrences = 1).intercept {
val answer = actor ? Foo("bar") val answer: Future[String] = actor.ask(Foo("bar", _))
val result = answer.failed.futureValue val result = answer.failed.futureValue
result shouldBe a[TimeoutException] result shouldBe a[TimeoutException]
result.getMessage should startWith("Ask timed out on") result.getMessage should startWith("Ask timed out on")
@ -90,9 +100,9 @@ class AskSpec extends ScalaTestWithActorTestKit("""
fail("this test must only run in an adapted actor system") fail("this test must only run in an adapted actor system")
} }
val answer = val answer: Future[String] =
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept { EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 1).intercept {
noSuchActor ? Foo("bar") noSuchActor.ask(Foo("bar", _))
} }
val result = answer.failed.futureValue val result = answer.failed.futureValue
result shouldBe a[TimeoutException] result shouldBe a[TimeoutException]
@ -120,7 +130,7 @@ class AskSpec extends ScalaTestWithActorTestKit("""
implicit val timeout: Timeout = 3.seconds implicit val timeout: Timeout = 3.seconds
implicit val scheduler = untypedSystem.toTyped.scheduler implicit val scheduler = untypedSystem.toTyped.scheduler
val typedLegacy: ActorRef[AnyRef] = legacyActor val typedLegacy: ActorRef[AnyRef] = legacyActor
(typedLegacy ? Ping).failed.futureValue should ===(ex) (typedLegacy.ask(Ping)).failed.futureValue should ===(ex)
} finally { } finally {
akka.testkit.TestKit.shutdownActorSystem(untypedSystem) akka.testkit.TestKit.shutdownActorSystem(untypedSystem)
} }

View file

@ -46,7 +46,7 @@ class SpawnProtocolSpec extends ScalaTestWithActorTestKit with WordSpecLike {
val parent = spawn(SpawnProtocol.behavior, "parent2") val parent = spawn(SpawnProtocol.behavior, "parent2")
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
implicit val timeout = Timeout(5.seconds) implicit val timeout = Timeout(5.seconds)
val parentReply = parent ? SpawnProtocol.Spawn(target, "child", Props.empty) val parentReply = parent.ask(SpawnProtocol.Spawn(target, "child", Props.empty))
val child = parentReply.futureValue val child = parentReply.futureValue
val childReply = TestProbe[Pong.type]() val childReply = TestProbe[Pong.type]()
child ! Ping(childReply.ref) child ! Ping(childReply.ref)

View file

@ -110,7 +110,7 @@ class LocalReceptionistSpec extends ScalaTestWithActorTestKit with WordSpecLike
"work with ask" in { "work with ask" in {
val receptionist = spawn(LocalReceptionist.behavior) val receptionist = spawn(LocalReceptionist.behavior)
val serviceA = spawn(behaviorA) val serviceA = spawn(behaviorA)
val f: Future[Registered] = receptionist ? (Register(ServiceKeyA, serviceA, _)) val f: Future[Registered] = receptionist.ask(Register(ServiceKeyA, serviceA, _))
f.futureValue should be(Registered(ServiceKeyA, serviceA)) f.futureValue should be(Registered(ServiceKeyA, serviceA))
} }

View file

@ -31,7 +31,7 @@ object ReceptionistApiSpec {
// needs the explicit type on the future and the extra parenthesises // needs the explicit type on the future and the extra parenthesises
// to work // to work
val registered: Future[Receptionist.Registered] = val registered: Future[Receptionist.Registered] =
system.receptionist ? (Receptionist.Register(key, service, _)) system.receptionist.ask(Receptionist.Register(key, service, _))
registered.foreach { registered.foreach {
case key.Registered(ref) case key.Registered(ref)
// ref is the right type here // ref is the right type here
@ -41,7 +41,7 @@ object ReceptionistApiSpec {
// one-off ask outside of actor, should be uncommon but not rare // one-off ask outside of actor, should be uncommon but not rare
val found: Future[Receptionist.Listing] = val found: Future[Receptionist.Listing] =
system.receptionist ? (Receptionist.Find(key, _)) system.receptionist.ask(Receptionist.Find(key, _))
found.foreach { found.foreach {
case key.Listing(instances) case key.Listing(instances)
instances.foreach(_ ! "woho") instances.foreach(_ ! "woho")

View file

@ -63,15 +63,15 @@ class DispatchersDocSpec extends ScalaTestWithActorTestKit(DispatchersDocSpec.co
val probe = TestProbe[Dispatcher]() val probe = TestProbe[Dispatcher]()
val actor: ActorRef[SpawnProtocol] = spawn(SpawnProtocol.behavior) val actor: ActorRef[SpawnProtocol] = spawn(SpawnProtocol.behavior)
val withDefault = (actor ? Spawn(giveMeYourDispatcher, "default", Props.empty)).futureValue val withDefault = actor.ask(Spawn(giveMeYourDispatcher, "default", Props.empty)).futureValue
withDefault ! WhichDispatcher(probe.ref) withDefault ! WhichDispatcher(probe.ref)
probe.receiveMessage().id shouldEqual "akka.actor.default-dispatcher" probe.receiveMessage().id shouldEqual "akka.actor.default-dispatcher"
val withBlocking = (actor ? Spawn(giveMeYourDispatcher, "default", DispatcherSelector.blocking())).futureValue val withBlocking = actor.ask(Spawn(giveMeYourDispatcher, "default", DispatcherSelector.blocking())).futureValue
withBlocking ! WhichDispatcher(probe.ref) withBlocking ! WhichDispatcher(probe.ref)
probe.receiveMessage().id shouldEqual "akka.actor.default-blocking-io-dispatcher" probe.receiveMessage().id shouldEqual "akka.actor.default-blocking-io-dispatcher"
val withCustom = (actor ? Spawn(giveMeYourDispatcher, "default", DispatcherSelector.fromConfig("your-dispatcher"))).futureValue val withCustom = actor.ask(Spawn(giveMeYourDispatcher, "default", DispatcherSelector.fromConfig("your-dispatcher"))).futureValue
withCustom ! WhichDispatcher(probe.ref) withCustom ! WhichDispatcher(probe.ref)
probe.receiveMessage().id shouldEqual "your-dispatcher" probe.receiveMessage().id shouldEqual "your-dispatcher"
} }

View file

@ -368,7 +368,7 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
implicit val timeout: Timeout = 3.seconds implicit val timeout: Timeout = 3.seconds
implicit val scheduler = system.scheduler implicit val scheduler = system.scheduler
val result: Future[Cookies] = cookieActorRef ? (ref GiveMeCookies(ref)) val result: Future[Cookies] = cookieActorRef.ask(ref GiveMeCookies(ref))
// the response callback will be executed on this execution context // the response callback will be executed on this execution context
implicit val ec = system.executionContext implicit val ec = system.executionContext

View file

@ -61,7 +61,7 @@ class SpawnProtocolDocSpec extends ScalaTestWithActorTestKit with WordSpecLike {
implicit val scheduler: Scheduler = system.scheduler implicit val scheduler: Scheduler = system.scheduler
val greeter: Future[ActorRef[HelloWorld.Greet]] = val greeter: Future[ActorRef[HelloWorld.Greet]] =
system ? SpawnProtocol.Spawn(behavior = HelloWorld.greeter, name = "greeter", props = Props.empty) system.ask(SpawnProtocol.Spawn(behavior = HelloWorld.greeter, name = "greeter", props = Props.empty))
val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message)
context.log.info("Greeting for {} from {}", message.whom, message.from) context.log.info("Greeting for {} from {}", message.whom, message.from)
@ -69,7 +69,7 @@ class SpawnProtocolDocSpec extends ScalaTestWithActorTestKit with WordSpecLike {
} }
val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] = val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
system ? SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty) system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty))
for (greeterRef greeter; replyToRef greetedReplyTo) { for (greeterRef greeter; replyToRef greetedReplyTo) {
greeterRef ! HelloWorld.Greet("Akka", replyToRef) greeterRef ! HelloWorld.Greet("Akka", replyToRef)

View file

@ -83,7 +83,7 @@ import akka.util.JavaDurationConverters._
// Scala API impl // Scala API impl
override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
pipeToSelf((target ? createRequest)(responseTimeout, system.scheduler))(mapResponse) pipeToSelf((target.ask(createRequest))(responseTimeout, system.scheduler))(mapResponse)
} }
// Java API impl // Java API impl

View file

@ -31,5 +31,5 @@ import scala.compat.java8.FutureConverters._
*/ */
object AskPattern { object AskPattern {
def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] = def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
(actor.?(message.apply)(timeout.asScala, scheduler)).toJava (actor.ask(message.apply)(timeout.asScala, scheduler)).toJava
} }

View file

@ -19,19 +19,20 @@ import akka.actor.typed.internal.InternalRecipientRef
/** /**
* The ask-pattern implements the initiator side of a requestreply protocol. * The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask".
* *
* See [[AskPattern.Askable.?]] for details * See [[AskPattern.Askable.ask]] for details
*/ */
object AskPattern { object AskPattern {
/** /**
* See [[?]] * See [[ask]]
*/ */
implicit final class Askable[T](val ref: RecipientRef[T]) extends AnyVal { implicit final class Askable[T](val ref: RecipientRef[T]) extends AnyVal {
/** /**
* The ask-pattern implements the initiator side of a requestreply protocol. * The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask". * The `?` operator is pronounced as "ask" (and a convenience symbolic operation
* kept since the previous ask API, if unsure which one to use, prefer the non-symbolic
* method as it leads to fewer surprises with the scope of the `replyTo` function)
* *
* Note that if you are inside of an actor you should prefer [[ActorContext.ask]] * Note that if you are inside of an actor you should prefer [[ActorContext.ask]]
* as that provides better safety. * as that provides better safety.
@ -51,10 +52,45 @@ object AskPattern {
* implicit val scheduler = system.scheduler * implicit val scheduler = system.scheduler
* implicit val timeout = Timeout(3.seconds) * implicit val timeout = Timeout(3.seconds)
* val target: ActorRef[Request] = ... * val target: ActorRef[Request] = ...
* val f: Future[Reply] = target ? replyTo => (Request("hello", replyTo)) * val f: Future[Reply] = target ? (replyTo => (Request("hello", replyTo)))
* }}} * }}}
*
* Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for
* the `ActorRef`.
*/ */
def ?[U](replyTo: ActorRef[U] T)(implicit timeout: Timeout, @unused scheduler: Scheduler): Future[U] = { def ?[U](replyTo: ActorRef[U] T)(implicit timeout: Timeout, @unused scheduler: Scheduler): Future[U] = {
ask(replyTo)(timeout, scheduler)
}
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
*
* Note that if you are inside of an actor you should prefer [[ActorContext.ask]]
* as that provides better safety.
*
* The party that asks may be within or without an Actor, since the
* implementation will fabricate a (hidden) [[ActorRef]] that is bound to a
* [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the
* message that is sent to the target Actor in order to function as a reply-to
* address, therefore the argument to the ask / `?`
* operator is not the message itself but a function that given the reply-to
* address will create the message.
*
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val scheduler = system.scheduler
* implicit val timeout = Timeout(3.seconds)
* val target: ActorRef[Request] = ...
* val f: Future[Reply] = target.ask(replyTo => (Request("hello", replyTo)))
* // alternatively
* val f2: Future[Reply] = target.ask(Request("hello", _))
* // note that the explicit type on f2 is important for the compiler
* // to understand the type of the wildcard
* }}}
*/
def ask[U](replyTo: ActorRef[U] T)(implicit timeout: Timeout, @unused scheduler: Scheduler): Future[U] = {
// We do not currently use the implicit scheduler, but want to require it // We do not currently use the implicit scheduler, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219 // because it might be needed when we move to a 'native' typed runtime, see #24219
ref match { ref match {

View file

@ -0,0 +1,10 @@
# Akka Microbenchmarks
This subproject contains some microbenchmarks excercising key parts of Akka Typed.
You can run them like:
project akka-bench-jmh-typed
jmh:run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
Use 'jmh:run -h' to get an overview of the available options.

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka
import org.openjdk.jmh.results.RunResult
import org.openjdk.jmh.runner.Runner
import org.openjdk.jmh.runner.options.CommandLineOptions
object BenchRunner {
def main(args: Array[String]) = {
import scala.collection.JavaConverters._
val args2 = args.toList.flatMap {
case "quick" "-i 1 -wi 1 -f1 -t1".split(" ").toList
case "full" "-i 10 -wi 4 -f3 -t1".split(" ").toList
case "jitwatch" "-jvmArgs=-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation" :: Nil
case other other :: Nil
}
val opts = new CommandLineOptions(args2: _*)
val results = new Runner(opts).run()
val report = results.asScala.map { result: RunResult
val bench = result.getParams.getBenchmark
val params = result.getParams.getParamsKeys.asScala.map(key s"$key=${result.getParams.getParam(key)}").mkString("_")
val score = result.getAggregatedResult.getPrimaryResult.getScore.round
val unit = result.getAggregatedResult.getPrimaryResult.getScoreUnit
s"\t${bench}_${params}\t$score\t$unit"
}
report.toList.sorted.foreach(println)
}
}

View file

@ -99,7 +99,7 @@ class TypedActorBenchmark {
@Benchmark @Benchmark
@OperationsPerInvocation(totalMessages) @OperationsPerInvocation(totalMessages)
def echo(): Unit = { def echo(): Unit = {
Await.result(system ? Start, timeout) Await.result(system.ask(Start), timeout)
} }
} }

View file

@ -1,6 +1,8 @@
# Akka Microbenchmarks # Akka Microbenchmarks
This subproject contains some microbenchmarks excercising key parts of Akka. This subproject contains some microbenchmarks excercising key parts of Akka. (Excluding typed which has its
own jmh module)
You can run them like: You can run them like:

View file

@ -106,14 +106,14 @@ object ReplicatorSpec {
implicit val scheduler: Scheduler = ??? implicit val scheduler: Scheduler = ???
implicit val cluster: Cluster = ??? implicit val cluster: Cluster = ???
val reply1: Future[GetResponse[GCounter]] = replicator ? Replicator.Get(Key, Replicator.ReadLocal) val reply1: Future[GetResponse[GCounter]] = replicator.ask(Replicator.Get(Key, Replicator.ReadLocal))
val reply2: Future[UpdateResponse[GCounter]] = val reply2: Future[UpdateResponse[GCounter]] =
replicator ? Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1) replicator.ask(Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1))
val reply3: Future[DeleteResponse[GCounter]] = replicator ? Replicator.Delete(Key, Replicator.WriteLocal) val reply3: Future[DeleteResponse[GCounter]] = replicator.ask(Replicator.Delete(Key, Replicator.WriteLocal))
val reply4: Future[ReplicaCount] = replicator ? Replicator.GetReplicaCount() val reply4: Future[ReplicaCount] = replicator.ask(Replicator.GetReplicaCount())
// suppress unused compiler warnings // suppress unused compiler warnings
println("" + reply1 + reply2 + reply3 + reply4) println("" + reply1 + reply2 + reply3 + reply4)

View file

@ -6,13 +6,14 @@ package akka.persistence.typed.scaladsl
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.TimerScheduler import akka.actor.typed.scaladsl.TimerScheduler
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SideEffect import akka.persistence.typed.SideEffect
import scala.concurrent.Future
object PersistentActorCompileOnlyTest { object PersistentActorCompileOnlyTest {
import akka.persistence.typed.scaladsl.EventSourcedBehavior._ import akka.persistence.typed.scaladsl.EventSourcedBehavior._
@ -57,7 +58,7 @@ object PersistentActorCompileOnlyTest {
case class EventsInFlight(nextCorrelationId: Int, dataByCorrelationId: Map[Int, String]) case class EventsInFlight(nextCorrelationId: Int, dataByCorrelationId: Map[Int, String])
case class Request(correlationId: Int, data: String)(sender: ActorRef[Response]) case class Request(correlationId: Int, data: String, sender: ActorRef[Response])
case class Response(correlationId: Int) case class Response(correlationId: Int)
val sideEffectProcessor: ActorRef[Request] = ??? val sideEffectProcessor: ActorRef[Request] = ???
@ -67,8 +68,10 @@ object PersistentActorCompileOnlyTest {
implicit val scheduler: akka.actor.Scheduler = ??? implicit val scheduler: akka.actor.Scheduler = ???
implicit val ec: ExecutionContext = ??? implicit val ec: ExecutionContext = ???
(sideEffectProcessor ? Request(correlationId, data)) val response: Future[RecoveryComplete.Response] =
.map(response AcknowledgeSideEffect(response.correlationId)) sideEffectProcessor.ask(Request(correlationId, data, _))
response.map(response AcknowledgeSideEffect(response.correlationId))
.foreach(sender ! _) .foreach(sender ! _)
} }

View file

@ -21,7 +21,7 @@ initialize := {
initialize.value initialize.value
} }
akka.AkkaBuild.buildSettings // akka.AkkaBuild.buildSettings
shellPrompt := { s => Project.extract(s).currentProject.id + " > " } shellPrompt := { s => Project.extract(s).currentProject.id + " > " }
resolverSettings resolverSettings
@ -46,6 +46,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
actorTyped, actorTypedTests, actorTestkitTyped, actorTyped, actorTypedTests, actorTestkitTyped,
persistenceTyped, persistenceTyped,
clusterTyped, clusterShardingTyped, clusterTyped, clusterShardingTyped,
benchJmhTyped,
streamTyped, streamTyped,
discovery discovery
) )
@ -58,8 +59,9 @@ lazy val root = Project(
.settings(unidocRootIgnoreProjects := .settings(unidocRootIgnoreProjects :=
(CrossVersion.partialVersion(scalaVersion.value) match { (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n == 11 ⇒ aggregatedProjects // ignore all, don't unidoc when scalaVersion is 2.11 case Some((2, n)) if n == 11 ⇒ aggregatedProjects // ignore all, don't unidoc when scalaVersion is 2.11
case _ ⇒ Seq(remoteTests, benchJmh, protobuf, akkaScalaNightly, docs) case _ ⇒ Seq(remoteTests, benchJmh, benchJmhTyped, protobuf, akkaScalaNightly, docs)
}) }),
crossScalaVersions := Nil, // Allows some modules (typed) to be only for 2.12 sbt/sbt#3465
) )
.settings( .settings(
unmanagedSources in(Compile, headerCreate) := (baseDirectory.value / "project").**("*.scala").get unmanagedSources in(Compile, headerCreate) := (baseDirectory.value / "project").**("*.scala").get
@ -102,8 +104,7 @@ lazy val benchJmh = akkaModule("akka-bench-jmh")
Seq( Seq(
actor, actor,
stream, streamTests, stream, streamTests,
persistence, persistenceTyped, persistence, distributedData,
distributedData, clusterTyped,
testkit testkit
).map(_ % "compile->compile;compile->test"): _* ).map(_ % "compile->compile;compile->test"): _*
) )
@ -111,6 +112,21 @@ lazy val benchJmh = akkaModule("akka-bench-jmh")
.enablePlugins(JmhPlugin, ScaladocNoVerificationOfDiagrams, NoPublish, CopyrightHeader) .enablePlugins(JmhPlugin, ScaladocNoVerificationOfDiagrams, NoPublish, CopyrightHeader)
.disablePlugins(MimaPlugin, WhiteSourcePlugin, ValidatePullRequest, CopyrightHeaderInPr) .disablePlugins(MimaPlugin, WhiteSourcePlugin, ValidatePullRequest, CopyrightHeaderInPr)
// typed benchmarks only on 2.12+
lazy val benchJmhTyped = akkaModule("akka-bench-jmh-typed")
.dependsOn(
Seq(
persistenceTyped,
distributedData, clusterTyped,
testkit, benchJmh
).map(_ % "compile->compile;compile->test"): _*
)
.settings(Dependencies.benchJmh)
.settings(AkkaBuild.noScala211)
.enablePlugins(JmhPlugin, ScaladocNoVerificationOfDiagrams, NoPublish, CopyrightHeader)
.disablePlugins(MimaPlugin, WhiteSourcePlugin, ValidatePullRequest, CopyrightHeaderInPr)
lazy val camel = akkaModule("akka-camel") lazy val camel = akkaModule("akka-camel")
.dependsOn(actor, slf4j, testkit % "test->test") .dependsOn(actor, slf4j, testkit % "test->test")
.settings(Dependencies.camel) .settings(Dependencies.camel)
@ -247,6 +263,7 @@ lazy val docs = akkaModule("akka-docs")
resolvers += Resolver.jcenterRepo, resolvers += Resolver.jcenterRepo,
deployRsyncArtifact := List((paradox in Compile).value -> s"www/docs/akka/${version.value}") deployRsyncArtifact := List((paradox in Compile).value -> s"www/docs/akka/${version.value}")
) )
.settings(AkkaBuild.noScala211)
.enablePlugins( .enablePlugins(
AkkaParadoxPlugin, DeployRsync, NoPublish, ParadoxBrowse, AkkaParadoxPlugin, DeployRsync, NoPublish, ParadoxBrowse,
ScaladocNoVerificationOfDiagrams, ScaladocNoVerificationOfDiagrams,
@ -395,6 +412,7 @@ lazy val actorTyped = akkaModule("akka-actor-typed")
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.actor.typed")) // fine for now, eventually new module name to become typed.actor .settings(AutomaticModuleName.settings("akka.actor.typed")) // fine for now, eventually new module name to become typed.actor
.settings(OSGi.actorTyped) .settings(OSGi.actorTyped)
.settings(AkkaBuild.noScala211)
.settings( .settings(
initialCommands := """ initialCommands := """
import akka.actor.typed._ import akka.actor.typed._
@ -417,6 +435,7 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed")
) )
.settings(Dependencies.persistenceShared) .settings(Dependencies.persistenceShared)
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AkkaBuild.noScala211)
.settings(AutomaticModuleName.settings("akka.persistence.typed")) .settings(AutomaticModuleName.settings("akka.persistence.typed"))
.settings(OSGi.persistenceTyped) .settings(OSGi.persistenceTyped)
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
@ -435,6 +454,7 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
remoteTests % "test->test" remoteTests % "test->test"
) )
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AkkaBuild.noScala211)
.settings(AutomaticModuleName.settings("akka.cluster.typed")) .settings(AutomaticModuleName.settings("akka.cluster.typed"))
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
.configs(MultiJvm) .configs(MultiJvm)
@ -451,6 +471,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
remoteTests % "test->test" remoteTests % "test->test"
) )
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AkkaBuild.noScala211)
.settings(AutomaticModuleName.settings("akka.cluster.sharding.typed")) .settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))
// To be able to import ContainerFormats.proto // To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" ))
@ -467,6 +488,7 @@ lazy val streamTyped = akkaModule("akka-stream-typed")
actorTypedTests % "test->test" actorTypedTests % "test->test"
) )
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AkkaBuild.noScala211)
.settings(AutomaticModuleName.settings("akka.stream.typed")) .settings(AutomaticModuleName.settings("akka.stream.typed"))
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
.enablePlugins(ScaladocNoVerificationOfDiagrams) .enablePlugins(ScaladocNoVerificationOfDiagrams)
@ -475,6 +497,7 @@ lazy val actorTestkitTyped = akkaModule("akka-actor-testkit-typed")
.dependsOn(actorTyped, testkit % "compile->compile;test->test") .dependsOn(actorTyped, testkit % "compile->compile;test->test")
.settings(AutomaticModuleName.settings("akka.actor.testkit.typed")) .settings(AutomaticModuleName.settings("akka.actor.testkit.typed"))
.settings(Dependencies.actorTestkitTyped) .settings(Dependencies.actorTestkitTyped)
.settings(AkkaBuild.noScala211)
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
lazy val actorTypedTests = akkaModule("akka-actor-typed-tests") lazy val actorTypedTests = akkaModule("akka-actor-typed-tests")
@ -483,6 +506,7 @@ lazy val actorTypedTests = akkaModule("akka-actor-typed-tests")
actorTestkitTyped % "compile->compile;test->test" actorTestkitTyped % "compile->compile;test->test"
) )
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.settings(AkkaBuild.noScala211)
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
.enablePlugins(NoPublish) .enablePlugins(NoPublish)

View file

@ -233,6 +233,11 @@ object AkkaBuild {
javacOptions in test ++= Seq("-Xdoclint:none"), javacOptions in test ++= Seq("-Xdoclint:none"),
javacOptions in doc ++= Seq("-Xdoclint:none", "--ignore-source-errors")) javacOptions in doc ++= Seq("-Xdoclint:none", "--ignore-source-errors"))
lazy val noScala211 = Seq(
crossScalaVersions := crossScalaVersions.value.filterNot(_.startsWith("2.11"))
)
def loadSystemProperties(fileName: String): Unit = { def loadSystemProperties(fileName: String): Unit = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
val file = new File(fileName) val file = new File(fileName)