=clu #18345 Support local address in cluster commands
* and clarify the doc sample for leave
This commit is contained in:
parent
4cbfe3d682
commit
f98b4c1f5e
6 changed files with 113 additions and 7 deletions
|
|
@ -261,7 +261,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* cluster or to join the same cluster again.
|
* cluster or to join the same cluster again.
|
||||||
*/
|
*/
|
||||||
def join(address: Address): Unit =
|
def join(address: Address): Unit =
|
||||||
clusterCore ! ClusterUserAction.JoinTo(address)
|
clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
|
||||||
|
|
||||||
|
private def fillLocal(address: Address): Address = {
|
||||||
|
// local address might be used if grabbed from actorRef.path.address
|
||||||
|
if (address.hasLocalScope && address.system == selfAddress.system) selfAddress
|
||||||
|
else address
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join the specified seed nodes without defining them in config.
|
* Join the specified seed nodes without defining them in config.
|
||||||
|
|
@ -272,7 +278,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* cluster or to join the same cluster again.
|
* cluster or to join the same cluster again.
|
||||||
*/
|
*/
|
||||||
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
|
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
|
||||||
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector)
|
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
|
|
@ -300,7 +306,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* still be necessary to set the node’s status to Down in order to complete the removal.
|
* still be necessary to set the node’s status to Down in order to complete the removal.
|
||||||
*/
|
*/
|
||||||
def leave(address: Address): Unit =
|
def leave(address: Address): Unit =
|
||||||
clusterCore ! ClusterUserAction.Leave(address)
|
clusterCore ! ClusterUserAction.Leave(fillLocal(address))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send command to DOWN the node specified by 'address'.
|
* Send command to DOWN the node specified by 'address'.
|
||||||
|
|
@ -311,7 +317,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* this method.
|
* this method.
|
||||||
*/
|
*/
|
||||||
def down(address: Address): Unit =
|
def down(address: Address): Unit =
|
||||||
clusterCore ! ClusterUserAction.Down(address)
|
clusterCore ! ClusterUserAction.Down(fillLocal(address))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The supplied thunk will be run, once, when current cluster member is `Up`.
|
* The supplied thunk will be run, once, when current cluster member is `Up`.
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,9 @@ import java.lang.management.ManagementFactory
|
||||||
import javax.management.ObjectName
|
import javax.management.ObjectName
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.testkit.TestProbe
|
import akka.testkit.TestProbe
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.Props
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
object ClusterSpec {
|
object ClusterSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -28,7 +31,7 @@ object ClusterSpec {
|
||||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
akka.remote.netty.tcp.port = 0
|
akka.remote.netty.tcp.port = 0
|
||||||
# akka.loglevel = DEBUG
|
#akka.loglevel = DEBUG
|
||||||
"""
|
"""
|
||||||
|
|
||||||
final case class GossipTo(address: Address)
|
final case class GossipTo(address: Address)
|
||||||
|
|
@ -107,5 +110,30 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
callbackProbe.expectMsg("OnMemberRemoved")
|
callbackProbe.expectMsg("OnMemberRemoved")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"allow join and leave with local address" in {
|
||||||
|
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.netty.tcp.port = 0
|
||||||
|
"""))
|
||||||
|
try {
|
||||||
|
val ref = sys2.actorOf(Props.empty)
|
||||||
|
Cluster(sys2).join(ref.path.address) // address doesn't contain full address information
|
||||||
|
within(5.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys2).state.members.size should ===(1)
|
||||||
|
Cluster(sys2).state.members.head.status should ===(MemberStatus.Up)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cluster(sys2).leave(ref.path.address)
|
||||||
|
within(5.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys2).isTerminated should ===(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
shutdown(sys2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,9 @@ above.
|
||||||
|
|
||||||
A more graceful exit can be performed if you tell the cluster that a node shall leave.
|
A more graceful exit can be performed if you tell the cluster that a node shall leave.
|
||||||
This can be performed using :ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`.
|
This can be performed using :ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`.
|
||||||
It can also be performed programmatically with ``Cluster.get(system).leave(address)``.
|
It can also be performed programmatically with:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/cluster/ClusterDocTest.java#leave
|
||||||
|
|
||||||
Note that this command can be issued to any member in the cluster, not necessarily the
|
Note that this command can be issued to any member in the cluster, not necessarily the
|
||||||
one that is leaving. The cluster extension, but not the actor system or JVM, of the
|
one that is leaving. The cluster extension, but not the actor system or JVM, of the
|
||||||
|
|
|
||||||
41
akka-docs/rst/java/code/docs/cluster/ClusterDocTest.java
Normal file
41
akka-docs/rst/java/code/docs/cluster/ClusterDocTest.java
Normal file
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package docs.cluster;
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.cluster.Cluster;
|
||||||
|
import akka.testkit.JavaTestKit;
|
||||||
|
|
||||||
|
|
||||||
|
public class ClusterDocTest {
|
||||||
|
|
||||||
|
static ActorSystem system;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() {
|
||||||
|
system = ActorSystem.create("ClusterDocTest",
|
||||||
|
ConfigFactory.parseString(ClusterDocSpec.config()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
JavaTestKit.shutdownActorSystem(system);
|
||||||
|
system = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void demonstrateLeave() {
|
||||||
|
//#leave
|
||||||
|
final Cluster cluster = Cluster.get(system);
|
||||||
|
cluster.leave(cluster.selfAddress());
|
||||||
|
//#leave
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -152,7 +152,9 @@ above.
|
||||||
|
|
||||||
A more graceful exit can be performed if you tell the cluster that a node shall leave.
|
A more graceful exit can be performed if you tell the cluster that a node shall leave.
|
||||||
This can be performed using :ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`.
|
This can be performed using :ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`.
|
||||||
It can also be performed programmatically with ``Cluster(system).leave(address)``.
|
It can also be performed programmatically with:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/cluster/ClusterDocSpec.scala#leave
|
||||||
|
|
||||||
Note that this command can be issued to any member in the cluster, not necessarily the
|
Note that this command can be issued to any member in the cluster, not necessarily the
|
||||||
one that is leaving. The cluster extension, but not the actor system or JVM, of the
|
one that is leaving. The cluster extension, but not the actor system or JVM, of the
|
||||||
|
|
|
||||||
27
akka-docs/rst/scala/code/docs/cluster/ClusterDocSpec.scala
Normal file
27
akka-docs/rst/scala/code/docs/cluster/ClusterDocSpec.scala
Normal file
|
|
@ -0,0 +1,27 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package docs.cluster
|
||||||
|
|
||||||
|
import akka.cluster.Cluster
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
|
object ClusterDocSpec {
|
||||||
|
|
||||||
|
val config =
|
||||||
|
"""
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.netty.tcp.port = 0
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClusterDocSpec extends AkkaSpec(ClusterDocSpec.config) {
|
||||||
|
|
||||||
|
"demonstrate leave" in {
|
||||||
|
//#leave
|
||||||
|
val cluster = Cluster(system)
|
||||||
|
cluster.leave(cluster.selfAddress)
|
||||||
|
//#leave
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue