diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala index 566e2a9f41..3395b6ac59 100644 --- a/akka-actors/src/main/scala/actor/ActiveObject.scala +++ b/akka-actors/src/main/scala/actor/ActiveObject.scala @@ -10,12 +10,13 @@ import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult} import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util._ import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} -import se.scalablesolutions.akka.serialization.Serializer + import java.lang.reflect.{InvocationTargetException, Method} object Annotations { diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index bc7a8806f3..3616868cbe 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -25,10 +25,18 @@ import org.multiverse.utils.ThreadLocalTransaction._ /** * Mix in this trait to give an actor TransactionRequired semantics. - * Equivalent to invoking the 'makeTransactionRequired' method in the actor. + * Equivalent to invoking the 'makeTransactionRequired' method in the actor. */ trait TransactionRequired { this: Actor => - makeTransactionRequired + makeTransactionRequired +} + +/** + * Extend this abstract class to create a remote actor. + * Equivalent to invoking the 'makeRemote(..)' method in or on the actor. + */ +abstract class RemoteActor(hostname: String, port: Int) extends Actor { + makeRemote(hostname, port) } @serializable sealed trait LifeCycleMessage @@ -64,22 +72,81 @@ object Actor { implicit val any: AnyRef = this + /** + * Use to create an anonymous event-driven actor. + * The actor is started when created. + * Example: + *
+   * import Actor._
+   *
+   * val a = actor {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { start def receive = body } + /** + * Use to create an anonymous event-driven actor with both an init block and a message loop block. + * The actor is started when created. + * Example: + *
+   * import Actor._
+   *
+   * val a = actor {
+   *   ... // init stuff
+   * } {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ def actor(body: => Unit)(matcher: PartialFunction[Any, Unit]): Actor = new Actor() { start body def receive = matcher } + /** + * Use to create an anonymous event-driven actor with a life-cycle configuration. + * The actor is started when created. + * Example: + *
+   * import Actor._
+   *
+   * val a = actor(LifeCycle(Temporary)) {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() { lifeCycle = lifeCycleConfig start def receive = body } + + /** + * Use to create an anonymous event-driven actor with both an init block and a message loop block + * as well as a life-cycle configuration. + * The actor is started when created. + * Example: + *
+   * import Actor._
+   *
+   * val a = actor(LifeCycle(Temporary)) {
+   *   ... // init stuff
+   * } {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ + def actor(lifeCycleConfig: LifeCycle)(body: => Unit)(matcher: PartialFunction[Any, Unit]): Actor = new Actor() { + start + body + def receive = matcher + } } /** @@ -607,7 +674,8 @@ trait Actor extends Logging with TransactionManagement { } } - private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime + private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): + CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) @@ -636,8 +704,14 @@ trait Actor extends Logging with TransactionManagement { * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods */ private[akka] def invoke(messageHandle: MessageInvocation) = synchronized { - if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) - else dispatch(messageHandle) + try { + if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) + else dispatch(messageHandle) + } catch { + case e => + log.error(e, e.getMessage) // for logging the exception to log file + throw e + } } private def dispatch[T](messageHandle: MessageInvocation) = { @@ -652,10 +726,10 @@ trait Actor extends Logging with TransactionManagement { else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) } catch { case e => + log.error(e, e.getMessage) // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) - else e.printStackTrace } finally { clearTransaction } @@ -692,9 +766,8 @@ trait Actor extends Logging with TransactionManagement { } else proceed } catch { case e => - e.printStackTrace + log.error(e, e.getMessage) if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) - else e.printStackTrace clearTransaction // need to clear currentTransaction before call to supervisor // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala index 99872beb33..7851286902 100644 --- a/akka-actors/src/main/scala/nio/RemoteClient.scala +++ b/akka-actors/src/main/scala/nio/RemoteClient.scala @@ -80,8 +80,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { // Wait until the connection attempt succeeds or fails. connection.awaitUninterruptibly if (!connection.isSuccess) { - log.error("Remote connection to [%s:%s] has failed due to [%s]", hostname, port, connection.getCause) - connection.getCause.printStackTrace + log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port) } isRunning = true } @@ -212,8 +211,7 @@ class RemoteClientHandler(val name: String, log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress); override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - log.error("Unexpected exception from downstream in remote client: %s", event.getCause) - event.getCause.printStackTrace + log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index c9528dc6a4..ba3338f39a 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -105,6 +105,8 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) @ChannelPipelineCoverage { val value = "all" } class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging { + val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + private val activeObjects = new ConcurrentHashMap[String, AnyRef] private val actors = new ConcurrentHashMap[String, Actor] @@ -126,8 +128,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - log.error("Unexpected exception from remote downstream: %s", event.getCause) - event.getCause.printStackTrace + log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close } @@ -159,8 +160,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL channel.write(replyMessage) } catch { case e: Throwable => - log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e) - e.printStackTrace + log.error(e, "Could not invoke remote actor [%s]", request.getTarget) val replyBuilder = RemoteReply.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) @@ -200,10 +200,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL } } catch { case e: InvocationTargetException => - log.error( - "Could not invoke remote active object [%s :: %s] due to: %s", - request.getMethod, request.getTarget, e.getCause) - e.getCause.printStackTrace + log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) val replyBuilder = RemoteReply.newBuilder .setId(request.getId) .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage) @@ -213,10 +210,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL val replyMessage = replyBuilder.build channel.write(replyMessage) case e: Throwable => - log.error( - "Could not invoke remote active object [%s :: %s] due to: %s", - request.getMethod, request.getTarget, e) - e.printStackTrace + log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) val replyBuilder = RemoteReply.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) @@ -247,9 +241,9 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL val escapedArgs = for (i <- 0 until args.size) { val arg = args(i) - if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith("$$ProxiedByAW")) { + if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) { val argString = arg.asInstanceOf[String] - val proxyName = argString.replace("$$ProxiedByAW", "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length) + val proxyName = argString.replace(AW_PROXY_PREFIX, "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length) val activeObject = createActiveObject(proxyName, timeout) unescapedArgs(i) = activeObject unescapedArgClasses(i) = Class.forName(proxyName) @@ -273,8 +267,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL newInstance } catch { case e => - log.debug("Could not create remote active object instance due to: %s", e) - e.printStackTrace + log.error(e, "Could not create remote active object instance") throw e } } else activeObjectOrNull @@ -295,8 +288,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL newInstance } catch { case e => - log.error(e, "Could not create remote actor instance due to: " + e.getMessage) - e.printStackTrace + log.error(e, "Could not create remote actor object instance") throw e } } else actorOrNull diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorage.scala index 0769053909..74f06d3a9d 100644 --- a/akka-persistence/src/main/scala/CassandraStorage.scala +++ b/akka-persistence/src/main/scala/CassandraStorage.scala @@ -10,8 +10,6 @@ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.Config.config import org.apache.cassandra.service._ -import org.apache.thrift.transport._ -import org.apache.thrift.protocol._ /** * @author Jonas Bonér @@ -92,7 +90,7 @@ object CassandraStorage extends MapStorage else None } catch { case e => - e.printStackTrace + log.error(e, "Could not retreive Ref from storage") None } } @@ -111,7 +109,7 @@ object CassandraStorage extends MapStorage } } - // FIXME implement + // FIXME implement insertVectorStorageEntriesFor def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet") } @@ -189,7 +187,7 @@ object CassandraStorage extends MapStorage else None } catch { case e => - e.printStackTrace + log.error(e, "Could not retreive Map from storage") None } } @@ -205,11 +203,8 @@ object CassandraStorage extends MapStorage } } - - def getMapStorageSizeFor(name: String): Int = { - sessions.withSession { - _ |# (name, MAP_COLUMN_PARENT) - } + def getMapStorageSizeFor(name: String): Int = sessions.withSession { + _ |# (name, MAP_COLUMN_PARENT) } def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) @@ -234,225 +229,3 @@ object CassandraStorage extends MapStorage columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None))) } } - -/** - * NOTE: requires command line options: - *
- * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid - *

- * @author Jonas Bonér - * -object EmbeddedCassandraStorage extends Logging { -val KEYSPACE = "akka" -val MAP_COLUMN_FAMILY = "map" -val VECTOR_COLUMN_FAMILY = "vector" -val REF_COLUMN_FAMILY = "ref:item" - -val IS_ASCENDING = true - -val RUN_THRIFT_SERVICE = akka.akka.config.getBool("akka.storage.cassandra.thrift-server.service", false) -val CONSISTENCY_LEVEL = { -if (akka.akka.config.getBool("akka.storage.cassandra.blocking", true)) 0 -else 1 } - -@volatile private[this] var isRunning = false -private[this] val serializer: Serializer = { -akka.akka.config.getString("akka.storage.cassandra.storage-format", "java") match { -case "scala-json" => Serializer.ScalaJSON -case "java-json" => Serializer.JavaJSON -case "protobuf" => Serializer.Protobuf -case "java" => Serializer.Java -case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage") -case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage") -case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") -} -} - -// TODO: is this server thread-safe or needed to be wrapped up in an actor? -private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer] - -private[this] var thriftServer: CassandraThriftServer = _ - -def start = synchronized { -if (!isRunning) { -try { -server.start -log.info("Cassandra persistent storage has started up successfully"); -} catch { -case e => -log.error("Could not start up Cassandra persistent storage") -throw e -} -if (RUN_THRIFT_SERVICE) { -thriftServer = new CassandraThriftServer(server) -thriftServer.start -} -isRunning -} -} - -def stop = if (isRunning) { -//server.storageService.shutdown -if (RUN_THRIFT_SERVICE) thriftServer.stop -} - -// =============================================================== -// For Ref -// =============================================================== - -def insertRefStorageFor(name: String, element: AnyRef) = { -server.insert( -KEYSPACE, -name, -REF_COLUMN_FAMILY, -element, -System.currentTimeMillis, -CONSISTENCY_LEVEL) -} - -def getRefStorageFor(name: String): Option[AnyRef] = { -try { -val column = server.get_column(KEYSPACE, name, REF_COLUMN_FAMILY) -Some(serializer.in(column.value, None)) -} catch { -case e => -e.printStackTrace -None } -} - -// =============================================================== -// For Vector -// =============================================================== - -def insertVectorStorageEntryFor(name: String, element: AnyRef) = { -server.insert( -KEYSPACE, -name, -VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), -element, -System.currentTimeMillis, -CONSISTENCY_LEVEL) -} - -def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { -try { -val column = server.get_column(KEYSPACE, name, VECTOR_COLUMN_FAMILY + ":" + index) -serializer.in(column.value, None) -} catch { -case e => -e.printStackTrace -throw new Predef.NoSuchElementException(e.getMessage) -} -} - -def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] = -server.get_slice(KEYSPACE, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count) -.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2) - -def getVectorStorageSizeFor(name: String): Int = -server.get_column_count(KEYSPACE, name, VECTOR_COLUMN_FAMILY) - -// =============================================================== -// For Map -// =============================================================== - -def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = { -server.insert( -KEYSPACE, name, -MAP_COLUMN_FAMILY + ":" + key, -serializer.out(value), -System.currentTimeMillis, -CONSISTENCY_LEVEL) -} - -def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = { -import java.util.{ Map, HashMap, List, ArrayList } -val columns: Map[String, List[column_t]] = new HashMap -for (entry <- entries) { -val cls: List[column_t] = new ArrayList -cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis)) -columns.put(MAP_COLUMN_FAMILY, cls) -} -server.batch_insert(new BatchMutation( -KEYSPACE, name, -columns), -CONSISTENCY_LEVEL) -} - -def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { -try { -val column = server.get_column(KEYSPACE, name, MAP_COLUMN_FAMILY + ":" + key) -Some(serializer.in(column.value, None)) -} catch { -case e => -e.printStackTrace -None -} -} - -def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = { -val columns = server.get_columns_since(KEYSPACE, name, MAP_COLUMN_FAMILY, -1) -.toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]] -for { -column <- columns -col = (column.columnName, serializer.in(column.value, None)) -} yield col -} - -def getMapStorageSizeFor(name: String): Int = -server.get_column_count(KEYSPACE, name, MAP_COLUMN_FAMILY) - -def removeMapStorageFor(name: String) = -server.remove(KEYSPACE, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, CONSISTENCY_LEVEL) - -def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = { -server.get_slice(KEYSPACE, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count) -.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]] -} -} - - -class CassandraThriftServer(server: CassandraServer) extends Logging { -case object Start -case object Stop - -private[this] val serverEngine: TThreadPoolServer = try { -val pidFile = akka.akka.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid") -if (pidFile != null) new File(pidFile).deleteOnExit(); -val listenPort = DatabaseDescriptor.getThriftPort - -val processor = new Cassandra.Processor(server) -val tServerSocket = new TServerSocket(listenPort) -val tProtocolFactory = new TBinaryProtocol.Factory - -val options = new TThreadPoolServer.Options -options.minWorkerThreads = 64 -new TThreadPoolServer(new TProcessorFactory(processor), -tServerSocket, -new TTransportFactory, -new TTransportFactory, -tProtocolFactory, -tProtocolFactory, -options) -} catch { -case e => -log.error("Could not start up Cassandra thrift service") -throw e -} - -import scala.actors.Actor._ -private[this] val serverDaemon = actor { -receive { -case Start => -serverEngine.serve -log.info("Cassandra thrift service has starting up successfully") -case Stop => -log.info("Cassandra thrift service is shutting down...") -serverEngine.stop -} -} - -def start = serverDaemon ! Start -def stop = serverDaemon ! Stop -} - */ diff --git a/akka-security/src/main/scala/Security.scala b/akka-security/src/main/scala/Security.scala index 384af1ea20..6efa5bdcce 100644 --- a/akka-security/src/main/scala/Security.scala +++ b/akka-security/src/main/scala/Security.scala @@ -367,7 +367,7 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] { Some(UserInfo(user, null, rolesFor(user))) } catch { case e: PrivilegedActionException => { - e.printStackTrace + log.error(e, "Action not allowed") return None } }