Dead letters containing remote envelopes handled correctly #2959

- New DeadLetter class for handling remoting specific envelopes
 - Fixed error handling of name lookups
 - Name lookup is now handled via futures (future refactor opportunity)
This commit is contained in:
Endre Sándor Varga 2013-01-28 14:38:33 +01:00
parent f12e11df46
commit e0a9dd70ba
9 changed files with 71 additions and 31 deletions

View file

@ -106,22 +106,6 @@ abstract class ClusterDeathWatchSpec
}
"receive Terminated when watched node is unknown host" taggedAs LongRunningTest in {
runOn(first) {
val path = RootActorPath(Address("akka", system.name, "unknownhost", 2552)) / "user" / "subject"
system.actorOf(Props(new Actor {
context.watch(context.actorFor(path))
def receive = {
case t: Terminated testActor ! t.actor.path
}
}), name = "observer2")
expectMsg(path)
}
enterBarrier("after-2")
}
"receive Terminated when watched path doesn't exist" taggedAs LongRunningTest in {
runOn(first) {
val path = RootActorPath(second) / "user" / "non-existing"
@ -135,7 +119,7 @@ abstract class ClusterDeathWatchSpec
expectMsg(path)
}
enterBarrier("after-3")
enterBarrier("after-2")
}
"be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) {
@ -172,7 +156,7 @@ abstract class ClusterDeathWatchSpec
testConductor.removeNode(fourth)
}
enterBarrier("after-4")
enterBarrier("after-3")
}
}

View file

@ -179,6 +179,14 @@ akka {
retry-window = 3 s
maximum-retries-in-window = 5
# The length of time to gate an address whose name lookup has failed.
# No connection attempts will be made to an address while it remains
# gated. Any messages sent to a gated address will be directed to dead
# letters instead. Name lookups are costly, and the time to recovery
# is typically large, therefore this setting should be a value in the
# order of seconds or minutes.
gate-unknown-addresses-for = 60 s
### Transports and adapters
# List of the transport drivers that will be loaded by the remoting.

View file

@ -172,7 +172,7 @@ private[remote] class EndpointWriter(
stash()
stay()
case Event(Status.Failure(e: InvalidAssociationException), _)
log.error(e, "Tried to associate with invalid remote address [{}]. " +
log.error("Tried to associate with invalid remote address [{}]. " +
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
case Event(Status.Failure(e), _)

View file

@ -8,11 +8,12 @@ import akka.actor._
import akka.dispatch._
import akka.event.{ Logging, LoggingAdapter, EventStream }
import akka.event.Logging.Error
import akka.serialization.{ Serialization, SerializationExtension }
import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }
import akka.pattern.pipe
import scala.concurrent.Future
import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook }
import scala.throws
object RemoteActorRefProvider {
private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
@ -59,6 +60,30 @@ object RemoteActorRefProvider {
}
}
/*
* Remoting wraps messages destined to a remote host in a remoting specific envelope: EndpointManager.Send
* As these wrapped messages might arrive to the dead letters of an EndpointWriter, they need to be unwrapped
* and handled as dead letters to the original (remote) destination. Without this special case, DeathWatch related
* functionality breaks, like the special handling of Watch messages arriving to dead letters.
*/
private class RemoteDeadLetterActorRef(_provider: ActorRefProvider,
_path: ActorPath,
_eventStream: EventStream) extends DeadLetterActorRef(_provider, _path, _eventStream) {
override def !(message: Any)(implicit sender: ActorRef): Unit = message match {
case EndpointManager.Send(m, senderOption, _) super.!(m)(senderOption.orNull)
case _ super.!(message)(sender)
}
override def specialHandle(msg: Any): Boolean = msg match {
case EndpointManager.Send(m, _, _) super.specialHandle(m)
case _ super.specialHandle(msg)
}
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
}
/**
@ -93,7 +118,7 @@ class RemoteActorRefProvider(
def log: LoggingAdapter = _log
override def rootPath: ActorPath = local.rootPath
override def deadLetters: InternalActorRef = local.deadLetters
override val deadLetters: InternalActorRef = new RemoteDeadLetterActorRef(this, rootPath / "deadLetters", eventStream)
// these are only available after init()
override def rootGuardian: InternalActorRef = local.rootGuardian

View file

@ -31,6 +31,8 @@ class RemoteSettings(val config: Config) {
val RetryGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.retry-gate-closed-for"), MILLISECONDS)
val UnknownAddressGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.gate-unknown-addresses-for"), MILLISECONDS)
val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections")
val MaximumRetriesInWindow: Int = getInt("akka.remote.maximum-retries-in-window")

View file

@ -334,7 +334,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow) {
case InvalidAssociation(localAddress, remoteAddress, e)
endpoints.markAsQuarantined(remoteAddress, e)
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
Stop
case NonFatal(e)

View file

@ -318,12 +318,14 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
case _ None
}
def addressToSocketAddress(addr: Address): InetSocketAddress =
new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get)
// TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] =
Future { new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get) }
override def listen: Future[(Address, Promise[AssociationEventListener])] =
(Promise[(Address, Promise[AssociationEventListener])]() complete Try {
val address = addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector))
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
for {
address addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector))
} yield {
val newServerChannel = inboundBootstrap match {
case b: ServerBootstrap b.bind(address)
case b: ConnectionlessBootstrap b.bind(address)
@ -342,7 +344,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
(address, associationListenerPromise)
case None throw new NettyTransportException(s"Unknown local address type ${newServerChannel.getLocalAddress.getClass}")
}
}).future
}
}
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound"))
@ -350,7 +353,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
val bootstrap: ClientBootstrap = outboundBootstrap
(for {
readyChannel NettyFutureBridge(bootstrap.connect(addressToSocketAddress(remoteAddress))) map {
socketAddress addressToSocketAddress(remoteAddress)
readyChannel NettyFutureBridge(bootstrap.connect(socketAddress)) map {
channel
if (EnableSsl)
blocking {
@ -375,7 +379,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
readyChannel.getPipeline.get[ClientHandler](classOf[ClientHandler]).statusFuture
} yield handle) recover {
case c: CancellationException throw new NettyTransportException("Connection was cancelled") with NoStackTrace
case NonFatal(t) throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace
case u @ (_: UnknownHostException | _: SecurityException) throw new InvalidAssociationException(u.getMessage, u.getCause)
case NonFatal(t) throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace
}
}
}

View file

@ -33,6 +33,8 @@ class RemoteConfigSpec extends AkkaSpec(
LogRemoteLifecycleEvents must be(false)
LogReceive must be(false)
LogSend must be(false)
RetryGateClosedFor must be === 0.seconds
UnknownAddressGateClosedFor must be === 10.seconds
MaximumRetriesInWindow must be === 5
RetryWindow must be === 3.seconds
BackoffPeriod must be === 10.milliseconds

View file

@ -4,8 +4,10 @@
package akka.remote
import akka.testkit._
import akka.actor.{ ActorSystem, DeathWatchSpec }
import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.actor.RootActorPath
import scala.concurrent.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString("""
@ -35,4 +37,16 @@ akka {
other.shutdown()
}
"receive Terminated when watched node is unknown host" in {
val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject"
system.actorOf(Props(new Actor {
context.watch(context.actorFor(path))
def receive = {
case t: Terminated testActor ! t.actor.path
}
}), name = "observer2")
expectMsg(60.seconds, path)
}
}