Added registration of remote actors in declarative supervisor config + Fixed bug in remote client reconnect + Added Redis as backend for Chat sample + Added UUID utility + Misc minor other fixes

This commit is contained in:
Jonas Bonér 2009-12-30 08:36:24 +01:00
parent fb98c64c0c
commit c4a78fb357
18 changed files with 157 additions and 88 deletions

View file

@ -32,16 +32,6 @@
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-nodeps-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>

View file

@ -14,9 +14,7 @@ import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging}
import org.codehaus.aspectwerkz.proxy.Uuid
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
import org.multiverse.api.ThreadLocalTransaction._
@ -238,7 +236,7 @@ trait Actor extends TransactionManagement {
implicit protected val transactionFamily: String = this.getClass.getName
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = Uuid.newUuid.toString
private[akka] var _uuid = UUID.newUuid.toString
def uuid = _uuid
@ -291,7 +289,7 @@ trait Actor extends TransactionManagement {
* Identifier for actor, does not have to be a unique one.
* Default is the class name.
*
* This field is used for logging, AspectRegistry.actorsFor etc.
* This field is used for logging, AspectRegistry.actorsFor, identifier for remote actor in RemoteServer etc.
* But also as the identifier for persistence, which means that you can
* use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
@ -817,8 +815,8 @@ trait Actor extends TransactionManagement {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
.setUuid(uuid)
.setTimeout(this.timeout)
.setUuid(this.id)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
@ -862,8 +860,8 @@ trait Actor extends TransactionManagement {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
.setUuid(uuid)
.setTimeout(this.timeout)
.setUuid(this.id)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)

View file

@ -9,6 +9,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.util.Helpers._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.remote.RemoteServer
import java.util.concurrent.ConcurrentHashMap
@ -116,6 +117,9 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
actors.put(actor.getClass.getName, actor)
actor.lifeCycle = Some(lifeCycle)
startLink(actor)
remoteAddress.foreach(address => println("----- ADDING actor for " + address.get.hostname + " - " + address.get.port))
remoteAddress.foreach(address => println("----- " + RemoteServer.Address(address.hostname, address.port).hashCode))
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.id, actor))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start

View file

@ -6,8 +6,9 @@ package se.scalablesolutions.akka.config
import com.google.inject._
import ScalaConfig._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher}
import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
//import org.apache.camel.impl.{DefaultCamelContext}
@ -50,8 +51,8 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
val (proxy, targetInstance, component) =
activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException(
"Class [" + clazz.getName + "] has not been put under supervision " +
"(by passing in the config to the 'configure' and then invoking 'supervise') method"))
"Class [" + clazz.getName + "] has not been put under supervision" +
"\n(by passing in the config to the 'configure' and then invoking 'supervise') method"))
injector.injectMembers(targetInstance)
proxy.asInstanceOf[T]
}
@ -104,11 +105,14 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(
component.remoteAddress.get.hostname, component.remoteAddress.get.port))
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
RemoteServer
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy)
}
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
@ -121,11 +125,14 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(
targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
RemoteServer
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy)
}
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)

View file

@ -54,7 +54,7 @@ object ScalaConfig {
case object Permanent extends Scope
case object Temporary extends Scope
case class RemoteAddress(hostname: String, port: Int) extends ConfigElement
case class RemoteAddress(val hostname: String, val port: Int) extends ConfigElement
class Component(_intf: Class[_],
val target: Class[_],

View file

@ -85,13 +85,12 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val timer = new HashedWheelTimer
private val remoteAddress = new InetSocketAddress(hostname, port)
private[remote] var connection: ChannelFuture = _
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer))
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
private var connection: ChannelFuture = _
def connect = synchronized {
if (!isRunning) {
connection = bootstrap.connect(remoteAddress)
@ -99,9 +98,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
// Wait until the connection attempt succeeds or fails.
connection.awaitUninterruptibly
if (!connection.isSuccess) {
log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
}
if (!connection.isSuccess) log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
isRunning = true
}
}
@ -148,7 +145,8 @@ class RemoteClientPipelineFactory(name: String,
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
timer: HashedWheelTimer) extends ChannelPipelineFactory {
timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
@ -166,7 +164,7 @@ class RemoteClientPipelineFactory(name: String,
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer))
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client))
pipeline
}
}
@ -180,7 +178,8 @@ class RemoteClientHandler(val name: String,
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
val timer: HashedWheelTimer)
val timer: HashedWheelTimer,
val client: RemoteClient)
extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self
@ -225,7 +224,11 @@ class RemoteClientHandler(val name: String,
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
log.debug("Remote client reconnecting to [%s]", remoteAddress)
bootstrap.connect(remoteAddress)
client.connection = bootstrap.connect(remoteAddress)
// Wait until the connection attempt succeeds or fails.
client.connection.awaitUninterruptibly
if (!client.connection.isSuccess) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS)
}

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.remote
import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util._
@ -62,6 +63,41 @@ object RemoteServer {
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
level
}
object Address {
def apply(hostname: String, port: Int) = new Address(hostname, port)
}
class Address(val hostname: String, val port: Int) {
override def hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, hostname)
result = HashCode.hash(result, port)
result
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Address] &&
that.asInstanceOf[Address].hostname == hostname &&
that.asInstanceOf[Address].port == port
}
}
class RemoteActorSet {
val actors = new ConcurrentHashMap[String, Actor]
val activeObjects = new ConcurrentHashMap[String, AnyRef]
}
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
val set = remoteActorSets.get(remoteServerAddress)
if (set ne null) set
else {
val remoteActorSet = new RemoteActorSet
remoteActorSets.put(remoteServerAddress, remoteActorSet)
remoteActorSet
}
}
}
/**
@ -96,7 +132,7 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-server")
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
def start: Unit = start(None)
@ -110,7 +146,12 @@ class RemoteServer extends Logging {
hostname = _hostname
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, openChannels, loader))
println("======= ADDING actor for " + hostname + " - " + port)
println("======= " + RemoteServer.Address(hostname, port).hashCode)
println("======= " + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.size)
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
@ -135,9 +176,11 @@ class RemoteServer extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(
name: String,
openChannels: ChannelGroup,
loader: Option[ClassLoader]) extends ChannelPipelineFactory {
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val actors: JMap[String, Actor],
val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
@ -156,7 +199,7 @@ class RemoteServerPipelineFactory(
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder)
pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader))
pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader, actors, activeObjects))
pipeline
}
}
@ -167,13 +210,12 @@ class RemoteServerPipelineFactory(
@ChannelPipelineCoverage {val value = "all"}
class RemoteServerHandler(
val name: String,
openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val actors: JMap[String, Actor],
val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**

View file

@ -6,11 +6,10 @@ package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.UUID
import org.multiverse.datastructures.refs.manual.Ref;
import org.codehaus.aspectwerkz.proxy.Uuid
/**
* Example Scala usage:
* <pre>
@ -75,7 +74,7 @@ object TransactionalRef {
class TransactionalRef[T] extends Transactional {
implicit val txInitName = "TransactionalRef:Init"
import org.multiverse.api.ThreadLocalTransaction._
val uuid = Uuid.newUuid.toString
val uuid = UUID.newUuid.toString
private[this] val ref: Ref[T] = atomic { new Ref }
@ -127,7 +126,7 @@ object TransactionalMap {
*/
class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
protected[this] val ref = TransactionalRef[HashTrie[K, V]]
val uuid = Uuid.newUuid.toString
val uuid = UUID.newUuid.toString
ref.swap(new HashTrie[K, V])
@ -179,7 +178,7 @@ object TransactionalVector {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
val uuid = Uuid.newUuid.toString
val uuid = UUID.newUuid.toString
private[this] val ref = TransactionalRef[Vector[T]]

View file

@ -65,7 +65,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
String str = conf.getInstance(String.class);
fail("exception should have been thrown");
} catch (Exception e) {
assertEquals("Class [java.lang.String] has not been put under supervision (by passing in the config to the 'configure' and then invoking 'supervise') method", e.getMessage());
assertEquals(IllegalStateException.class, e.getClass());
}
}

View file

@ -47,6 +47,11 @@
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-persistence-redis</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-comet</artifactId>
<groupId>${project.groupId}</groupId>

View file

@ -20,6 +20,11 @@
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-persistence-redis</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View file

@ -5,10 +5,12 @@
package se.scalablesolutions.akka.sample.chat
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor}
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.{OneForOneStrategy}
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.state.RedisStorage
import scala.collection.mutable.HashMap
@ -69,17 +71,23 @@ class Session(user: String, storage: Actor) extends Actor {
*/
class Storage extends Actor {
lifeCycle = Some(LifeCycle(Permanent))
private var chatLog: List[String] = Nil
log.info("Chat storage is starting up...")
private val chatLog = RedisStorage.getVector("akka.chat.log")
log.info("Redis-based chat storage is starting up...")
def receive = {
case msg @ ChatMessage(from, message) =>
log.debug("New chat message [%s]", message)
chatLog ::= message
atomic {
chatLog + message.getBytes("UTF-8")
}
case GetChatLog(_) =>
reply(ChatLog(chatLog.reverse))
val messageList = atomic {
chatLog.map(bytes => new String(bytes, "UTF-8")).toList
}
reply(ChatLog(messageList))
}
}
@ -159,23 +167,6 @@ trait ChatServer extends Actor {
*/
object ChatService extends ChatServer with SessionManagement with ChatManagement
/**
* Boot class for running the ChatService in the Akka microkernel.
* <p/>
* Configures supervision of the ChatService for fault-tolerance.
*/
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
ChatService,
LifeCycle(Permanent),
RemoteAddress("localhost", 9999))
:: Nil))
factory.newInstance.start
}
/**
* Test runner emulating a chat session.
*/

View file

@ -20,6 +20,16 @@
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-nodeps-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>

View file

@ -53,9 +53,13 @@ object Config extends Logging {
log.info("Config loaded from the application classpath.")
} catch {
case e: ParseException => throw new IllegalStateException(
"'$AKKA_HOME/config/akka.conf' could not be found" +
"\n\tand no 'akka.conf' can be found on the classpath - aborting." +
"\n\tEither add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
"Can't find 'akka.conf' configuration file." +
"One of the three ways of locating the 'akka.conf' file needs to be defined:" +
"\n\t1. Define '$AKKA_HOME' to the root of the Akka distribution." +
"\n\t2. Define the '-Dakka.config=...' environment option." +
"\n\t3. Put the 'akka.conf' file on the classpath." +
"\n\tI have no way of finding the 'akka.conf' configuration file." +
"\nAborting.")
}
}
Configgy.config

View file

@ -0,0 +1,12 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
/**
* Factory object for very fast UUID generation.
*/
object UUID {
def newUuid: Long = org.codehaus.aspectwerkz.proxy.Uuid.newUuid
}

View file

@ -21,8 +21,7 @@
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.java.Boot",
"sample.scala.Boot",
"se.scalablesolutions.akka.security.samples.Boot",
"se.scalablesolutions.akka.sample.chat.Boot"]
"se.scalablesolutions.akka.security.samples.Boot"]
<actor>
timeout = 5000 # default timeout for future based invocations