From e0a9dd70ba8fb0e5df0e398d13aa043c24ceea83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 28 Jan 2013 14:38:33 +0100 Subject: [PATCH] Dead letters containing remote envelopes handled correctly #2959 - New DeadLetter class for handling remoting specific envelopes - Fixed error handling of name lookups - Name lookup is now handled via futures (future refactor opportunity) --- .../akka/cluster/ClusterDeathWatchSpec.scala | 20 ++----------- akka-remote/src/main/resources/reference.conf | 8 +++++ .../src/main/scala/akka/remote/Endpoint.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 29 +++++++++++++++++-- .../scala/akka/remote/RemoteSettings.scala | 2 ++ .../src/main/scala/akka/remote/Remoting.scala | 2 +- .../transport/netty/NettyTransport.scala | 21 +++++++++----- .../scala/akka/remote/RemoteConfigSpec.scala | 2 ++ .../akka/remote/RemoteDeathWatchSpec.scala | 16 +++++++++- 9 files changed, 71 insertions(+), 31 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index aca2ea196d..8a712dba0e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -106,22 +106,6 @@ abstract class ClusterDeathWatchSpec } - "receive Terminated when watched node is unknown host" taggedAs LongRunningTest in { - runOn(first) { - val path = RootActorPath(Address("akka", system.name, "unknownhost", 2552)) / "user" / "subject" - system.actorOf(Props(new Actor { - context.watch(context.actorFor(path)) - def receive = { - case t: Terminated ⇒ testActor ! t.actor.path - } - }), name = "observer2") - - expectMsg(path) - } - - enterBarrier("after-2") - } - "receive Terminated when watched path doesn't exist" taggedAs LongRunningTest in { runOn(first) { val path = RootActorPath(second) / "user" / "non-existing" @@ -135,7 +119,7 @@ abstract class ClusterDeathWatchSpec expectMsg(path) } - enterBarrier("after-3") + enterBarrier("after-2") } "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) { @@ -172,7 +156,7 @@ abstract class ClusterDeathWatchSpec testConductor.removeNode(fourth) } - enterBarrier("after-4") + enterBarrier("after-3") } } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 8cf1adccc7..ce5a572484 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -179,6 +179,14 @@ akka { retry-window = 3 s maximum-retries-in-window = 5 + # The length of time to gate an address whose name lookup has failed. + # No connection attempts will be made to an address while it remains + # gated. Any messages sent to a gated address will be directed to dead + # letters instead. Name lookups are costly, and the time to recovery + # is typically large, therefore this setting should be a value in the + # order of seconds or minutes. + gate-unknown-addresses-for = 60 s + ### Transports and adapters # List of the transport drivers that will be loaded by the remoting. diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index e678fa9551..0b5cc1306d 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -172,7 +172,7 @@ private[remote] class EndpointWriter( stash() stay() case Event(Status.Failure(e: InvalidAssociationException), _) ⇒ - log.error(e, "Tried to associate with invalid remote address [{}]. " + + log.error("Tried to associate with invalid remote address [{}]. " + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) case Event(Status.Failure(e), _) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index d70dc96d2e..1f32d88140 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -8,11 +8,12 @@ import akka.actor._ import akka.dispatch._ import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.Logging.Error -import akka.serialization.{ Serialization, SerializationExtension } +import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } import akka.pattern.pipe import scala.concurrent.Future import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } +import scala.throws object RemoteActorRefProvider { private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) @@ -59,6 +60,30 @@ object RemoteActorRefProvider { } } + + /* + * Remoting wraps messages destined to a remote host in a remoting specific envelope: EndpointManager.Send + * As these wrapped messages might arrive to the dead letters of an EndpointWriter, they need to be unwrapped + * and handled as dead letters to the original (remote) destination. Without this special case, DeathWatch related + * functionality breaks, like the special handling of Watch messages arriving to dead letters. + */ + private class RemoteDeadLetterActorRef(_provider: ActorRefProvider, + _path: ActorPath, + _eventStream: EventStream) extends DeadLetterActorRef(_provider, _path, _eventStream) { + + override def !(message: Any)(implicit sender: ActorRef): Unit = message match { + case EndpointManager.Send(m, senderOption, _) ⇒ super.!(m)(senderOption.orNull) + case _ ⇒ super.!(message)(sender) + } + + override def specialHandle(msg: Any): Boolean = msg match { + case EndpointManager.Send(m, _, _) ⇒ super.specialHandle(m) + case _ ⇒ super.specialHandle(msg) + } + + @throws(classOf[java.io.ObjectStreamException]) + override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized + } } /** @@ -93,7 +118,7 @@ class RemoteActorRefProvider( def log: LoggingAdapter = _log override def rootPath: ActorPath = local.rootPath - override def deadLetters: InternalActorRef = local.deadLetters + override val deadLetters: InternalActorRef = new RemoteDeadLetterActorRef(this, rootPath / "deadLetters", eventStream) // these are only available after init() override def rootGuardian: InternalActorRef = local.rootGuardian diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 9b57579304..2dbcea4e41 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -31,6 +31,8 @@ class RemoteSettings(val config: Config) { val RetryGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.retry-gate-closed-for"), MILLISECONDS) + val UnknownAddressGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.gate-unknown-addresses-for"), MILLISECONDS) + val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections") val MaximumRetriesInWindow: Int = getInt("akka.remote.maximum-retries-in-window") diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index adad076911..e78af62b27 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -334,7 +334,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow) { case InvalidAssociation(localAddress, remoteAddress, e) ⇒ - endpoints.markAsQuarantined(remoteAddress, e) + endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) Stop case NonFatal(e) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index a7b31042cf..0a3c1a65e4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -318,12 +318,14 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s case _ ⇒ None } - def addressToSocketAddress(addr: Address): InetSocketAddress = - new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get) + // TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960 + def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = + Future { new InetSocketAddress(InetAddress.getByName(addr.host.get), addr.port.get) } - override def listen: Future[(Address, Promise[AssociationEventListener])] = - (Promise[(Address, Promise[AssociationEventListener])]() complete Try { - val address = addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector)) + override def listen: Future[(Address, Promise[AssociationEventListener])] = { + for { + address ← addressToSocketAddress(Address("", "", settings.Hostname, settings.PortSelector)) + } yield { val newServerChannel = inboundBootstrap match { case b: ServerBootstrap ⇒ b.bind(address) case b: ConnectionlessBootstrap ⇒ b.bind(address) @@ -342,7 +344,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s (address, associationListenerPromise) case None ⇒ throw new NettyTransportException(s"Unknown local address type ${newServerChannel.getLocalAddress.getClass}") } - }).future + } + } override def associate(remoteAddress: Address): Future[AssociationHandle] = { if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound")) @@ -350,7 +353,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s val bootstrap: ClientBootstrap = outboundBootstrap (for { - readyChannel ← NettyFutureBridge(bootstrap.connect(addressToSocketAddress(remoteAddress))) map { + socketAddress ← addressToSocketAddress(remoteAddress) + readyChannel ← NettyFutureBridge(bootstrap.connect(socketAddress)) map { channel ⇒ if (EnableSsl) blocking { @@ -375,7 +379,8 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s readyChannel.getPipeline.get[ClientHandler](classOf[ClientHandler]).statusFuture } yield handle) recover { case c: CancellationException ⇒ throw new NettyTransportException("Connection was cancelled") with NoStackTrace - case NonFatal(t) ⇒ throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace + case u @ (_: UnknownHostException | _: SecurityException) ⇒ throw new InvalidAssociationException(u.getMessage, u.getCause) + case NonFatal(t) ⇒ throw new NettyTransportException(t.getMessage, t.getCause) with NoStackTrace } } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index cfdc8b1915..ae47123818 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -33,6 +33,8 @@ class RemoteConfigSpec extends AkkaSpec( LogRemoteLifecycleEvents must be(false) LogReceive must be(false) LogSend must be(false) + RetryGateClosedFor must be === 0.seconds + UnknownAddressGateClosedFor must be === 10.seconds MaximumRetriesInWindow must be === 5 RetryWindow must be === 3.seconds BackoffPeriod must be === 10.milliseconds diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index ca440806c7..ec0b55450f 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -4,8 +4,10 @@ package akka.remote import akka.testkit._ -import akka.actor.{ ActorSystem, DeathWatchSpec } +import akka.actor._ import com.typesafe.config.ConfigFactory +import akka.actor.RootActorPath +import scala.concurrent.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" @@ -35,4 +37,16 @@ akka { other.shutdown() } + "receive Terminated when watched node is unknown host" in { + val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject" + system.actorOf(Props(new Actor { + context.watch(context.actorFor(path)) + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer2") + + expectMsg(60.seconds, path) + } + }