cluster.joinSeedNodes(...) does not throw warnings when seed hostname contains underscores #25287 (#27222)
This commit is contained in:
parent
d4167bc930
commit
501848dcca
6 changed files with 103 additions and 20 deletions
|
|
@ -89,5 +89,33 @@ class ActorPathSpec extends WordSpec with Matchers {
|
||||||
ActorPath.fromString("akka://mysys/user/boom/*")
|
ActorPath.fromString("akka://mysys/user/boom/*")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"detect valid and invalid chars in host names when not using AddressFromURIString, e.g. docker host given name" in {
|
||||||
|
Seq(
|
||||||
|
Address("akka", "sys", Some("valid"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("is_valid.org"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("fu.is_valid.org"), Some(0))).forall(_.hasInvalidHostCharacters) shouldBe false
|
||||||
|
|
||||||
|
Seq(Address("akka", "sys", Some("in_valid"), Some(0)), Address("akka", "sys", Some("invalid._org"), Some(0)))
|
||||||
|
.forall(_.hasInvalidHostCharacters) shouldBe true
|
||||||
|
|
||||||
|
intercept[MalformedURLException](AddressFromURIString("akka://sys@in_valid:5001"))
|
||||||
|
}
|
||||||
|
|
||||||
|
"not fail fast if the check is called on valid chars in host names" in {
|
||||||
|
Seq(
|
||||||
|
Address("akka", "sys", Some("localhost"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("is_valid.org"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("fu.is_valid.org"), Some(0))).foreach(_.checkHostCharacters())
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail fast if the check is called when invalid chars are in host names" in {
|
||||||
|
Seq(
|
||||||
|
Address("akka", "sys", Some("localhost"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("is_valid.org"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("fu.is_valid.org"), Some(0))).foreach(_.checkHostCharacters())
|
||||||
|
|
||||||
|
intercept[IllegalArgumentException](Address("akka", "sys", Some("in_valid"), Some(0)).checkHostCharacters())
|
||||||
|
intercept[IllegalArgumentException](Address("akka", "sys", Some("invalid._org"), Some(0)).checkHostCharacters())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,12 @@ package akka.actor
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
import java.net.URISyntaxException
|
import java.net.URISyntaxException
|
||||||
import java.net.MalformedURLException
|
import java.net.MalformedURLException
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
|
||||||
|
import akka.annotation.InternalApi
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The address specifies the physical location under which an Actor can be
|
* The address specifies the physical location under which an Actor can be
|
||||||
* reached. Examples are local addresses, identified by the ActorSystem’s
|
* reached. Examples are local addresses, identified by the ActorSystem’s
|
||||||
|
|
@ -65,10 +68,27 @@ final case class Address private (protocol: String, system: String, host: Option
|
||||||
* `system@host:port`
|
* `system@host:port`
|
||||||
*/
|
*/
|
||||||
def hostPort: String = toString.substring(protocol.length + 3)
|
def hostPort: String = toString.substring(protocol.length + 3)
|
||||||
|
|
||||||
|
/** INTERNAL API
|
||||||
|
* Check if the address is not created through `AddressFromURIString`, if there
|
||||||
|
* are any unusual characters in the host string.
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
private[akka] def hasInvalidHostCharacters: Boolean =
|
||||||
|
host.exists(Address.InvalidHostRegex.findFirstIn(_).nonEmpty)
|
||||||
|
|
||||||
|
/** INTERNAL API */
|
||||||
|
@InternalApi
|
||||||
|
private[akka] def checkHostCharacters(): Unit =
|
||||||
|
require(!hasInvalidHostCharacters, s"Using invalid host characters '$host' in the Address is not allowed.")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object Address {
|
object Address {
|
||||||
|
|
||||||
|
// if underscore and no dot after, then invalid
|
||||||
|
val InvalidHostRegex = "_[^.]*$".r
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new Address with the specified protocol and system name
|
* Constructs a new Address with the specified protocol and system name
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,9 @@ sealed trait ClusterCommand
|
||||||
* The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
|
* The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
|
||||||
* cluster.
|
* cluster.
|
||||||
*/
|
*/
|
||||||
final case class Join(address: Address) extends ClusterCommand
|
final case class Join(address: Address) extends ClusterCommand {
|
||||||
|
address.checkHostCharacters()
|
||||||
|
}
|
||||||
|
|
||||||
object Join {
|
object Join {
|
||||||
|
|
||||||
|
|
@ -100,6 +102,7 @@ object Join {
|
||||||
* cluster or to join the same cluster again.
|
* cluster or to join the same cluster again.
|
||||||
*/
|
*/
|
||||||
final case class JoinSeedNodes(seedNodes: immutable.Seq[Address]) extends ClusterCommand {
|
final case class JoinSeedNodes(seedNodes: immutable.Seq[Address]) extends ClusterCommand {
|
||||||
|
seedNodes.foreach(_.checkHostCharacters())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Join the specified seed nodes without defining them in config.
|
* Java API: Join the specified seed nodes without defining them in config.
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
package akka.cluster.typed
|
package akka.cluster.typed
|
||||||
|
|
||||||
|
import akka.actor.Address
|
||||||
import akka.actor.typed.scaladsl.adapter._
|
import akka.actor.typed.scaladsl.adapter._
|
||||||
import akka.cluster.ClusterEvent._
|
import akka.cluster.ClusterEvent._
|
||||||
import akka.cluster.MemberStatus
|
import akka.cluster.MemberStatus
|
||||||
|
|
@ -43,6 +44,12 @@ class ClusterApiSpec extends ScalaTestWithActorTestKit(ClusterApiSpec.config) wi
|
||||||
|
|
||||||
"A typed Cluster" must {
|
"A typed Cluster" must {
|
||||||
|
|
||||||
|
"fail fast in a join attempt if invalid chars are in host names, e.g. docker host given name" in {
|
||||||
|
val address = Address("akka", "sys", Some("in_valid"), Some(0))
|
||||||
|
intercept[IllegalArgumentException](Join(address))
|
||||||
|
intercept[IllegalArgumentException](JoinSeedNodes(scala.collection.immutable.Seq(address)))
|
||||||
|
}
|
||||||
|
|
||||||
"join a cluster and observe events from both sides" in {
|
"join a cluster and observe events from both sides" in {
|
||||||
|
|
||||||
val system2 = akka.actor.ActorSystem(system.name, system.settings.config)
|
val system2 = akka.actor.ActorSystem(system.name, system.settings.config)
|
||||||
|
|
|
||||||
|
|
@ -300,8 +300,10 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
|
* The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
|
||||||
* cluster.
|
* cluster.
|
||||||
*/
|
*/
|
||||||
def join(address: Address): Unit =
|
def join(address: Address): Unit = {
|
||||||
|
address.checkHostCharacters()
|
||||||
clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
|
clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
|
||||||
|
}
|
||||||
|
|
||||||
private def fillLocal(address: Address): Address = {
|
private def fillLocal(address: Address): Address = {
|
||||||
// local address might be used if grabbed from actorRef.path.address
|
// local address might be used if grabbed from actorRef.path.address
|
||||||
|
|
@ -317,8 +319,10 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
* When it has successfully joined it must be restarted to be able to join another
|
* When it has successfully joined it must be restarted to be able to join another
|
||||||
* cluster or to join the same cluster again.
|
* cluster or to join the same cluster again.
|
||||||
*/
|
*/
|
||||||
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
|
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit = {
|
||||||
|
seedNodes.foreach(_.checkHostCharacters())
|
||||||
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
|
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
|
|
|
||||||
|
|
@ -4,26 +4,28 @@
|
||||||
|
|
||||||
package akka.cluster
|
package akka.cluster
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
import akka.testkit.ImplicitSender
|
|
||||||
import akka.actor.ExtendedActorSystem
|
|
||||||
import akka.actor.Address
|
|
||||||
import akka.cluster.InternalClusterAction._
|
|
||||||
import java.lang.management.ManagementFactory
|
import java.lang.management.ManagementFactory
|
||||||
import javax.management.ObjectName
|
|
||||||
|
|
||||||
import akka.testkit.TestProbe
|
|
||||||
import akka.actor.ActorSystem
|
|
||||||
import akka.actor.Props
|
|
||||||
import com.typesafe.config.ConfigFactory
|
|
||||||
import akka.actor.CoordinatedShutdown
|
|
||||||
import akka.cluster.ClusterEvent.MemberEvent
|
|
||||||
import akka.cluster.ClusterEvent._
|
|
||||||
import akka.stream.ActorMaterializer
|
|
||||||
import akka.stream.scaladsl.{ Sink, Source, StreamRefs }
|
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.Address
|
||||||
|
import akka.actor.CoordinatedShutdown
|
||||||
|
import akka.actor.ExtendedActorSystem
|
||||||
|
import akka.actor.Props
|
||||||
|
import akka.cluster.ClusterEvent.MemberEvent
|
||||||
|
import akka.cluster.ClusterEvent._
|
||||||
|
import akka.cluster.InternalClusterAction._
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.scaladsl.Sink
|
||||||
|
import akka.stream.scaladsl.Source
|
||||||
|
import akka.stream.scaladsl.StreamRefs
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
import akka.testkit.ImplicitSender
|
||||||
|
import akka.testkit.TestProbe
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import javax.management.ObjectName
|
||||||
|
|
||||||
object ClusterSpec {
|
object ClusterSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -70,6 +72,25 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
expectMsgType[InitJoinNack]
|
expectMsgType[InitJoinNack]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"fail fast in a join if invalid chars in host names, e.g. docker host given name" in {
|
||||||
|
val addresses = scala.collection.immutable
|
||||||
|
.Seq(Address("akka", "sys", Some("in_valid"), Some(0)), Address("akka", "sys", Some("invalid._org"), Some(0)))
|
||||||
|
|
||||||
|
addresses.foreach(a => intercept[IllegalArgumentException](cluster.join(a)))
|
||||||
|
intercept[IllegalArgumentException](cluster.joinSeedNodes(addresses))
|
||||||
|
}
|
||||||
|
|
||||||
|
"not fail fast to attempt a join with valid chars in host names" in {
|
||||||
|
val addresses = scala.collection.immutable.Seq(
|
||||||
|
Address("akka", "sys", Some("localhost"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("is_valid.org"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("fu.is_valid.org"), Some(0)),
|
||||||
|
Address("akka", "sys", Some("fu_.is_valid.org"), Some(0)))
|
||||||
|
|
||||||
|
addresses.foreach(cluster.join)
|
||||||
|
cluster.joinSeedNodes(addresses)
|
||||||
|
}
|
||||||
|
|
||||||
"initially become singleton cluster when joining itself and reach convergence" in {
|
"initially become singleton cluster when joining itself and reach convergence" in {
|
||||||
clusterView.members.size should ===(0)
|
clusterView.members.size should ===(0)
|
||||||
cluster.join(selfAddress)
|
cluster.join(selfAddress)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue