cleaned up logging and error reporting

This commit is contained in:
jboner 2009-11-22 15:25:16 +01:00
parent f41c1ac7c7
commit f98184ff34
6 changed files with 101 additions and 264 deletions

View file

@ -10,12 +10,13 @@ import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import se.scalablesolutions.akka.serialization.Serializer
import java.lang.reflect.{InvocationTargetException, Method}
object Annotations {

View file

@ -25,10 +25,18 @@ import org.multiverse.utils.ThreadLocalTransaction._
/**
* Mix in this trait to give an actor TransactionRequired semantics.
* Equivalent to invoking the 'makeTransactionRequired' method in the actor.
* Equivalent to invoking the 'makeTransactionRequired' method in the actor.
*/
trait TransactionRequired { this: Actor =>
makeTransactionRequired
makeTransactionRequired
}
/**
* Extend this abstract class to create a remote actor.
* Equivalent to invoking the 'makeRemote(..)' method in or on the actor.
*/
abstract class RemoteActor(hostname: String, port: Int) extends Actor {
makeRemote(hostname, port)
}
@serializable sealed trait LifeCycleMessage
@ -64,22 +72,81 @@ object Actor {
implicit val any: AnyRef = this
/**
* Use to create an anonymous event-driven actor.
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() {
start
def receive = body
}
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor {
* ... // init stuff
* } {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(body: => Unit)(matcher: PartialFunction[Any, Unit]): Actor = new Actor() {
start
body
def receive = matcher
}
/**
* Use to create an anonymous event-driven actor with a life-cycle configuration.
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor(LifeCycle(Temporary)) {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() {
lifeCycle = lifeCycleConfig
start
def receive = body
}
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block
* as well as a life-cycle configuration.
* The actor is started when created.
* Example:
* <pre>
* import Actor._
*
* val a = actor(LifeCycle(Temporary)) {
* ... // init stuff
* } {
* case msg => ... // handle message
* }
* </pre>
*/
def actor(lifeCycleConfig: LifeCycle)(body: => Unit)(matcher: PartialFunction[Any, Unit]): Actor = new Actor() {
start
body
def receive = matcher
}
}
/**
@ -607,7 +674,8 @@ trait Actor extends Logging with TransactionManagement {
}
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long):
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@ -636,8 +704,14 @@ trait Actor extends Logging with TransactionManagement {
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
*/
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
} catch {
case e =>
log.error(e, e.getMessage) // for logging the exception to log file
throw e
}
}
private def dispatch[T](messageHandle: MessageInvocation) = {
@ -652,10 +726,10 @@ trait Actor extends Logging with TransactionManagement {
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
log.error(e, e.getMessage)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
else e.printStackTrace
} finally {
clearTransaction
}
@ -692,9 +766,8 @@ trait Actor extends Logging with TransactionManagement {
} else proceed
} catch {
case e =>
e.printStackTrace
log.error(e, e.getMessage)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
else e.printStackTrace
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)

View file

@ -80,8 +80,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
// Wait until the connection attempt succeeds or fails.
connection.awaitUninterruptibly
if (!connection.isSuccess) {
log.error("Remote connection to [%s:%s] has failed due to [%s]", hostname, port, connection.getCause)
connection.getCause.printStackTrace
log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
}
@ -212,8 +211,7 @@ class RemoteClientHandler(val name: String,
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress);
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.error("Unexpected exception from downstream in remote client: %s", event.getCause)
event.getCause.printStackTrace
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}

View file

@ -105,6 +105,8 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
@ChannelPipelineCoverage { val value = "all" }
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
@ -126,8 +128,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.error("Unexpected exception from remote downstream: %s", event.getCause)
event.getCause.printStackTrace
log.error(event.getCause, "Unexpected exception from remote downstream")
event.getChannel.close
}
@ -159,8 +160,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
channel.write(replyMessage)
} catch {
case e: Throwable =>
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
e.printStackTrace
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
@ -200,10 +200,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
}
} catch {
case e: InvocationTargetException =>
log.error(
"Could not invoke remote active object [%s :: %s] due to: %s",
request.getMethod, request.getTarget, e.getCause)
e.getCause.printStackTrace
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
@ -213,10 +210,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: Throwable =>
log.error(
"Could not invoke remote active object [%s :: %s] due to: %s",
request.getMethod, request.getTarget, e)
e.printStackTrace
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReply.newBuilder
.setId(request.getId)
.setException(e.getClass.getName + "$" + e.getMessage)
@ -247,9 +241,9 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
val escapedArgs = for (i <- 0 until args.size) {
val arg = args(i)
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith("$$ProxiedByAW")) {
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) {
val argString = arg.asInstanceOf[String]
val proxyName = argString.replace("$$ProxiedByAW", "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject
unescapedArgClasses(i) = Class.forName(proxyName)
@ -273,8 +267,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
newInstance
} catch {
case e =>
log.debug("Could not create remote active object instance due to: %s", e)
e.printStackTrace
log.error(e, "Could not create remote active object instance")
throw e
}
} else activeObjectOrNull
@ -295,8 +288,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
newInstance
} catch {
case e =>
log.error(e, "Could not create remote actor instance due to: " + e.getMessage)
e.printStackTrace
log.error(e, "Could not create remote actor object instance")
throw e
}
} else actorOrNull

View file

@ -10,8 +10,6 @@ import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.Config.config
import org.apache.cassandra.service._
import org.apache.thrift.transport._
import org.apache.thrift.protocol._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -92,7 +90,7 @@ object CassandraStorage extends MapStorage
else None
} catch {
case e =>
e.printStackTrace
log.error(e, "Could not retreive Ref from storage")
None
}
}
@ -111,7 +109,7 @@ object CassandraStorage extends MapStorage
}
}
// FIXME implement
// FIXME implement insertVectorStorageEntriesFor
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet")
}
@ -189,7 +187,7 @@ object CassandraStorage extends MapStorage
else None
} catch {
case e =>
e.printStackTrace
log.error(e, "Could not retreive Map from storage")
None
}
}
@ -205,11 +203,8 @@ object CassandraStorage extends MapStorage
}
}
def getMapStorageSizeFor(name: String): Int = {
sessions.withSession {
_ |# (name, MAP_COLUMN_PARENT)
}
def getMapStorageSizeFor(name: String): Int = sessions.withSession {
_ |# (name, MAP_COLUMN_PARENT)
}
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
@ -234,225 +229,3 @@ object CassandraStorage extends MapStorage
columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None)))
}
}
/**
* NOTE: requires command line options:
* <br/>
* <code>-Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid</code>
* <p/>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*
object EmbeddedCassandraStorage extends Logging {
val KEYSPACE = "akka"
val MAP_COLUMN_FAMILY = "map"
val VECTOR_COLUMN_FAMILY = "vector"
val REF_COLUMN_FAMILY = "ref:item"
val IS_ASCENDING = true
val RUN_THRIFT_SERVICE = akka.akka.config.getBool("akka.storage.cassandra.thrift-server.service", false)
val CONSISTENCY_LEVEL = {
if (akka.akka.config.getBool("akka.storage.cassandra.blocking", true)) 0
else 1 }
@volatile private[this] var isRunning = false
private[this] val serializer: Serializer = {
akka.akka.config.getString("akka.storage.cassandra.storage-format", "java") match {
case "scala-json" => Serializer.ScalaJSON
case "java-json" => Serializer.JavaJSON
case "protobuf" => Serializer.Protobuf
case "java" => Serializer.Java
case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
// TODO: is this server thread-safe or needed to be wrapped up in an actor?
private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
private[this] var thriftServer: CassandraThriftServer = _
def start = synchronized {
if (!isRunning) {
try {
server.start
log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
log.error("Could not start up Cassandra persistent storage")
throw e
}
if (RUN_THRIFT_SERVICE) {
thriftServer = new CassandraThriftServer(server)
thriftServer.start
}
isRunning
}
}
def stop = if (isRunning) {
//server.storageService.shutdown
if (RUN_THRIFT_SERVICE) thriftServer.stop
}
// ===============================================================
// For Ref
// ===============================================================
def insertRefStorageFor(name: String, element: AnyRef) = {
server.insert(
KEYSPACE,
name,
REF_COLUMN_FAMILY,
element,
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
def getRefStorageFor(name: String): Option[AnyRef] = {
try {
val column = server.get_column(KEYSPACE, name, REF_COLUMN_FAMILY)
Some(serializer.in(column.value, None))
} catch {
case e =>
e.printStackTrace
None }
}
// ===============================================================
// For Vector
// ===============================================================
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
server.insert(
KEYSPACE,
name,
VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
element,
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val column = server.get_column(KEYSPACE, name, VECTOR_COLUMN_FAMILY + ":" + index)
serializer.in(column.value, None)
} catch {
case e =>
e.printStackTrace
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
server.get_slice(KEYSPACE, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
def getVectorStorageSizeFor(name: String): Int =
server.get_column_count(KEYSPACE, name, VECTOR_COLUMN_FAMILY)
// ===============================================================
// For Map
// ===============================================================
def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = {
server.insert(
KEYSPACE, name,
MAP_COLUMN_FAMILY + ":" + key,
serializer.out(value),
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
import java.util.{ Map, HashMap, List, ArrayList }
val columns: Map[String, List[column_t]] = new HashMap
for (entry <- entries) {
val cls: List[column_t] = new ArrayList
cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis))
columns.put(MAP_COLUMN_FAMILY, cls)
}
server.batch_insert(new BatchMutation(
KEYSPACE, name,
columns),
CONSISTENCY_LEVEL)
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
try {
val column = server.get_column(KEYSPACE, name, MAP_COLUMN_FAMILY + ":" + key)
Some(serializer.in(column.value, None))
} catch {
case e =>
e.printStackTrace
None
}
}
def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {
val columns = server.get_columns_since(KEYSPACE, name, MAP_COLUMN_FAMILY, -1)
.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]]
for {
column <- columns
col = (column.columnName, serializer.in(column.value, None))
} yield col
}
def getMapStorageSizeFor(name: String): Int =
server.get_column_count(KEYSPACE, name, MAP_COLUMN_FAMILY)
def removeMapStorageFor(name: String) =
server.remove(KEYSPACE, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, CONSISTENCY_LEVEL)
def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
server.get_slice(KEYSPACE, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
}
class CassandraThriftServer(server: CassandraServer) extends Logging {
case object Start
case object Stop
private[this] val serverEngine: TThreadPoolServer = try {
val pidFile = akka.akka.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
if (pidFile != null) new File(pidFile).deleteOnExit();
val listenPort = DatabaseDescriptor.getThriftPort
val processor = new Cassandra.Processor(server)
val tServerSocket = new TServerSocket(listenPort)
val tProtocolFactory = new TBinaryProtocol.Factory
val options = new TThreadPoolServer.Options
options.minWorkerThreads = 64
new TThreadPoolServer(new TProcessorFactory(processor),
tServerSocket,
new TTransportFactory,
new TTransportFactory,
tProtocolFactory,
tProtocolFactory,
options)
} catch {
case e =>
log.error("Could not start up Cassandra thrift service")
throw e
}
import scala.actors.Actor._
private[this] val serverDaemon = actor {
receive {
case Start =>
serverEngine.serve
log.info("Cassandra thrift service has starting up successfully")
case Stop =>
log.info("Cassandra thrift service is shutting down...")
serverEngine.stop
}
}
def start = serverDaemon ! Start
def stop = serverDaemon ! Stop
}
*/

View file

@ -367,7 +367,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] {
Some(UserInfo(user, null, rolesFor(user)))
} catch {
case e: PrivilegedActionException => {
e.printStackTrace
log.error(e, "Action not allowed")
return None
}
}