provided cluster dependencies in Akka Typed

* and some additional cleanup
* on my mac upd doesn't work unless loopback address is used
This commit is contained in:
Patrik Nordwall 2017-09-22 12:17:04 +02:00
parent 846cfe0416
commit 579b56a0ee
10 changed files with 41 additions and 146 deletions

View file

@ -22,6 +22,7 @@ public class ClusterApiTest extends JUnitSuite {
"akka.remote.artery.enabled = true \n"+ "akka.remote.artery.enabled = true \n"+
"akka.remote.netty.tcp.port = 0 \n"+ "akka.remote.netty.tcp.port = 0 \n"+
"akka.remote.artery.canonical.port = 0 \n"+ "akka.remote.artery.canonical.port = 0 \n"+
"akka.remote.artery.canonical.hostname = 127.0.0.1 \n" +
"akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n"+ "akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n"+
"akka.coordinated-shutdown.terminate-actor-system = off \n"+ "akka.coordinated-shutdown.terminate-actor-system = off \n"+
"akka.actor { \n"+ "akka.actor { \n"+

View file

@ -150,7 +150,8 @@ public class ReplicatorTest extends JUnitSuite {
static Config config = ConfigFactory.parseString( static Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" + "akka.actor.provider = cluster \n" +
"akka.remote.netty.tcp.port = 0 \n" + "akka.remote.netty.tcp.port = 0 \n" +
"akka.remote.artery.canonical.port = 0 \n"); "akka.remote.artery.canonical.port = 0 \n" +
"akka.remote.artery.canonical.hostname = 127.0.0.1 \n");
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest",

View file

@ -23,6 +23,7 @@ object ClusterApiSpec {
akka.remote.artery.enabled = true akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0 akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.actor { akka.actor {
@ -62,8 +63,7 @@ class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures
// check that cached selfMember is updated // check that cached selfMember is updated
node1Probe.awaitAssert( node1Probe.awaitAssert(
clusterNode1.selfMember.status should ===(MemberStatus.Up) clusterNode1.selfMember.status should ===(MemberStatus.Up))
)
// subscribing to OnSelfUp when already up // subscribing to OnSelfUp when already up
clusterNode1.subscriptions ! Subscribe(node1Probe.ref, classOf[SelfUp]) clusterNode1.subscriptions ! Subscribe(node1Probe.ref, classOf[SelfUp])
@ -73,8 +73,7 @@ class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures
clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfUp]) clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfUp])
clusterNode2.manager ! Join(clusterNode1.selfMember.address) clusterNode2.manager ! Join(clusterNode1.selfMember.address)
node2Probe.awaitAssert( node2Probe.awaitAssert(
clusterNode2.selfMember.status should ===(MemberStatus.Up) clusterNode2.selfMember.status should ===(MemberStatus.Up))
)
node2Probe.expectMsgType[SelfUp] node2Probe.expectMsgType[SelfUp]
// events about node2 joining to subscriber on node1 // events about node2 joining to subscriber on node1
@ -92,8 +91,7 @@ class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures
// selfMember updated and self removed event gotten // selfMember updated and self removed event gotten
node2Probe.awaitAssert( node2Probe.awaitAssert(
clusterNode2.selfMember.status should ===(MemberStatus.Removed) clusterNode2.selfMember.status should ===(MemberStatus.Removed))
)
node2Probe.expectMsg(SelfRemoved(MemberStatus.Exiting)) node2Probe.expectMsg(SelfRemoved(MemberStatus.Exiting))
// subscribing to SelfRemoved when already removed yields immediate message back // subscribing to SelfRemoved when already removed yields immediate message back

View file

@ -22,23 +22,25 @@ import scala.concurrent.duration._
object ClusterSingletonApiSpec { object ClusterSingletonApiSpec {
val config = ConfigFactory.parseString( val config = ConfigFactory.parseString(
""" s"""
akka.actor { akka.actor {
provider = cluster provider = cluster
serialize-messages = off serialize-messages = off
allow-java-serialization = off allow-java-serialization = off
serializers { serializers {
test = "akka.typed.cluster.ClusterSingletonApiSpec$PingSerializer" test = "akka.typed.cluster.ClusterSingletonApiSpec$$PingSerializer"
} }
serialization-bindings { serialization-bindings {
"akka.typed.cluster.ClusterSingletonApiSpec$Ping" = test "akka.typed.cluster.ClusterSingletonApiSpec$$Ping" = test
"akka.typed.cluster.ClusterSingletonApiSpec$Pong$" = test "akka.typed.cluster.ClusterSingletonApiSpec$$Pong$$" = test
"akka.typed.cluster.ClusterSingletonApiSpec$Perish$" = test "akka.typed.cluster.ClusterSingletonApiSpec$$Perish$$" = test
} }
} }
akka.remote.netty.tcp.port = 0
akka.remote.artery.enabled = true akka.remote.artery.enabled = true
akka.remote.artery.canonical.port = 25552 akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.cluster.jmx.multi-mbeans-in-same-jvm = on
""") """)
@ -94,10 +96,8 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config)
adaptedSystem.name, adaptedSystem.name,
ConfigFactory.parseString( ConfigFactory.parseString(
""" """
akka.remote.artery.canonical.port = 0
akka.cluster.roles = ["singleton"] akka.cluster.roles = ["singleton"]
""" """).withFallback(adaptedSystem.settings.config))
).withFallback(adaptedSystem.settings.config))
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(adaptedSystem2) val clusterNode2 = Cluster(adaptedSystem2)

View file

@ -10,7 +10,6 @@ import akka.testkit.AkkaSpec
import akka.typed.{ ActorRef, ActorSystem } import akka.typed.{ ActorRef, ActorSystem }
import akka.typed.scaladsl.Actor import akka.typed.scaladsl.Actor
import akka.actor.{ ExtendedActorSystem, ActorSystem UntypedActorSystem } import akka.actor.{ ExtendedActorSystem, ActorSystem UntypedActorSystem }
import akka.cluster.Cluster
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.Promise import scala.concurrent.Promise

View file

@ -32,6 +32,7 @@ object ReplicatorSpec {
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0 akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
""") """)
sealed trait ClientCommand sealed trait ClientCommand

View file

@ -28,25 +28,26 @@ import scala.concurrent.duration._
object ClusterReceptionistSpec { object ClusterReceptionistSpec {
val config = ConfigFactory.parseString( val config = ConfigFactory.parseString(
""" s"""
akka.log-level = DEBUG
akka.actor { akka.actor {
provider = cluster provider = cluster
serialize-messages = off serialize-messages = off
allow-java-serialization = true allow-java-serialization = true
serializers { serializers {
test = "akka.typed.cluster.receptionist.ClusterReceptionistSpec$PingSerializer" test = "akka.typed.cluster.receptionist.ClusterReceptionistSpec$$PingSerializer"
} }
serialization-bindings { serialization-bindings {
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Ping" = test "akka.typed.cluster.receptionist.ClusterReceptionistSpec$$Ping" = test
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Pong$" = test "akka.typed.cluster.receptionist.ClusterReceptionistSpec$$Pong$$" = test
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Perish$" = test "akka.typed.cluster.receptionist.ClusterReceptionistSpec$$Perish$$" = test
# for now, using Java serializers is good enough (tm), see #23687 # for now, using Java serializers is good enough (tm), see #23687
# "akka.typed.internal.receptionist.ReceptionistImpl$DefaultServiceKey" = test # "akka.typed.internal.receptionist.ReceptionistImpl$$DefaultServiceKey" = test
} }
} }
akka.remote.artery.enabled = true akka.remote.artery.enabled = true
akka.remote.artery.canonical.port = 25552 akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.cluster.jmx.multi-mbeans-in-same-jvm = on
""") """)
@ -102,11 +103,7 @@ class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config)
val system2 = akka.actor.ActorSystem( val system2 = akka.actor.ActorSystem(
adaptedSystem.name, adaptedSystem.name,
ConfigFactory.parseString( adaptedSystem.settings.config)
"""
akka.remote.artery.canonical.port = 0
"""
).withFallback(adaptedSystem.settings.config))
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(system2) val clusterNode2 = Cluster(system2)

View file

@ -1,112 +0,0 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.scaladsl.adapter
import java.nio.charset.StandardCharsets
import akka.Done
import akka.testkit.AkkaSpec
import akka.typed.{ ActorRef, ActorSystem }
import akka.typed.scaladsl.Actor
import akka.actor.{ ExtendedActorSystem, ActorSystem UntypedActorSystem }
import akka.cluster.Cluster
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import com.typesafe.config.ConfigFactory
import scala.concurrent.Promise
import akka.typed.cluster.ActorRefResolver
import akka.typed.internal.adapter.ActorRefAdapter
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
override def identifier = 41
override def manifest(o: AnyRef) = "a"
override def toBinary(o: AnyRef) = o match {
case RemotingSpec.Ping(who)
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = ActorRefResolver(system.toTyped).resolveActorRef[String](str)
RemotingSpec.Ping(ref)
}
}
object RemotingSpec {
def config = ConfigFactory.parseString(
s"""
akka {
loglevel = debug
actor {
provider = cluster
warn-about-java-serializer-usage = off
serialize-creators = off
serializers {
test = "akka.typed.scaladsl.adapter.PingSerializer"
}
serialization-bindings {
"akka.typed.scaladsl.adapter.RemotingSpec$$Ping" = test
}
}
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0
}
}
}
""")
case class Ping(sender: ActorRef[String])
}
class RemotingSpec extends AkkaSpec(RemotingSpec.config) {
import RemotingSpec._
val typedSystem = system.toTyped
"the adapted system" should {
"something something" in {
val pingPromise = Promise[Done]()
val ponger = Actor.immutable[Ping]((_, msg)
msg match {
case Ping(sender)
pingPromise.success(Done)
sender ! "pong"
Actor.stopped
})
// typed actor on system1
val pingPongActor = system.spawn(ponger, "pingpong")
val system2 = UntypedActorSystem(system.name + "-system2", RemotingSpec.config)
val typedSystem2 = system2.toTyped
try {
// resolve the actor from node2
val remoteRefStr = ActorRefResolver(typedSystem).toSerializationFormat(pingPongActor)
val remoteRef: ActorRef[Ping] =
ActorRefResolver(typedSystem2).resolveActorRef[Ping](remoteRefStr)
val pongPromise = Promise[Done]()
val recipient = system2.spawn(Actor.immutable[String] { (_, msg)
pongPromise.success(Done)
Actor.stopped
}, "recipient")
remoteRef ! Ping(recipient)
pingPromise.future.futureValue should ===(Done)
pongPromise.future.futureValue should ===(Done)
} finally {
system2.terminate()
}
}
}
}

View file

@ -154,6 +154,10 @@ private[typed] object ReceptionistImpl extends ReceptionistBehaviorProvider {
case SubscriberTerminated(key, subscriber) case SubscriberTerminated(key, subscriber)
next(newSubscriptions = subscriptions.removed(key)(subscriber)) next(newSubscriptions = subscriptions.removed(key)(subscriber))
case _: InternalCommand
// silence compiler exhaustive check
Actor.unhandled
} }
} }
} }

View file

@ -160,15 +160,21 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck")
lazy val typed = akkaModule("akka-typed") lazy val typed = akkaModule("akka-typed")
.dependsOn( .dependsOn(
testkit % "compile->compile;test->test", testkit % "compile->compile;test->test",
cluster % "compile->compile;test->test", cluster % "provided->compile",
clusterTools % "provided->compile",
clusterSharding % "provided->compile",
distributedData % "provided->compile")
lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->provided;test->test")
// the provided dependencies
.dependsOn(
cluster % "test->test",
clusterTools, clusterTools,
clusterSharding, clusterSharding,
distributedData, distributedData,
persistence % "compile->compile;test->test") persistence % "compile->compile;test->test")
lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->test")
lazy val typedTestkit = akkaModule("akka-typed-testkit") lazy val typedTestkit = akkaModule("akka-typed-testkit")
.dependsOn(typed, testkit % "compile->compile;test->test") .dependsOn(typed, testkit % "compile->compile;test->test")