diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index b0c27fb91c..b576ff68a3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -156,16 +156,6 @@ class RemoteActorRefProvider( // this enables reception of remote requests transport.start() - val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { - def receive = { - case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) - case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address) - case _ ⇒ //ignore other - } - }), "RemoteClientLifeCycleListener") - - system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 0a580d84cf..0331402148 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -14,147 +14,6 @@ import akka.actor._ import scala.collection.immutable import scala.concurrent.Future -/** - * Remote life-cycle events. - */ -sealed trait RemoteLifeCycleEvent extends Serializable { - def logLevel: Logging.LogLevel -} - -/** - * Life-cycle events for RemoteClient. - */ -trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { - def remoteAddress: Address - final def getRemoteAddress: Address = remoteAddress -} - -/** - * A RemoteClientError is a general error that is thrown within or from a RemoteClient - */ -case class RemoteClientError( - @BeanProperty cause: Throwable, - @transient remote: RemoteTransport, - remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = "RemoteClientError@" + remoteAddress + ": Error[" + Logging.stackTraceFor(cause) + "]" -} - -/** - * RemoteClientDisconnected is published when a RemoteClient's connection is disconnected - */ -case class RemoteClientDisconnected( - @transient @BeanProperty remote: RemoteTransport, - remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.DebugLevel - override def toString: String = "RemoteClientDisconnected@" + remoteAddress -} - -/** - * RemoteClientConnected is published when a RemoteClient's connection is established - */ -case class RemoteClientConnected( - @transient @BeanProperty remote: RemoteTransport, - remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.DebugLevel - override def toString: String = "RemoteClientConnected@" + remoteAddress -} - -/** - * RemoteClientStarted is published when a RemoteClient has started up - */ -case class RemoteClientStarted( - @transient @BeanProperty remote: RemoteTransport, - remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.InfoLevel - override def toString: String = "RemoteClientStarted@" + remoteAddress -} - -/** - * RemoteClientShutdown is published when a RemoteClient has shut down - */ -case class RemoteClientShutdown( - @transient @BeanProperty remote: RemoteTransport, - remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.InfoLevel - override def toString: String = "RemoteClientShutdown@" + remoteAddress -} - -/** - * Life-cycle events for RemoteServer. - */ -trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent - -/** - * RemoteServerStarted is published when a local RemoteServer has started up - */ -case class RemoteServerStarted( - @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.InfoLevel - override def toString: String = "RemoteServerStarted@" + remote -} - -/** - * RemoteServerShutdown is published when a local RemoteServer has shut down - */ -case class RemoteServerShutdown( - @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.InfoLevel - override def toString: String = "RemoteServerShutdown@" + remote -} - -/** - * A RemoteServerError is a general error that is thrown within or from a RemoteServer - */ -case class RemoteServerError( - @BeanProperty val cause: Throwable, - @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - - override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]" -} - -/** - * RemoteServerClientConnected is published when an inbound connection has been established - */ -case class RemoteServerClientConnected( - @transient @BeanProperty remote: RemoteTransport, - @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.DebugLevel - override def toString: String = - "RemoteServerClientConnected@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]" -} - -/** - * RemoteServerClientConnected is published when an inbound connection has been disconnected - */ -case class RemoteServerClientDisconnected( - @transient @BeanProperty remote: RemoteTransport, - @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.DebugLevel - override def toString: String = - "RemoteServerClientDisconnected@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]" -} - -/** - * RemoteServerClientClosed is published when an inbound RemoteClient is closed - */ -case class RemoteServerClientClosed( - @transient @BeanProperty remote: RemoteTransport, - @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { - override def logLevel: Logging.LogLevel = Logging.DebugLevel - override def toString: String = - "RemoteServerClientClosed@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]" -} - -/** - * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. - */ -class RemoteClientException private[akka] ( - message: String, - @transient @BeanProperty val client: RemoteTransport, - val remoteAddress: Address, cause: Throwable = null) extends AkkaException(message, cause) - /** * RemoteTransportException represents a general failure within a RemoteTransport, * such as inability to start, wrong configuration etc. @@ -198,29 +57,11 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re */ def start(): Unit - /** - * Attempts to shut down a specific client connected to the supplied remote address - */ - def shutdownClientConnection(address: Address): Unit - - /** - * Attempts to restart a specific client connected to the supplied remote address, but only if the client is not shut down - */ - def restartClientConnection(address: Address): Unit - /** * Sends the given message to the recipient supplying the sender if any */ def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit - /** - * Default implementation both publishes the message to the eventStream as well as logs it using the system logger - */ - def notifyListeners(message: RemoteLifeCycleEvent): Unit = { - system.eventStream.publish(message) - if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message) - } - /** * Sends a management command to the underlying transport stack. The call returns with a Future that indicates * if the command was handled successfully or dropped. @@ -244,121 +85,4 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re */ protected def logRemoteLifeCycleEvents: Boolean - /** - * Returns a newly created AkkaRemoteProtocol with the given message payload. - */ - def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = - AkkaRemoteProtocol.newBuilder.setPayload(rmp.toByteString).build - - /** - * Returns a newly created AkkaRemoteProtocol with the given control payload. - */ - def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol = - AkkaRemoteProtocol.newBuilder.setInstruction(rcp).build - - /** - * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. - */ - def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = - ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(defaultAddress)).build - - /** - * Returns a new RemoteMessageProtocol containing the serialized representation of the given parameters. - */ - def createRemoteMessageProtocolBuilder(recipient: ActorRef, message: Any, senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { - val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) - if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) - - Serialization.currentTransportAddress.withValue(defaultAddress) { - messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) - } - - messageBuilder - } - - /** - * Call this method with an inbound RemoteMessage and this will take care of security (see: "useUntrustedMode") - * as well as making sure that the message ends up at its destination (best effort). - * There is also a fair amount of logging produced by this method, which is good for debugging. - */ - def receiveMessage(remoteMessage: RemoteMessage): Unit = { - val remoteDaemon = provider.remoteDaemon - - remoteMessage.recipient match { - case `remoteDaemon` ⇒ - if (useUntrustedMode) log.debug("dropping daemon message in untrusted mode") - else { - if (provider.remoteSettings.LogReceive) log.debug("received daemon message {}", remoteMessage) - remoteMessage.payload match { - case m @ (_: DaemonMsg | _: Terminated) ⇒ - try remoteDaemon ! m catch { - case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender) - } - case x ⇒ log.debug("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) - } - } - case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ - if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage) - remoteMessage.payload match { - case msg: PossiblyHarmful if useUntrustedMode ⇒ log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass) - case msg: SystemMessage ⇒ l.sendSystemMessage(msg) - case msg ⇒ l.!(msg)(remoteMessage.sender) - } - case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !useUntrustedMode ⇒ - if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage) - remoteMessage.originalReceiver match { - case AddressFromURIString(address) if provider.transport.addresses(address) ⇒ - // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) - r.!(remoteMessage.payload)(remoteMessage.sender) - case r ⇒ - log.debug("dropping message {} for non-local recipient {} arriving at {} inbound addresses are {}", - remoteMessage.payloadClass, r, addresses, provider.transport.addresses) - } - case r ⇒ - log.debug("dropping message {} for unknown recipient {} arriving at {} inbound addresses are {}", - remoteMessage.payloadClass, r, addresses, provider.transport.addresses) - } - } -} - -/** - * RemoteMessage is a wrapper around a message that has come in over the wire, - * it allows to easily obtain references to the deserialized message, its intended recipient - * and the sender. - */ -class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) { - /** - * Returns a String-representation of the ActorPath that this RemoteMessage is destined for - */ - def originalReceiver: String = input.getRecipient.getPath - - /** - * Returns an Option with the String representation of the ActorPath of the Actor who is the sender of this message - */ - def originalSender: Option[String] = if (input.hasSender) Some(input.getSender.getPath) else None - - /** - * Returns a reference to the Actor that sent this message, or DeadLetterActorRef if not present or found. - */ - lazy val sender: ActorRef = - if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath) - else system.deadLetters - - /** - * Returns a reference to the Actor that this message is destined for. - * In case this returns a DeadLetterActorRef, you have access to the path using the "originalReceiver" method. - */ - lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) - - /** - * Returns the message - */ - lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage) - - def payloadClass: Class[_] = if (payload eq null) null else payload.getClass - - /** - * Returns a String representation of this RemoteMessage, intended for debugging purposes. - */ - override def toString: String = "RemoteMessage: " + payloadClass + " to " + recipient + "<+{" + originalReceiver + "} from " + sender } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 595b348cbe..3fbce8e10d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -220,17 +220,6 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc } } - // TODO: this is called in RemoteActorRefProvider to handle the lifecycle of connections (clients) - // which is not how things work in the new remoting - override def shutdownClientConnection(address: Address): Unit = { - // Ignore - } - - // TODO: this is never called anywhere, should be taken out from RemoteTransport API - override def restartClientConnection(address: Address): Unit = { - // Ignore - } - override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match { case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender) case None ⇒ throw new IllegalStateException("Attempted to send remote message but Remoting is not running.") diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala deleted file mode 100644 index f8a2943320..0000000000 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.remote.netty - -import java.util.concurrent.TimeUnit -import java.net.{ InetAddress, InetSocketAddress } -import org.jboss.netty.util.{ Timeout, TimerTask, HashedWheelTimer } -import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.channel.group.DefaultChannelGroup -import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, DefaultChannelPipeline, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel } -import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } -import org.jboss.netty.handler.execution.ExecutionHandler -import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler } -import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, CommandType, AkkaRemoteProtocol } -import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected } -import akka.AkkaException -import akka.event.Logging -import akka.actor.{ DeadLetter, Address, ActorRef } -import akka.util.Switch -import scala.util.control.NonFatal -import org.jboss.netty.handler.ssl.SslHandler -import scala.concurrent.duration._ -import java.nio.channels.ClosedChannelException - -/** - * This is the abstract baseclass for netty remote clients, currently there's only an - * ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that - * reuses an already established connection. - */ -private[akka] abstract class RemoteClient private[akka] (val netty: NettyRemoteTransport, val remoteAddress: Address) { - - val log = Logging(netty.system, "RemoteClient") - - val name = Logging.simpleName(this) + "@" + remoteAddress - - private[remote] val runSwitch = new Switch() - - private[remote] def isRunning = runSwitch.isOn - - protected def currentChannel: Channel - - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean - - def shutdown(): Boolean - - /** - * Converts the message to the wireprotocol and sends the message across the wire - */ - def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) - send((message, senderOption, recipient)) - } else { - val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress) - netty.notifyListeners(RemoteClientError(exception, netty, remoteAddress)) - throw exception - } - - /** - * Sends the message across the wire - */ - private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { - try { - val channel = currentChannel - if (channel.isOpen) { - val f = channel.write(request) - f.addListener( - new ChannelFutureListener { - import netty.system.deadLetters - def operationComplete(future: ChannelFuture): Unit = - if (future.isCancelled || !future.isSuccess) request match { - case (msg, sender, recipient) ⇒ deadLetters ! DeadLetter(msg, sender.getOrElse(deadLetters), recipient) - // We don't call notifyListeners here since we don't think failed message deliveries are errors - /// If the connection goes down we'll get the error reporting done by the pipeline. - } - }) - // Check if we should back off - if (!channel.isWritable) { - val backoff = netty.settings.BackoffTimeout - if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off - } - } - } catch { - case NonFatal(e) ⇒ netty.notifyListeners(RemoteClientError(e, netty, remoteAddress)) - } - } - - override def toString: String = name -} - -/** - * RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node. - */ -private[akka] class ActiveRemoteClient private[akka] ( - netty: NettyRemoteTransport, - remoteAddress: Address, - localAddress: Address) - extends RemoteClient(netty, remoteAddress) { - - import netty.settings - - //TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation) - @volatile - private var bootstrap: ClientBootstrap = _ - @volatile - private var connection: ChannelFuture = _ - @volatile - private[remote] var openChannels: DefaultChannelGroup = _ - - @volatile - private var reconnectionDeadline: Option[Deadline] = None - - def notifyListeners(msg: RemoteLifeCycleEvent): Unit = netty.notifyListeners(msg) - - def currentChannel = connection.getChannel - - /** - * Connect to remote server. - */ - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { - - // Returns whether the handshake was written to the channel or not - def sendSecureCookie(connection: ChannelFuture): Boolean = { - val future = - if (!connection.isSuccess || !settings.EnableSSL) connection - else connection.getChannel.getPipeline.get[SslHandler](classOf[SslHandler]).handshake().awaitUninterruptibly() - - if (!future.isSuccess) { - notifyListeners(RemoteClientError(future.getCause, netty, remoteAddress)) - false - } else { - ChannelAddress.set(connection.getChannel, Some(remoteAddress)) - val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) - if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get) - handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(localAddress.system) - .setHostname(localAddress.host.get) - .setPort(localAddress.port.get) - .build) - connection.getChannel.write(netty.createControlEnvelope(handshake.build)) - true - } - } - - def attemptReconnect(): Boolean = { - val remoteIP = InetAddress.getByName(remoteAddress.host.get) - log.debug("Remote client reconnecting to [{}|{}]", remoteAddress, remoteIP) - connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get)) - openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. - sendSecureCookie(connection) - } - - runSwitch switchOn { - openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) - - val b = new ClientBootstrap(netty.clientChannelFactory) - b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), withTimeout = true, isClient = true)) - b.setOption("tcpNoDelay", true) - b.setOption("keepAlive", true) - b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) - settings.ReceiveBufferSize.foreach(sz ⇒ b.setOption("receiveBufferSize", sz)) - settings.SendBufferSize.foreach(sz ⇒ b.setOption("sendBufferSize", sz)) - settings.WriteBufferHighWaterMark.foreach(sz ⇒ b.setOption("writeBufferHighWaterMark", sz)) - settings.WriteBufferLowWaterMark.foreach(sz ⇒ b.setOption("writeBufferLowWaterMark", sz)) - settings.OutboundLocalAddress.foreach(s ⇒ b.setOption("localAddress", new InetSocketAddress(s, 0))) - bootstrap = b - - val remoteIP = InetAddress.getByName(remoteAddress.host.get) - log.debug("Starting remote client connection to [{}|{}]", remoteAddress, remoteIP) - - connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get)) - - openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. - - if (sendSecureCookie(connection)) { - notifyListeners(RemoteClientStarted(netty, remoteAddress)) - true - } else { - connection.getChannel.close() - openChannels.remove(connection.getChannel) - false - } - } match { - case true ⇒ true - case false if reconnectIfAlreadyConnected ⇒ - log.debug("Remote client reconnecting to [{}]", remoteAddress) - attemptReconnect() - case false ⇒ false - } - } - - // Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients - def shutdown() = runSwitch switchOff { - log.debug("Shutting down remote client [{}]", name) - - notifyListeners(RemoteClientShutdown(netty, remoteAddress)) - try { - if ((connection ne null) && (connection.getChannel ne null)) { - val channel = connection.getChannel - ChannelAddress.remove(channel) - // Try to disconnect first to reduce "connection reset by peer" events - if (channel.isConnected) channel.disconnect() - if (channel.isOpen) channel.close() - } - } finally { - try { - if (openChannels ne null) openChannels.close.awaitUninterruptibly() - } finally { - connection = null - } - } - - log.debug("[{}] has been shut down", name) - } - - private[akka] def isWithinReconnectionTimeWindow: Boolean = reconnectionDeadline match { - case None ⇒ - reconnectionDeadline = Some(Deadline.now + settings.ReconnectionTimeWindow) - true - case Some(deadline) ⇒ - val hasTimeLeft = deadline.hasTimeLeft - if (hasTimeLeft) - log.info("Will try to reconnect to remote server for another [{}] milliseconds", deadline.timeLeft.toMillis) - hasTimeLeft - } - - private[akka] def resetReconnectionTimeWindow = reconnectionDeadline = None -} - -@ChannelHandler.Sharable -private[akka] class ActiveRemoteClientHandler( - val name: String, - val bootstrap: ClientBootstrap, - val remoteAddress: Address, - val localAddress: Address, - val timer: HashedWheelTimer, - val client: ActiveRemoteClient) - extends IdleStateAwareChannelHandler { - - def runOnceNow(thunk: ⇒ Unit): Unit = timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() } - }, 0, TimeUnit.MILLISECONDS) - - override def channelIdle(ctx: ChannelHandlerContext, e: IdleStateEvent) { - import IdleState._ - - def createHeartBeat(localAddress: Address, cookie: Option[String]): AkkaRemoteProtocol = { - val beat = RemoteControlProtocol.newBuilder.setCommandType(CommandType.HEARTBEAT) - if (cookie.nonEmpty) beat.setCookie(cookie.get) - - client.netty.createControlEnvelope( - beat.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(localAddress.system) - .setHostname(localAddress.host.get) - .setPort(localAddress.port.get) - .build).build) - } - - e.getState match { - case READER_IDLE | ALL_IDLE ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) } - case WRITER_IDLE ⇒ e.getChannel.write(createHeartBeat(localAddress, client.netty.settings.SecureCookie)) - } - } - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { - try { - event.getMessage match { - case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ - val rcp = arp.getInstruction - rcp.getCommandType match { - case CommandType.SHUTDOWN ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) } - case _ ⇒ //Ignore others - } - case arp: AkkaRemoteProtocol if arp.hasPayload ⇒ - client.netty.receiveMessage(new RemoteMessage(RemoteMessageProtocol.parseFrom(arp.getPayload), client.netty.system)) - case other ⇒ - throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress) - } - } catch { - case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress)) - } - } - - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { - if (client.isWithinReconnectionTimeWindow) { - timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = - if (client.isRunning) { - client.openChannels.remove(event.getChannel) - client.connect(reconnectIfAlreadyConnected = true) - } - }, client.netty.settings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) - } else runOnceNow { - client.netty.shutdownClientConnection(remoteAddress) // spawn in another thread - } - } - - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - try { - client.notifyListeners(RemoteClientConnected(client.netty, client.remoteAddress)) - client.resetReconnectionTimeWindow - } catch { - case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress)) - } - } - - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.notifyListeners(RemoteClientDisconnected(client.netty, client.remoteAddress)) - } - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - val cause = if (event.getCause ne null) event.getCause else new AkkaException("Unknown cause") - cause match { - case _: ClosedChannelException ⇒ // Ignore - case NonFatal(e) ⇒ - client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress)) - event.getChannel.close() - case e: Throwable ⇒ throw e // Rethrow fatals - } - } -} - -private[akka] class PassiveRemoteClient(val currentChannel: Channel, - netty: NettyRemoteTransport, - remoteAddress: Address) extends RemoteClient(netty, remoteAddress) { - - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { - netty.notifyListeners(RemoteClientStarted(netty, remoteAddress)) - log.debug("Starting remote client connection to [{}]", remoteAddress) - } - - def shutdown() = runSwitch switchOff { - log.debug("Shutting down remote client [{}]", name) - - netty.notifyListeners(RemoteClientShutdown(netty, remoteAddress)) - log.debug("[{}] has been shut down", name) - } -} - diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala deleted file mode 100644 index a63394869e..0000000000 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.remote.netty - -import java.net.InetSocketAddress -import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } -import java.util.concurrent.locks.ReentrantReadWriteLock -import java.util.concurrent.Executors -import scala.collection.mutable -import scala.collection.immutable -import scala.util.control.NonFatal -import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture } -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, DefaultChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal } -import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } -import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } -import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } -import org.jboss.netty.handler.timeout.IdleStateHandler -import org.jboss.netty.util.{ DefaultObjectSizeEstimator, HashedWheelTimer } -import akka.event.Logging -import akka.remote.RemoteProtocol.AkkaRemoteProtocol -import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } -import scala.util.control.NonFatal -import akka.actor.{ ExtendedActorSystem, Address, ActorRef } -import com.google.protobuf.MessageLite -import scala.concurrent.Future - -private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] { - override def initialValue(ch: Channel): Option[Address] = None -} - -/** - * Provides the implementation of the Netty remote support - */ -private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { - - import provider.remoteSettings - - val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) - - // Workaround to emulate the support of multiple local addresses - override def localAddressForRemote(remote: Address): Address = defaultAddress - - // TODO replace by system.scheduler - val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory) - - val clientChannelFactory = { - val boss, worker = settings.UseDispatcherForIO.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() - new NioClientSocketChannelFactory(boss, worker, settings.ClientSocketWorkerPoolSize) - } - - /** - * Backing scaffolding for the default implementation of NettyRemoteSupport.createPipeline. - */ - object PipelineFactory { - /** - * Construct a DefaultChannelPipeline from a sequence of handlers; to be used - * in implementations of ChannelPipelineFactory. - */ - def apply(handlers: immutable.Seq[ChannelHandler]): DefaultChannelPipeline = - (new DefaultChannelPipeline /: handlers) { (p, h) ⇒ p.addLast(Logging.simpleName(h.getClass), h); p } - - /** - * Constructs the NettyRemoteTransport default pipeline with the give “head” handler, which - * is taken by-name to allow it not to be shared across pipelines. - * - * @param withTimeout determines whether an IdleStateHandler shall be included - */ - def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = - new ChannelPipelineFactory { override def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) } - - /** - * Construct a default protocol stack, excluding the “head” handler (i.e. the one which - * actually dispatches the received messages to the local target actors). - */ - def defaultStack(withTimeout: Boolean, isClient: Boolean): immutable.Seq[ChannelHandler] = - (if (settings.EnableSSL) List(NettySSLSupport(settings.SslSettings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: - (if (withTimeout) List(timeout) else Nil) ::: - msgFormat ::: - authenticator ::: - executionHandler - - /** - * Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer. - */ - def timeout = new IdleStateHandler(timer, - settings.ReadTimeout.toSeconds.toInt, - settings.WriteTimeout.toSeconds.toInt, - settings.AllTimeout.toSeconds.toInt) - - /** - * Construct frame&protobuf encoder/decoder. - */ - def msgFormat = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) :: - new LengthFieldPrepender(4) :: - new RemoteMessageDecoder :: - new RemoteMessageEncoder(NettyRemoteTransport.this) :: - Nil - - /** - * Construct an ExecutionHandler which is used to ensure that message dispatch does not - * happen on a netty thread (that could be bad if re-sending over the network for - * remote-deployed actors). - */ - val executionHandler = if (settings.ExecutionPoolSize != 0) - List(new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor( - settings.ExecutionPoolSize, - settings.MaxChannelMemorySize, - settings.MaxTotalMemorySize, - settings.ExecutionPoolKeepalive.length, - settings.ExecutionPoolKeepalive.unit, - AkkaProtocolMessageSizeEstimator, - system.threadFactory))) - else Nil - - /** - * Helps keep track of how many bytes are in flight - */ - object AkkaProtocolMessageSizeEstimator extends DefaultObjectSizeEstimator { - override final def estimateSize(o: AnyRef): Int = - o match { - case proto: MessageLite ⇒ - val msgSize = proto.getSerializedSize - val misalignment = msgSize % 8 - if (misalignment != 0) msgSize + 8 - misalignment else msgSize - case msg ⇒ super.estimateSize(msg) - } - } - - /** - * Construct and authentication handler which uses the SecureCookie to somewhat - * protect the TCP port from unauthorized use (don’t rely on it too much, though, - * as this is NOT a cryptographic feature). - */ - def authenticator = if (settings.RequireCookie) List(new RemoteServerAuthenticationHandler(settings.SecureCookie)) else Nil - } - - /** - * This method is factored out to provide an extension point in case the - * pipeline shall be changed. It is recommended to use - */ - def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = - PipelineFactory(Seq(endpoint), withTimeout, isClient) - - private val remoteClients = new mutable.HashMap[Address, RemoteClient] - private val clientsLock = new ReentrantReadWriteLock - - override protected def useUntrustedMode = remoteSettings.UntrustedMode - - override protected def logRemoteLifeCycleEvents = remoteSettings.LogRemoteLifeCycleEvents - - val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) ⇒ shutdown(); throw ex } - - /** - * Override this method to inject a subclass of NettyRemoteServer instead of - * the normal one, e.g. for inserting security hooks. If this method throws - * an exception, the transport will shut itself down and re-throw. - */ - protected def createServer(): NettyRemoteServer = new NettyRemoteServer(this) - - /** - * Override this method to inject a subclass of RemoteClient instead of - * the normal one, e.g. for inserting security hooks. Get this transport’s - * address from `this.address`. - */ - protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, defaultAddress) - - // the address is set in start() or from the RemoteServerHandler, whichever comes first - private val _address = new AtomicReference[Address] - private[akka] def setAddressFromChannel(ch: Channel) = { - val addr = ch.getLocalAddress match { - case sa: InetSocketAddress ⇒ sa - case x ⇒ throw new RemoteTransportException("unknown local address type " + x.getClass, null) - } - _address.compareAndSet(null, Address("akka", remoteSettings.systemName, settings.Hostname, addr.getPort)) - } - - // Workaround to emulate the support of multiple local addresses - override def addresses = Set(address) - def address = _address.get - override def defaultAddress: Address = _address.get - - lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + addresses + ")") - - def start(): Unit = { - server.start() - setAddressFromChannel(server.channel) - notifyListeners(RemoteServerStarted(this)) - } - - def shutdown(): Future[Unit] = { - clientsLock.writeLock().lock() - try { - remoteClients foreach { - case (_, client) ⇒ try client.shutdown() catch { - case NonFatal(e) ⇒ log.error(e, "failure while shutting down [{}]", client) - } - } - remoteClients.clear() - } finally { - clientsLock.writeLock().unlock() - try { - if (server != null) server.shutdown() - } finally { - try { - timer.stop() - } finally { - clientChannelFactory.releaseExternalResources() - } - } - } - Future successful (()) - } - - def send( - message: Any, - senderOption: Option[ActorRef], - recipient: RemoteActorRef): Unit = { - - val recipientAddress = recipient.path.address - - clientsLock.readLock.lock - try { - val client = remoteClients.get(recipientAddress) match { - case Some(client) ⇒ client - case None ⇒ - clientsLock.readLock.unlock - clientsLock.writeLock.lock //Lock upgrade, not supported natively - try { - try { - remoteClients.get(recipientAddress) match { - //Recheck for addition, race between upgrades - case Some(client) ⇒ client //If already populated by other writer - case None ⇒ //Populate map - val client = createClient(recipientAddress) - remoteClients += recipientAddress -> client - client - } - } finally { - clientsLock.readLock.lock - } //downgrade - } finally { - clientsLock.writeLock.unlock - } - } - client.connect() // this will literally do nothing after the first time - client.send(message, senderOption, recipient) - } finally { - clientsLock.readLock.unlock - } - } - - def bindClient(remoteAddress: Address, client: RemoteClient): Boolean = { - clientsLock.writeLock().lock() - try { - if (remoteClients.contains(remoteAddress)) false - else { - client.connect() - remoteClients.put(remoteAddress, client) - true - } - } finally { - clientsLock.writeLock().unlock() - } - } - - def unbindClient(remoteAddress: Address): Unit = shutdownClientConnection(remoteAddress) - - def shutdownClientConnection(remoteAddress: Address): Unit = { - clientsLock.writeLock().lock() - try { - remoteClients.remove(remoteAddress) foreach (_.shutdown()) - } finally { - clientsLock.writeLock().unlock() - } - } - - def restartClientConnection(remoteAddress: Address): Unit = { - clientsLock.readLock().lock() - try { - remoteClients.get(remoteAddress) foreach (_.connect(reconnectIfAlreadyConnected = true)) - } finally { - clientsLock.readLock().unlock() - } - } - -} - -private[akka] class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends ProtobufEncoder { - override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { - msg match { - case (message: Any, sender: Option[_], recipient: ActorRef) ⇒ - super.encode(ctx, channel, - remoteSupport.createMessageSendEnvelope( - remoteSupport.createRemoteMessageProtocolBuilder( - recipient, - message, - sender.asInstanceOf[Option[ActorRef]]).build)) - case _ ⇒ super.encode(ctx, channel, msg) - } - } -} - -private[akka] class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) - -private[akka] class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) { - protected val guard = new ReentrantReadWriteLock - protected val open = new AtomicBoolean(true) - - override def add(channel: Channel): Boolean = { - guard.readLock().lock() - try { - if (open.get) { - super.add(channel) - } else { - channel.close() - false - } - } finally { - guard.readLock().unlock() - } - } - - override def close(): ChannelGroupFuture = { - guard.writeLock().lock() - try { - if (open.getAndSet(false)) super.close() - else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") - } finally { - guard.writeLock().unlock() - } - } -} diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala deleted file mode 100644 index 2141df30a9..0000000000 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.remote.netty - -import akka.actor.Address -import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, CommandType, AkkaRemoteProtocol } -import akka.remote._ -import java.net.InetAddress -import java.net.InetSocketAddress -import java.nio.channels.ClosedChannelException -import java.util.concurrent.Executors -import org.jboss.netty.bootstrap.ServerBootstrap -import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.ChannelGroup -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import scala.util.control.NonFatal -import akka.AkkaException - -private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { - - import netty.settings - - val ip = InetAddress.getByName(settings.Hostname) - - private val factory = { - val boss, worker = settings.UseDispatcherForIO.map(netty.system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool() - new NioServerSocketChannelFactory(boss, worker, settings.ServerSocketWorkerPoolSize) - } - - // group of open channels, used for clean-up - private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - - private val bootstrap = { - val b = new ServerBootstrap(factory) - b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), withTimeout = false, isClient = false)) - b.setOption("backlog", settings.Backlog) - b.setOption("tcpNoDelay", true) - b.setOption("child.keepAlive", true) - b.setOption("reuseAddress", settings.ReuseAddress) - settings.ReceiveBufferSize.foreach(sz ⇒ b.setOption("receiveBufferSize", sz)) - settings.SendBufferSize.foreach(sz ⇒ b.setOption("sendBufferSize", sz)) - settings.WriteBufferHighWaterMark.foreach(sz ⇒ b.setOption("writeBufferHighWaterMark", sz)) - settings.WriteBufferLowWaterMark.foreach(sz ⇒ b.setOption("writeBufferLowWaterMark", sz)) - b - } - - @volatile - private[akka] var channel: Channel = _ - - def start(): Unit = { - channel = bootstrap.bind(new InetSocketAddress(ip, settings.PortSelector)) - openChannels.add(channel) - } - - def shutdown() { - try { - val shutdownSignal = { - val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) - b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(netty.address.system) - .setHostname(netty.address.host.get) - .setPort(netty.address.port.get) - .build) - if (settings.SecureCookie.nonEmpty) - b.setCookie(settings.SecureCookie.get) - b.build - } - openChannels.write(netty.createControlEnvelope(shutdownSignal)) - openChannels.disconnect - openChannels.close.awaitUninterruptibly - bootstrap.releaseExternalResources() - netty.notifyListeners(RemoteServerShutdown(netty)) - } catch { - case e: Exception ⇒ netty.notifyListeners(RemoteServerError(e, netty)) - } - } -} - -@ChannelHandler.Sharable -private[akka] class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler { - val authenticated = new AnyRef - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match { - case None ⇒ ctx.sendUpstream(event) - case Some(cookie) ⇒ - ctx.getAttachment match { - case `authenticated` ⇒ ctx.sendUpstream(event) - case null ⇒ event.getMessage match { - case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction ⇒ - val instruction = remoteProtocol.getInstruction - instruction.getCookie match { - case `cookie` ⇒ - ctx.setAttachment(authenticated) - ctx.sendUpstream(event) - case _ ⇒ - throw new SecurityException( - "The remote client [" + ctx.getChannel.getRemoteAddress + "] secure cookie is not the same as remote server secure cookie") - } - case _ ⇒ - throw new SecurityException("The remote client [" + ctx.getChannel.getRemoteAddress + "] is not authorized!") - } - } - } -} - -@ChannelHandler.Sharable -private[akka] class RemoteServerHandler( - val openChannels: ChannelGroup, - val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler { - - import netty.settings - - private var addressToSet = true - - // TODO look into moving that into onBind or similar, but verify that that is guaranteed to be the first to be called - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { - if (addressToSet) { - netty.setAddressFromChannel(event.getChannel) - addressToSet = false - } - super.handleUpstream(ctx, event) - } - - /** - * ChannelOpen overridden to store open channels for a clean postStop of a node. - * If a channel is closed before, it is automatically removed from the open channels group. - */ - override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) - - // TODO might want to log or otherwise signal that a TCP connection has been established here. - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = () - - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - netty.notifyListeners(RemoteServerClientDisconnected(netty, ChannelAddress.get(ctx.getChannel))) - } - - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val address = ChannelAddress.get(ctx.getChannel) - if (address.isDefined && settings.UsePassiveConnections) - netty.unbindClient(address.get) - - netty.notifyListeners(RemoteServerClientClosed(netty, address)) - ChannelAddress.remove(ctx.getChannel) - } - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { - event.getMessage match { - case remote: AkkaRemoteProtocol if remote.hasPayload ⇒ - netty.receiveMessage(new RemoteMessage(RemoteMessageProtocol.parseFrom(remote.getPayload), netty.system)) - case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ - val instruction = remote.getInstruction - instruction.getCommandType match { - case CommandType.CONNECT ⇒ - val origin = instruction.getOrigin - val inbound = Address("akka", origin.getSystem, origin.getHostname, origin.getPort) - ChannelAddress.set(event.getChannel, Option(inbound)) - - //If we want to reuse the inbound connections as outbound we need to get busy - if (settings.UsePassiveConnections) - netty.bindClient(inbound, new PassiveRemoteClient(event.getChannel, netty, inbound)) - - netty.notifyListeners(RemoteServerClientConnected(netty, Option(inbound))) - case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed - case CommandType.HEARTBEAT ⇒ //Other guy is still alive - case _ ⇒ //Unknown command - } - case _ ⇒ //ignore - } - } catch { - case e: Exception ⇒ netty.notifyListeners(RemoteServerError(e, netty)) - } - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - val cause = if (event.getCause ne null) event.getCause else new AkkaException("Unknown cause") - cause match { - case _: ClosedChannelException ⇒ // Ignore - case NonFatal(e) ⇒ - netty.notifyListeners(RemoteServerError(e, netty)) - event.getChannel.close() - case e: Throwable ⇒ throw e // Rethrow fatals - } - } -} - diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala deleted file mode 100644 index 2d4183b46d..0000000000 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.remote.netty - -import com.typesafe.config.Config -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit._ -import java.net.InetAddress -import akka.ConfigurationException -import akka.japi.Util.immutableSeq -import scala.concurrent.duration.FiniteDuration -import akka.dispatch.ThreadPoolConfig -import akka.util.Helpers - -private[akka] class NettySettings(config: Config, val systemName: String) { - - import config._ - - val BackoffTimeout: FiniteDuration = Duration(getMilliseconds("backoff-timeout"), MILLISECONDS) - - val SecureCookie: Option[String] = getString("secure-cookie") match { - case "" ⇒ None - case cookie ⇒ Some(cookie) - } - val RequireCookie: Boolean = { - val requireCookie = getBoolean("require-cookie") - if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( - "Configuration option 'akka.remote.netty.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.netty.secure-cookie'.") - requireCookie - } - - val UsePassiveConnections: Boolean = getBoolean("use-passive-connections") - val UseDispatcherForIO: Option[String] = getString("use-dispatcher-for-io") match { - case "" | null ⇒ None - case dispatcher ⇒ Some(dispatcher) - } - - val ReconnectionTimeWindow: FiniteDuration = Duration(getMilliseconds("reconnection-time-window"), MILLISECONDS) - val ReadTimeout: FiniteDuration = Duration(getMilliseconds("read-timeout"), MILLISECONDS) - val WriteTimeout: FiniteDuration = Duration(getMilliseconds("write-timeout"), MILLISECONDS) - val AllTimeout: FiniteDuration = Duration(getMilliseconds("all-timeout"), MILLISECONDS) - val ReconnectDelay: FiniteDuration = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS) - - val MessageFrameSize: Int = getBytes("message-frame-size").toInt - - private[this] def optionSize(s: String): Option[Int] = getBytes(s).toInt match { - case 0 ⇒ None - case x if x < 0 ⇒ - throw new ConfigurationException("Setting '%s' must be 0 or positive (and fit in an Int)" format s) - case other ⇒ Some(other) - } - - val WriteBufferHighWaterMark: Option[Int] = optionSize("write-buffer-high-water-mark") - val WriteBufferLowWaterMark: Option[Int] = optionSize("write-buffer-low-water-mark") - val SendBufferSize: Option[Int] = optionSize("send-buffer-size") - val ReceiveBufferSize: Option[Int] = optionSize("receive-buffer-size") - - val Hostname: String = getString("hostname") match { - case "" ⇒ InetAddress.getLocalHost.getHostAddress - case value ⇒ value - } - - val OutboundLocalAddress: Option[String] = getString("outbound-local-address") match { - case "auto" | "" | null ⇒ None - case some ⇒ Some(some) - } - - @deprecated("WARNING: This should only be used by professionals.", "2.0") - val PortSelector: Int = getInt("port") - - val ConnectionTimeout: FiniteDuration = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) - - val Backlog: Int = getInt("backlog") - - val ReuseAddress: Boolean = getString("reuse-address") match { - case "off-for-windows" ⇒ !Helpers.isWindows - case _ ⇒ getBoolean("reuse-address") - } - - val ExecutionPoolKeepalive: FiniteDuration = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) - - val ExecutionPoolSize: Int = getInt("execution-pool-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 0") - case sz ⇒ sz - } - - val MaxChannelMemorySize: Long = getBytes("max-channel-memory-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-channel-memory-size is less than 0 bytes") - case sz ⇒ sz - } - - val MaxTotalMemorySize: Long = getBytes("max-total-memory-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-total-memory-size is less than 0 bytes") - case sz ⇒ sz - } - - private def computeWPS(config: Config): Int = - ThreadPoolConfig.scaledPoolSize( - config.getInt("pool-size-min"), - config.getDouble("pool-size-factor"), - config.getInt("pool-size-max")) - - val ServerSocketWorkerPoolSize = computeWPS(config.getConfig("server-socket-worker-pool")) - - val ClientSocketWorkerPoolSize = computeWPS(config.getConfig("client-socket-worker-pool")) - - val SslSettings = new SSLSettings(config.getConfig("ssl")) - - val EnableSSL = getBoolean("ssl.enable") -} diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala similarity index 99% rename from akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala rename to akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala index 65ae01f8b4..949ef235fb 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala @@ -2,20 +2,21 @@ * Copyright (C) 2009-2013 Typesafe Inc. */ -package akka.remote.netty +package akka.remote.transport.netty -import org.jboss.netty.handler.ssl.SslHandler -import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext } -import akka.remote.RemoteTransportException -import akka.event.LoggingAdapter -import java.io.{ IOException, FileNotFoundException, FileInputStream } -import akka.remote.security.provider.AkkaProvider -import java.security._ -import com.typesafe.config.Config -import akka.japi.Util._ import akka.ConfigurationException +import akka.event.LoggingAdapter +import akka.japi.Util._ +import akka.remote.RemoteTransportException +import akka.remote.security.provider.AkkaProvider +import com.typesafe.config.Config +import java.io.{ IOException, FileNotFoundException, FileInputStream } +import java.security._ +import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext } +import org.jboss.netty.handler.ssl.SslHandler private[akka] class SSLSettings(config: Config) { + import config._ val SSLKeyStore = Option(getString("key-store")).filter(_.length > 0) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 6550753214..d1fee3658b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -3,30 +3,29 @@ */ package akka.remote.transport.netty -import akka.{ OnlyCauseStackTrace, ConfigurationException } import akka.actor.{ Address, ExtendedActorSystem } +import akka.dispatch.ThreadPoolConfig import akka.event.Logging -import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup } +import akka.remote.transport.AssociationHandle.HandleEventListener import akka.remote.transport.Transport._ import akka.remote.transport.netty.NettyTransportSettings.{ Udp, Tcp, Mode } import akka.remote.transport.{ AssociationHandle, Transport } +import akka.{ OnlyCauseStackTrace, ConfigurationException } import com.typesafe.config.Config import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException } -import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException } +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{ ConcurrentHashMap, Executors, CancellationException } import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap } import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer } import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener } +import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener } import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } +import org.jboss.netty.handler.ssl.SslHandler import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } import scala.concurrent.{ ExecutionContext, Promise, Future } -import scala.util.{ Try, Random } +import scala.util.Try import util.control.{ NoStackTrace, NonFatal } -import akka.dispatch.ThreadPoolConfig -import akka.remote.transport.AssociationHandle.HandleEventListener -import java.util.concurrent.atomic.AtomicInteger -import org.jboss.netty.handler.ssl.SslHandler object NettyTransportSettings { sealed trait Mode @@ -209,7 +208,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]() - val channelGroup = new DefaultDisposableChannelGroup("akka-netty-transport-driver-channelgroup-" + + /* + * Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap. + * In the old remoting this was handled by using a custom subclass, guarding the close() method with a write-lock. + * The usage of this class is safe in the new remoting, as close() is called after unbind() is finished, and no + * outbound connections are initiated in the shutdown phase. + */ + val channelGroup = new DefaultChannelGroup("akka-netty-transport-driver-channelgroup-" + uniqueIdCounter.getAndIncrement) private val clientChannelFactory: ChannelFactory = TransportMode match { diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index 752f7c4faa..fc0ca7cded 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -6,7 +6,6 @@ package akka.remote import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import akka.remote.netty.NettyRemoteTransport import akka.actor.Actor import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala deleted file mode 100644 index 18c3018827..0000000000 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.remote - -import akka.testkit._ -import akka.actor._ -import com.typesafe.config._ -import scala.concurrent.Future -import scala.concurrent.Await -import scala.reflect.classTag -import akka.pattern.ask -import akka.event.Logging - -object RemoteCommunicationSpec { - class Echo extends Actor { - var target: ActorRef = context.system.deadLetters - - def receive = { - case (p: Props, n: String) ⇒ sender ! context.actorOf(Props[Echo], n) - case ex: Exception ⇒ throw ex - case s: String ⇒ sender ! context.actorFor(s) - case x ⇒ target = sender; sender ! x - } - - override def preStart() {} - override def preRestart(cause: Throwable, msg: Option[Any]) { - target ! "preRestart" - } - override def postRestart(cause: Throwable) {} - override def postStop() { - target ! "postStop" - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RemoteCommunicationSpec extends AkkaSpec(""" -akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.transport = "akka.remote.netty.NettyRemoteTransport" - remote.netty { - hostname = localhost - port = 0 - } -} - """) with ImplicitSender with DefaultTimeout { - - import RemoteCommunicationSpec._ - - val other = ActorSystem("remote-sys", system.settings.config) - - val localAddr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - val remoteAddr = other.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - - val deploys = Seq( - Deploy("/blub", scope = RemoteScope(remoteAddr)), - Deploy("/looker/child", scope = RemoteScope(remoteAddr)), - Deploy("/looker/child/grandchild", scope = RemoteScope(localAddr))) - - def deploy(sys: ActorSystem, d: Deploy) { - sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) - } - - for (d ← deploys) { - deploy(system, d) - deploy(other, d) - } - - val remote = other.actorOf(Props(new Actor { - def receive = { - case "ping" ⇒ sender ! (("pong", sender)) - } - }), "echo") - - val here = system.actorFor(RootActorPath(remoteAddr) / "user" / "echo") - - override def afterTermination() { - other.shutdown() - } - - "Remoting" must { - - "support remote look-ups" in { - here ! "ping" - expectMsgPF() { - case ("pong", s: AnyRef) if s eq testActor ⇒ true - } - } - - "send error message for wrong address" in { - val old = other.eventStream.logLevel - other.eventStream.setLogLevel(Logging.DebugLevel) - EventFilter.debug(start = "dropping", occurrences = 1).intercept { - system.actorFor(RootActorPath(remoteAddr.copy(system = "remotesys")) / "user" / "echo") ! "ping" - }(other) - other.eventStream.setLogLevel(old) - } - - "support ask" in { - Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") - } - } - - "send dead letters on remote if actor does not exist" in { - EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { - system.actorFor(RootActorPath(remoteAddr) / "does" / "not" / "exist") ! "buh" - }(other) - } - - "create and supervise children on remote node" in { - val r = system.actorOf(Props[Echo], "blub") - r.path.toString must be === s"akka://remote-sys@localhost:${remoteAddr.port.get}/remote/akka/RemoteCommunicationSpec@localhost:${localAddr.port.get}/user/blub" - r ! 42 - expectMsg(42) - EventFilter[Exception]("crash", occurrences = 1).intercept { - r ! new Exception("crash") - }(other) - expectMsg("preRestart") - r ! 42 - expectMsg(42) - system.stop(r) - expectMsg("postStop") - } - - "look-up actors across node boundaries" in { - val l = system.actorOf(Props(new Actor { - def receive = { - case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) - case s: String ⇒ sender ! context.actorFor(s) - } - }), "looker") - l ! (Props[Echo], "child") - val r = expectMsgType[ActorRef] - r ! (Props[Echo], "grandchild") - val remref = expectMsgType[ActorRef] - remref.asInstanceOf[ActorRefScope].isLocal must be(true) - val myref = system.actorFor(system / "looker" / "child" / "grandchild") - myref.isInstanceOf[RemoteActorRef] must be(true) - myref ! 43 - expectMsg(43) - lastSender must be theSameInstanceAs remref - r.asInstanceOf[RemoteActorRef].getParent must be(l) - system.actorFor("/user/looker/child") must be theSameInstanceAs r - Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - } - - "not fail ask across node boundaries" in { - import system.dispatcher - val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo classTag[(String, ActorRef)] - Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong")) - } - - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index be87d1a942..2cd9f3dda6 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -8,7 +8,6 @@ import language.postfixOps import akka.testkit.AkkaSpec import akka.actor.ExtendedActorSystem import scala.concurrent.duration._ -import akka.remote.netty.NettyRemoteTransport import akka.util.Helpers @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -33,44 +32,6 @@ class RemoteConfigSpec extends AkkaSpec( LogRemoteLifeCycleEvents must be(true) } - "be able to parse Netty config elements" ignore { - val settings = - system.asInstanceOf[ExtendedActorSystem] - .provider.asInstanceOf[RemoteActorRefProvider] - .transport.asInstanceOf[NettyRemoteTransport] - .settings - import settings._ - - BackoffTimeout must be(Duration.Zero) - SecureCookie must be(None) - RequireCookie must be(false) - UsePassiveConnections must be(true) - Hostname must not be "" // will be set to the local IP - PortSelector must be(0) - OutboundLocalAddress must be(None) - MessageFrameSize must be(1048576) - ConnectionTimeout must be(2 minutes) - Backlog must be(4096) - ReuseAddress must be(!Helpers.isWindows) - ExecutionPoolKeepalive must be(1 minute) - ExecutionPoolSize must be(4) - MaxChannelMemorySize must be(0) - MaxTotalMemorySize must be(0) - ReconnectDelay must be(5 seconds) - ReadTimeout must be(0 millis) - WriteTimeout must be(10 seconds) - AllTimeout must be(0 millis) - ReconnectionTimeWindow must be(10 minutes) - WriteBufferHighWaterMark must be(None) - WriteBufferLowWaterMark must be(None) - SendBufferSize must be(None) - ReceiveBufferSize must be(None) - ServerSocketWorkerPoolSize must be >= (2) - ServerSocketWorkerPoolSize must be <= (8) - ClientSocketWorkerPoolSize must be >= (2) - ClientSocketWorkerPoolSize must be <= (8) - } - "contain correct configuration values in reference.conf" in { val c = system.asInstanceOf[ExtendedActorSystem]. provider.asInstanceOf[RemoteActorRefProvider]. diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 5b40c48944..c7e2224efc 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -13,12 +13,12 @@ import scala.reflect.classTag import akka.pattern.ask import java.io.File import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController } -import akka.remote.netty.{ NettySettings, NettySSLSupport } import javax.net.ssl.SSLException import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ import akka.event.{ Logging, NoLogging, LoggingAdapter } +import akka.remote.transport.netty.{ SSLSettings, NettySSLSupport } object Configuration { // set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager" @@ -33,21 +33,26 @@ object Configuration { filter-leeway = 10s default-timeout = 10s } - remote.transport = "akka.remote.netty.NettyRemoteTransport" - remote.netty { + + remoting.enabled-transports = ["ssl"] + + remoting.transports.ssl { hostname = localhost port = %d ssl { enable = on trust-store = "%s" key-store = "%s" + key-store-password = "changeme" + trust-store-password = "changeme" + protocol = "TLSv1" random-number-generator = "%s" enabled-algorithms = [%s] sha1prng-random-source = "/dev/./urandom" } } } - """ + """ case class CipherConfig(runTest: Boolean, config: Config, cipher: String, localPort: Int, remotePort: Int) @@ -57,24 +62,24 @@ object Configuration { //if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled") val config = ConfigFactory.parseString(conf.format(localPort, trustStore, keyStore, cipher, enabled.mkString(", "))) - val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") - val settings = new NettySettings(fullConfig, "placeholder") + val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remoting.transports.ssl.ssl") + val settings = new SSLSettings(fullConfig) - val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SslSettings.SSLRandomNumberGenerator, - settings.SslSettings.SSLRandomSource, NoLogging) + val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, + settings.SSLRandomSource, NoLogging) rng.nextInt() // Has to work - settings.SslSettings.SSLRandomNumberGenerator foreach { + settings.SSLRandomNumberGenerator foreach { sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) } - val engine = NettySSLSupport.initializeClientSSL(settings.SslSettings, NoLogging).getEngine + val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) - engine.getSupportedProtocols.contains(settings.SslSettings.SSLProtocol.get) || - (throw new IllegalArgumentException("Protocol not supported: " + settings.SslSettings.SSLProtocol.get)) + engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || + (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get)) CipherConfig(true, config, cipher, localPort, remotePort) } catch { @@ -119,11 +124,9 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten implicit val timeout: Timeout = Timeout(10 seconds) - import RemoteCommunicationSpec._ - lazy val other: ActorSystem = ActorSystem( "remote-sys", - ConfigFactory.parseString("akka.remote.netty.port=" + cipherConfig.remotePort).withFallback(system.settings.config)) + ConfigFactory.parseString("akka.remoting.transports.ssl.port = " + cipherConfig.remotePort).withFallback(system.settings.config)) override def afterTermination() { if (cipherConfig.runTest) { diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala index bb396df189..88179b4ab1 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala @@ -4,8 +4,8 @@ import akka.testkit._ import akka.actor._ import com.typesafe.config._ import scala.concurrent.duration._ -import akka.remote.netty.{ SSLSettings, NettyRemoteTransport } import java.util.ArrayList +import akka.remote.transport.netty.SSLSettings @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket1978ConfigSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {