- Fixes in contrib and docs
- Fixed getExternalAddressFor
This commit is contained in:
parent
f1177464ad
commit
6bd64d55bd
5 changed files with 15 additions and 18 deletions
|
|
@ -10,7 +10,7 @@ import akka.remote.testkit.MultiNodeConfig
|
||||||
import akka.remote.testkit.MultiNodeSpec
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
import akka.remote.testkit.STMultiNodeSpec
|
import akka.remote.testkit.STMultiNodeSpec
|
||||||
import org.scalatest.BeforeAndAfterEach
|
import org.scalatest.BeforeAndAfterEach
|
||||||
import akka.remote.testconductor.Direction
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.testkit.ImplicitSender
|
import akka.testkit.ImplicitSender
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ object RemoteDeploymentDocSpec {
|
||||||
|
|
||||||
class RemoteDeploymentDocSpec extends AkkaSpec("""
|
class RemoteDeploymentDocSpec extends AkkaSpec("""
|
||||||
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
akka.remote.netty.port = 0
|
akka.remoting.transports.tcp.port = 0
|
||||||
""") with ImplicitSender {
|
""") with ImplicitSender {
|
||||||
|
|
||||||
import RemoteDeploymentDocSpec._
|
import RemoteDeploymentDocSpec._
|
||||||
|
|
@ -42,8 +42,8 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"demonstrate address extractor" in {
|
"demonstrate address extractor" in {
|
||||||
//#make-address
|
//#make-address
|
||||||
val one = AddressFromURIString("akka://sys@host:1234")
|
val one = AddressFromURIString("tcp.akka://sys@host:1234")
|
||||||
val two = Address("akka", "sys", "host", 1234) // this gives the same
|
val two = Address("tcp.akka", "sys", "host", 1234) // this gives the same
|
||||||
//#make-address
|
//#make-address
|
||||||
one must be === two
|
one must be === two
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -228,7 +228,7 @@ class RemoteActorRefProvider(
|
||||||
def getExternalAddressFor(addr: Address): Option[Address] = {
|
def getExternalAddressFor(addr: Address): Option[Address] = {
|
||||||
addr match {
|
addr match {
|
||||||
case _ if hasAddress(addr) ⇒ Some(local.rootPath.address)
|
case _ if hasAddress(addr) ⇒ Some(local.rootPath.address)
|
||||||
case Address("akka", _, Some(_), Some(_)) ⇒ Some(transport.localAddressForRemote(addr))
|
case Address("akka", _, Some(_), Some(_)) ⇒ Some(transport.defaultAddress)
|
||||||
case _ ⇒ None
|
case _ ⇒ None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -292,7 +292,7 @@ private[remote] object EndpointManager {
|
||||||
|
|
||||||
def removeIfNotGated(endpoint: ActorRef): Unit = {
|
def removeIfNotGated(endpoint: ActorRef): Unit = {
|
||||||
endpointToAddress.get(endpoint) foreach { address ⇒
|
endpointToAddress.get(endpoint) foreach { address ⇒
|
||||||
addressToEndpointAndPolicy.get(address) foreach {
|
addressToEndpointAndPolicy.get(address) foreach {
|
||||||
case Pass(_) ⇒ addressToEndpointAndPolicy = addressToEndpointAndPolicy - address
|
case Pass(_) ⇒ addressToEndpointAndPolicy = addressToEndpointAndPolicy - address
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
}
|
}
|
||||||
|
|
@ -475,9 +475,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
AkkaPduProtobufCodec))
|
AkkaPduProtobufCodec))
|
||||||
.withDispatcher("akka.remoting.writer-dispatcher"),
|
.withDispatcher("akka.remoting.writer-dispatcher"),
|
||||||
"endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next()))
|
"endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next()))
|
||||||
|
|
||||||
context.watch(endpoint)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime()
|
private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime()
|
||||||
|
|
|
||||||
|
|
@ -20,15 +20,15 @@ private[remote] trait UdpHandlers extends CommonHandlers with HasTransport {
|
||||||
msg: ChannelBuffer,
|
msg: ChannelBuffer,
|
||||||
remoteSocketAddress: InetSocketAddress): Unit = {
|
remoteSocketAddress: InetSocketAddress): Unit = {
|
||||||
transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, listener) match {
|
transport.udpConnectionTable.putIfAbsent(remoteSocketAddress, listener) match {
|
||||||
case null => listener notify InboundPayload(ByteString(msg.array()))
|
case null ⇒ listener notify InboundPayload(ByteString(msg.array()))
|
||||||
case oldReader =>
|
case oldReader ⇒
|
||||||
throw new NettyTransportException(s"Listener $listener attempted to register for remote address $remoteSocketAddress" +
|
throw new NettyTransportException(s"Listener $listener attempted to register for remote address $remoteSocketAddress" +
|
||||||
s" but $oldReader was already registered.", null)
|
s" but $oldReader was already registered.", null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e.getRemoteAddress match {
|
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e.getRemoteAddress match {
|
||||||
case inetSocketAddress: InetSocketAddress =>
|
case inetSocketAddress: InetSocketAddress ⇒
|
||||||
if (!transport.udpConnectionTable.containsKey(inetSocketAddress)) {
|
if (!transport.udpConnectionTable.containsKey(inetSocketAddress)) {
|
||||||
e.getChannel.setReadable(false)
|
e.getChannel.setReadable(false)
|
||||||
initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer])
|
initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer])
|
||||||
|
|
@ -36,7 +36,7 @@ private[remote] trait UdpHandlers extends CommonHandlers with HasTransport {
|
||||||
val listener = transport.udpConnectionTable.get(inetSocketAddress)
|
val listener = transport.udpConnectionTable.get(inetSocketAddress)
|
||||||
listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))
|
listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))
|
||||||
}
|
}
|
||||||
case _ =>
|
case _ ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit
|
def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit
|
||||||
|
|
@ -57,9 +57,9 @@ private[remote] class UdpClientHandler(_transport: NettyTransport, _statusPromis
|
||||||
}
|
}
|
||||||
|
|
||||||
private[remote] class UdpAssociationHandle(val localAddress: Address,
|
private[remote] class UdpAssociationHandle(val localAddress: Address,
|
||||||
val remoteAddress: Address,
|
val remoteAddress: Address,
|
||||||
private val channel: Channel,
|
private val channel: Channel,
|
||||||
private val transport: NettyTransport) extends AssociationHandle {
|
private val transport: NettyTransport) extends AssociationHandle {
|
||||||
|
|
||||||
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
|
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
|
||||||
|
|
||||||
|
|
@ -74,6 +74,6 @@ private[remote] class UdpAssociationHandle(val localAddress: Address,
|
||||||
}
|
}
|
||||||
|
|
||||||
override def disassociate(): Unit = try channel.close()
|
override def disassociate(): Unit = try channel.close()
|
||||||
finally transport.udpConnectionTable.remove(transport.addressToSocketAddress(remoteAddress))
|
finally transport.udpConnectionTable.remove(transport.addressToSocketAddress(remoteAddress))
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue