Potential fix for #672
This commit is contained in:
parent
7582b1ec3a
commit
1e72fbfe8b
7 changed files with 15 additions and 19 deletions
|
|
@ -640,7 +640,7 @@ class LocalActorRef private[akka] (
|
||||||
initializeActorInstance
|
initializeActorInstance
|
||||||
|
|
||||||
if (isClientManaged_?)
|
if (isClientManaged_?)
|
||||||
Actor.remote.registerClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid)
|
Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
|
||||||
|
|
||||||
checkReceiveTimeout //Schedule the initial Receive timeout
|
checkReceiveTimeout //Schedule the initial Receive timeout
|
||||||
}
|
}
|
||||||
|
|
@ -663,7 +663,7 @@ class LocalActorRef private[akka] (
|
||||||
Actor.registry.unregister(this)
|
Actor.registry.unregister(this)
|
||||||
if (isRemotingEnabled) {
|
if (isRemotingEnabled) {
|
||||||
if (isClientManaged_?)
|
if (isClientManaged_?)
|
||||||
Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid)
|
Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
|
||||||
Actor.remote.unregister(this)
|
Actor.remote.unregister(this)
|
||||||
}
|
}
|
||||||
setActorSelfFields(actorInstance.get,null)
|
setActorSelfFields(actorInstance.get,null)
|
||||||
|
|
@ -1133,8 +1133,6 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
|
|
||||||
def start: ActorRef = synchronized {
|
def start: ActorRef = synchronized {
|
||||||
_status = ActorRefInternals.RUNNING
|
_status = ActorRefInternals.RUNNING
|
||||||
//if (clientManaged)
|
|
||||||
// Actor.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
|
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1142,8 +1140,6 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
if (_status == ActorRefInternals.RUNNING) {
|
if (_status == ActorRefInternals.RUNNING) {
|
||||||
_status = ActorRefInternals.SHUTDOWN
|
_status = ActorRefInternals.SHUTDOWN
|
||||||
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
|
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
|
||||||
// if (clientManaged)
|
|
||||||
// Actor.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -245,7 +245,7 @@ trait RemoteServerModule extends RemoteModule {
|
||||||
* Starts the server up
|
* Starts the server up
|
||||||
*/
|
*/
|
||||||
def start(): RemoteServerModule =
|
def start(): RemoteServerModule =
|
||||||
start(ReflectiveAccess.Remote.configDefaultAddress.getHostName,
|
start(ReflectiveAccess.Remote.configDefaultAddress.getAddress.getHostAddress,
|
||||||
ReflectiveAccess.Remote.configDefaultAddress.getPort,
|
ReflectiveAccess.Remote.configDefaultAddress.getPort,
|
||||||
None)
|
None)
|
||||||
|
|
||||||
|
|
@ -253,7 +253,7 @@ trait RemoteServerModule extends RemoteModule {
|
||||||
* Starts the server up
|
* Starts the server up
|
||||||
*/
|
*/
|
||||||
def start(loader: ClassLoader): RemoteServerModule =
|
def start(loader: ClassLoader): RemoteServerModule =
|
||||||
start(ReflectiveAccess.Remote.configDefaultAddress.getHostName,
|
start(ReflectiveAccess.Remote.configDefaultAddress.getAddress.getHostAddress,
|
||||||
ReflectiveAccess.Remote.configDefaultAddress.getPort,
|
ReflectiveAccess.Remote.configDefaultAddress.getPort,
|
||||||
Option(loader))
|
Option(loader))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ object Address {
|
||||||
def apply(hostname: String, port: Int) = new Address(hostname, port)
|
def apply(hostname: String, port: Int) = new Address(hostname, port)
|
||||||
def apply(inetAddress: InetSocketAddress): Address = inetAddress match {
|
def apply(inetAddress: InetSocketAddress): Address = inetAddress match {
|
||||||
case null => null
|
case null => null
|
||||||
case inet => new Address(inet.getHostName, inet.getPort)
|
case inet => new Address(inet.getAddress.getHostAddress, inet.getPort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,10 @@ import java.util.concurrent.atomic. {AtomicBoolean}
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class ReentrantGuard {
|
final class ReentrantGuard {
|
||||||
val lock = new ReentrantLock
|
val lock = new ReentrantLock
|
||||||
|
|
||||||
def withGuard[T](body: => T): T = {
|
final def withGuard[T](body: => T): T = {
|
||||||
lock.lock
|
lock.lock
|
||||||
try {
|
try {
|
||||||
body
|
body
|
||||||
|
|
@ -22,7 +22,7 @@ class ReentrantGuard {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def tryWithGuard[T](body: => T): T = {
|
final def tryWithGuard[T](body: => T): T = {
|
||||||
while(!lock.tryLock) { Thread.sleep(10) } // wait on the monitor to be unlocked
|
while(!lock.tryLock) { Thread.sleep(10) } // wait on the monitor to be unlocked
|
||||||
try {
|
try {
|
||||||
body
|
body
|
||||||
|
|
|
||||||
|
|
@ -144,7 +144,7 @@ abstract class RemoteClient private[akka] (
|
||||||
val module: NettyRemoteClientModule,
|
val module: NettyRemoteClientModule,
|
||||||
val remoteAddress: InetSocketAddress) extends Logging {
|
val remoteAddress: InetSocketAddress) extends Logging {
|
||||||
|
|
||||||
val name = this.getClass.getSimpleName + "@" + remoteAddress.getHostName + "::" + remoteAddress.getPort
|
val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort
|
||||||
|
|
||||||
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
|
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
|
||||||
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
|
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
|
||||||
|
|
@ -525,7 +525,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
|
||||||
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
|
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
|
||||||
if (optimizeLocalScoped_?) {
|
if (optimizeLocalScoped_?) {
|
||||||
val home = this.address
|
val home = this.address
|
||||||
if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddress.equals?
|
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) {//TODO: switch to InetSocketAddress.equals?
|
||||||
val localRef = findActorByIdOrUuid(serviceId,serviceId)
|
val localRef = findActorByIdOrUuid(serviceId,serviceId)
|
||||||
if (localRef ne null) return localRef //Code significantly simpler with the return statement
|
if (localRef ne null) return localRef //Code significantly simpler with the return statement
|
||||||
}
|
}
|
||||||
|
|
@ -538,7 +538,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
|
||||||
|
|
||||||
if (optimizeLocalScoped_?) {
|
if (optimizeLocalScoped_?) {
|
||||||
val home = this.address
|
val home = this.address
|
||||||
if (host == home.getHostName && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
|
if ((host == home.getHostName || host == home.getAddress.getHostAddress) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
|
||||||
return new LocalActorRef(factory, None) // Code is much simpler with return
|
return new LocalActorRef(factory, None) // Code is much simpler with return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -597,7 +597,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
||||||
case Some(s) => s.name
|
case Some(s) => s.name
|
||||||
case None =>
|
case None =>
|
||||||
val a = ReflectiveAccess.Remote.configDefaultAddress
|
val a = ReflectiveAccess.Remote.configDefaultAddress
|
||||||
"NettyRemoteServer@" + a.getHostName + ":" + a.getPort
|
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort
|
||||||
}
|
}
|
||||||
|
|
||||||
private val _isRunning = new Switch(false)
|
private val _isRunning = new Switch(false)
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ object ActorSerialization {
|
||||||
private[akka] def toAddressProtocol(actorRef: ActorRef) = {
|
private[akka] def toAddressProtocol(actorRef: ActorRef) = {
|
||||||
val address = actorRef.homeAddress.getOrElse(Actor.remote.address)
|
val address = actorRef.homeAddress.getOrElse(Actor.remote.address)
|
||||||
AddressProtocol.newBuilder
|
AddressProtocol.newBuilder
|
||||||
.setHostname(address.getHostName)
|
.setHostname(address.getAddress.getHostAddress)
|
||||||
.setPort(address.getPort)
|
.setPort(address.getPort)
|
||||||
.build
|
.build
|
||||||
}
|
}
|
||||||
|
|
@ -162,7 +162,7 @@ object ActorSerialization {
|
||||||
format: Format[T]): ActorRef = {
|
format: Format[T]): ActorRef = {
|
||||||
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
|
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
|
||||||
homeAddress.foreach { addr =>
|
homeAddress.foreach { addr =>
|
||||||
val addressProtocol = AddressProtocol.newBuilder.setHostname(addr.getHostName).setPort(addr.getPort).build
|
val addressProtocol = AddressProtocol.newBuilder.setHostname(addr.getAddress.getHostAddress).setPort(addr.getPort).build
|
||||||
builder.setOriginalAddress(addressProtocol)
|
builder.setOriginalAddress(addressProtocol)
|
||||||
}
|
}
|
||||||
fromProtobufToLocalActorRef(builder.build, format, None)
|
fromProtobufToLocalActorRef(builder.build, format, None)
|
||||||
|
|
|
||||||
|
|
@ -540,7 +540,7 @@ object TypedActor extends Logging {
|
||||||
config match {
|
config match {
|
||||||
case null => actorOf(typedActor)
|
case null => actorOf(typedActor)
|
||||||
case c: TypedActorConfiguration if (c._host.isDefined) =>
|
case c: TypedActorConfiguration if (c._host.isDefined) =>
|
||||||
Actor.remote.actorOf(typedActor, c._host.get.getHostName, c._host.get.getPort)
|
Actor.remote.actorOf(typedActor, c._host.get.getAddress.getHostAddress, c._host.get.getPort)
|
||||||
case _ => actorOf(typedActor)
|
case _ => actorOf(typedActor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue