Fixed problems handling passive connections in Remoting #2878
- Fixed potential race in EndpointWrite when TakeOff is processed - Reenabled accidentally disabled gremlin due to changed defaults - Fixed EndpointRegistry incorrectly handling passive connections
This commit is contained in:
parent
d983104a7f
commit
b08c331577
14 changed files with 45 additions and 1502 deletions
|
|
@ -156,16 +156,6 @@ class RemoteActorRefProvider(
|
|||
// this enables reception of remote requests
|
||||
transport.start()
|
||||
|
||||
val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address)
|
||||
case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address)
|
||||
case _ ⇒ //ignore other
|
||||
}
|
||||
}), "RemoteClientLifeCycleListener")
|
||||
|
||||
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
|
||||
|
||||
}
|
||||
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
|
||||
|
|
|
|||
|
|
@ -14,147 +14,6 @@ import akka.actor._
|
|||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
|
||||
/**
|
||||
* Remote life-cycle events.
|
||||
*/
|
||||
sealed trait RemoteLifeCycleEvent extends Serializable {
|
||||
def logLevel: Logging.LogLevel
|
||||
}
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteClient.
|
||||
*/
|
||||
trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent {
|
||||
def remoteAddress: Address
|
||||
final def getRemoteAddress: Address = remoteAddress
|
||||
}
|
||||
|
||||
/**
|
||||
* A RemoteClientError is a general error that is thrown within or from a RemoteClient
|
||||
*/
|
||||
case class RemoteClientError(
|
||||
@BeanProperty cause: Throwable,
|
||||
@transient remote: RemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClientLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
|
||||
override def toString: String = "RemoteClientError@" + remoteAddress + ": Error[" + Logging.stackTraceFor(cause) + "]"
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteClientDisconnected is published when a RemoteClient's connection is disconnected
|
||||
*/
|
||||
case class RemoteClientDisconnected(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClientLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.DebugLevel
|
||||
override def toString: String = "RemoteClientDisconnected@" + remoteAddress
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteClientConnected is published when a RemoteClient's connection is established
|
||||
*/
|
||||
case class RemoteClientConnected(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClientLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.DebugLevel
|
||||
override def toString: String = "RemoteClientConnected@" + remoteAddress
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteClientStarted is published when a RemoteClient has started up
|
||||
*/
|
||||
case class RemoteClientStarted(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClientLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.InfoLevel
|
||||
override def toString: String = "RemoteClientStarted@" + remoteAddress
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteClientShutdown is published when a RemoteClient has shut down
|
||||
*/
|
||||
case class RemoteClientShutdown(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClientLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.InfoLevel
|
||||
override def toString: String = "RemoteClientShutdown@" + remoteAddress
|
||||
}
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteServer.
|
||||
*/
|
||||
trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent
|
||||
|
||||
/**
|
||||
* RemoteServerStarted is published when a local RemoteServer has started up
|
||||
*/
|
||||
case class RemoteServerStarted(
|
||||
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.InfoLevel
|
||||
override def toString: String = "RemoteServerStarted@" + remote
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteServerShutdown is published when a local RemoteServer has shut down
|
||||
*/
|
||||
case class RemoteServerShutdown(
|
||||
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.InfoLevel
|
||||
override def toString: String = "RemoteServerShutdown@" + remote
|
||||
}
|
||||
|
||||
/**
|
||||
* A RemoteServerError is a general error that is thrown within or from a RemoteServer
|
||||
*/
|
||||
case class RemoteServerError(
|
||||
@BeanProperty val cause: Throwable,
|
||||
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
|
||||
|
||||
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
|
||||
override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]"
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteServerClientConnected is published when an inbound connection has been established
|
||||
*/
|
||||
case class RemoteServerClientConnected(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.DebugLevel
|
||||
override def toString: String =
|
||||
"RemoteServerClientConnected@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]"
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteServerClientConnected is published when an inbound connection has been disconnected
|
||||
*/
|
||||
case class RemoteServerClientDisconnected(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.DebugLevel
|
||||
override def toString: String =
|
||||
"RemoteServerClientDisconnected@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]"
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteServerClientClosed is published when an inbound RemoteClient is closed
|
||||
*/
|
||||
case class RemoteServerClientClosed(
|
||||
@transient @BeanProperty remote: RemoteTransport,
|
||||
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
|
||||
override def logLevel: Logging.LogLevel = Logging.DebugLevel
|
||||
override def toString: String =
|
||||
"RemoteServerClientClosed@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]"
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
|
||||
*/
|
||||
class RemoteClientException private[akka] (
|
||||
message: String,
|
||||
@transient @BeanProperty val client: RemoteTransport,
|
||||
val remoteAddress: Address, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
||||
/**
|
||||
* RemoteTransportException represents a general failure within a RemoteTransport,
|
||||
* such as inability to start, wrong configuration etc.
|
||||
|
|
@ -198,29 +57,11 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
|
|||
*/
|
||||
def start(): Unit
|
||||
|
||||
/**
|
||||
* Attempts to shut down a specific client connected to the supplied remote address
|
||||
*/
|
||||
def shutdownClientConnection(address: Address): Unit
|
||||
|
||||
/**
|
||||
* Attempts to restart a specific client connected to the supplied remote address, but only if the client is not shut down
|
||||
*/
|
||||
def restartClientConnection(address: Address): Unit
|
||||
|
||||
/**
|
||||
* Sends the given message to the recipient supplying the sender if any
|
||||
*/
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit
|
||||
|
||||
/**
|
||||
* Default implementation both publishes the message to the eventStream as well as logs it using the system logger
|
||||
*/
|
||||
def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
|
||||
system.eventStream.publish(message)
|
||||
if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a management command to the underlying transport stack. The call returns with a Future that indicates
|
||||
* if the command was handled successfully or dropped.
|
||||
|
|
@ -244,121 +85,4 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
|
|||
*/
|
||||
protected def logRemoteLifeCycleEvents: Boolean
|
||||
|
||||
/**
|
||||
* Returns a newly created AkkaRemoteProtocol with the given message payload.
|
||||
*/
|
||||
def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol =
|
||||
AkkaRemoteProtocol.newBuilder.setPayload(rmp.toByteString).build
|
||||
|
||||
/**
|
||||
* Returns a newly created AkkaRemoteProtocol with the given control payload.
|
||||
*/
|
||||
def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol =
|
||||
AkkaRemoteProtocol.newBuilder.setInstruction(rcp).build
|
||||
|
||||
/**
|
||||
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
||||
*/
|
||||
def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol =
|
||||
ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(defaultAddress)).build
|
||||
|
||||
/**
|
||||
* Returns a new RemoteMessageProtocol containing the serialized representation of the given parameters.
|
||||
*/
|
||||
def createRemoteMessageProtocolBuilder(recipient: ActorRef, message: Any, senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = {
|
||||
val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient))
|
||||
if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
|
||||
|
||||
Serialization.currentTransportAddress.withValue(defaultAddress) {
|
||||
messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef]))
|
||||
}
|
||||
|
||||
messageBuilder
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this method with an inbound RemoteMessage and this will take care of security (see: "useUntrustedMode")
|
||||
* as well as making sure that the message ends up at its destination (best effort).
|
||||
* There is also a fair amount of logging produced by this method, which is good for debugging.
|
||||
*/
|
||||
def receiveMessage(remoteMessage: RemoteMessage): Unit = {
|
||||
val remoteDaemon = provider.remoteDaemon
|
||||
|
||||
remoteMessage.recipient match {
|
||||
case `remoteDaemon` ⇒
|
||||
if (useUntrustedMode) log.debug("dropping daemon message in untrusted mode")
|
||||
else {
|
||||
if (provider.remoteSettings.LogReceive) log.debug("received daemon message {}", remoteMessage)
|
||||
remoteMessage.payload match {
|
||||
case m @ (_: DaemonMsg | _: Terminated) ⇒
|
||||
try remoteDaemon ! m catch {
|
||||
case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender)
|
||||
}
|
||||
case x ⇒ log.debug("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
|
||||
}
|
||||
}
|
||||
case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒
|
||||
if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage)
|
||||
remoteMessage.payload match {
|
||||
case msg: PossiblyHarmful if useUntrustedMode ⇒ log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass)
|
||||
case msg: SystemMessage ⇒ l.sendSystemMessage(msg)
|
||||
case msg ⇒ l.!(msg)(remoteMessage.sender)
|
||||
}
|
||||
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !useUntrustedMode ⇒
|
||||
if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage)
|
||||
remoteMessage.originalReceiver match {
|
||||
case AddressFromURIString(address) if provider.transport.addresses(address) ⇒
|
||||
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
|
||||
r.!(remoteMessage.payload)(remoteMessage.sender)
|
||||
case r ⇒
|
||||
log.debug("dropping message {} for non-local recipient {} arriving at {} inbound addresses are {}",
|
||||
remoteMessage.payloadClass, r, addresses, provider.transport.addresses)
|
||||
}
|
||||
case r ⇒
|
||||
log.debug("dropping message {} for unknown recipient {} arriving at {} inbound addresses are {}",
|
||||
remoteMessage.payloadClass, r, addresses, provider.transport.addresses)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteMessage is a wrapper around a message that has come in over the wire,
|
||||
* it allows to easily obtain references to the deserialized message, its intended recipient
|
||||
* and the sender.
|
||||
*/
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) {
|
||||
/**
|
||||
* Returns a String-representation of the ActorPath that this RemoteMessage is destined for
|
||||
*/
|
||||
def originalReceiver: String = input.getRecipient.getPath
|
||||
|
||||
/**
|
||||
* Returns an Option with the String representation of the ActorPath of the Actor who is the sender of this message
|
||||
*/
|
||||
def originalSender: Option[String] = if (input.hasSender) Some(input.getSender.getPath) else None
|
||||
|
||||
/**
|
||||
* Returns a reference to the Actor that sent this message, or DeadLetterActorRef if not present or found.
|
||||
*/
|
||||
lazy val sender: ActorRef =
|
||||
if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath)
|
||||
else system.deadLetters
|
||||
|
||||
/**
|
||||
* Returns a reference to the Actor that this message is destined for.
|
||||
* In case this returns a DeadLetterActorRef, you have access to the path using the "originalReceiver" method.
|
||||
*/
|
||||
lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver)
|
||||
|
||||
/**
|
||||
* Returns the message
|
||||
*/
|
||||
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage)
|
||||
|
||||
def payloadClass: Class[_] = if (payload eq null) null else payload.getClass
|
||||
|
||||
/**
|
||||
* Returns a String representation of this RemoteMessage, intended for debugging purposes.
|
||||
*/
|
||||
override def toString: String = "RemoteMessage: " + payloadClass + " to " + recipient + "<+{" + originalReceiver + "} from " + sender
|
||||
}
|
||||
|
|
|
|||
|
|
@ -220,17 +220,6 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: this is called in RemoteActorRefProvider to handle the lifecycle of connections (clients)
|
||||
// which is not how things work in the new remoting
|
||||
override def shutdownClientConnection(address: Address): Unit = {
|
||||
// Ignore
|
||||
}
|
||||
|
||||
// TODO: this is never called anywhere, should be taken out from RemoteTransport API
|
||||
override def restartClientConnection(address: Address): Unit = {
|
||||
// Ignore
|
||||
}
|
||||
|
||||
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
|
||||
case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
|
||||
case None ⇒ throw new IllegalStateException("Attempted to send remote message but Remoting is not running.")
|
||||
|
|
|
|||
|
|
@ -1,339 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote.netty
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.net.{ InetAddress, InetSocketAddress }
|
||||
import org.jboss.netty.util.{ Timeout, TimerTask, HashedWheelTimer }
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap
|
||||
import org.jboss.netty.channel.group.DefaultChannelGroup
|
||||
import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, DefaultChannelPipeline, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel }
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
|
||||
import org.jboss.netty.handler.execution.ExecutionHandler
|
||||
import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler }
|
||||
import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
|
||||
import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected }
|
||||
import akka.AkkaException
|
||||
import akka.event.Logging
|
||||
import akka.actor.{ DeadLetter, Address, ActorRef }
|
||||
import akka.util.Switch
|
||||
import scala.util.control.NonFatal
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import scala.concurrent.duration._
|
||||
import java.nio.channels.ClosedChannelException
|
||||
|
||||
/**
|
||||
* This is the abstract baseclass for netty remote clients, currently there's only an
|
||||
* ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that
|
||||
* reuses an already established connection.
|
||||
*/
|
||||
private[akka] abstract class RemoteClient private[akka] (val netty: NettyRemoteTransport, val remoteAddress: Address) {
|
||||
|
||||
val log = Logging(netty.system, "RemoteClient")
|
||||
|
||||
val name = Logging.simpleName(this) + "@" + remoteAddress
|
||||
|
||||
private[remote] val runSwitch = new Switch()
|
||||
|
||||
private[remote] def isRunning = runSwitch.isOn
|
||||
|
||||
protected def currentChannel: Channel
|
||||
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
|
||||
|
||||
def shutdown(): Boolean
|
||||
|
||||
/**
|
||||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
|
||||
if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
send((message, senderOption, recipient))
|
||||
} else {
|
||||
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress)
|
||||
netty.notifyListeners(RemoteClientError(exception, netty, remoteAddress))
|
||||
throw exception
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the message across the wire
|
||||
*/
|
||||
private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = {
|
||||
try {
|
||||
val channel = currentChannel
|
||||
if (channel.isOpen) {
|
||||
val f = channel.write(request)
|
||||
f.addListener(
|
||||
new ChannelFutureListener {
|
||||
import netty.system.deadLetters
|
||||
def operationComplete(future: ChannelFuture): Unit =
|
||||
if (future.isCancelled || !future.isSuccess) request match {
|
||||
case (msg, sender, recipient) ⇒ deadLetters ! DeadLetter(msg, sender.getOrElse(deadLetters), recipient)
|
||||
// We don't call notifyListeners here since we don't think failed message deliveries are errors
|
||||
/// If the connection goes down we'll get the error reporting done by the pipeline.
|
||||
}
|
||||
})
|
||||
// Check if we should back off
|
||||
if (!channel.isWritable) {
|
||||
val backoff = netty.settings.BackoffTimeout
|
||||
if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ netty.notifyListeners(RemoteClientError(e, netty, remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
override def toString: String = name
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node.
|
||||
*/
|
||||
private[akka] class ActiveRemoteClient private[akka] (
|
||||
netty: NettyRemoteTransport,
|
||||
remoteAddress: Address,
|
||||
localAddress: Address)
|
||||
extends RemoteClient(netty, remoteAddress) {
|
||||
|
||||
import netty.settings
|
||||
|
||||
//TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
||||
@volatile
|
||||
private var bootstrap: ClientBootstrap = _
|
||||
@volatile
|
||||
private var connection: ChannelFuture = _
|
||||
@volatile
|
||||
private[remote] var openChannels: DefaultChannelGroup = _
|
||||
|
||||
@volatile
|
||||
private var reconnectionDeadline: Option[Deadline] = None
|
||||
|
||||
def notifyListeners(msg: RemoteLifeCycleEvent): Unit = netty.notifyListeners(msg)
|
||||
|
||||
def currentChannel = connection.getChannel
|
||||
|
||||
/**
|
||||
* Connect to remote server.
|
||||
*/
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
|
||||
|
||||
// Returns whether the handshake was written to the channel or not
|
||||
def sendSecureCookie(connection: ChannelFuture): Boolean = {
|
||||
val future =
|
||||
if (!connection.isSuccess || !settings.EnableSSL) connection
|
||||
else connection.getChannel.getPipeline.get[SslHandler](classOf[SslHandler]).handshake().awaitUninterruptibly()
|
||||
|
||||
if (!future.isSuccess) {
|
||||
notifyListeners(RemoteClientError(future.getCause, netty, remoteAddress))
|
||||
false
|
||||
} else {
|
||||
ChannelAddress.set(connection.getChannel, Some(remoteAddress))
|
||||
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
|
||||
if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get)
|
||||
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
|
||||
.setSystem(localAddress.system)
|
||||
.setHostname(localAddress.host.get)
|
||||
.setPort(localAddress.port.get)
|
||||
.build)
|
||||
connection.getChannel.write(netty.createControlEnvelope(handshake.build))
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
def attemptReconnect(): Boolean = {
|
||||
val remoteIP = InetAddress.getByName(remoteAddress.host.get)
|
||||
log.debug("Remote client reconnecting to [{}|{}]", remoteAddress, remoteIP)
|
||||
connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get))
|
||||
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
|
||||
sendSecureCookie(connection)
|
||||
}
|
||||
|
||||
runSwitch switchOn {
|
||||
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
|
||||
|
||||
val b = new ClientBootstrap(netty.clientChannelFactory)
|
||||
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), withTimeout = true, isClient = true))
|
||||
b.setOption("tcpNoDelay", true)
|
||||
b.setOption("keepAlive", true)
|
||||
b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)
|
||||
settings.ReceiveBufferSize.foreach(sz ⇒ b.setOption("receiveBufferSize", sz))
|
||||
settings.SendBufferSize.foreach(sz ⇒ b.setOption("sendBufferSize", sz))
|
||||
settings.WriteBufferHighWaterMark.foreach(sz ⇒ b.setOption("writeBufferHighWaterMark", sz))
|
||||
settings.WriteBufferLowWaterMark.foreach(sz ⇒ b.setOption("writeBufferLowWaterMark", sz))
|
||||
settings.OutboundLocalAddress.foreach(s ⇒ b.setOption("localAddress", new InetSocketAddress(s, 0)))
|
||||
bootstrap = b
|
||||
|
||||
val remoteIP = InetAddress.getByName(remoteAddress.host.get)
|
||||
log.debug("Starting remote client connection to [{}|{}]", remoteAddress, remoteIP)
|
||||
|
||||
connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get))
|
||||
|
||||
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
|
||||
|
||||
if (sendSecureCookie(connection)) {
|
||||
notifyListeners(RemoteClientStarted(netty, remoteAddress))
|
||||
true
|
||||
} else {
|
||||
connection.getChannel.close()
|
||||
openChannels.remove(connection.getChannel)
|
||||
false
|
||||
}
|
||||
} match {
|
||||
case true ⇒ true
|
||||
case false if reconnectIfAlreadyConnected ⇒
|
||||
log.debug("Remote client reconnecting to [{}]", remoteAddress)
|
||||
attemptReconnect()
|
||||
case false ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
// Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
|
||||
def shutdown() = runSwitch switchOff {
|
||||
log.debug("Shutting down remote client [{}]", name)
|
||||
|
||||
notifyListeners(RemoteClientShutdown(netty, remoteAddress))
|
||||
try {
|
||||
if ((connection ne null) && (connection.getChannel ne null)) {
|
||||
val channel = connection.getChannel
|
||||
ChannelAddress.remove(channel)
|
||||
// Try to disconnect first to reduce "connection reset by peer" events
|
||||
if (channel.isConnected) channel.disconnect()
|
||||
if (channel.isOpen) channel.close()
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if (openChannels ne null) openChannels.close.awaitUninterruptibly()
|
||||
} finally {
|
||||
connection = null
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("[{}] has been shut down", name)
|
||||
}
|
||||
|
||||
private[akka] def isWithinReconnectionTimeWindow: Boolean = reconnectionDeadline match {
|
||||
case None ⇒
|
||||
reconnectionDeadline = Some(Deadline.now + settings.ReconnectionTimeWindow)
|
||||
true
|
||||
case Some(deadline) ⇒
|
||||
val hasTimeLeft = deadline.hasTimeLeft
|
||||
if (hasTimeLeft)
|
||||
log.info("Will try to reconnect to remote server for another [{}] milliseconds", deadline.timeLeft.toMillis)
|
||||
hasTimeLeft
|
||||
}
|
||||
|
||||
private[akka] def resetReconnectionTimeWindow = reconnectionDeadline = None
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private[akka] class ActiveRemoteClientHandler(
|
||||
val name: String,
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: Address,
|
||||
val localAddress: Address,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: ActiveRemoteClient)
|
||||
extends IdleStateAwareChannelHandler {
|
||||
|
||||
def runOnceNow(thunk: ⇒ Unit): Unit = timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() }
|
||||
}, 0, TimeUnit.MILLISECONDS)
|
||||
|
||||
override def channelIdle(ctx: ChannelHandlerContext, e: IdleStateEvent) {
|
||||
import IdleState._
|
||||
|
||||
def createHeartBeat(localAddress: Address, cookie: Option[String]): AkkaRemoteProtocol = {
|
||||
val beat = RemoteControlProtocol.newBuilder.setCommandType(CommandType.HEARTBEAT)
|
||||
if (cookie.nonEmpty) beat.setCookie(cookie.get)
|
||||
|
||||
client.netty.createControlEnvelope(
|
||||
beat.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
|
||||
.setSystem(localAddress.system)
|
||||
.setHostname(localAddress.host.get)
|
||||
.setPort(localAddress.port.get)
|
||||
.build).build)
|
||||
}
|
||||
|
||||
e.getState match {
|
||||
case READER_IDLE | ALL_IDLE ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) }
|
||||
case WRITER_IDLE ⇒ e.getChannel.write(createHeartBeat(localAddress, client.netty.settings.SecureCookie))
|
||||
}
|
||||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
||||
try {
|
||||
event.getMessage match {
|
||||
case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒
|
||||
val rcp = arp.getInstruction
|
||||
rcp.getCommandType match {
|
||||
case CommandType.SHUTDOWN ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) }
|
||||
case _ ⇒ //Ignore others
|
||||
}
|
||||
case arp: AkkaRemoteProtocol if arp.hasPayload ⇒
|
||||
client.netty.receiveMessage(new RemoteMessage(RemoteMessageProtocol.parseFrom(arp.getPayload), client.netty.system))
|
||||
case other ⇒
|
||||
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn {
|
||||
if (client.isWithinReconnectionTimeWindow) {
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) =
|
||||
if (client.isRunning) {
|
||||
client.openChannels.remove(event.getChannel)
|
||||
client.connect(reconnectIfAlreadyConnected = true)
|
||||
}
|
||||
}, client.netty.settings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS)
|
||||
} else runOnceNow {
|
||||
client.netty.shutdownClientConnection(remoteAddress) // spawn in another thread
|
||||
}
|
||||
}
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
try {
|
||||
client.notifyListeners(RemoteClientConnected(client.netty, client.remoteAddress))
|
||||
client.resetReconnectionTimeWindow
|
||||
} catch {
|
||||
case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.notifyListeners(RemoteClientDisconnected(client.netty, client.remoteAddress))
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
val cause = if (event.getCause ne null) event.getCause else new AkkaException("Unknown cause")
|
||||
cause match {
|
||||
case _: ClosedChannelException ⇒ // Ignore
|
||||
case NonFatal(e) ⇒
|
||||
client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress))
|
||||
event.getChannel.close()
|
||||
case e: Throwable ⇒ throw e // Rethrow fatals
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] class PassiveRemoteClient(val currentChannel: Channel,
|
||||
netty: NettyRemoteTransport,
|
||||
remoteAddress: Address) extends RemoteClient(netty, remoteAddress) {
|
||||
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn {
|
||||
netty.notifyListeners(RemoteClientStarted(netty, remoteAddress))
|
||||
log.debug("Starting remote client connection to [{}]", remoteAddress)
|
||||
}
|
||||
|
||||
def shutdown() = runSwitch switchOff {
|
||||
log.debug("Shutting down remote client [{}]", name)
|
||||
|
||||
netty.notifyListeners(RemoteClientShutdown(netty, remoteAddress))
|
||||
log.debug("[{}] has been shut down", name)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,335 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote.netty
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
import java.util.concurrent.Executors
|
||||
import scala.collection.mutable
|
||||
import scala.collection.immutable
|
||||
import scala.util.control.NonFatal
|
||||
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture }
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||
import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, DefaultChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal }
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
|
||||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
|
||||
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
|
||||
import org.jboss.netty.handler.timeout.IdleStateHandler
|
||||
import org.jboss.netty.util.{ DefaultObjectSizeEstimator, HashedWheelTimer }
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
|
||||
import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted }
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ ExtendedActorSystem, Address, ActorRef }
|
||||
import com.google.protobuf.MessageLite
|
||||
import scala.concurrent.Future
|
||||
|
||||
private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] {
|
||||
override def initialValue(ch: Channel): Option[Address] = None
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
*/
|
||||
private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
|
||||
|
||||
import provider.remoteSettings
|
||||
|
||||
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
|
||||
|
||||
// Workaround to emulate the support of multiple local addresses
|
||||
override def localAddressForRemote(remote: Address): Address = defaultAddress
|
||||
|
||||
// TODO replace by system.scheduler
|
||||
val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory)
|
||||
|
||||
val clientChannelFactory = {
|
||||
val boss, worker = settings.UseDispatcherForIO.map(system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool()
|
||||
new NioClientSocketChannelFactory(boss, worker, settings.ClientSocketWorkerPoolSize)
|
||||
}
|
||||
|
||||
/**
|
||||
* Backing scaffolding for the default implementation of NettyRemoteSupport.createPipeline.
|
||||
*/
|
||||
object PipelineFactory {
|
||||
/**
|
||||
* Construct a DefaultChannelPipeline from a sequence of handlers; to be used
|
||||
* in implementations of ChannelPipelineFactory.
|
||||
*/
|
||||
def apply(handlers: immutable.Seq[ChannelHandler]): DefaultChannelPipeline =
|
||||
(new DefaultChannelPipeline /: handlers) { (p, h) ⇒ p.addLast(Logging.simpleName(h.getClass), h); p }
|
||||
|
||||
/**
|
||||
* Constructs the NettyRemoteTransport default pipeline with the give “head” handler, which
|
||||
* is taken by-name to allow it not to be shared across pipelines.
|
||||
*
|
||||
* @param withTimeout determines whether an IdleStateHandler shall be included
|
||||
*/
|
||||
def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
|
||||
new ChannelPipelineFactory { override def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) }
|
||||
|
||||
/**
|
||||
* Construct a default protocol stack, excluding the “head” handler (i.e. the one which
|
||||
* actually dispatches the received messages to the local target actors).
|
||||
*/
|
||||
def defaultStack(withTimeout: Boolean, isClient: Boolean): immutable.Seq[ChannelHandler] =
|
||||
(if (settings.EnableSSL) List(NettySSLSupport(settings.SslSettings, NettyRemoteTransport.this.log, isClient)) else Nil) :::
|
||||
(if (withTimeout) List(timeout) else Nil) :::
|
||||
msgFormat :::
|
||||
authenticator :::
|
||||
executionHandler
|
||||
|
||||
/**
|
||||
* Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer.
|
||||
*/
|
||||
def timeout = new IdleStateHandler(timer,
|
||||
settings.ReadTimeout.toSeconds.toInt,
|
||||
settings.WriteTimeout.toSeconds.toInt,
|
||||
settings.AllTimeout.toSeconds.toInt)
|
||||
|
||||
/**
|
||||
* Construct frame&protobuf encoder/decoder.
|
||||
*/
|
||||
def msgFormat = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) ::
|
||||
new LengthFieldPrepender(4) ::
|
||||
new RemoteMessageDecoder ::
|
||||
new RemoteMessageEncoder(NettyRemoteTransport.this) ::
|
||||
Nil
|
||||
|
||||
/**
|
||||
* Construct an ExecutionHandler which is used to ensure that message dispatch does not
|
||||
* happen on a netty thread (that could be bad if re-sending over the network for
|
||||
* remote-deployed actors).
|
||||
*/
|
||||
val executionHandler = if (settings.ExecutionPoolSize != 0)
|
||||
List(new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(
|
||||
settings.ExecutionPoolSize,
|
||||
settings.MaxChannelMemorySize,
|
||||
settings.MaxTotalMemorySize,
|
||||
settings.ExecutionPoolKeepalive.length,
|
||||
settings.ExecutionPoolKeepalive.unit,
|
||||
AkkaProtocolMessageSizeEstimator,
|
||||
system.threadFactory)))
|
||||
else Nil
|
||||
|
||||
/**
|
||||
* Helps keep track of how many bytes are in flight
|
||||
*/
|
||||
object AkkaProtocolMessageSizeEstimator extends DefaultObjectSizeEstimator {
|
||||
override final def estimateSize(o: AnyRef): Int =
|
||||
o match {
|
||||
case proto: MessageLite ⇒
|
||||
val msgSize = proto.getSerializedSize
|
||||
val misalignment = msgSize % 8
|
||||
if (misalignment != 0) msgSize + 8 - misalignment else msgSize
|
||||
case msg ⇒ super.estimateSize(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct and authentication handler which uses the SecureCookie to somewhat
|
||||
* protect the TCP port from unauthorized use (don’t rely on it too much, though,
|
||||
* as this is NOT a cryptographic feature).
|
||||
*/
|
||||
def authenticator = if (settings.RequireCookie) List(new RemoteServerAuthenticationHandler(settings.SecureCookie)) else Nil
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is factored out to provide an extension point in case the
|
||||
* pipeline shall be changed. It is recommended to use
|
||||
*/
|
||||
def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
|
||||
PipelineFactory(Seq(endpoint), withTimeout, isClient)
|
||||
|
||||
private val remoteClients = new mutable.HashMap[Address, RemoteClient]
|
||||
private val clientsLock = new ReentrantReadWriteLock
|
||||
|
||||
override protected def useUntrustedMode = remoteSettings.UntrustedMode
|
||||
|
||||
override protected def logRemoteLifeCycleEvents = remoteSettings.LogRemoteLifeCycleEvents
|
||||
|
||||
val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) ⇒ shutdown(); throw ex }
|
||||
|
||||
/**
|
||||
* Override this method to inject a subclass of NettyRemoteServer instead of
|
||||
* the normal one, e.g. for inserting security hooks. If this method throws
|
||||
* an exception, the transport will shut itself down and re-throw.
|
||||
*/
|
||||
protected def createServer(): NettyRemoteServer = new NettyRemoteServer(this)
|
||||
|
||||
/**
|
||||
* Override this method to inject a subclass of RemoteClient instead of
|
||||
* the normal one, e.g. for inserting security hooks. Get this transport’s
|
||||
* address from `this.address`.
|
||||
*/
|
||||
protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, defaultAddress)
|
||||
|
||||
// the address is set in start() or from the RemoteServerHandler, whichever comes first
|
||||
private val _address = new AtomicReference[Address]
|
||||
private[akka] def setAddressFromChannel(ch: Channel) = {
|
||||
val addr = ch.getLocalAddress match {
|
||||
case sa: InetSocketAddress ⇒ sa
|
||||
case x ⇒ throw new RemoteTransportException("unknown local address type " + x.getClass, null)
|
||||
}
|
||||
_address.compareAndSet(null, Address("akka", remoteSettings.systemName, settings.Hostname, addr.getPort))
|
||||
}
|
||||
|
||||
// Workaround to emulate the support of multiple local addresses
|
||||
override def addresses = Set(address)
|
||||
def address = _address.get
|
||||
override def defaultAddress: Address = _address.get
|
||||
|
||||
lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + addresses + ")")
|
||||
|
||||
def start(): Unit = {
|
||||
server.start()
|
||||
setAddressFromChannel(server.channel)
|
||||
notifyListeners(RemoteServerStarted(this))
|
||||
}
|
||||
|
||||
def shutdown(): Future[Unit] = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
remoteClients foreach {
|
||||
case (_, client) ⇒ try client.shutdown() catch {
|
||||
case NonFatal(e) ⇒ log.error(e, "failure while shutting down [{}]", client)
|
||||
}
|
||||
}
|
||||
remoteClients.clear()
|
||||
} finally {
|
||||
clientsLock.writeLock().unlock()
|
||||
try {
|
||||
if (server != null) server.shutdown()
|
||||
} finally {
|
||||
try {
|
||||
timer.stop()
|
||||
} finally {
|
||||
clientChannelFactory.releaseExternalResources()
|
||||
}
|
||||
}
|
||||
}
|
||||
Future successful (())
|
||||
}
|
||||
|
||||
def send(
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
recipient: RemoteActorRef): Unit = {
|
||||
|
||||
val recipientAddress = recipient.path.address
|
||||
|
||||
clientsLock.readLock.lock
|
||||
try {
|
||||
val client = remoteClients.get(recipientAddress) match {
|
||||
case Some(client) ⇒ client
|
||||
case None ⇒
|
||||
clientsLock.readLock.unlock
|
||||
clientsLock.writeLock.lock //Lock upgrade, not supported natively
|
||||
try {
|
||||
try {
|
||||
remoteClients.get(recipientAddress) match {
|
||||
//Recheck for addition, race between upgrades
|
||||
case Some(client) ⇒ client //If already populated by other writer
|
||||
case None ⇒ //Populate map
|
||||
val client = createClient(recipientAddress)
|
||||
remoteClients += recipientAddress -> client
|
||||
client
|
||||
}
|
||||
} finally {
|
||||
clientsLock.readLock.lock
|
||||
} //downgrade
|
||||
} finally {
|
||||
clientsLock.writeLock.unlock
|
||||
}
|
||||
}
|
||||
client.connect() // this will literally do nothing after the first time
|
||||
client.send(message, senderOption, recipient)
|
||||
} finally {
|
||||
clientsLock.readLock.unlock
|
||||
}
|
||||
}
|
||||
|
||||
def bindClient(remoteAddress: Address, client: RemoteClient): Boolean = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
if (remoteClients.contains(remoteAddress)) false
|
||||
else {
|
||||
client.connect()
|
||||
remoteClients.put(remoteAddress, client)
|
||||
true
|
||||
}
|
||||
} finally {
|
||||
clientsLock.writeLock().unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def unbindClient(remoteAddress: Address): Unit = shutdownClientConnection(remoteAddress)
|
||||
|
||||
def shutdownClientConnection(remoteAddress: Address): Unit = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
remoteClients.remove(remoteAddress) foreach (_.shutdown())
|
||||
} finally {
|
||||
clientsLock.writeLock().unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def restartClientConnection(remoteAddress: Address): Unit = {
|
||||
clientsLock.readLock().lock()
|
||||
try {
|
||||
remoteClients.get(remoteAddress) foreach (_.connect(reconnectIfAlreadyConnected = true))
|
||||
} finally {
|
||||
clientsLock.readLock().unlock()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private[akka] class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends ProtobufEncoder {
|
||||
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
|
||||
msg match {
|
||||
case (message: Any, sender: Option[_], recipient: ActorRef) ⇒
|
||||
super.encode(ctx, channel,
|
||||
remoteSupport.createMessageSendEnvelope(
|
||||
remoteSupport.createRemoteMessageProtocolBuilder(
|
||||
recipient,
|
||||
message,
|
||||
sender.asInstanceOf[Option[ActorRef]]).build))
|
||||
case _ ⇒ super.encode(ctx, channel, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
|
||||
private[akka] class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {
|
||||
protected val guard = new ReentrantReadWriteLock
|
||||
protected val open = new AtomicBoolean(true)
|
||||
|
||||
override def add(channel: Channel): Boolean = {
|
||||
guard.readLock().lock()
|
||||
try {
|
||||
if (open.get) {
|
||||
super.add(channel)
|
||||
} else {
|
||||
channel.close()
|
||||
false
|
||||
}
|
||||
} finally {
|
||||
guard.readLock().unlock()
|
||||
}
|
||||
}
|
||||
|
||||
override def close(): ChannelGroupFuture = {
|
||||
guard.writeLock().lock()
|
||||
try {
|
||||
if (open.getAndSet(false)) super.close()
|
||||
else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
|
||||
} finally {
|
||||
guard.writeLock().unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,185 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote.netty
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
|
||||
import akka.remote._
|
||||
import java.net.InetAddress
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.ClosedChannelException
|
||||
import java.util.concurrent.Executors
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
import org.jboss.netty.channel._
|
||||
import org.jboss.netty.channel.group.ChannelGroup
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
||||
import scala.util.control.NonFatal
|
||||
import akka.AkkaException
|
||||
|
||||
private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
||||
|
||||
import netty.settings
|
||||
|
||||
val ip = InetAddress.getByName(settings.Hostname)
|
||||
|
||||
private val factory = {
|
||||
val boss, worker = settings.UseDispatcherForIO.map(netty.system.dispatchers.lookup) getOrElse Executors.newCachedThreadPool()
|
||||
new NioServerSocketChannelFactory(boss, worker, settings.ServerSocketWorkerPoolSize)
|
||||
}
|
||||
|
||||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
|
||||
|
||||
private val bootstrap = {
|
||||
val b = new ServerBootstrap(factory)
|
||||
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), withTimeout = false, isClient = false))
|
||||
b.setOption("backlog", settings.Backlog)
|
||||
b.setOption("tcpNoDelay", true)
|
||||
b.setOption("child.keepAlive", true)
|
||||
b.setOption("reuseAddress", settings.ReuseAddress)
|
||||
settings.ReceiveBufferSize.foreach(sz ⇒ b.setOption("receiveBufferSize", sz))
|
||||
settings.SendBufferSize.foreach(sz ⇒ b.setOption("sendBufferSize", sz))
|
||||
settings.WriteBufferHighWaterMark.foreach(sz ⇒ b.setOption("writeBufferHighWaterMark", sz))
|
||||
settings.WriteBufferLowWaterMark.foreach(sz ⇒ b.setOption("writeBufferLowWaterMark", sz))
|
||||
b
|
||||
}
|
||||
|
||||
@volatile
|
||||
private[akka] var channel: Channel = _
|
||||
|
||||
def start(): Unit = {
|
||||
channel = bootstrap.bind(new InetSocketAddress(ip, settings.PortSelector))
|
||||
openChannels.add(channel)
|
||||
}
|
||||
|
||||
def shutdown() {
|
||||
try {
|
||||
val shutdownSignal = {
|
||||
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
|
||||
b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
|
||||
.setSystem(netty.address.system)
|
||||
.setHostname(netty.address.host.get)
|
||||
.setPort(netty.address.port.get)
|
||||
.build)
|
||||
if (settings.SecureCookie.nonEmpty)
|
||||
b.setCookie(settings.SecureCookie.get)
|
||||
b.build
|
||||
}
|
||||
openChannels.write(netty.createControlEnvelope(shutdownSignal))
|
||||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources()
|
||||
netty.notifyListeners(RemoteServerShutdown(netty))
|
||||
} catch {
|
||||
case e: Exception ⇒ netty.notifyListeners(RemoteServerError(e, netty))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private[akka] class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler {
|
||||
val authenticated = new AnyRef
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match {
|
||||
case None ⇒ ctx.sendUpstream(event)
|
||||
case Some(cookie) ⇒
|
||||
ctx.getAttachment match {
|
||||
case `authenticated` ⇒ ctx.sendUpstream(event)
|
||||
case null ⇒ event.getMessage match {
|
||||
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction ⇒
|
||||
val instruction = remoteProtocol.getInstruction
|
||||
instruction.getCookie match {
|
||||
case `cookie` ⇒
|
||||
ctx.setAttachment(authenticated)
|
||||
ctx.sendUpstream(event)
|
||||
case _ ⇒
|
||||
throw new SecurityException(
|
||||
"The remote client [" + ctx.getChannel.getRemoteAddress + "] secure cookie is not the same as remote server secure cookie")
|
||||
}
|
||||
case _ ⇒
|
||||
throw new SecurityException("The remote client [" + ctx.getChannel.getRemoteAddress + "] is not authorized!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private[akka] class RemoteServerHandler(
|
||||
val openChannels: ChannelGroup,
|
||||
val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler {
|
||||
|
||||
import netty.settings
|
||||
|
||||
private var addressToSet = true
|
||||
|
||||
// TODO look into moving that into onBind or similar, but verify that that is guaranteed to be the first to be called
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (addressToSet) {
|
||||
netty.setAddressFromChannel(event.getChannel)
|
||||
addressToSet = false
|
||||
}
|
||||
super.handleUpstream(ctx, event)
|
||||
}
|
||||
|
||||
/**
|
||||
* ChannelOpen overridden to store open channels for a clean postStop of a node.
|
||||
* If a channel is closed before, it is automatically removed from the open channels group.
|
||||
*/
|
||||
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)
|
||||
|
||||
// TODO might want to log or otherwise signal that a TCP connection has been established here.
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = ()
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
netty.notifyListeners(RemoteServerClientDisconnected(netty, ChannelAddress.get(ctx.getChannel)))
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val address = ChannelAddress.get(ctx.getChannel)
|
||||
if (address.isDefined && settings.UsePassiveConnections)
|
||||
netty.unbindClient(address.get)
|
||||
|
||||
netty.notifyListeners(RemoteServerClientClosed(netty, address))
|
||||
ChannelAddress.remove(ctx.getChannel)
|
||||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try {
|
||||
event.getMessage match {
|
||||
case remote: AkkaRemoteProtocol if remote.hasPayload ⇒
|
||||
netty.receiveMessage(new RemoteMessage(RemoteMessageProtocol.parseFrom(remote.getPayload), netty.system))
|
||||
case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒
|
||||
val instruction = remote.getInstruction
|
||||
instruction.getCommandType match {
|
||||
case CommandType.CONNECT ⇒
|
||||
val origin = instruction.getOrigin
|
||||
val inbound = Address("akka", origin.getSystem, origin.getHostname, origin.getPort)
|
||||
ChannelAddress.set(event.getChannel, Option(inbound))
|
||||
|
||||
//If we want to reuse the inbound connections as outbound we need to get busy
|
||||
if (settings.UsePassiveConnections)
|
||||
netty.bindClient(inbound, new PassiveRemoteClient(event.getChannel, netty, inbound))
|
||||
|
||||
netty.notifyListeners(RemoteServerClientConnected(netty, Option(inbound)))
|
||||
case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed
|
||||
case CommandType.HEARTBEAT ⇒ //Other guy is still alive
|
||||
case _ ⇒ //Unknown command
|
||||
}
|
||||
case _ ⇒ //ignore
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ netty.notifyListeners(RemoteServerError(e, netty))
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
val cause = if (event.getCause ne null) event.getCause else new AkkaException("Unknown cause")
|
||||
cause match {
|
||||
case _: ClosedChannelException ⇒ // Ignore
|
||||
case NonFatal(e) ⇒
|
||||
netty.notifyListeners(RemoteServerError(e, netty))
|
||||
event.getChannel.close()
|
||||
case e: Throwable ⇒ throw e // Rethrow fatals
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,111 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote.netty
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import scala.concurrent.duration.Duration
|
||||
import java.util.concurrent.TimeUnit._
|
||||
import java.net.InetAddress
|
||||
import akka.ConfigurationException
|
||||
import akka.japi.Util.immutableSeq
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.dispatch.ThreadPoolConfig
|
||||
import akka.util.Helpers
|
||||
|
||||
private[akka] class NettySettings(config: Config, val systemName: String) {
|
||||
|
||||
import config._
|
||||
|
||||
val BackoffTimeout: FiniteDuration = Duration(getMilliseconds("backoff-timeout"), MILLISECONDS)
|
||||
|
||||
val SecureCookie: Option[String] = getString("secure-cookie") match {
|
||||
case "" ⇒ None
|
||||
case cookie ⇒ Some(cookie)
|
||||
}
|
||||
val RequireCookie: Boolean = {
|
||||
val requireCookie = getBoolean("require-cookie")
|
||||
if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.netty.secure-cookie'.")
|
||||
requireCookie
|
||||
}
|
||||
|
||||
val UsePassiveConnections: Boolean = getBoolean("use-passive-connections")
|
||||
val UseDispatcherForIO: Option[String] = getString("use-dispatcher-for-io") match {
|
||||
case "" | null ⇒ None
|
||||
case dispatcher ⇒ Some(dispatcher)
|
||||
}
|
||||
|
||||
val ReconnectionTimeWindow: FiniteDuration = Duration(getMilliseconds("reconnection-time-window"), MILLISECONDS)
|
||||
val ReadTimeout: FiniteDuration = Duration(getMilliseconds("read-timeout"), MILLISECONDS)
|
||||
val WriteTimeout: FiniteDuration = Duration(getMilliseconds("write-timeout"), MILLISECONDS)
|
||||
val AllTimeout: FiniteDuration = Duration(getMilliseconds("all-timeout"), MILLISECONDS)
|
||||
val ReconnectDelay: FiniteDuration = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS)
|
||||
|
||||
val MessageFrameSize: Int = getBytes("message-frame-size").toInt
|
||||
|
||||
private[this] def optionSize(s: String): Option[Int] = getBytes(s).toInt match {
|
||||
case 0 ⇒ None
|
||||
case x if x < 0 ⇒
|
||||
throw new ConfigurationException("Setting '%s' must be 0 or positive (and fit in an Int)" format s)
|
||||
case other ⇒ Some(other)
|
||||
}
|
||||
|
||||
val WriteBufferHighWaterMark: Option[Int] = optionSize("write-buffer-high-water-mark")
|
||||
val WriteBufferLowWaterMark: Option[Int] = optionSize("write-buffer-low-water-mark")
|
||||
val SendBufferSize: Option[Int] = optionSize("send-buffer-size")
|
||||
val ReceiveBufferSize: Option[Int] = optionSize("receive-buffer-size")
|
||||
|
||||
val Hostname: String = getString("hostname") match {
|
||||
case "" ⇒ InetAddress.getLocalHost.getHostAddress
|
||||
case value ⇒ value
|
||||
}
|
||||
|
||||
val OutboundLocalAddress: Option[String] = getString("outbound-local-address") match {
|
||||
case "auto" | "" | null ⇒ None
|
||||
case some ⇒ Some(some)
|
||||
}
|
||||
|
||||
@deprecated("WARNING: This should only be used by professionals.", "2.0")
|
||||
val PortSelector: Int = getInt("port")
|
||||
|
||||
val ConnectionTimeout: FiniteDuration = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
|
||||
|
||||
val Backlog: Int = getInt("backlog")
|
||||
|
||||
val ReuseAddress: Boolean = getString("reuse-address") match {
|
||||
case "off-for-windows" ⇒ !Helpers.isWindows
|
||||
case _ ⇒ getBoolean("reuse-address")
|
||||
}
|
||||
|
||||
val ExecutionPoolKeepalive: FiniteDuration = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS)
|
||||
|
||||
val ExecutionPoolSize: Int = getInt("execution-pool-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 0")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxChannelMemorySize: Long = getBytes("max-channel-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-channel-memory-size is less than 0 bytes")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxTotalMemorySize: Long = getBytes("max-total-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-total-memory-size is less than 0 bytes")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
private def computeWPS(config: Config): Int =
|
||||
ThreadPoolConfig.scaledPoolSize(
|
||||
config.getInt("pool-size-min"),
|
||||
config.getDouble("pool-size-factor"),
|
||||
config.getInt("pool-size-max"))
|
||||
|
||||
val ServerSocketWorkerPoolSize = computeWPS(config.getConfig("server-socket-worker-pool"))
|
||||
|
||||
val ClientSocketWorkerPoolSize = computeWPS(config.getConfig("client-socket-worker-pool"))
|
||||
|
||||
val SslSettings = new SSLSettings(config.getConfig("ssl"))
|
||||
|
||||
val EnableSSL = getBoolean("ssl.enable")
|
||||
}
|
||||
|
|
@ -2,20 +2,21 @@
|
|||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote.netty
|
||||
package akka.remote.transport.netty
|
||||
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
|
||||
import akka.remote.RemoteTransportException
|
||||
import akka.event.LoggingAdapter
|
||||
import java.io.{ IOException, FileNotFoundException, FileInputStream }
|
||||
import akka.remote.security.provider.AkkaProvider
|
||||
import java.security._
|
||||
import com.typesafe.config.Config
|
||||
import akka.japi.Util._
|
||||
import akka.ConfigurationException
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.japi.Util._
|
||||
import akka.remote.RemoteTransportException
|
||||
import akka.remote.security.provider.AkkaProvider
|
||||
import com.typesafe.config.Config
|
||||
import java.io.{ IOException, FileNotFoundException, FileInputStream }
|
||||
import java.security._
|
||||
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
private[akka] class SSLSettings(config: Config) {
|
||||
|
||||
import config._
|
||||
|
||||
val SSLKeyStore = Option(getString("key-store")).filter(_.length > 0)
|
||||
|
|
@ -3,30 +3,29 @@
|
|||
*/
|
||||
package akka.remote.transport.netty
|
||||
|
||||
import akka.{ OnlyCauseStackTrace, ConfigurationException }
|
||||
import akka.actor.{ Address, ExtendedActorSystem }
|
||||
import akka.dispatch.ThreadPoolConfig
|
||||
import akka.event.Logging
|
||||
import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup }
|
||||
import akka.remote.transport.AssociationHandle.HandleEventListener
|
||||
import akka.remote.transport.Transport._
|
||||
import akka.remote.transport.netty.NettyTransportSettings.{ Udp, Tcp, Mode }
|
||||
import akka.remote.transport.{ AssociationHandle, Transport }
|
||||
import akka.{ OnlyCauseStackTrace, ConfigurationException }
|
||||
import com.typesafe.config.Config
|
||||
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
|
||||
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.{ ConcurrentHashMap, Executors, CancellationException }
|
||||
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
|
||||
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
|
||||
import org.jboss.netty.channel._
|
||||
import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
|
||||
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
|
||||
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
|
||||
import scala.concurrent.{ ExecutionContext, Promise, Future }
|
||||
import scala.util.{ Try, Random }
|
||||
import scala.util.Try
|
||||
import util.control.{ NoStackTrace, NonFatal }
|
||||
import akka.dispatch.ThreadPoolConfig
|
||||
import akka.remote.transport.AssociationHandle.HandleEventListener
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
object NettyTransportSettings {
|
||||
sealed trait Mode
|
||||
|
|
@ -209,7 +208,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
|
|||
|
||||
final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]()
|
||||
|
||||
val channelGroup = new DefaultDisposableChannelGroup("akka-netty-transport-driver-channelgroup-" +
|
||||
/*
|
||||
* Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap.
|
||||
* In the old remoting this was handled by using a custom subclass, guarding the close() method with a write-lock.
|
||||
* The usage of this class is safe in the new remoting, as close() is called after unbind() is finished, and no
|
||||
* outbound connections are initiated in the shutdown phase.
|
||||
*/
|
||||
val channelGroup = new DefaultChannelGroup("akka-netty-transport-driver-channelgroup-" +
|
||||
uniqueIdCounter.getAndIncrement)
|
||||
|
||||
private val clientChannelFactory: ChannelFactory = TransportMode match {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package akka.remote
|
|||
|
||||
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
|
||||
|
||||
import akka.remote.netty.NettyRemoteTransport
|
||||
import akka.actor.Actor
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.DefaultTimeout
|
||||
|
|
|
|||
|
|
@ -1,159 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote
|
||||
|
||||
import akka.testkit._
|
||||
import akka.actor._
|
||||
import com.typesafe.config._
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Await
|
||||
import scala.reflect.classTag
|
||||
import akka.pattern.ask
|
||||
import akka.event.Logging
|
||||
|
||||
object RemoteCommunicationSpec {
|
||||
class Echo extends Actor {
|
||||
var target: ActorRef = context.system.deadLetters
|
||||
|
||||
def receive = {
|
||||
case (p: Props, n: String) ⇒ sender ! context.actorOf(Props[Echo], n)
|
||||
case ex: Exception ⇒ throw ex
|
||||
case s: String ⇒ sender ! context.actorFor(s)
|
||||
case x ⇒ target = sender; sender ! x
|
||||
}
|
||||
|
||||
override def preStart() {}
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) {
|
||||
target ! "preRestart"
|
||||
}
|
||||
override def postRestart(cause: Throwable) {}
|
||||
override def postStop() {
|
||||
target ! "postStop"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class RemoteCommunicationSpec extends AkkaSpec("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.transport = "akka.remote.netty.NettyRemoteTransport"
|
||||
remote.netty {
|
||||
hostname = localhost
|
||||
port = 0
|
||||
}
|
||||
}
|
||||
""") with ImplicitSender with DefaultTimeout {
|
||||
|
||||
import RemoteCommunicationSpec._
|
||||
|
||||
val other = ActorSystem("remote-sys", system.settings.config)
|
||||
|
||||
val localAddr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||
val remoteAddr = other.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||
|
||||
val deploys = Seq(
|
||||
Deploy("/blub", scope = RemoteScope(remoteAddr)),
|
||||
Deploy("/looker/child", scope = RemoteScope(remoteAddr)),
|
||||
Deploy("/looker/child/grandchild", scope = RemoteScope(localAddr)))
|
||||
|
||||
def deploy(sys: ActorSystem, d: Deploy) {
|
||||
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
|
||||
}
|
||||
|
||||
for (d ← deploys) {
|
||||
deploy(system, d)
|
||||
deploy(other, d)
|
||||
}
|
||||
|
||||
val remote = other.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "ping" ⇒ sender ! (("pong", sender))
|
||||
}
|
||||
}), "echo")
|
||||
|
||||
val here = system.actorFor(RootActorPath(remoteAddr) / "user" / "echo")
|
||||
|
||||
override def afterTermination() {
|
||||
other.shutdown()
|
||||
}
|
||||
|
||||
"Remoting" must {
|
||||
|
||||
"support remote look-ups" in {
|
||||
here ! "ping"
|
||||
expectMsgPF() {
|
||||
case ("pong", s: AnyRef) if s eq testActor ⇒ true
|
||||
}
|
||||
}
|
||||
|
||||
"send error message for wrong address" in {
|
||||
val old = other.eventStream.logLevel
|
||||
other.eventStream.setLogLevel(Logging.DebugLevel)
|
||||
EventFilter.debug(start = "dropping", occurrences = 1).intercept {
|
||||
system.actorFor(RootActorPath(remoteAddr.copy(system = "remotesys")) / "user" / "echo") ! "ping"
|
||||
}(other)
|
||||
other.eventStream.setLogLevel(old)
|
||||
}
|
||||
|
||||
"support ask" in {
|
||||
Await.result(here ? "ping", timeout.duration) match {
|
||||
case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good
|
||||
case m ⇒ fail(m + " was not (pong, AskActorRef)")
|
||||
}
|
||||
}
|
||||
|
||||
"send dead letters on remote if actor does not exist" in {
|
||||
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
|
||||
system.actorFor(RootActorPath(remoteAddr) / "does" / "not" / "exist") ! "buh"
|
||||
}(other)
|
||||
}
|
||||
|
||||
"create and supervise children on remote node" in {
|
||||
val r = system.actorOf(Props[Echo], "blub")
|
||||
r.path.toString must be === s"akka://remote-sys@localhost:${remoteAddr.port.get}/remote/akka/RemoteCommunicationSpec@localhost:${localAddr.port.get}/user/blub"
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
r ! new Exception("crash")
|
||||
}(other)
|
||||
expectMsg("preRestart")
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
system.stop(r)
|
||||
expectMsg("postStop")
|
||||
}
|
||||
|
||||
"look-up actors across node boundaries" in {
|
||||
val l = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n)
|
||||
case s: String ⇒ sender ! context.actorFor(s)
|
||||
}
|
||||
}), "looker")
|
||||
l ! (Props[Echo], "child")
|
||||
val r = expectMsgType[ActorRef]
|
||||
r ! (Props[Echo], "grandchild")
|
||||
val remref = expectMsgType[ActorRef]
|
||||
remref.asInstanceOf[ActorRefScope].isLocal must be(true)
|
||||
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
|
||||
myref.isInstanceOf[RemoteActorRef] must be(true)
|
||||
myref ! 43
|
||||
expectMsg(43)
|
||||
lastSender must be theSameInstanceAs remref
|
||||
r.asInstanceOf[RemoteActorRef].getParent must be(l)
|
||||
system.actorFor("/user/looker/child") must be theSameInstanceAs r
|
||||
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
|
||||
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
|
||||
}
|
||||
|
||||
"not fail ask across node boundaries" in {
|
||||
import system.dispatcher
|
||||
val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo classTag[(String, ActorRef)]
|
||||
Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -8,7 +8,6 @@ import language.postfixOps
|
|||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import scala.concurrent.duration._
|
||||
import akka.remote.netty.NettyRemoteTransport
|
||||
import akka.util.Helpers
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -33,44 +32,6 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
LogRemoteLifeCycleEvents must be(true)
|
||||
}
|
||||
|
||||
"be able to parse Netty config elements" ignore {
|
||||
val settings =
|
||||
system.asInstanceOf[ExtendedActorSystem]
|
||||
.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
.transport.asInstanceOf[NettyRemoteTransport]
|
||||
.settings
|
||||
import settings._
|
||||
|
||||
BackoffTimeout must be(Duration.Zero)
|
||||
SecureCookie must be(None)
|
||||
RequireCookie must be(false)
|
||||
UsePassiveConnections must be(true)
|
||||
Hostname must not be "" // will be set to the local IP
|
||||
PortSelector must be(0)
|
||||
OutboundLocalAddress must be(None)
|
||||
MessageFrameSize must be(1048576)
|
||||
ConnectionTimeout must be(2 minutes)
|
||||
Backlog must be(4096)
|
||||
ReuseAddress must be(!Helpers.isWindows)
|
||||
ExecutionPoolKeepalive must be(1 minute)
|
||||
ExecutionPoolSize must be(4)
|
||||
MaxChannelMemorySize must be(0)
|
||||
MaxTotalMemorySize must be(0)
|
||||
ReconnectDelay must be(5 seconds)
|
||||
ReadTimeout must be(0 millis)
|
||||
WriteTimeout must be(10 seconds)
|
||||
AllTimeout must be(0 millis)
|
||||
ReconnectionTimeWindow must be(10 minutes)
|
||||
WriteBufferHighWaterMark must be(None)
|
||||
WriteBufferLowWaterMark must be(None)
|
||||
SendBufferSize must be(None)
|
||||
ReceiveBufferSize must be(None)
|
||||
ServerSocketWorkerPoolSize must be >= (2)
|
||||
ServerSocketWorkerPoolSize must be <= (8)
|
||||
ClientSocketWorkerPoolSize must be >= (2)
|
||||
ClientSocketWorkerPoolSize must be <= (8)
|
||||
}
|
||||
|
||||
"contain correct configuration values in reference.conf" in {
|
||||
val c = system.asInstanceOf[ExtendedActorSystem].
|
||||
provider.asInstanceOf[RemoteActorRefProvider].
|
||||
|
|
|
|||
|
|
@ -13,12 +13,12 @@ import scala.reflect.classTag
|
|||
import akka.pattern.ask
|
||||
import java.io.File
|
||||
import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController }
|
||||
import akka.remote.netty.{ NettySettings, NettySSLSupport }
|
||||
import javax.net.ssl.SSLException
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import akka.event.{ Logging, NoLogging, LoggingAdapter }
|
||||
import akka.remote.transport.netty.{ SSLSettings, NettySSLSupport }
|
||||
|
||||
object Configuration {
|
||||
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
|
||||
|
|
@ -33,14 +33,19 @@ object Configuration {
|
|||
filter-leeway = 10s
|
||||
default-timeout = 10s
|
||||
}
|
||||
remote.transport = "akka.remote.netty.NettyRemoteTransport"
|
||||
remote.netty {
|
||||
|
||||
remoting.enabled-transports = ["ssl"]
|
||||
|
||||
remoting.transports.ssl {
|
||||
hostname = localhost
|
||||
port = %d
|
||||
ssl {
|
||||
enable = on
|
||||
trust-store = "%s"
|
||||
key-store = "%s"
|
||||
key-store-password = "changeme"
|
||||
trust-store-password = "changeme"
|
||||
protocol = "TLSv1"
|
||||
random-number-generator = "%s"
|
||||
enabled-algorithms = [%s]
|
||||
sha1prng-random-source = "/dev/./urandom"
|
||||
|
|
@ -57,24 +62,24 @@ object Configuration {
|
|||
//if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled")
|
||||
|
||||
val config = ConfigFactory.parseString(conf.format(localPort, trustStore, keyStore, cipher, enabled.mkString(", ")))
|
||||
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty")
|
||||
val settings = new NettySettings(fullConfig, "placeholder")
|
||||
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remoting.transports.ssl.ssl")
|
||||
val settings = new SSLSettings(fullConfig)
|
||||
|
||||
val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SslSettings.SSLRandomNumberGenerator,
|
||||
settings.SslSettings.SSLRandomSource, NoLogging)
|
||||
val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator,
|
||||
settings.SSLRandomSource, NoLogging)
|
||||
|
||||
rng.nextInt() // Has to work
|
||||
settings.SslSettings.SSLRandomNumberGenerator foreach {
|
||||
settings.SSLRandomNumberGenerator foreach {
|
||||
sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng))
|
||||
}
|
||||
|
||||
val engine = NettySSLSupport.initializeClientSSL(settings.SslSettings, NoLogging).getEngine
|
||||
val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine
|
||||
val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet
|
||||
val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet
|
||||
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
|
||||
gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled))
|
||||
engine.getSupportedProtocols.contains(settings.SslSettings.SSLProtocol.get) ||
|
||||
(throw new IllegalArgumentException("Protocol not supported: " + settings.SslSettings.SSLProtocol.get))
|
||||
engine.getSupportedProtocols.contains(settings.SSLProtocol.get) ||
|
||||
(throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get))
|
||||
|
||||
CipherConfig(true, config, cipher, localPort, remotePort)
|
||||
} catch {
|
||||
|
|
@ -119,11 +124,9 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten
|
|||
|
||||
implicit val timeout: Timeout = Timeout(10 seconds)
|
||||
|
||||
import RemoteCommunicationSpec._
|
||||
|
||||
lazy val other: ActorSystem = ActorSystem(
|
||||
"remote-sys",
|
||||
ConfigFactory.parseString("akka.remote.netty.port=" + cipherConfig.remotePort).withFallback(system.settings.config))
|
||||
ConfigFactory.parseString("akka.remoting.transports.ssl.port = " + cipherConfig.remotePort).withFallback(system.settings.config))
|
||||
|
||||
override def afterTermination() {
|
||||
if (cipherConfig.runTest) {
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@ import akka.testkit._
|
|||
import akka.actor._
|
||||
import com.typesafe.config._
|
||||
import scala.concurrent.duration._
|
||||
import akka.remote.netty.{ SSLSettings, NettyRemoteTransport }
|
||||
import java.util.ArrayList
|
||||
import akka.remote.transport.netty.SSLSettings
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978ConfigSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue