Merge branch 'master' of git-proxy:jboner/akka into rpc_amqp

This commit is contained in:
momania 2010-08-11 15:30:01 +02:00
commit 6b8e20d599
11 changed files with 359 additions and 89 deletions

View file

@ -436,7 +436,7 @@ trait Actor extends Logging {
/**
* Is the actor able to handle the message passed in as arguments?
*/
def isDefinedAt(message: Any): Boolean = base.isDefinedAt(message)
def isDefinedAt(message: Any): Boolean = processingBehavior.isDefinedAt(message)
/** One of the fundamental methods of the ActorsModel
* Actor assumes a new behavior
@ -449,21 +449,26 @@ trait Actor extends Logging {
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def apply(msg: Any) = processingBehavior(msg)
private[akka] def base: Receive = try {
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalActorStateException(
"The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.")
}
private lazy val processingBehavior: Receive = {
lazy val defaultBehavior = receive
private val lifeCycles: Receive = {
case HotSwap(code) => become(code)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Restart(reason) => throw reason
val actorBehavior: Receive = {
case HotSwap(code) => become(code)
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Restart(reason) => throw reason
case msg if self.hotswap.isDefined &&
self.hotswap.get.isDefinedAt(msg) => self.hotswap.get.apply(msg)
case msg if self.hotswap.isEmpty &&
defaultBehavior.isDefinedAt(msg) => defaultBehavior.apply(msg)
}
actorBehavior
}
}

View file

@ -1132,11 +1132,11 @@ class LocalActorRef private[akka](
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
actor.base(message)
actor(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} else {
actor.base(message)
actor(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} catch {

View file

@ -23,18 +23,18 @@ trait Logging {
/**
* Scala SLF4J wrapper
*
* ex.
*
* Example:
* <pre>
* class Foo extends Logging {
* log.info("My foo is %s","alive")
* log.error(new Exception(),"My foo is %s","broken")
* }
* </pre>
*
* The logger uses String.format:
* http://download-llnw.oracle.com/javase/6/docs/api/java/lang/String.html#format(java.lang.String,%20java.lang.Object...)
*/
class Logger(val logger: SLFLogger)
{
class Logger(val logger: SLFLogger) {
def name = logger.getName
def trace_? = logger.isTraceEnabled
@ -44,91 +44,89 @@ class Logger(val logger: SLFLogger)
def error_? = logger.isErrorEnabled
//Trace
def trace(t: Throwable, fmt: => String, arg: Any, argN: Any*){
def trace(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
trace(t,message(fmt,arg,argN:_*))
}
def trace(t: Throwable, msg: => String){
if(trace_?) logger.trace(msg,t)
def trace(t: Throwable, msg: => String) {
if (trace_?) logger.trace(msg,t)
}
def trace(fmt: => String, arg: Any, argN: Any*){
def trace(fmt: => String, arg: Any, argN: Any*) {
trace(message(fmt,arg,argN:_*))
}
def trace(msg: => String){
if(trace_?) logger trace msg
def trace(msg: => String) {
if (trace_?) logger trace msg
}
//Debug
def debug(t: Throwable, fmt: => String, arg: Any, argN: Any*){
def debug(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
debug(t,message(fmt,arg,argN:_*))
}
def debug(t: Throwable, msg: => String){
if(debug_?) logger.debug(msg,t)
def debug(t: Throwable, msg: => String) {
if (debug_?) logger.debug(msg,t)
}
def debug(fmt: => String, arg: Any, argN: Any*){
def debug(fmt: => String, arg: Any, argN: Any*) {
debug(message(fmt,arg,argN:_*))
}
def debug(msg: => String){
if(debug_?) logger debug msg
def debug(msg: => String) {
if (debug_?) logger debug msg
}
//Info
def info(t: Throwable, fmt: => String, arg: Any, argN: Any*){
def info(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
info(t,message(fmt,arg,argN:_*))
}
def info(t: Throwable, msg: => String){
if(info_?) logger.info(msg,t)
def info(t: Throwable, msg: => String) {
if (info_?) logger.info(msg,t)
}
def info(fmt: => String, arg: Any, argN: Any*){
def info(fmt: => String, arg: Any, argN: Any*) {
info(message(fmt,arg,argN:_*))
}
def info(msg: => String){
if(info_?) logger info msg
def info(msg: => String) {
if (info_?) logger info msg
}
//Warning
def warning(t: Throwable, fmt: => String, arg: Any, argN: Any*){
def warning(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
warning(t,message(fmt,arg,argN:_*))
}
def warning(t: Throwable, msg: => String){
if(warning_?) logger.warn(msg,t)
def warning(t: Throwable, msg: => String) {
if (warning_?) logger.warn(msg,t)
}
def warning(fmt: => String, arg: Any, argN: Any*){
def warning(fmt: => String, arg: Any, argN: Any*) {
warning(message(fmt,arg,argN:_*))
}
def warning(msg: => String){
if(warning_?) logger warn msg
def warning(msg: => String) {
if (warning_?) logger warn msg
}
//Error
def error(t: Throwable, fmt: => String, arg: Any, argN: Any*){
def error(t: Throwable, fmt: => String, arg: Any, argN: Any*) {
error(t,message(fmt,arg,argN:_*))
}
def error(t: Throwable, msg: => String){
if(error_?) logger.error(msg,t)
def error(t: Throwable, msg: => String) {
if (error_?) logger.error(msg,t)
}
def error(fmt: => String, arg: Any, argN: Any*){
def error(fmt: => String, arg: Any, argN: Any*) {
error(message(fmt,arg,argN:_*))
}
def error(msg: => String){
if(error_?) logger error msg
def error(msg: => String) {
if (error_?) logger error msg
}
protected def message(fmt: String, arg: Any, argN: Any*) : String = {
if((argN eq null) || argN.isEmpty)
fmt.format(arg)
else
fmt.format((arg +: argN):_*)
if ((argN eq null) || argN.isEmpty) fmt.format(arg)
else fmt.format((arg +: argN):_*)
}
}
@ -142,17 +140,12 @@ class Logger(val logger: SLFLogger)
* val rootLogger = Logger.root
*
*/
object Logger
{
object Logger {
def apply(logger: String) : Logger = new Logger(SLFLoggerFactory getLogger logger)
def apply(clazz: Class[_]) : Logger = apply(clazz.getName)
def root : Logger = apply(SLFLogger.ROOT_LOGGER_NAME)
}
/**
* LoggableException is a subclass of Exception and can be used as the base exception
* for application specific exceptions.

View file

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- For assistance related to logback-translator or configuration -->
<!-- files in general, please contact the logback user mailing list -->
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
<!-- -->
<!-- For professional support please see -->
<!-- http://www.qos.ch/shop/products/professionalSupport -->
<!-- -->
<configuration>
<!-- Errors were reported during translation. -->
<!-- Could not find transformer for org.apache.log4j.SimpleLayout -->
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
</appender>
<logger name="se.scalablesolutions" level="INFO"/>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
</configuration>

View file

@ -49,7 +49,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
assert(SamplePojoImpl._pre)
assert(SamplePojoImpl._post)
assert(!SamplePojoImpl._down)
assert(AspectInitRegistry.initFor(obj) ne null)
// assert(AspectInitRegistry.initFor(obj) ne null)
}
}
}
@ -69,7 +69,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
assert(!SamplePojoImpl._pre)
assert(!SamplePojoImpl._post)
assert(SamplePojoImpl._down)
assert(AspectInitRegistry.initFor(obj) eq null)
// assert(AspectInitRegistry.initFor(obj) eq null)
}
}
}

View file

@ -0,0 +1,80 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import UntypedActor._
object ForwardUntypedActorSpec {
object ForwardState {
var sender: Option[UntypedActorRef] = None
}
class ReceiverUntypedActor extends UntypedActor {
//println(getClass + ":" + toString + " => " + getContext)
val latch = new CountDownLatch(1)
def onReceive(message: Any){
println(getClass.getName + " got " + message)
message match {
case "SendBang" => {
ForwardState.sender = getContext.getSender
latch.countDown
}
case "SendBangBang" => getContext.replyUnsafe("SendBangBang")
case x => throw new IllegalArgumentException("Unknown message: " + x);
}
}
}
class ForwardUntypedActor extends UntypedActor {
val receiverActor = actorOf(classOf[ReceiverUntypedActor]).start
def onReceive(message: Any){
message match {
case "SendBang" => receiverActor.forward("SendBang",getContext)
case "SendBangBang" => receiverActor.forward("SendBangBang",getContext)
}
}
}
class BangSenderUntypedActor extends UntypedActor {
val forwardActor = actorOf(classOf[ForwardUntypedActor]).start
forwardActor.sendOneWay("SendBang",getContext)
def onReceive(message: Any) = ()
}
class BangBangSenderUntypedActor extends UntypedActor {
val latch: CountDownLatch = new CountDownLatch(1)
val forwardActor = actorOf(classOf[ForwardUntypedActor]).start
(forwardActor sendRequestReply "SendBangBang") match {
case _ => latch.countDown
}
def onReceive(message: Any) = ()
}
}
class ForwardUntypedActorSpec extends JUnitSuite {
import ForwardUntypedActorSpec._
@Test
def shouldForwardUntypedActorReferenceWhenInvokingForwardOnBang {
val senderActor = actorOf(classOf[BangSenderUntypedActor])
val latch = senderActor.actorRef.actor.asInstanceOf[BangSenderUntypedActor]
.forwardActor.actorRef.actor.asInstanceOf[ForwardUntypedActor]
.receiverActor.actorRef.actor.asInstanceOf[ReceiverUntypedActor]
.latch
senderActor.start
assert(latch.await(5L, TimeUnit.SECONDS))
println(senderActor.actorRef.toString + " " + ForwardState.sender.get.actorRef.toString)
assert(ForwardState.sender ne null)
assert(senderActor.actorRef.toString === ForwardState.sender.get.actorRef.toString)
}
@Test
def shouldForwardUntypedActorReferenceWhenInvokingForwardOnBangBang {
val senderActor = actorOf(classOf[BangBangSenderUntypedActor]).start
val latch = senderActor.actorRef.actor.asInstanceOf[BangBangSenderUntypedActor].latch
assert(latch.await(1L, TimeUnit.SECONDS))
}
}

View file

@ -0,0 +1,59 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import org.multiverse.api.latches.StandardLatch
import UntypedActor._
object UntypedActorReceiveTimeoutSpec {
object Tick
class TestReceiveTimeoutActor extends UntypedActor {
val latch = new StandardLatch
def onReceive(message: Any):Unit = message match {
case ReceiveTimeout => latch.open
case Tick =>
}
}
class FiveOhOhTestReceiveTimeoutActor extends TestReceiveTimeoutActor {
getContext.setReceiveTimeout(500L)
}
}
class UntypedActorReceiveTimeoutSpec extends JUnitSuite {
import UntypedActorReceiveTimeoutSpec._
@Test def receiveShouldGetTimeout = {
val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(3, TimeUnit.SECONDS))
}
@Test def swappedReceiveShouldAlsoGetTimout = {
val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(3, TimeUnit.SECONDS))
val swappedLatch = new StandardLatch
timeoutActor sendOneWay HotSwap(Some{
case ReceiveTimeout => swappedLatch.open
})
assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
}
@Test def timeoutShouldBeCancelledAfterRegularReceive = {
val timeoutActor = actorOf(classOf[FiveOhOhTestReceiveTimeoutActor]).start
timeoutActor sendOneWay Tick
assert(timeoutActor.actorRef.actor.asInstanceOf[TestReceiveTimeoutActor].latch.tryAwait(1, TimeUnit.SECONDS) == false)
}
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
val timeoutLatch = new StandardLatch
val timeoutActor = actorOf(classOf[TestReceiveTimeoutActor]).start
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
}
}

View file

@ -1,20 +0,0 @@
# for production, you should probably set the root to INFO
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
log4j.rootLogger=INFO,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
# rolling log file ("system.log
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
# Edit the next line to point to your logs directory
log4j.appender.R.File=./logs/akka.log
log4j.logger.se.scalablesolutions=INFO

35
config/logback.xml Executable file
View file

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- For assistance related to logback-translator or configuration -->
<!-- files in general, please contact the logback user mailing list -->
<!-- at http://www.qos.ch/mailman/listinfo/logback-user -->
<!-- -->
<!-- For professional support please see -->
<!-- http://www.qos.ch/shop/products/professionalSupport -->
<!-- -->
<configuration>
<!-- Errors were reported during translation. -->
<!-- Could not find transformer for org.apache.log4j.SimpleLayout -->
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
</appender>
<appender name="R" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!--See http://logback.qos.ch/manual/appenders.html#RollingFileAppender-->
<!--and http://logback.qos.ch/manual/appenders.html#TimeBasedRollingPolicy-->
<!--for further documentation-->
<File>./logs/akka.log</File>
<encoder>
<pattern>[%4p] [%d{ISO8601}] [%t] %c{1}: %m%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>./logs/akka.log.%d{yyyy-MM-dd-HH}</fileNamePattern>
</rollingPolicy>
</appender>
<logger name="se.scalablesolutions" level="INFO"/>
<root level="INFO">
<appender-ref ref="stdout"/>
<appender-ref ref="R"/>
</root>
</configuration>

View file

@ -75,6 +75,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo)
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
// -------------------------------------------------------------------------------------------------------------------
@ -91,7 +92,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val LIFT_VERSION = "2.1-M1"
lazy val MULTIVERSE_VERSION = "0.6-SNAPSHOT"
lazy val SCALATEST_VERSION = "1.2-for-scala-2.8.0.final-SNAPSHOT"
lazy val Slf4jVersion = "1.6.0"
lazy val LOGBACK_VERSION = "0.9.24"
lazy val SPRING_VERSION = "3.0.3.RELEASE"
lazy val WerkzVersion = "2.2.1"
@ -187,8 +188,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile"
lazy val slf4j = "org.slf4j" % "slf4j-api" % Slf4jVersion % "compile"
lazy val slf4j_log4j = "org.slf4j" % "slf4j-log4j12" % Slf4jVersion % "compile"
lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile"
lazy val logback_core = "ch.qos.logback" % "logback-core" % LOGBACK_VERSION % "compile"
lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile"
lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile"
@ -265,7 +266,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
" dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version)
)
// Exclude slf4j1.5.11 from the classpath, it's conflicting...
//FIXME STILL NEEDED? => Exclude slf4j1.5.11 from the classpath, it's conflicting...
override def runClasspath = super.runClasspath +++
descendents(info.projectPath / "config", "*") ---
(super.runClasspath ** "slf4j*1.5.11.jar")
@ -348,9 +349,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val sjson = Dependencies.sjson
val werkz = Dependencies.werkz
val werkz_core = Dependencies.werkz_core
val slf4j = Dependencies.slf4j
val slf4j_log4j = Dependencies.slf4j_log4j
val log4j = Dependencies.log4j
val logback = Dependencies.logback
val logback_core = Dependencies.logback_core
// testing
val junit = Dependencies.junit

View file

@ -0,0 +1,96 @@
#! /bin/bash
#Original /etc/init.d/skeleton modified for http://mydebian.blogdns.org
# PATH should only include /usr/* if it runs after the mountnfs.sh
script
PATH=/sbin:/usr/sbin:/bin:/usr/bin
DESC="my cool akka app"
NAME="cool"
DAEMON=/usr/bin/java
export AKKA_HOME=/var/.../servers/akka
AKKA_JAR=$AKKA_HOME/akka.jar
LOG4J=$AKKA_HOME/config/log4j.properties
JVMFLAGS="-Xms512M -Xmx3072M -XX:+UseConcMarkSweepGC -
Dlog4j.configuration=file://"$LOG4J
DAEMON_ARGS=$JVMFLAGS" -jar "$AKKA_JAR
PIDFILE=/var/run/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
#the user that will run the script
USER=cool-user
VERBOSE=1
# NO NEED TO MODIFY THE LINES BELOW
# Load the VERBOSE setting and other rcS variables
. /lib/init/vars.sh
# Define LSB log_* functions.
# Depend on lsb-base (>= 3.0-6) to ensure that this file is present.
. /lib/lsb/init-functions
#
# Function that starts the daemon/service
#
do_start()
{
start-stop-daemon --start --quiet -b -m -p $PIDFILE --exec $DAEMON --
$DAEMON_ARGS \
|| return 2
}
#
# Function that stops the daemon/service
#
do_stop()
{
start-stop-daemon --stop --quiet --oknodo --pidfile $PIDFILE
RETVAL="$?"
rm -f $PIDFILE
return "$RETVAL"
}
case "$1" in
start)
[ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME"
do_start
case "$?" in
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
esac
;;
stop)
[ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME"
do_stop
case "$?" in
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
esac
;;
restart)
#
# If the "reload" option is implemented then remove the
# 'force-reload' alias
#
log_daemon_msg "Restarting $DESC" "$NAME"
do_stop
case "$?" in
0|1)
do_start
case "$?" in
0) log_end_msg 0 ;;
1) log_end_msg 1 ;; # Old process is still running
*) log_end_msg 1 ;; # Failed to start
esac
;;
*)
# Failed to stop
log_end_msg 1
;;
esac
;;
*)
echo "Usage: $SCRIPTNAME {start|stop|restart}" >&2
exit 3
;;
esac