Reversed protocol scheme order in Addresses
This commit is contained in:
parent
b6ad46e88c
commit
fe22c7515f
22 changed files with 127 additions and 127 deletions
|
|
@ -24,8 +24,8 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
|||
}
|
||||
|
||||
"An AccrualFailureDetector" must {
|
||||
val conn = Address("tcp.akka", "", "localhost", 2552)
|
||||
val conn2 = Address("tcp.akka", "", "localhost", 2553)
|
||||
val conn = Address("akka.tcp", "", "localhost", 2552)
|
||||
val conn2 = Address("akka.tcp", "", "localhost", 2553)
|
||||
|
||||
def fakeTimeGenerator(timeIntervals: immutable.Seq[Long]): () ⇒ Long = {
|
||||
var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c))
|
||||
|
|
|
|||
|
|
@ -23,11 +23,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
|
|||
with BeforeAndAfterEach with ImplicitSender {
|
||||
|
||||
var publisher: ActorRef = _
|
||||
val a1 = Member(Address("tcp.akka", "sys", "a", 2552), Up)
|
||||
val b1 = Member(Address("tcp.akka", "sys", "b", 2552), Up)
|
||||
val c1 = Member(Address("tcp.akka", "sys", "c", 2552), Joining)
|
||||
val c2 = Member(Address("tcp.akka", "sys", "c", 2552), Up)
|
||||
val d1 = Member(Address("tcp.akka", "sys", "a", 2551), Up)
|
||||
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
|
||||
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
|
||||
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Joining)
|
||||
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
|
||||
val d1 = Member(Address("akka.tcp", "sys", "a", 2551), Up)
|
||||
|
||||
val g0 = Gossip(members = SortedSet(a1)).seen(a1.address)
|
||||
val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address)
|
||||
|
|
|
|||
|
|
@ -15,19 +15,19 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
|
|||
import MemberStatus._
|
||||
import ClusterEvent._
|
||||
|
||||
val a1 = Member(Address("tcp.akka", "sys", "a", 2552), Up)
|
||||
val a2 = Member(Address("tcp.akka", "sys", "a", 2552), Joining)
|
||||
val a3 = Member(Address("tcp.akka", "sys", "a", 2552), Removed)
|
||||
val b1 = Member(Address("tcp.akka", "sys", "b", 2552), Up)
|
||||
val b2 = Member(Address("tcp.akka", "sys", "b", 2552), Removed)
|
||||
val b3 = Member(Address("tcp.akka", "sys", "b", 2552), Down)
|
||||
val c1 = Member(Address("tcp.akka", "sys", "c", 2552), Leaving)
|
||||
val c2 = Member(Address("tcp.akka", "sys", "c", 2552), Up)
|
||||
val d1 = Member(Address("tcp.akka", "sys", "d", 2552), Leaving)
|
||||
val d2 = Member(Address("tcp.akka", "sys", "d", 2552), Removed)
|
||||
val e1 = Member(Address("tcp.akka", "sys", "e", 2552), Joining)
|
||||
val e2 = Member(Address("tcp.akka", "sys", "e", 2552), Up)
|
||||
val e3 = Member(Address("tcp.akka", "sys", "e", 2552), Down)
|
||||
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
|
||||
val a2 = Member(Address("akka.tcp", "sys", "a", 2552), Joining)
|
||||
val a3 = Member(Address("akka.tcp", "sys", "a", 2552), Removed)
|
||||
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
|
||||
val b2 = Member(Address("akka.tcp", "sys", "b", 2552), Removed)
|
||||
val b3 = Member(Address("akka.tcp", "sys", "b", 2552), Down)
|
||||
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving)
|
||||
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
|
||||
val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving)
|
||||
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
|
||||
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
|
||||
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
|
||||
val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down)
|
||||
|
||||
def converge(gossip: Gossip): (Gossip, Set[Address]) =
|
||||
((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) ⇒ (gs._1.seen(m.address), gs._2 + m.address) }
|
||||
|
|
|
|||
|
|
@ -14,12 +14,12 @@ import scala.collection.immutable
|
|||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
|
||||
|
||||
val selfAddress = Address("tcp.akka", "sys", "myself", 2552)
|
||||
val aa = Address("tcp.akka", "sys", "aa", 2552)
|
||||
val bb = Address("tcp.akka", "sys", "bb", 2552)
|
||||
val cc = Address("tcp.akka", "sys", "cc", 2552)
|
||||
val dd = Address("tcp.akka", "sys", "dd", 2552)
|
||||
val ee = Address("tcp.akka", "sys", "ee", 2552)
|
||||
val selfAddress = Address("akka.tcp", "sys", "myself", 2552)
|
||||
val aa = Address("akka.tcp", "sys", "aa", 2552)
|
||||
val bb = Address("akka.tcp", "sys", "bb", 2552)
|
||||
val cc = Address("akka.tcp", "sys", "cc", 2552)
|
||||
val dd = Address("akka.tcp", "sys", "dd", 2552)
|
||||
val ee = Address("akka.tcp", "sys", "ee", 2552)
|
||||
|
||||
val emptyState = ClusterHeartbeatSenderState.empty(selfAddress, 3)
|
||||
|
||||
|
|
|
|||
|
|
@ -14,17 +14,17 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
|
||||
import MemberStatus._
|
||||
|
||||
val a1 = Member(Address("tcp.akka", "sys", "a", 2552), Up)
|
||||
val a2 = Member(Address("tcp.akka", "sys", "a", 2552), Joining)
|
||||
val b1 = Member(Address("tcp.akka", "sys", "b", 2552), Up)
|
||||
val b2 = Member(Address("tcp.akka", "sys", "b", 2552), Removed)
|
||||
val c1 = Member(Address("tcp.akka", "sys", "c", 2552), Leaving)
|
||||
val c2 = Member(Address("tcp.akka", "sys", "c", 2552), Up)
|
||||
val c3 = Member(Address("tcp.akka", "sys", "c", 2552), Exiting)
|
||||
val d1 = Member(Address("tcp.akka", "sys", "d", 2552), Leaving)
|
||||
val d2 = Member(Address("tcp.akka", "sys", "d", 2552), Removed)
|
||||
val e1 = Member(Address("tcp.akka", "sys", "e", 2552), Joining)
|
||||
val e2 = Member(Address("tcp.akka", "sys", "e", 2552), Up)
|
||||
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
|
||||
val a2 = Member(Address("akka.tcp", "sys", "a", 2552), Joining)
|
||||
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
|
||||
val b2 = Member(Address("akka.tcp", "sys", "b", 2552), Removed)
|
||||
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving)
|
||||
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
|
||||
val c3 = Member(Address("akka.tcp", "sys", "c", 2552), Exiting)
|
||||
val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving)
|
||||
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
|
||||
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
|
||||
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
|
||||
|
||||
"A Gossip" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -14,11 +14,11 @@ import scala.collection.immutable
|
|||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class HeartbeatNodeRingSpec extends WordSpec with MustMatchers {
|
||||
|
||||
val aa = Address("tcp.akka", "sys", "aa", 2552)
|
||||
val bb = Address("tcp.akka", "sys", "bb", 2552)
|
||||
val cc = Address("tcp.akka", "sys", "cc", 2552)
|
||||
val dd = Address("tcp.akka", "sys", "dd", 2552)
|
||||
val ee = Address("tcp.akka", "sys", "ee", 2552)
|
||||
val aa = Address("akka.tcp", "sys", "aa", 2552)
|
||||
val bb = Address("akka.tcp", "sys", "bb", 2552)
|
||||
val cc = Address("akka.tcp", "sys", "cc", 2552)
|
||||
val dd = Address("akka.tcp", "sys", "dd", 2552)
|
||||
val ee = Address("akka.tcp", "sys", "ee", 2552)
|
||||
|
||||
val nodes = Set(aa, bb, cc, dd, ee)
|
||||
|
||||
|
|
|
|||
|
|
@ -35,8 +35,8 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
"be sorted by address correctly" in {
|
||||
import Member.ordering
|
||||
// sorting should be done on host and port, only
|
||||
val m1 = Member(Address("tcp.akka", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m2 = Member(Address("tcp.akka", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
val m1 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m2 = Member(Address("akka.tcp", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
|
||||
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
|
||||
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
|
||||
|
|
@ -48,9 +48,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"have stable equals and hashCode" in {
|
||||
val m1 = Member(Address("tcp.akka", "sys1", "host1", 9000), MemberStatus.Joining)
|
||||
val m2 = Member(Address("tcp.akka", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m3 = Member(Address("tcp.akka", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
val m1 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Joining)
|
||||
val m2 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m3 = Member(Address("akka.tcp", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
|
||||
m1 must be(m2)
|
||||
m1.hashCode must be(m2.hashCode)
|
||||
|
|
@ -60,8 +60,8 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"have consistent ordering and equals" in {
|
||||
val address1 = Address("tcp.akka", "sys1", "host1", 9001)
|
||||
val address2 = Address("tcp.akka", "sys1", "host1", 9002)
|
||||
val address1 = Address("akka.tcp", "sys1", "host1", 9001)
|
||||
val address2 = Address("akka.tcp", "sys1", "host1", 9002)
|
||||
|
||||
val x = Member(address1, Exiting)
|
||||
val y = Member(address1, Removed)
|
||||
|
|
@ -71,9 +71,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
|
||||
"work with SortedSet" in {
|
||||
val address1 = Address("tcp.akka", "sys1", "host1", 9001)
|
||||
val address2 = Address("tcp.akka", "sys1", "host1", 9002)
|
||||
val address3 = Address("tcp.akka", "sys1", "host1", 9003)
|
||||
val address1 = Address("akka.tcp", "sys1", "host1", 9001)
|
||||
val address2 = Address("akka.tcp", "sys1", "host1", 9002)
|
||||
val address3 = Address("akka.tcp", "sys1", "host1", 9003)
|
||||
|
||||
(SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member])
|
||||
(SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member])
|
||||
|
|
|
|||
|
|
@ -14,8 +14,8 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
|
|||
|
||||
val collector = createMetricsCollector
|
||||
|
||||
val node1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), 1, collector.sample.metrics)
|
||||
val node2 = NodeMetrics(Address("tcp.akka", "sys", "a", 2555), 1, collector.sample.metrics)
|
||||
val node1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), 1, collector.sample.metrics)
|
||||
val node2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), 1, collector.sample.metrics)
|
||||
|
||||
val nodes: Seq[NodeMetrics] = {
|
||||
(1 to 100).foldLeft(List(node1, node2)) { (nodes, _) ⇒
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
|
|||
|
||||
"A MetricsGossip" must {
|
||||
"add new NodeMetrics" in {
|
||||
val m1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("tcp.akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
|
||||
m1.metrics.size must be > (3)
|
||||
m2.metrics.size must be > (3)
|
||||
|
|
@ -35,8 +35,8 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
|
|||
}
|
||||
|
||||
"merge peer metrics" in {
|
||||
val m1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("tcp.akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
|
||||
val g1 = MetricsGossip.empty :+ m1 :+ m2
|
||||
g1.nodes.size must be(2)
|
||||
|
|
@ -51,9 +51,9 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
|
|||
}
|
||||
|
||||
"merge an existing metric set for a node and update node ring" in {
|
||||
val m1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("tcp.akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
val m3 = NodeMetrics(Address("tcp.akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
|
||||
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
val m3 = NodeMetrics(Address("akka.tcp", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
|
||||
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
|
||||
|
||||
val g1 = MetricsGossip.empty :+ m1 :+ m2
|
||||
|
|
@ -72,14 +72,14 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
|
|||
}
|
||||
|
||||
"get the current NodeMetrics if it exists in the local nodes" in {
|
||||
val m1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val g1 = MetricsGossip.empty :+ m1
|
||||
g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
|
||||
}
|
||||
|
||||
"remove a node if it is no longer Up" in {
|
||||
val m1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("tcp.akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
|
||||
val g1 = MetricsGossip.empty :+ m1 :+ m2
|
||||
g1.nodes.size must be(2)
|
||||
|
|
@ -91,8 +91,8 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
|
|||
}
|
||||
|
||||
"filter nodes" in {
|
||||
val m1 = NodeMetrics(Address("tcp.akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("tcp.akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
|
||||
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
|
||||
|
||||
val g1 = MetricsGossip.empty :+ m1 :+ m2
|
||||
g1.nodes.size must be(2)
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ import akka.actor.Address
|
|||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class NodeMetricsSpec extends WordSpec with MustMatchers {
|
||||
|
||||
val node1 = Address("tcp.akka", "sys", "a", 2554)
|
||||
val node2 = Address("tcp.akka", "sys", "a", 2555)
|
||||
val node1 = Address("akka.tcp", "sys", "a", 2554)
|
||||
val node2 = Address("akka.tcp", "sys", "a", 2555)
|
||||
|
||||
"NodeMetrics must" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -19,10 +19,10 @@ class MetricsSelectorSpec extends WordSpec with MustMatchers {
|
|||
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = Map.empty
|
||||
}
|
||||
|
||||
val a1 = Address("tcp.akka", "sys", "a1", 2551)
|
||||
val b1 = Address("tcp.akka", "sys", "b1", 2551)
|
||||
val c1 = Address("tcp.akka", "sys", "c1", 2551)
|
||||
val d1 = Address("tcp.akka", "sys", "d1", 2551)
|
||||
val a1 = Address("akka.tcp", "sys", "a1", 2551)
|
||||
val b1 = Address("akka.tcp", "sys", "b1", 2551)
|
||||
val c1 = Address("akka.tcp", "sys", "c1", 2551)
|
||||
val d1 = Address("akka.tcp", "sys", "d1", 2551)
|
||||
|
||||
val decayFactor = Some(0.18)
|
||||
|
||||
|
|
|
|||
|
|
@ -16,10 +16,10 @@ class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString("""
|
|||
akka.remote.netty.tcp.port = 0
|
||||
""")) {
|
||||
|
||||
val a1 = Address("tcp.akka", "sys", "a1", 2551)
|
||||
val b1 = Address("tcp.akka", "sys", "b1", 2551)
|
||||
val c1 = Address("tcp.akka", "sys", "c1", 2551)
|
||||
val d1 = Address("tcp.akka", "sys", "d1", 2551)
|
||||
val a1 = Address("akka.tcp", "sys", "a1", 2551)
|
||||
val b1 = Address("akka.tcp", "sys", "b1", 2551)
|
||||
val c1 = Address("akka.tcp", "sys", "c1", 2551)
|
||||
val d1 = Address("akka.tcp", "sys", "d1", 2551)
|
||||
|
||||
val refA = system.actorFor(RootActorPath(a1) / "user" / "a")
|
||||
val refB = system.actorFor(RootActorPath(b1) / "user" / "b")
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
|
|||
import RemoteDeploymentDocSpec._
|
||||
|
||||
val other = ActorSystem("remote", system.settings.config)
|
||||
val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("tcp.akka", "s", "host", 1)).get
|
||||
val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka.tcp", "s", "host", 1)).get
|
||||
|
||||
override def afterTermination() { other.shutdown() }
|
||||
|
||||
|
|
@ -42,8 +42,8 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
|
|||
|
||||
"demonstrate address extractor" in {
|
||||
//#make-address
|
||||
val one = AddressFromURIString("tcp.akka://sys@host:1234")
|
||||
val two = Address("tcp.akka", "sys", "host", 1234) // this gives the same
|
||||
val one = AddressFromURIString("akka.tcp://sys@host:1234")
|
||||
val two = Address("akka.tcp", "sys", "host", 1234) // this gives the same
|
||||
//#make-address
|
||||
one must be === two
|
||||
}
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ trait Conductor { this: TestConductorExt ⇒
|
|||
throttle(node, target, direction, 0f)
|
||||
|
||||
private def requireTestConductorTranport(): Unit =
|
||||
if (!transport.defaultAddress.protocol.contains(".gremlin.trttl."))
|
||||
if (!transport.defaultAddress.protocol.contains(".trttl.gremlin."))
|
||||
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
|
||||
"(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.")
|
||||
|
||||
|
|
|
|||
|
|
@ -44,13 +44,13 @@ object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with Ex
|
|||
trait SchemeAugmenter {
|
||||
protected def addedSchemeIdentifier: String
|
||||
|
||||
protected def augmentScheme(originalScheme: String): String = s"$originalScheme.$addedSchemeIdentifier"
|
||||
protected def augmentScheme(originalScheme: String): String = s"$addedSchemeIdentifier.$originalScheme"
|
||||
|
||||
protected def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol))
|
||||
|
||||
protected def removeScheme(scheme: String): String =
|
||||
if (scheme.endsWith(s".$addedSchemeIdentifier"))
|
||||
scheme.take(scheme.length - addedSchemeIdentifier.length - 1)
|
||||
if (scheme.startsWith(s"$addedSchemeIdentifier."))
|
||||
scheme.drop(addedSchemeIdentifier.length + 1)
|
||||
else scheme
|
||||
|
||||
protected def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol))
|
||||
|
|
|
|||
|
|
@ -196,7 +196,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
|
|||
|
||||
implicit val executionContext: ExecutionContext = system.dispatcher
|
||||
|
||||
override val schemeIdentifier: String = TransportMode + (if (EnableSsl) ".ssl" else "")
|
||||
override val schemeIdentifier: String = (if (EnableSsl) "ssl." else "") + TransportMode
|
||||
override val maximumPayloadBytes: Int = 32000 // The number of octets required by the remoting specification
|
||||
|
||||
private final val isDatagram: Boolean = TransportMode == Udp
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ akka {
|
|||
actor {
|
||||
provider = "akka.remote.RemoteActorRefProvider"
|
||||
deployment {
|
||||
/watchers.remote = "tcp.akka://other@localhost:2666"
|
||||
/watchers.remote = "akka.tcp://other@localhost:2666"
|
||||
}
|
||||
}
|
||||
remote.netty.tcp {
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ akka {
|
|||
/blub {
|
||||
router = round-robin
|
||||
nr-of-instances = 2
|
||||
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
|
||||
target.nodes = ["akka.tcp://remote-sys@localhost:12347"]
|
||||
}
|
||||
/elastic-blub {
|
||||
router = round-robin
|
||||
|
|
@ -37,10 +37,10 @@ akka {
|
|||
lower-bound = 2
|
||||
upper-bound = 3
|
||||
}
|
||||
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
|
||||
target.nodes = ["akka.tcp://remote-sys@localhost:12347"]
|
||||
}
|
||||
/remote-blub {
|
||||
remote = "tcp.akka://remote-sys@localhost:12347"
|
||||
remote = "akka.tcp://remote-sys@localhost:12347"
|
||||
router = round-robin
|
||||
nr-of-instances = 2
|
||||
}
|
||||
|
|
@ -48,12 +48,12 @@ akka {
|
|||
remote = "akka://RemoteRouterSpec"
|
||||
router = round-robin
|
||||
nr-of-instances = 2
|
||||
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
|
||||
target.nodes = ["akka.tcp://remote-sys@localhost:12347"]
|
||||
}
|
||||
/local-blub2 {
|
||||
router = round-robin
|
||||
nr-of-instances = 4
|
||||
target.nodes = ["tcp.akka://remote-sys@localhost:12347"]
|
||||
target.nodes = ["akka.tcp://remote-sys@localhost:12347"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -85,13 +85,13 @@ akka.actor.deployment {
|
|||
val children = replies.toSet
|
||||
children must have size 2
|
||||
children.map(_.parent) must have size 1
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy its children on remote host driven by programatic definition" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2),
|
||||
Seq(Address("tcp.akka", "remote-sys", "localhost", 12347)))), "blub2")
|
||||
Seq(Address("akka.tcp", "remote-sys", "localhost", 12347)))), "blub2")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
|
|
@ -99,7 +99,7 @@ akka.actor.deployment {
|
|||
val children = replies.toSet
|
||||
children must have size 2
|
||||
children.map(_.parent) must have size 1
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
|
|
@ -112,13 +112,13 @@ akka.actor.deployment {
|
|||
val children = replies.toSet
|
||||
children.size must be >= 2
|
||||
children.map(_.parent) must have size 1
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy remote routers based on configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(FromConfig), "remote-blub")
|
||||
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
|
||||
router.path.address.toString must be("akka.tcp://remote-sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
|
|
@ -128,14 +128,14 @@ akka.actor.deployment {
|
|||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy remote routers based on explicit deployment" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "remote-blub2")
|
||||
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://remote-sys@localhost:12347")))), "remote-blub2")
|
||||
router.path.address.toString must be("akka.tcp://remote-sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
|
|
@ -145,13 +145,13 @@ akka.actor.deployment {
|
|||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"let remote deployment be overridden by local configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "local-blub")
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://remote-sys@localhost:12347")))), "local-blub")
|
||||
router.path.address.toString must be("akka://RemoteRouterSpec")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
|
|
@ -161,15 +161,15 @@ akka.actor.deployment {
|
|||
children must have size 2
|
||||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head.address must be(Address("tcp.akka", "remote-sys", "localhost", 12347))
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
parents.head.address must be(Address("akka.tcp", "remote-sys", "localhost", 12347))
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"let remote deployment router be overridden by local configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "local-blub2")
|
||||
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://remote-sys@localhost:12347")))), "local-blub2")
|
||||
router.path.address.toString must be("akka.tcp://remote-sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
|
|
@ -179,14 +179,14 @@ akka.actor.deployment {
|
|||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"let remote deployment be overridden by remote configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("tcp.akka://remote-sys@localhost:12347")))), "remote-override")
|
||||
router.path.address.toString must be("tcp.akka://remote-sys@localhost:12347")
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka.tcp://remote-sys@localhost:12347")))), "remote-override")
|
||||
router.path.address.toString must be("akka.tcp://remote-sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
|
|
@ -196,7 +196,7 @@ akka.actor.deployment {
|
|||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "tcp.akka://remote-sys@localhost:12347")
|
||||
children foreach (_.address.toString must be === "akka.tcp://remote-sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
|
|
@ -206,7 +206,7 @@ akka.actor.deployment {
|
|||
}
|
||||
val router = system.actorOf(Props.empty.withRouter(new RemoteRouterConfig(
|
||||
RoundRobinRouter(1, supervisorStrategy = escalator),
|
||||
Seq(Address("tcp.akka", "remote-sys", "localhost", 12347)))), "blub3")
|
||||
Seq(Address("akka.tcp", "remote-sys", "localhost", 12347)))), "blub3")
|
||||
|
||||
router ! CurrentRoutees
|
||||
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
||||
|
|
|
|||
|
|
@ -77,9 +77,9 @@ object RemotingSpec {
|
|||
}
|
||||
|
||||
actor.deployment {
|
||||
/blub.remote = "test.akka://remote-sys@localhost:12346"
|
||||
/looker/child.remote = "test.akka://remote-sys@localhost:12346"
|
||||
/looker/child/grandchild.remote = "test.akka://RemotingSpec@localhost:12345"
|
||||
/blub.remote = "akka.test://remote-sys@localhost:12346"
|
||||
/looker/child.remote = "akka.test://remote-sys@localhost:12346"
|
||||
/looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345"
|
||||
}
|
||||
}
|
||||
""".format(
|
||||
|
|
@ -105,11 +105,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
(name, proto) ← Seq(
|
||||
"/gonk" -> "tcp",
|
||||
"/zagzag" -> "udp",
|
||||
"/roghtaar" -> "tcp.ssl")
|
||||
"/roghtaar" -> "ssl.tcp")
|
||||
) deploy(system, Deploy(name, scope = RemoteScope(addr(other, proto))))
|
||||
|
||||
def addr(sys: ActorSystem, proto: String) =
|
||||
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"$proto.akka", "", "", 0)).get
|
||||
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
||||
def port(sys: ActorSystem, proto: String) = addr(sys, proto).port.get
|
||||
def deploy(sys: ActorSystem, d: Deploy) {
|
||||
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
|
||||
|
|
@ -121,7 +121,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
}
|
||||
}), "echo")
|
||||
|
||||
val here = system.actorFor("test.akka://remote-sys@localhost:12346/user/echo")
|
||||
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
|
||||
|
||||
override def afterTermination() {
|
||||
other.shutdown()
|
||||
|
|
@ -137,7 +137,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
|
||||
"send error message for wrong address" in {
|
||||
filterEvents(EventFilter[EndpointException](occurrences = 6), EventFilter.error(start = "Association", occurrences = 6)) {
|
||||
system.actorFor("test.akka://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -150,13 +150,13 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
|
||||
"send dead letters on remote if actor does not exist" in {
|
||||
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
|
||||
system.actorFor("test.akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
|
||||
system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh"
|
||||
}(other)
|
||||
}
|
||||
|
||||
"create and supervise children on remote node" in {
|
||||
val r = system.actorOf(Props[Echo], "blub")
|
||||
r.path.toString must be === "test.akka://remote-sys@localhost:12346/remote/test.akka/RemotingSpec@localhost:12345/user/blub"
|
||||
r.path.toString must be === "akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/blub"
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
@ -201,7 +201,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"be able to use multiple transports and use the appropriate one (TCP)" in {
|
||||
val r = system.actorOf(Props[Echo], "gonk")
|
||||
r.path.toString must be ===
|
||||
s"tcp.akka://remote-sys@localhost:${port(other, "tcp")}/remote/tcp.akka/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
|
||||
s"akka.tcp://remote-sys@localhost:${port(other, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
@ -217,7 +217,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"be able to use multiple transports and use the appropriate one (UDP)" in {
|
||||
val r = system.actorOf(Props[Echo], "zagzag")
|
||||
r.path.toString must be ===
|
||||
s"udp.akka://remote-sys@localhost:${port(other, "udp")}/remote/udp.akka/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
|
||||
s"akka.udp://remote-sys@localhost:${port(other, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
|
||||
r ! 42
|
||||
expectMsg(10.seconds, 42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
@ -233,7 +233,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"be able to use multiple transports and use the appropriate one (SSL)" in {
|
||||
val r = system.actorOf(Props[Echo], "roghtaar")
|
||||
r.path.toString must be ===
|
||||
s"tcp.ssl.akka://remote-sys@localhost:${port(other, "tcp.ssl")}/remote/tcp.ssl.akka/RemotingSpec@localhost:${port(system, "tcp.ssl")}/user/roghtaar"
|
||||
s"akka.ssl.tcp://remote-sys@localhost:${port(other, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
|
||||
r ! 42
|
||||
expectMsg(10.seconds, 42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
|
|||
|
|
@ -63,10 +63,10 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
|||
""")
|
||||
|
||||
val localAddress = Address("test", "testsystem", "testhost", 1234)
|
||||
val localAkkaAddress = Address("test.akka", "testsystem", "testhost", 1234)
|
||||
val localAkkaAddress = Address("akka.test", "testsystem", "testhost", 1234)
|
||||
|
||||
val remoteAddress = Address("test", "testsystem2", "testhost2", 1234)
|
||||
val remoteAkkaAddress = Address("test.akka", "testsystem2", "testhost2", 1234)
|
||||
val remoteAkkaAddress = Address("akka.test", "testsystem2", "testhost2", 1234)
|
||||
|
||||
val codec = AkkaPduProtobufCodec
|
||||
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
|
|||
val addressATest: Address = Address("test", "testsytemA", "testhostA", 4321)
|
||||
val addressBTest: Address = Address("test", "testsytemB", "testhostB", 5432)
|
||||
|
||||
val addressA: Address = addressATest.copy(protocol = s"${addressATest.protocol}.$schemeIdentifier")
|
||||
val addressB: Address = addressBTest.copy(protocol = s"${addressBTest.protocol}.$schemeIdentifier")
|
||||
val nonExistingAddress = Address("test." + schemeIdentifier, "nosystem", "nohost", 0)
|
||||
val addressA: Address = addressATest.copy(protocol = s"$schemeIdentifier.${addressATest.protocol}")
|
||||
val addressB: Address = addressBTest.copy(protocol = s"$schemeIdentifier.${addressATest.protocol}")
|
||||
val nonExistingAddress = Address(schemeIdentifier + ".test", "nosystem", "nohost", 0)
|
||||
|
||||
def freshTransport(testTransport: TestTransport): Transport
|
||||
def wrapTransport(transport: Transport): Transport =
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
|
|||
class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) {
|
||||
|
||||
def transportName = "ThrottlerTransportAdapter"
|
||||
def schemeIdentifier = "trttl.akka"
|
||||
def schemeIdentifier = "akka.trttl"
|
||||
def freshTransport(testTransport: TestTransport) =
|
||||
new ThrottlerTransportAdapter(testTransport, system.asInstanceOf[ExtendedActorSystem])
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue