Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-03-29 09:37:41 +02:00
commit 8c24a985b1
10 changed files with 464 additions and 88 deletions

View file

@ -14,7 +14,10 @@ package object actor {
ref.asInstanceOf[ActorRef]
type Uuid = com.eaio.uuid.UUID
def newUuid(): Uuid = new Uuid()
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode)
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode)
def uuidFrom(uuid: String): Uuid = new Uuid(uuid)
}

View file

@ -6,18 +6,14 @@ package akka.actor
import akka.event.EventHandler
import akka.dispatch._
import akka.config.Config._
import akka.config.Supervision._
import akka.AkkaException
import akka.util._
import ReflectiveAccess._
import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map => JMap }
import java.lang.reflect.Field
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
@ -39,7 +35,7 @@ private[akka] object ActorRefInternals {
* Abstraction for unification of sender and senderFuture for later reply
*/
abstract class Channel[T] {
/**
* Sends the specified message to the channel
* Scala API
@ -943,7 +939,7 @@ class LocalActorRef private[akka] (
performRestart
true
} catch {
case e =>
case e =>
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry
} finally {
@ -1009,7 +1005,7 @@ class LocalActorRef private[akka] (
private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
EventHandler.error(reason, this, message.toString)
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@ -1121,9 +1117,9 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = Actor.remote.send[T](
message, senderOption, senderFuture,
homeAddress.get, timeout,
false, this, None,
message, senderOption, senderFuture,
homeAddress.get, timeout,
false, this, None,
actorType, loader)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
@ -1201,8 +1197,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
*/
def id: String
def id_=(id: String): Unit
def id_=(id: String): Unit
/**
* User overridable callback/setting.
* <p/>

View file

@ -0,0 +1,44 @@
package akka.remoteinterface
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
import akka.actor.Actor
import akka.event.EventHandler
/**
* Remote client and server event listener that pipes the events to the standard Akka EventHander.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteEventHandler extends Actor {
import EventHandler._
self.id = ID
self.dispatcher = EventHandlerDispatcher
def receive = {
// client
case RemoteClientError(cause, client, address) => EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString)
case RemoteClientWriteFailed(request, cause, client, address) => EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString))
case RemoteClientDisconnected(client, address) => EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString)
case RemoteClientConnected(client, address) => EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString)
case RemoteClientStarted(client, address) => EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString)
case RemoteClientShutdown(client, address) => EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString)
// server
case RemoteServerError(cause, server) => EventHandler.error(cause, server, "RemoteServerError")
case RemoteServerWriteFailed(request, cause, server, clientAddress) => EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString))
case RemoteServerStarted(server) => EventHandler.info(server, "RemoteServerStarted")
case RemoteServerShutdown(server) => EventHandler.info(server, "RemoteServerShutdown")
case RemoteServerClientConnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientDisconnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientClosed(server, clientAddress) => EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString)
case _ => //ignore other
}
}

View file

@ -5,14 +5,16 @@
package akka.remoteinterface
import akka.japi.Creator
import java.net.InetSocketAddress
import akka.actor._
import akka.util._
import akka.dispatch.CompletableFuture
import akka.config.Config.{config, TIME_UNIT}
import java.util.concurrent.ConcurrentHashMap
import akka.AkkaException
import reflect.BeanProperty
import scala.reflect.BeanProperty
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import java.io.{PrintWriter, PrintStream}
trait RemoteModule {
val UUID_PREFIX = "uuid:"
@ -20,7 +22,6 @@ trait RemoteModule {
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit
private[akka] def actors: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef]
@ -28,7 +29,6 @@ trait RemoteModule {
private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef]
/** Lookup methods **/
private[akka] def findActorById(id: String) : ActorRef = actors.get(id)
@ -114,25 +114,38 @@ case class RemoteServerWriteFailed(
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
class RemoteClientException private[akka] (message: String,
@BeanProperty val client: RemoteClientModule,
val remoteAddress: InetSocketAddress) extends AkkaException(message)
class RemoteClientException private[akka] (
message: String,
@BeanProperty val client: RemoteClientModule,
val remoteAddress: InetSocketAddress) extends AkkaException(message)
/**
* Returned when a remote exception cannot be instantiated or parsed
* Returned when a remote exception sent over the wire cannot be loaded and instantiated
*/
case class UnparsableException private[akka] (originalClassName: String,
originalMessage: String) extends AkkaException(originalMessage)
case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String)
extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]"
.format(cause.toString, originalClassName, originalMessage)) {
override def printStackTrace = cause.printStackTrace
override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream)
override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter)
}
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
lazy val eventHandler: ActorRef = {
val handler = Actor.actorOf[RemoteEventHandler].start
// add the remote client and server listener that pipes the events to the event handler system
addListener(handler)
handler
}
def shutdown {
removeListener(eventHandler)
this.shutdownClientModule
this.shutdownServerModule
clear
}
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor

View file

@ -5,46 +5,40 @@
package akka.remote.netty
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator
import akka.config.Config._
import akka.remoteinterface._
import akka.actor.{PoisonPill, Index,
ActorInitializationException, LocalActorRef, newUuid,
ActorRegistry, Actor, RemoteActorRef,
import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException
import akka.event.EventHandler
import akka.actor.Actor._
import akka.config.Config._
import akka.util._
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.event.EventHandler
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap}
import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.{ HashMap }
import scala.reflect.BeanProperty
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.{ SocketAddress, InetSocketAddress }
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean}
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent._
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -157,29 +151,50 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
/**
* This is the abstract baseclass for netty remote clients,
* currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
* This is the abstract baseclass for netty remote clients, currently there's only an
* ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
* reuses an already established connection.
*/
abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort
val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true)
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
private[remote] val runSwitch = new Switch()
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
protected def notifyListeners(msg: => Any); Unit
protected def currentChannel: Channel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
def shutdown: Boolean
/**
* Returns an array with the current pending messages not yet delivered.
*/
def pendingMessages: Array[Any] = {
var messages = Vector[Any]()
val iter = pendingRequests.iterator
while (iter.hasNext) {
val (_, _, message) = iter.next
messages = messages :+ MessageSerializer.deserialize(message.getMessage)
}
messages.toArray
}
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
@ -213,37 +228,52 @@ abstract class RemoteClient private[akka] (
* Sends the message across the wire
*/
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
try {
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
}
} catch {
case e: Throwable =>
// add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress))
if (useTransactionLog) pendingRequests.add((true, null, request))
else throw e
}
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
futures.remove(futureUuid) //Clean this up
//We don't care about that right now
} else if (!future.isSuccess) {
val f = futures.remove(futureUuid) //Clean this up
if (f ne null)
f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
}
})
Some(futureResult)
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
def handleRequestReplyError(future: ChannelFuture) = {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
if (useTransactionLog) {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
} else {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
}
}
var future: ChannelFuture = null
try {
// try to send the original one
future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) handleRequestReplyError(future)
} catch {
case e: Exception => handleRequestReplyError(future)
}
Some(futureResult)
}
} else {
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress)
@ -252,6 +282,34 @@ abstract class RemoteClient private[akka] (
}
}
private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log
val nrOfMessages = pendingRequests.size
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
var pendingRequest = pendingRequests.peek
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) { // sendOneWay
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
throw future.getCause
}
} else { // sendRequestReply
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
}
pendingRequests.remove(pendingRequest)
pendingRequest = pendingRequests.peek // try to grab next message
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
@ -272,6 +330,7 @@ class ActiveRemoteClient private[akka] (
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile private var bootstrap: ClientBootstrap = _
@volatile private[remote] var connection: ChannelFuture = _
@ -342,6 +401,7 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources
bootstrap = null
connection = null
pendingRequests.clear
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -456,8 +516,16 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
try {
if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
} catch {
case e: Throwable =>
EventHandler.error(e, this, e.getMessage)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
}
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@ -486,7 +554,7 @@ class ActiveRemoteClientHandler(
} catch {
case problem: Throwable =>
EventHandler.error(problem, this, problem.getMessage)
UnparsableException(classname, exception.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
}
@ -495,7 +563,8 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
//Needed for remote testing and switching on/off under run
// Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
@ -564,7 +633,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.releaseExternalResources
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
}
}
@ -597,7 +666,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
notifyListeners(RemoteServerError(e, this))
}
@ -988,7 +1057,7 @@ class RemoteServerHandler(
write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
}
@ -1206,4 +1275,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
}
}
}
}

View file

@ -1,13 +1,13 @@
package akka.actor.remote
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.remote.netty.NettyRemoteSupport
import akka.actor. {Actor, ActorRegistry}
import java.util.concurrent. {TimeUnit, CountDownLatch}
import org.scalatest.{Spec, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach}
import java.util.concurrent.atomic.AtomicBoolean
object AkkaRemoteTest {
class ReplyHandlerActor(latch: CountDownLatch, expect: String) extends Actor {
@ -59,4 +59,97 @@ class AkkaRemoteTest extends
/* Utilities */
def replyHandler(latch: CountDownLatch, expect: String) = Some(Actor.actorOf(new ReplyHandlerActor(latch, expect)).start)
}
}
trait NetworkFailureTest { self: WordSpec =>
import akka.actor.Actor._
import akka.util.Duration
// override is subclass if needed
val BYTES_PER_SECOND = "60KByte/s"
val DELAY_MILLIS = "350ms"
val PORT_RANGE = "1024-65535"
// FIXME add support for TCP FIN by hooking into Netty and do socket.close
def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = {
spawn {
try {
enableTcpReset()
println("===>>> Reply with [TCP RST] for [" + duration + "]")
Thread.sleep(duration.toMillis)
restoreIP
} catch {
case e =>
dead.set(true)
e.printStackTrace
}
}
}
def throttleNetworkFor(duration: Duration, dead: AtomicBoolean) = {
spawn {
try {
enableNetworkThrottling()
println("===>>> Throttling network with [" + BYTES_PER_SECOND + ", " + DELAY_MILLIS + "] for [" + duration + "]")
Thread.sleep(duration.toMillis)
restoreIP
} catch {
case e =>
dead.set(true)
e.printStackTrace
}
}
}
def dropNetworkFor(duration: Duration, dead: AtomicBoolean) = {
spawn {
try {
enableNetworkDrop()
println("===>>> Blocking network [TCP DENY] for [" + duration + "]")
Thread.sleep(duration.toMillis)
restoreIP
} catch {
case e =>
dead.set(true)
e.printStackTrace
}
}
}
def sleepFor(duration: Duration) = {
println("===>>> Sleeping for [" + duration + "]")
Thread sleep (duration.toMillis)
}
def enableNetworkThrottling() = {
restoreIP()
assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0)
assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0)
assert(new ProcessBuilder("sudo", "ipfw", "pipe", "1", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0)
assert(new ProcessBuilder("sudo", "ipfw", "pipe", "2", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0)
}
def enableNetworkDrop() = {
restoreIP()
assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0)
}
def enableTcpReset() = {
restoreIP()
assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0)
}
def restoreIP() = {
println("===>>> Restoring network")
assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "1").start.waitFor == 0)
assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "2").start.waitFor == 0)
assert(new ProcessBuilder("sudo", "ipfw", "flush").start.waitFor == 0)
assert(new ProcessBuilder("sudo", "ipfw", "pipe", "flush").start.waitFor == 0)
}
def validateSudo() = {
println("===>>> Validating sudo")
assert(new ProcessBuilder("sudo", "-v").start.waitFor == 0)
}
}

View file

@ -0,0 +1,119 @@
package akka.actor.remote
import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.actor.Actor._
import akka.actor.{ActorRef, Actor}
import akka.util.duration._
import java.util.concurrent.atomic.AtomicBoolean
object RemoteErrorHandlingNetworkTest {
case class Send(actor: ActorRef)
class RemoteActorSpecActorUnidirectional extends Actor {
self.id = "network-drop:unidirectional"
def receive = {
case "Ping" => self.reply_?("Pong")
}
}
class Decrementer extends Actor {
def receive = {
case "done" => self.reply_?(false)
case i: Int if i > 0 =>
self.reply_?(i - 1)
case i: Int =>
self.reply_?(0)
this become {
case "done" => self.reply_?(true)
case _ => //Do Nothing
}
}
}
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
class RemoteActorSpecActorAsyncSender(latch: CountDownLatch) extends Actor {
def receive = {
case Send(actor: ActorRef) =>
actor ! "Hello"
case "World" => latch.countDown
}
}
}
class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureTest {
import RemoteErrorHandlingNetworkTest._
"Remote actors" should {
"be able to recover from network drop without loosing any messages" in {
validateSudo()
val latch = new CountDownLatch(10)
implicit val sender = replyHandler(latch, "Pong")
val service = actorOf[RemoteActorSpecActorUnidirectional]
remote.register(service.id, service)
val actor = remote.actorFor(service.id, 5000L, host, port)
actor ! "Ping"
actor ! "Ping"
actor ! "Ping"
actor ! "Ping"
actor ! "Ping"
val dead = new AtomicBoolean(false)
dropNetworkFor (10 seconds, dead) // drops the network - in another thread - so async
sleepFor (2 seconds) // wait until network drop is done before sending the other messages
try { actor ! "Ping" } catch { case e => () } // queue up messages
try { actor ! "Ping" } catch { case e => () } // ...
try { actor ! "Ping" } catch { case e => () } // ...
try { actor ! "Ping" } catch { case e => () } // ...
try { actor ! "Ping" } catch { case e => () } // ...
latch.await(15, TimeUnit.SECONDS) must be (true) // network should be restored and the messages delivered
dead.get must be (false)
}
"be able to recover from TCP RESET without loosing any messages" in {
validateSudo()
val latch = new CountDownLatch(10)
implicit val sender = replyHandler(latch, "Pong")
val service = actorOf[RemoteActorSpecActorUnidirectional]
remote.register(service.id, service)
val actor = remote.actorFor(service.id, 5000L, host, port)
actor ! "Ping"
actor ! "Ping"
actor ! "Ping"
actor ! "Ping"
actor ! "Ping"
val dead = new AtomicBoolean(false)
replyWithTcpResetFor (10 seconds, dead)
sleepFor (2 seconds)
try { actor ! "Ping" } catch { case e => () } // queue up messages
try { actor ! "Ping" } catch { case e => () } // ...
try { actor ! "Ping" } catch { case e => () } // ...
try { actor ! "Ping" } catch { case e => () } // ...
try { actor ! "Ping" } catch { case e => () } // ...
latch.await(15, TimeUnit.SECONDS) must be (true)
dead.get must be (false)
}
/*
"sendWithBangAndGetReplyThroughSenderRef" in {
remote.register(actorOf[RemoteActorSpecActorBidirectional])
implicit val timeout = 500000000L
val actor = remote.actorFor(
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port)
val latch = new CountDownLatch(1)
val sender = actorOf( new RemoteActorSpecActorAsyncSender(latch) ).start
sender ! Send(actor)
latch.await(1, TimeUnit.SECONDS) must be (true)
}
*/
}
}

View file

@ -11,7 +11,7 @@ akka {
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
time-unit = "seconds" # Time unit for all timeout properties throughout the config
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
@ -146,6 +146,7 @@ akka {
}
client {
retry-message-send-on-failure = on
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576

View file

@ -344,6 +344,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val junit = Dependencies.junit
val scalatest = Dependencies.scalatest
lazy val networkTestsEnabled = systemOptional[Boolean]("akka.test.network", false)
override def testOptions = super.testOptions ++ {
if (!networkTestsEnabled.value) Seq(TestFilter(test => !test.endsWith("NetworkTest")))
else Seq.empty
}
override def bndImportPackage = "javax.transaction;version=1.1" :: super.bndImportPackage.toList
}

31
scripts/ip-mod.sh Executable file
View file

@ -0,0 +1,31 @@
#!/bin/sh
# flush rules
ipfw del pipe 1
ipfw del pipe 2
ipfw -q -f flush
ipfw -q -f pipe flush
if [ "$1" == "" ]; then
echo "Options: ip-mod.sh slow"
echo " ip-mod.sh block"
echo " ip-mod.sh reset"
echo " ip-mod.sh restore"
exit
elif [ "$1" == "restore" ]; then
echo "restoring normal network"
exit
elif [ "$1" == "slow" ]; then
# simulate slow connection <to specific hosts>
echo "enabling slow connection"
ipfw add pipe 1 ip from any to any
ipfw add pipe 2 ip from any to any
ipfw pipe 1 config bw 60KByte/s delay 350ms
ipfw pipe 2 config bw 60KByte/s delay 350ms
elif [ "$1" == "block" ]; then
echo "enabling blocked connections"
ipfw add 1 deny tcp from any to any 1024-65535
elif [ "$1" == "reset" ]; then
echo "enabling reset connections"
ipfw add 1 reset tcp from any to any 1024-65535
fi