Merge with master
This commit is contained in:
commit
774debbf09
26 changed files with 936 additions and 501 deletions
|
|
@ -786,7 +786,7 @@ class LocalActorRef private[akka] (
|
|||
|
||||
_status = ActorRefInternals.RUNNING
|
||||
|
||||
//If we are not currently creating this ActorRef instance
|
||||
// If we are not currently creating this ActorRef instance
|
||||
if ((actorInstance ne null) && (actorInstance.get ne null))
|
||||
initializeActorInstance
|
||||
|
||||
|
|
@ -853,11 +853,8 @@ class LocalActorRef private[akka] (
|
|||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
def startLink(actorRef: ActorRef): Unit = guard.withGuard {
|
||||
try {
|
||||
link(actorRef)
|
||||
} finally {
|
||||
actorRef.start
|
||||
}
|
||||
link(actorRef)
|
||||
actorRef.start
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -867,12 +864,9 @@ class LocalActorRef private[akka] (
|
|||
*/
|
||||
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = guard.withGuard {
|
||||
ensureRemotingEnabled
|
||||
try {
|
||||
actorRef.makeRemote(hostname, port)
|
||||
link(actorRef)
|
||||
} finally {
|
||||
actorRef.start
|
||||
}
|
||||
actorRef.makeRemote(hostname, port)
|
||||
link(actorRef)
|
||||
actorRef.start
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -904,11 +898,8 @@ class LocalActorRef private[akka] (
|
|||
*/
|
||||
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard {
|
||||
val actor = Actor.actorOf(clazz)
|
||||
try {
|
||||
link(actor)
|
||||
} finally {
|
||||
actor.start
|
||||
}
|
||||
link(actor)
|
||||
actor.start
|
||||
actor
|
||||
}
|
||||
|
||||
|
|
@ -920,12 +911,9 @@ class LocalActorRef private[akka] (
|
|||
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard {
|
||||
ensureRemotingEnabled
|
||||
val actor = Actor.actorOf(clazz)
|
||||
try {
|
||||
actor.makeRemote(hostname, port)
|
||||
link(actor)
|
||||
} finally {
|
||||
actor.start
|
||||
}
|
||||
actor.makeRemote(hostname, port)
|
||||
link(actor)
|
||||
actor.start
|
||||
actor
|
||||
}
|
||||
|
||||
|
|
@ -994,8 +982,7 @@ class LocalActorRef private[akka] (
|
|||
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
|
||||
*/
|
||||
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
|
||||
if (isShutdown)
|
||||
Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
|
||||
if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
|
||||
else {
|
||||
currentMessage = messageHandle
|
||||
try {
|
||||
|
|
@ -1004,8 +991,7 @@ class LocalActorRef private[akka] (
|
|||
case e =>
|
||||
Actor.log.error(e, "Could not invoke actor [%s]", this)
|
||||
throw e
|
||||
}
|
||||
finally {
|
||||
} finally {
|
||||
currentMessage = null //TODO: Don't reset this, we might want to resend the message
|
||||
}
|
||||
}
|
||||
|
|
@ -1031,8 +1017,7 @@ class LocalActorRef private[akka] (
|
|||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
|
||||
val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
|
||||
false
|
||||
}
|
||||
else if (withinTimeRange.isEmpty) { // restrict number of restarts
|
||||
} else if (withinTimeRange.isEmpty) { // restrict number of restarts
|
||||
maxNrOfRetriesCount += 1 //Increment number of retries
|
||||
maxNrOfRetriesCount > maxNrOfRetries.get
|
||||
} else { // cannot restart more than N within M timerange
|
||||
|
|
@ -1041,10 +1026,8 @@ class LocalActorRef private[akka] (
|
|||
val now = System.currentTimeMillis
|
||||
val retries = maxNrOfRetriesCount
|
||||
//We are within the time window if it isn't the first restart, or if the window hasn't closed
|
||||
val insideWindow = if (windowStart == 0)
|
||||
false
|
||||
else
|
||||
(now - windowStart) <= withinTimeRange.get
|
||||
val insideWindow = if (windowStart == 0) false
|
||||
else (now - windowStart) <= withinTimeRange.get
|
||||
|
||||
//The actor is dead if it dies X times within the window of restart
|
||||
val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1)
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.AkkaException
|
|||
import akka.japi.{ Function => JFunc, Procedure => JProc }
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.config.RemoteAddress
|
||||
|
||||
class AgentException private[akka](message: String) extends AkkaException(message)
|
||||
|
||||
|
|
@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag
|
|||
* @author Viktor Klang
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed class Agent[T] private (initialValue: T) {
|
||||
sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) {
|
||||
|
||||
import Agent._
|
||||
import Actor._
|
||||
|
||||
private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
|
||||
val dispatcher = remote match {
|
||||
case Some(address) =>
|
||||
val d = actorOf(new AgentDispatcher[T]())
|
||||
d.makeRemote(remote.get.hostname,remote.get.port)
|
||||
d.start
|
||||
d ! Value(initialValue)
|
||||
d
|
||||
case None =>
|
||||
actorOf(new AgentDispatcher(initialValue)).start
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits a request to read the internal state.
|
||||
|
|
@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) {
|
|||
if (dispatcher.isTransactionInScope) throw new AgentException(
|
||||
"Can't call Agent.get within an enclosing transaction."+
|
||||
"\n\tWould block indefinitely.\n\tPlease refactor your code.")
|
||||
val ref = new AtomicReference[T]
|
||||
val latch = new CountDownLatch(1)
|
||||
sendProc((v: T) => {ref.set(v); latch.countDown})
|
||||
latch.await
|
||||
ref.get
|
||||
val f = (dispatcher.!!).await
|
||||
if (f.exception.isDefined) throw f.exception.get
|
||||
else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out"))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) {
|
|||
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
|
||||
* Does not change the value of the agent (this).
|
||||
*/
|
||||
final def map[B](f: (T) => B): Agent[B] = Agent(f(get))
|
||||
final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote)
|
||||
|
||||
/**
|
||||
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
|
||||
* Does not change the value of the agent (this).
|
||||
*/
|
||||
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)())
|
||||
final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote)
|
||||
|
||||
/**
|
||||
* Applies function with type 'T => B' to the agent's internal state.
|
||||
|
|
@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) {
|
|||
* Does not change the value of the agent (this).
|
||||
* Java API
|
||||
*/
|
||||
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get))
|
||||
final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote)
|
||||
|
||||
/**
|
||||
* Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result.
|
||||
* Does not change the value of the agent (this).
|
||||
* Java API
|
||||
*/
|
||||
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)())
|
||||
final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote)
|
||||
|
||||
/**
|
||||
* Applies procedure with type T to the agent's internal state.
|
||||
|
|
@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Agent {
|
||||
|
||||
import Actor._
|
||||
/*
|
||||
* The internal messages for passing around requests.
|
||||
*/
|
||||
private[akka] case class Value[T](value: T)
|
||||
private[akka] case class Function[T](fun: ((T) => T))
|
||||
private[akka] case class Procedure[T](fun: ((T) => Unit))
|
||||
private[akka] case object Read
|
||||
|
||||
/**
|
||||
* Creates a new Agent of type T with the initial value of value.
|
||||
*/
|
||||
def apply[T](value: T): Agent[T] = new Agent(value)
|
||||
def apply[T](value: T): Agent[T] =
|
||||
apply(value,None)
|
||||
|
||||
/**
|
||||
* Creates an Agent backed by a client managed Actor if Some(remoteAddress)
|
||||
* or a local agent if None
|
||||
*/
|
||||
def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] =
|
||||
new Agent[T](value,remoteAddress)
|
||||
|
||||
/**
|
||||
* Creates an Agent backed by a client managed Actor
|
||||
*/
|
||||
def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] =
|
||||
apply(value,Some(remoteAddress))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -254,12 +277,15 @@ object Agent {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor {
|
||||
final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor {
|
||||
import Agent._
|
||||
import Actor._
|
||||
log.debug("Starting up Agent [%s]", self.uuid)
|
||||
|
||||
private val value = Ref[T](initialValue)
|
||||
private[akka] def this(initialValue: T) = this(Ref(initialValue))
|
||||
private[akka] def this() = this(Ref[T]())
|
||||
|
||||
private val value = ref
|
||||
|
||||
log.debug("Starting up Agent [%s]", self.uuid)
|
||||
|
||||
/**
|
||||
* Periodically handles incoming messages.
|
||||
|
|
@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
|
|||
def receive = {
|
||||
case Value(v: T) =>
|
||||
swap(v)
|
||||
case Read => self.reply_?(value.get())
|
||||
case Function(fun: (T => T)) =>
|
||||
swap(fun(value.getOrWait))
|
||||
case Procedure(proc: (T => Unit)) =>
|
||||
|
|
|
|||
|
|
@ -4,58 +4,130 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import akka.stm.Ref
|
||||
import akka.stm.local._
|
||||
|
||||
import scala.collection.mutable
|
||||
import java.util.concurrent.{ScheduledFuture, TimeUnit}
|
||||
|
||||
trait FSM[S] { this: Actor =>
|
||||
trait FSM[S, D] {
|
||||
this: Actor =>
|
||||
|
||||
type StateFunction = scala.PartialFunction[Event, State]
|
||||
|
||||
var currentState: State = initialState
|
||||
var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
|
||||
/** DSL */
|
||||
protected final def inState(stateName: S)(stateFunction: StateFunction) = {
|
||||
register(stateName, stateFunction)
|
||||
}
|
||||
|
||||
def initialState: State
|
||||
protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = {
|
||||
setState(State(stateName, stateData, timeout))
|
||||
}
|
||||
|
||||
def handleEvent: StateFunction = {
|
||||
case event@Event(value, stateData) =>
|
||||
log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id)
|
||||
State(NextState, currentState.stateFunction, stateData, currentState.timeout)
|
||||
protected final def goto(nextStateName: S): State = {
|
||||
State(nextStateName, currentState.stateData)
|
||||
}
|
||||
|
||||
protected final def stay(): State = {
|
||||
goto(currentState.stateName)
|
||||
}
|
||||
|
||||
protected final def stop(): State = {
|
||||
stop(Normal)
|
||||
}
|
||||
|
||||
protected final def stop(reason: Reason): State = {
|
||||
stop(reason, currentState.stateData)
|
||||
}
|
||||
|
||||
protected final def stop(reason: Reason, stateData: D): State = {
|
||||
self ! Stop(reason, stateData)
|
||||
stay
|
||||
}
|
||||
|
||||
def whenUnhandled(stateFunction: StateFunction) = {
|
||||
handleEvent = stateFunction
|
||||
}
|
||||
|
||||
def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = {
|
||||
terminateEvent = terminationHandler
|
||||
}
|
||||
|
||||
/** FSM State data and default handlers */
|
||||
private var currentState: State = _
|
||||
private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
|
||||
|
||||
private val transitions = mutable.Map[S, StateFunction]()
|
||||
private def register(name: S, function: StateFunction) {
|
||||
if (transitions contains name) {
|
||||
transitions(name) = transitions(name) orElse function
|
||||
} else {
|
||||
transitions(name) = function
|
||||
}
|
||||
}
|
||||
|
||||
private var handleEvent: StateFunction = {
|
||||
case Event(value, stateData) =>
|
||||
log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName)
|
||||
stay
|
||||
}
|
||||
|
||||
private var terminateEvent: PartialFunction[Reason, Unit] = {
|
||||
case failure@Failure(_) => log.error("Stopping because of a %s", failure)
|
||||
case reason => log.info("Stopping because of reason: %s", reason)
|
||||
}
|
||||
|
||||
override final protected def receive: Receive = {
|
||||
case Stop(reason, stateData) =>
|
||||
terminateEvent.apply(reason)
|
||||
self.stop
|
||||
case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) =>
|
||||
log.trace("Ignoring StateTimeout - ")
|
||||
// state timeout when new message in queue, skip this timeout
|
||||
case value => {
|
||||
timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None}
|
||||
|
||||
val event = Event(value, currentState.stateData)
|
||||
val newState = (currentState.stateFunction orElse handleEvent).apply(event)
|
||||
val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event)
|
||||
setState(nextState)
|
||||
}
|
||||
}
|
||||
|
||||
currentState = newState
|
||||
|
||||
newState match {
|
||||
case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue)
|
||||
case _ => () // ignore for now
|
||||
}
|
||||
|
||||
newState.timeout.foreach {
|
||||
timeout =>
|
||||
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS))
|
||||
private def setState(nextState: State) = {
|
||||
if (!transitions.contains(nextState.stateName)) {
|
||||
stop(Failure("Next state %s does not exist".format(nextState.stateName)))
|
||||
} else {
|
||||
currentState = nextState
|
||||
currentState.timeout.foreach {
|
||||
t =>
|
||||
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class State(stateEvent: StateEvent,
|
||||
stateFunction: StateFunction,
|
||||
stateData: S,
|
||||
timeout: Option[Int] = None,
|
||||
replyValue: Option[Any] = None)
|
||||
case class Event(event: Any, stateData: D)
|
||||
|
||||
case class Event(event: Any, stateData: S)
|
||||
case class State(stateName: S, stateData: D, timeout: Option[Long] = None) {
|
||||
|
||||
sealed trait StateEvent
|
||||
object NextState extends StateEvent
|
||||
object Reply extends StateEvent
|
||||
def until(timeout: Long): State = {
|
||||
copy(timeout = Some(timeout))
|
||||
}
|
||||
|
||||
object StateTimeout
|
||||
def replying(replyValue:Any): State = {
|
||||
self.sender match {
|
||||
case Some(sender) => sender ! replyValue
|
||||
case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue)
|
||||
}
|
||||
this
|
||||
}
|
||||
|
||||
def using(nextStateDate: D): State = {
|
||||
copy(stateData = nextStateDate)
|
||||
}
|
||||
}
|
||||
|
||||
sealed trait Reason
|
||||
case object Normal extends Reason
|
||||
case object Shutdown extends Reason
|
||||
case class Failure(cause: Any) extends Reason
|
||||
|
||||
case object StateTimeout
|
||||
|
||||
private case class Stop(reason: Reason, stateData: D)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,10 +27,6 @@ object ConfigLogger extends Logging
|
|||
object Config {
|
||||
val VERSION = "1.0-SNAPSHOT"
|
||||
|
||||
// Set Multiverse options for max speed
|
||||
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")
|
||||
System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast")
|
||||
|
||||
val HOME = {
|
||||
val envHome = System.getenv("AKKA_HOME") match {
|
||||
case null | "" | "." => None
|
||||
|
|
|
|||
|
|
@ -37,10 +37,16 @@ object Futures {
|
|||
|
||||
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
||||
|
||||
def awaitOne(futures: List[Future[_]]): Future[_] = {
|
||||
/**
|
||||
* Returns the First Future that is completed
|
||||
* if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans
|
||||
*/
|
||||
def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = {
|
||||
var future: Option[Future[_]] = None
|
||||
do {
|
||||
future = futures.find(_.isCompleted)
|
||||
if (sleepMs > 0 && future.isEmpty)
|
||||
Thread.sleep(sleepMs)
|
||||
} while (future.isEmpty)
|
||||
future.get
|
||||
}
|
||||
|
|
@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] {
|
|||
|
||||
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
||||
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
||||
private val TIME_UNIT = TimeUnit.MILLISECONDS
|
||||
import TimeUnit.{MILLISECONDS => TIME_UNIT}
|
||||
def this() = this(0)
|
||||
|
||||
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
|
||||
|
|
|
|||
45
akka-actor/src/main/scala/util/Crypt.scala
Normal file
45
akka-actor/src/main/scala/util/Crypt.scala
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
||||
import java.security.{MessageDigest, SecureRandom}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Crypt extends Logging {
|
||||
val hex = "0123456789ABCDEF"
|
||||
val lineSeparator = System.getProperty("line.separator")
|
||||
|
||||
lazy val random = SecureRandom.getInstance("SHA1PRNG")
|
||||
|
||||
def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII"))
|
||||
|
||||
def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5"))
|
||||
|
||||
def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII"))
|
||||
|
||||
def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1"))
|
||||
|
||||
def generateSecureCookie: String = {
|
||||
log.info("Generating secure cookie...")
|
||||
val bytes = Array.fill(32)(0.byteValue)
|
||||
random.nextBytes(bytes)
|
||||
sha1(bytes)
|
||||
}
|
||||
|
||||
def digest(bytes: Array[Byte], md: MessageDigest): String = {
|
||||
md.update(bytes)
|
||||
hexify(md.digest)
|
||||
}
|
||||
|
||||
def hexify(bytes: Array[Byte]): String = {
|
||||
val builder = new StringBuilder
|
||||
bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) }
|
||||
builder.toString
|
||||
}
|
||||
|
||||
private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n")
|
||||
}
|
||||
|
|
@ -4,8 +4,6 @@
|
|||
|
||||
package akka.util
|
||||
|
||||
import java.security.MessageDigest
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -22,18 +20,8 @@ object Helpers extends Logging {
|
|||
bytes
|
||||
}
|
||||
|
||||
def getMD5For(s: String) = {
|
||||
val digest = MessageDigest.getInstance("MD5")
|
||||
digest.update(s.getBytes("ASCII"))
|
||||
val bytes = digest.digest
|
||||
|
||||
val sb = new StringBuilder
|
||||
val hex = "0123456789ABCDEF"
|
||||
bytes.foreach(b => {
|
||||
val n = b.asInstanceOf[Int]
|
||||
sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF))
|
||||
})
|
||||
sb.toString
|
||||
def bytesToInt(bytes: Array[Byte], offset: Int): Int = {
|
||||
(0 until 4).foldLeft(0)((value, index) => value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -57,4 +45,56 @@ object Helpers extends Logging {
|
|||
log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName)
|
||||
None
|
||||
}
|
||||
|
||||
/**
|
||||
* Reference that can hold either a typed value or an exception.
|
||||
*
|
||||
* Usage:
|
||||
* <pre>
|
||||
* scala> ResultOrError(1)
|
||||
* res0: ResultOrError[Int] = ResultOrError@a96606
|
||||
*
|
||||
* scala> res0()
|
||||
res1: Int = 1
|
||||
*
|
||||
* scala> res0() = 3
|
||||
*
|
||||
* scala> res0()
|
||||
* res3: Int = 3
|
||||
*
|
||||
* scala> res0() = { println("Hello world"); 3}
|
||||
* Hello world
|
||||
*
|
||||
* scala> res0()
|
||||
* res5: Int = 3
|
||||
*
|
||||
* scala> res0() = error("Lets see what happens here...")
|
||||
*
|
||||
* scala> res0()
|
||||
* java.lang.RuntimeException: Lets see what happens here...
|
||||
* at ResultOrError.apply(Helper.scala:11)
|
||||
* at .<init>(<console>:6)
|
||||
* at .<clinit>(<console>)
|
||||
* at Re...
|
||||
* </pre>
|
||||
*/
|
||||
class ResultOrError[R](result: R){
|
||||
private[this] var contents: Either[R, Throwable] = Left(result)
|
||||
|
||||
def update(value: => R) = {
|
||||
contents = try {
|
||||
Left(value)
|
||||
} catch {
|
||||
case (error : Throwable) => Right(error)
|
||||
}
|
||||
}
|
||||
|
||||
def apply() = contents match {
|
||||
case Left(result) => result
|
||||
case Right(error) => throw error.fillInStackTrace
|
||||
}
|
||||
}
|
||||
object ResultOrError {
|
||||
def apply[R](result: R) = new ResultOrError(result)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class Switch(startAsOn: Boolean = false) {
|
|||
private val switch = new AtomicBoolean(startAsOn)
|
||||
|
||||
protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized {
|
||||
if (switch.compareAndSet(from,!from)) {
|
||||
if (switch.compareAndSet(from, !from)) {
|
||||
try {
|
||||
action
|
||||
} catch {
|
||||
|
|
@ -133,43 +133,35 @@ class Switch(startAsOn: Boolean = false) {
|
|||
}
|
||||
|
||||
def switchOff(action: => Unit): Boolean = transcend(from = true, action)
|
||||
def switchOn(action: => Unit): Boolean = transcend(from = false,action)
|
||||
def switchOn(action: => Unit): Boolean = transcend(from = false, action)
|
||||
|
||||
def switchOff: Boolean = synchronized { switch.compareAndSet(true,false) }
|
||||
def switchOn: Boolean = synchronized { switch.compareAndSet(false,true) }
|
||||
def switchOff: Boolean = synchronized { switch.compareAndSet(true, false) }
|
||||
def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) }
|
||||
|
||||
def ifOnYield[T](action: => T): Option[T] = {
|
||||
if (switch.get)
|
||||
Some(action)
|
||||
else
|
||||
None
|
||||
if (switch.get) Some(action)
|
||||
else None
|
||||
}
|
||||
|
||||
def ifOffYield[T](action: => T): Option[T] = {
|
||||
if (switch.get)
|
||||
Some(action)
|
||||
else
|
||||
None
|
||||
if (switch.get) Some(action)
|
||||
else None
|
||||
}
|
||||
|
||||
def ifOn(action: => Unit): Boolean = {
|
||||
if (switch.get) {
|
||||
action
|
||||
true
|
||||
}
|
||||
else
|
||||
false
|
||||
} else false
|
||||
}
|
||||
|
||||
def ifOff(action: => Unit): Boolean = {
|
||||
if (!switch.get) {
|
||||
action
|
||||
true
|
||||
}
|
||||
else
|
||||
false
|
||||
} else false
|
||||
}
|
||||
|
||||
def isOn = switch.get
|
||||
def isOff = !isOn
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,37 +13,57 @@ import java.util.concurrent.TimeUnit
|
|||
|
||||
object FSMActorSpec {
|
||||
|
||||
class Lock(code: String,
|
||||
timeout: Int,
|
||||
unlockedLatch: StandardLatch,
|
||||
lockedLatch: StandardLatch) extends Actor with FSM[CodeState] {
|
||||
val unlockedLatch = new StandardLatch
|
||||
val lockedLatch = new StandardLatch
|
||||
val unhandledLatch = new StandardLatch
|
||||
val terminatedLatch = new StandardLatch
|
||||
|
||||
def initialState = State(NextState, locked, CodeState("", code))
|
||||
sealed trait LockState
|
||||
case object Locked extends LockState
|
||||
case object Open extends LockState
|
||||
|
||||
def locked: StateFunction = {
|
||||
class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] {
|
||||
|
||||
inState(Locked) {
|
||||
case Event(digit: Char, CodeState(soFar, code)) => {
|
||||
soFar + digit match {
|
||||
case incomplete if incomplete.length < code.length =>
|
||||
State(NextState, locked, CodeState(incomplete, code))
|
||||
stay using CodeState(incomplete, code)
|
||||
case codeTry if (codeTry == code) => {
|
||||
doUnlock
|
||||
State(NextState, open, CodeState("", code), Some(timeout))
|
||||
goto(Open) using CodeState("", code) until timeout
|
||||
}
|
||||
case wrong => {
|
||||
log.error("Wrong code %s", wrong)
|
||||
State(NextState, locked, CodeState("", code))
|
||||
stay using CodeState("", code)
|
||||
}
|
||||
}
|
||||
}
|
||||
case Event("hello", _) => stay replying "world"
|
||||
case Event("bye", _) => stop(Shutdown)
|
||||
}
|
||||
|
||||
def open: StateFunction = {
|
||||
inState(Open) {
|
||||
case Event(StateTimeout, stateData) => {
|
||||
doLock
|
||||
State(NextState, locked, stateData)
|
||||
goto(Locked)
|
||||
}
|
||||
}
|
||||
|
||||
setInitialState(Locked, CodeState("", code))
|
||||
|
||||
whenUnhandled {
|
||||
case Event(_, stateData) => {
|
||||
log.info("Unhandled")
|
||||
unhandledLatch.open
|
||||
stay
|
||||
}
|
||||
}
|
||||
|
||||
onTermination {
|
||||
case reason => terminatedLatch.open
|
||||
}
|
||||
|
||||
private def doLock() {
|
||||
log.info("Locked")
|
||||
lockedLatch.open
|
||||
|
|
@ -63,11 +83,9 @@ class FSMActorSpec extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def unlockTheLock = {
|
||||
val unlockedLatch = new StandardLatch
|
||||
val lockedLatch = new StandardLatch
|
||||
|
||||
// lock that locked after being open for 1 sec
|
||||
val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start
|
||||
val lock = Actor.actorOf(new Lock("33221", 1000)).start
|
||||
|
||||
lock ! '3'
|
||||
lock ! '3'
|
||||
|
|
@ -77,6 +95,25 @@ class FSMActorSpec extends JUnitSuite {
|
|||
|
||||
assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))
|
||||
|
||||
lock ! "not_handled"
|
||||
assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS))
|
||||
|
||||
val answerLatch = new StandardLatch
|
||||
object Hello
|
||||
object Bye
|
||||
val tester = Actor.actorOf(new Actor {
|
||||
protected def receive = {
|
||||
case Hello => lock ! "hello"
|
||||
case "world" => answerLatch.open
|
||||
case Bye => lock ! "bye"
|
||||
}
|
||||
}).start
|
||||
tester ! Hello
|
||||
assert(answerLatch.tryAwait(2, TimeUnit.SECONDS))
|
||||
|
||||
tester ! Bye
|
||||
assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,21 +23,21 @@ private [akka] object CouchDBStorageBackend extends
|
|||
RefStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
|
||||
|
||||
import dispatch.json._
|
||||
|
||||
implicit object widgetWrites extends Writes[Map[String,Any]] {
|
||||
def writes(o: Map[String,Any]): JsValue = JsValue(o)
|
||||
}
|
||||
|
||||
import dispatch.json._
|
||||
|
||||
implicit object widgetWrites extends Writes[Map[String,Any]] {
|
||||
def writes(o: Map[String,Any]): JsValue = JsValue(o)
|
||||
}
|
||||
|
||||
lazy val URL = config.
|
||||
lazy val URL = config.
|
||||
getString("akka.storage.couchdb.url").
|
||||
getOrElse(throw new IllegalArgumentException("'akka.storage.couchdb.url' not found in config"))
|
||||
|
||||
def drop() = {
|
||||
val client = new HttpClient()
|
||||
val delete = new DeleteMethod(URL)
|
||||
client.executeMethod(delete)
|
||||
val delete = new DeleteMethod(URL)
|
||||
client.executeMethod(delete)
|
||||
}
|
||||
|
||||
def create() = {
|
||||
|
|
@ -45,60 +45,60 @@ private [akka] object CouchDBStorageBackend extends
|
|||
val put = new PutMethod(URL)
|
||||
put.setRequestEntity(new StringRequestEntity("", null, "utf-8"))
|
||||
put.setRequestHeader("Content-Type", "application/json")
|
||||
client.executeMethod(put)
|
||||
client.executeMethod(put)
|
||||
put.getResponseBodyAsString
|
||||
}
|
||||
|
||||
private def storeMap(name: String, postfix: String, entries: List[(Array[Byte], Array[Byte])]) ={
|
||||
var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> (name + postfix))
|
||||
val dataJson = JsonSerialization.tojson(m)
|
||||
postData(URL, dataJson.toString)
|
||||
}
|
||||
|
||||
private def storeMap(name: String, postfix: String, entries: Map[String, Any]) ={
|
||||
private def storeMap(name: String, postfix: String, entries: List[(Array[Byte], Array[Byte])]) ={
|
||||
var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> (name + postfix))
|
||||
val dataJson = JsonSerialization.tojson(m)
|
||||
postData(URL, dataJson.toString)
|
||||
}
|
||||
|
||||
private def storeMap(name: String, postfix: String, entries: Map[String, Any]) ={
|
||||
postData(URL, JsonSerialization.tojson(entries + ("_id" -> (name + postfix))).toString)
|
||||
}
|
||||
}
|
||||
|
||||
private def getResponseForNameAsMap(name: String, postfix: String): Option[Map[String, Any]] = {
|
||||
getResponse(URL + name + postfix).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]]
|
||||
}
|
||||
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={
|
||||
val newDoc = getResponseForNameAsMap(name, "_map").getOrElse(Map[String, Any]()) ++
|
||||
entries.map(e => (new String(e._1) -> new String(e._2))).toMap
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={
|
||||
val newDoc = getResponseForNameAsMap(name, "_map").getOrElse(Map[String, Any]()) ++
|
||||
entries.map(e => (new String(e._1) -> new String(e._2))).toMap
|
||||
storeMap(name, "_map", newDoc)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte])={
|
||||
insertMapStorageEntriesFor(name, List((key, value)))
|
||||
}
|
||||
|
||||
|
||||
insertMapStorageEntriesFor(name, List((key, value)))
|
||||
}
|
||||
|
||||
|
||||
def removeMapStorageFor(name: String) {
|
||||
lazy val url = URL + name + "_map"
|
||||
findDocRev(name + "_map").foreach(deleteData(url, _))
|
||||
}
|
||||
|
||||
findDocRev(name + "_map").foreach(deleteData(url, _))
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
|
||||
lazy val sKey = new String(key)
|
||||
// if we can't find the map for name, then we don't need to delete it.
|
||||
getResponseForNameAsMap(name, "_map").foreach(doc => storeMap(name, "_map", doc - sKey))
|
||||
}
|
||||
|
||||
getResponseForNameAsMap(name, "_map").foreach(doc => storeMap(name, "_map", doc - sKey))
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
|
||||
lazy val sKey = new String(key)
|
||||
getResponseForNameAsMap(name, "_map").flatMap(_.get(sKey)).asInstanceOf[Option[String]].map(_.getBytes)
|
||||
}
|
||||
}
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = getMapStorageFor(name).size
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = getMapStorageFor(name).size
|
||||
|
||||
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
|
||||
val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]())
|
||||
m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes))
|
||||
}
|
||||
val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]())
|
||||
m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes))
|
||||
}
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
|
||||
val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]())
|
||||
val keys = m.keys.toList.sortWith(_ < _)
|
||||
|
||||
|
|
@ -112,7 +112,7 @@ private [akka] object CouchDBStorageBackend extends
|
|||
// slice from keys: both ends inclusive
|
||||
val ks = keys.slice(keys.indexOf(s), scala.math.min(keys.indexOf(s) + c, keys.indexOf(f) + 1))
|
||||
ks.map(k => (k.getBytes, m(k).asInstanceOf[String].getBytes))
|
||||
}
|
||||
}
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
insertVectorStorageEntriesFor(name, List(element))
|
||||
|
|
@ -133,16 +133,16 @@ private [akka] object CouchDBStorageBackend extends
|
|||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] ={
|
||||
val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]]
|
||||
if (v.indices.contains(index))
|
||||
v(index).getBytes
|
||||
else
|
||||
Array[Byte]()
|
||||
}
|
||||
|
||||
val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]]
|
||||
if (v.indices.contains(index))
|
||||
v(index).getBytes
|
||||
else
|
||||
Array[Byte]()
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int ={
|
||||
getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0)
|
||||
}
|
||||
getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0)
|
||||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
||||
val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).asInstanceOf[Option[List[String]]].getOrElse(List[String]())
|
||||
|
|
@ -151,60 +151,60 @@ private [akka] object CouchDBStorageBackend extends
|
|||
val c = if (count == 0) v.length else count
|
||||
v.slice(s, scala.math.min(s + c, f)).map(_.getBytes)
|
||||
}
|
||||
|
||||
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) ={
|
||||
val newDoc = getResponseForNameAsMap(name, "_ref").getOrElse(Map[String, Any]()) + ("ref" -> new String(element))
|
||||
storeMap(name, "_ref", newDoc)
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] ={
|
||||
getResponseForNameAsMap(name, "_ref").flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes)
|
||||
}
|
||||
val newDoc = getResponseForNameAsMap(name, "_ref").getOrElse(Map[String, Any]()) + ("ref" -> new String(element))
|
||||
storeMap(name, "_ref", newDoc)
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] ={
|
||||
getResponseForNameAsMap(name, "_ref").flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes)
|
||||
}
|
||||
|
||||
private def findDocRev(name: String) = {
|
||||
getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]]
|
||||
.flatMap(_.get("_rev")).asInstanceOf[Option[String]]
|
||||
}
|
||||
private def findDocRev(name: String) = {
|
||||
getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]]
|
||||
.flatMap(_.get("_rev")).asInstanceOf[Option[String]]
|
||||
}
|
||||
|
||||
private def deleteData(url:String, rev:String): Option[String] = {
|
||||
val client = new HttpClient()
|
||||
val delete = new DeleteMethod(url)
|
||||
delete.setRequestHeader("If-Match", rev)
|
||||
client.executeMethod(delete)
|
||||
|
||||
val response = delete.getResponseBodyAsString()
|
||||
if (response != null)
|
||||
Some(response)
|
||||
else
|
||||
None
|
||||
}
|
||||
private def deleteData(url:String, rev:String): Option[String] = {
|
||||
val client = new HttpClient()
|
||||
val delete = new DeleteMethod(url)
|
||||
delete.setRequestHeader("If-Match", rev)
|
||||
client.executeMethod(delete)
|
||||
|
||||
val response = delete.getResponseBodyAsString()
|
||||
if (response != null)
|
||||
Some(response)
|
||||
else
|
||||
None
|
||||
}
|
||||
|
||||
private def postData(url: String, data: String): Option[String] = {
|
||||
val client = new HttpClient()
|
||||
val post = new PostMethod(url)
|
||||
post.setRequestEntity(new StringRequestEntity(data, null, "utf-8"))
|
||||
post.setRequestHeader("Content-Type", "application/json")
|
||||
client.executeMethod(post)
|
||||
private def postData(url: String, data: String): Option[String] = {
|
||||
val client = new HttpClient()
|
||||
val post = new PostMethod(url)
|
||||
post.setRequestEntity(new StringRequestEntity(data, null, "utf-8"))
|
||||
post.setRequestHeader("Content-Type", "application/json")
|
||||
client.executeMethod(post)
|
||||
val response = post.getResponseBodyAsString
|
||||
if (response != null)
|
||||
Some(response)
|
||||
else
|
||||
None
|
||||
}
|
||||
|
||||
private def getResponse(url: String): Option[String] = {
|
||||
val client = new HttpClient()
|
||||
val method = new GetMethod(url)
|
||||
Some(response)
|
||||
else
|
||||
None
|
||||
}
|
||||
|
||||
private def getResponse(url: String): Option[String] = {
|
||||
val client = new HttpClient()
|
||||
val method = new GetMethod(url)
|
||||
|
||||
method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
|
||||
method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
|
||||
new DefaultHttpMethodRetryHandler(3, false))
|
||||
|
||||
client.executeMethod(method)
|
||||
client.executeMethod(method)
|
||||
val response = method.getResponseBodyAsString
|
||||
if (method.getStatusCode == 200 && response != null)
|
||||
Some(response)
|
||||
else
|
||||
None
|
||||
}
|
||||
if (method.getStatusCode == 200 && response != null)
|
||||
Some(response)
|
||||
else
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -82,12 +82,12 @@ BeforeAndAfterEach {
|
|||
("guido van rossum", "python"),
|
||||
("james strachan", "groovy"))
|
||||
val rl = List(
|
||||
("james gosling", "java"),
|
||||
("james strachan", "groovy"),
|
||||
("larry wall", "perl"),
|
||||
("martin odersky", "scala"),
|
||||
("ola bini", "ioke"), ("rich hickey", "clojure"),
|
||||
("slava pestov", "factor"))
|
||||
("james gosling", "java"),
|
||||
("james strachan", "groovy"),
|
||||
("larry wall", "perl"),
|
||||
("martin odersky", "scala"),
|
||||
("ola bini", "ioke"), ("rich hickey", "clojure"),
|
||||
("slava pestov", "factor"))
|
||||
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
|
||||
getMapStorageSizeFor("t1") should equal(l.size)
|
||||
getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1))
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ object CommonsCodec {
|
|||
|
||||
import CommonsCodec._
|
||||
import CommonsCodec.Base64StringEncoder._
|
||||
|
||||
|
||||
/**
|
||||
* A module for supporting Redis based persistence.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -3635,6 +3635,13 @@ public final class RemoteProtocol {
|
|||
return metadata_.get(index);
|
||||
}
|
||||
|
||||
// optional string cookie = 8;
|
||||
public static final int COOKIE_FIELD_NUMBER = 8;
|
||||
private boolean hasCookie;
|
||||
private java.lang.String cookie_ = "";
|
||||
public boolean hasCookie() { return hasCookie; }
|
||||
public java.lang.String getCookie() { return cookie_; }
|
||||
|
||||
private void initFields() {
|
||||
uuid_ = akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance();
|
||||
message_ = akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance();
|
||||
|
|
@ -3686,6 +3693,9 @@ public final class RemoteProtocol {
|
|||
for (akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) {
|
||||
output.writeMessage(7, element);
|
||||
}
|
||||
if (hasCookie()) {
|
||||
output.writeString(8, getCookie());
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
|
@ -3723,6 +3733,10 @@ public final class RemoteProtocol {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeMessageSize(7, element);
|
||||
}
|
||||
if (hasCookie()) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeStringSize(8, getCookie());
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
|
@ -3909,6 +3923,9 @@ public final class RemoteProtocol {
|
|||
}
|
||||
result.metadata_.addAll(other.metadata_);
|
||||
}
|
||||
if (other.hasCookie()) {
|
||||
setCookie(other.getCookie());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
|
@ -3989,6 +4006,10 @@ public final class RemoteProtocol {
|
|||
addMetadata(subBuilder.buildPartial());
|
||||
break;
|
||||
}
|
||||
case 66: {
|
||||
setCookie(input.readString());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4248,6 +4269,27 @@ public final class RemoteProtocol {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional string cookie = 8;
|
||||
public boolean hasCookie() {
|
||||
return result.hasCookie();
|
||||
}
|
||||
public java.lang.String getCookie() {
|
||||
return result.getCookie();
|
||||
}
|
||||
public Builder setCookie(java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
result.hasCookie = true;
|
||||
result.cookie_ = value;
|
||||
return this;
|
||||
}
|
||||
public Builder clearCookie() {
|
||||
result.hasCookie = false;
|
||||
result.cookie_ = getDefaultInstance().getCookie();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:RemoteRequestProtocol)
|
||||
}
|
||||
|
||||
|
|
@ -4341,6 +4383,13 @@ public final class RemoteProtocol {
|
|||
return metadata_.get(index);
|
||||
}
|
||||
|
||||
// optional string cookie = 8;
|
||||
public static final int COOKIE_FIELD_NUMBER = 8;
|
||||
private boolean hasCookie;
|
||||
private java.lang.String cookie_ = "";
|
||||
public boolean hasCookie() { return hasCookie; }
|
||||
public java.lang.String getCookie() { return cookie_; }
|
||||
|
||||
private void initFields() {
|
||||
uuid_ = akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance();
|
||||
message_ = akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance();
|
||||
|
|
@ -4391,6 +4440,9 @@ public final class RemoteProtocol {
|
|||
for (akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) {
|
||||
output.writeMessage(7, element);
|
||||
}
|
||||
if (hasCookie()) {
|
||||
output.writeString(8, getCookie());
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
|
@ -4428,6 +4480,10 @@ public final class RemoteProtocol {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeMessageSize(7, element);
|
||||
}
|
||||
if (hasCookie()) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeStringSize(8, getCookie());
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
|
@ -4614,6 +4670,9 @@ public final class RemoteProtocol {
|
|||
}
|
||||
result.metadata_.addAll(other.metadata_);
|
||||
}
|
||||
if (other.hasCookie()) {
|
||||
setCookie(other.getCookie());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
|
@ -4689,6 +4748,10 @@ public final class RemoteProtocol {
|
|||
addMetadata(subBuilder.buildPartial());
|
||||
break;
|
||||
}
|
||||
case 66: {
|
||||
setCookie(input.readString());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4929,6 +4992,27 @@ public final class RemoteProtocol {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional string cookie = 8;
|
||||
public boolean hasCookie() {
|
||||
return result.hasCookie();
|
||||
}
|
||||
public java.lang.String getCookie() {
|
||||
return result.getCookie();
|
||||
}
|
||||
public Builder setCookie(java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
result.hasCookie = true;
|
||||
result.cookie_ = value;
|
||||
return this;
|
||||
}
|
||||
public Builder clearCookie() {
|
||||
result.hasCookie = false;
|
||||
result.cookie_ = getDefaultInstance().getCookie();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:RemoteReplyProtocol)
|
||||
}
|
||||
|
||||
|
|
@ -6657,33 +6741,33 @@ public final class RemoteProtocol {
|
|||
"\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016type" +
|
||||
"dActorInfo\030\005 \001(\0132\027.TypedActorInfoProtoco" +
|
||||
"l\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" +
|
||||
"\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025R" +
|
||||
"\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\232\002\n\025R" +
|
||||
"emoteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" +
|
||||
"dProtocol\022!\n\007message\030\002 \002(\0132\020.MessageProt",
|
||||
"ocol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProto" +
|
||||
"col\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030" +
|
||||
"\005 \001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.R" +
|
||||
"emoteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132" +
|
||||
"\026.MetadataEntryProtocol\"\364\001\n\023RemoteReplyP" +
|
||||
"rotocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007" +
|
||||
"message\030\002 \001(\0132\020.MessageProtocol\022%\n\texcep" +
|
||||
"tion\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016superv" +
|
||||
"isorUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor" +
|
||||
"\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata",
|
||||
"\030\007 \003(\0132\026.MetadataEntryProtocol\")\n\014UuidPr" +
|
||||
"otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" +
|
||||
"adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" +
|
||||
"\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle" +
|
||||
"\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoco" +
|
||||
"l\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Exc" +
|
||||
"eptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007mes" +
|
||||
"sage\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001" +
|
||||
"\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Ser" +
|
||||
"ializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINAR",
|
||||
"Y\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PR" +
|
||||
"OTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001" +
|
||||
"\022\r\n\tTEMPORARY\020\002B\030\n\024akka.remote.protocolH" +
|
||||
"\001"
|
||||
"\026.MetadataEntryProtocol\022\016\n\006cookie\030\010 \001(\t\"" +
|
||||
"\204\002\n\023RemoteReplyProtocol\022\033\n\004uuid\030\001 \002(\0132\r." +
|
||||
"UuidProtocol\022!\n\007message\030\002 \001(\0132\020.MessageP" +
|
||||
"rotocol\022%\n\texception\030\003 \001(\0132\022.ExceptionPr" +
|
||||
"otocol\022%\n\016supervisorUuid\030\004 \001(\0132\r.UuidPro" +
|
||||
"tocol\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccessful\030\006",
|
||||
" \002(\010\022(\n\010metadata\030\007 \003(\0132\026.MetadataEntryPr" +
|
||||
"otocol\022\016\n\006cookie\030\010 \001(\t\")\n\014UuidProtocol\022\014" +
|
||||
"\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEnt" +
|
||||
"ryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6" +
|
||||
"\n\021LifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016" +
|
||||
".LifeCycleType\"1\n\017AddressProtocol\022\020\n\010hos" +
|
||||
"tname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionPr" +
|
||||
"otocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002" +
|
||||
"(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016\n\nJAVA" +
|
||||
"_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Serializati",
|
||||
"onSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nS" +
|
||||
"CALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005" +
|
||||
"*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMP" +
|
||||
"ORARY\020\002B\030\n\024akka.remote.protocolH\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
@ -6751,7 +6835,7 @@ public final class RemoteProtocol {
|
|||
internal_static_RemoteRequestProtocol_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_RemoteRequestProtocol_descriptor,
|
||||
new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", },
|
||||
new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", "Cookie", },
|
||||
akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class,
|
||||
akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class);
|
||||
internal_static_RemoteReplyProtocol_descriptor =
|
||||
|
|
@ -6759,7 +6843,7 @@ public final class RemoteProtocol {
|
|||
internal_static_RemoteReplyProtocol_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_RemoteReplyProtocol_descriptor,
|
||||
new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", },
|
||||
new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", "Cookie", },
|
||||
akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class,
|
||||
akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class);
|
||||
internal_static_UuidProtocol_descriptor =
|
||||
|
|
|
|||
|
|
@ -91,6 +91,13 @@ message TypedActorInfoProtocol {
|
|||
required string method = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote connection handshake.
|
||||
*/
|
||||
//message HandshakeProtocol {
|
||||
// required string cookie = 1;
|
||||
//}
|
||||
|
||||
/**
|
||||
* Defines a remote message request.
|
||||
*/
|
||||
|
|
@ -102,6 +109,7 @@ message RemoteRequestProtocol {
|
|||
optional UuidProtocol supervisorUuid = 5;
|
||||
optional RemoteActorRefProtocol sender = 6;
|
||||
repeated MetadataEntryProtocol metadata = 7;
|
||||
optional string cookie = 8;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -115,6 +123,7 @@ message RemoteReplyProtocol {
|
|||
required bool isActor = 5;
|
||||
required bool isSuccessful = 6;
|
||||
repeated MetadataEntryProtocol metadata = 7;
|
||||
optional string cookie = 8;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -12,26 +12,29 @@ import akka.config.Config._
|
|||
import akka.serialization.RemoteActorSerialization._
|
||||
import akka.AkkaException
|
||||
import Actor._
|
||||
|
||||
import org.jboss.netty.channel._
|
||||
import group.DefaultChannelGroup
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap
|
||||
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
||||
import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
|
||||
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
|
||||
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
|
||||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
|
||||
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
|
||||
import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
|
||||
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
import java.net.{SocketAddress, InetSocketAddress}
|
||||
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet}
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.net.{ SocketAddress, InetSocketAddress }
|
||||
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
|
||||
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
|
||||
|
||||
import scala.collection.mutable.{HashSet, HashMap}
|
||||
import scala.collection.mutable.{ HashSet, HashMap }
|
||||
import scala.reflect.BeanProperty
|
||||
|
||||
import akka.actor._
|
||||
import akka.util._
|
||||
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteClient.
|
||||
*/
|
||||
|
|
@ -51,7 +54,7 @@ case class RemoteClientShutdown(
|
|||
/**
|
||||
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
|
||||
*/
|
||||
class RemoteClientException private[akka](message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
|
||||
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
|
||||
|
|
@ -59,11 +62,18 @@ class RemoteClientException private[akka](message: String, @BeanProperty val cli
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteClient extends Logging {
|
||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
|
||||
|
||||
val SECURE_COOKIE: Option[String] = {
|
||||
val cookie = config.getString("akka.remote.secure-cookie", "")
|
||||
if (cookie == "") None
|
||||
else Some(cookie)
|
||||
}
|
||||
|
||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
|
||||
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
|
||||
|
||||
private val remoteClients = new HashMap[String, RemoteClient]
|
||||
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
|
||||
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
|
||||
|
||||
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
|
||||
|
|
@ -86,23 +96,23 @@ object RemoteClient extends Logging {
|
|||
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = {
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = {
|
||||
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None)
|
||||
}
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = {
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T = {
|
||||
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
|
||||
}
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = {
|
||||
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
|
||||
}
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
|
||||
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = {
|
||||
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
|
||||
}
|
||||
|
||||
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = {
|
||||
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = {
|
||||
val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor)
|
||||
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
|
||||
}
|
||||
|
|
@ -158,7 +168,7 @@ object RemoteClient extends Logging {
|
|||
* Clean-up all open connections.
|
||||
*/
|
||||
def shutdownAll = synchronized {
|
||||
remoteClients.foreach({case (addr, client) => client.shutdown})
|
||||
remoteClients.foreach({ case (addr, client) => client.shutdown })
|
||||
remoteClients.clear
|
||||
}
|
||||
|
||||
|
|
@ -200,34 +210,40 @@ class RemoteClient private[akka] (
|
|||
private val remoteAddress = new InetSocketAddress(hostname, port)
|
||||
|
||||
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
||||
@volatile private var bootstrap: ClientBootstrap = _
|
||||
@volatile private[remote] var connection: ChannelFuture = _
|
||||
@volatile private[remote] var openChannels: DefaultChannelGroup = _
|
||||
@volatile private var timer: HashedWheelTimer = _
|
||||
@volatile
|
||||
private var bootstrap: ClientBootstrap = _
|
||||
@volatile
|
||||
private[remote] var connection: ChannelFuture = _
|
||||
@volatile
|
||||
private[remote] var openChannels: DefaultChannelGroup = _
|
||||
@volatile
|
||||
private var timer: HashedWheelTimer = _
|
||||
private[remote] val runSwitch = new Switch()
|
||||
|
||||
private[remote] val isAuthenticated = new AtomicBoolean(false)
|
||||
|
||||
private[remote] def isRunning = runSwitch.isOn
|
||||
|
||||
private val reconnectionTimeWindow = Duration(config.getInt(
|
||||
"akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
||||
@volatile private var reconnectionTimeWindowStart = 0L
|
||||
@volatile
|
||||
private var reconnectionTimeWindowStart = 0L
|
||||
|
||||
def connect = runSwitch switchOn {
|
||||
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
|
||||
timer = new HashedWheelTimer
|
||||
bootstrap = new ClientBootstrap(
|
||||
new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,Executors.newCachedThreadPool
|
||||
)
|
||||
)
|
||||
|
||||
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
|
||||
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
||||
|
||||
// Wait until the connection attempt succeeds or fails.
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
|
||||
if (!connection.isSuccess) {
|
||||
notifyListeners(RemoteClientError(connection.getCause, this))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
|
|
@ -268,31 +284,34 @@ class RemoteClient private[akka] (
|
|||
actorRef: ActorRef,
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType): Option[CompletableFuture[T]] = {
|
||||
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
|
||||
else None
|
||||
send(createRemoteRequestProtocolBuilder(
|
||||
actorRef, message, isOneWay, senderOption, typedActorInfo, actorType).build, senderFuture)
|
||||
}
|
||||
actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, cookie).build, senderFuture)
|
||||
}
|
||||
|
||||
def send[T](
|
||||
request: RemoteRequestProtocol,
|
||||
senderFuture: Option[CompletableFuture[T]]):
|
||||
Option[CompletableFuture[T]] = if (isRunning) {
|
||||
if (request.getIsOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
None
|
||||
} else {
|
||||
futures.synchronized {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
futures.put(uuidFrom(request.getUuid.getHigh,request.getUuid.getLow), futureResult)
|
||||
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||
if (isRunning) {
|
||||
if (request.getIsOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
Some(futureResult)
|
||||
None
|
||||
} else {
|
||||
futures.synchronized {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult)
|
||||
connection.getChannel.write(request)
|
||||
Some(futureResult)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
val exception = new RemoteClientException(
|
||||
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
|
||||
notifyListeners(RemoteClientError(exception, this))
|
||||
throw exception
|
||||
}
|
||||
} else {
|
||||
val exception = new RemoteClientException(
|
||||
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
|
||||
notifyListeners(RemoteClientError(exception, this))
|
||||
throw exception
|
||||
}
|
||||
|
||||
private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
|
||||
|
|
@ -325,13 +344,13 @@ class RemoteClient private[akka] (
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClientPipelineFactory(
|
||||
name: String,
|
||||
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
supervisors: ConcurrentMap[Uuid, ActorRef],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: SocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
client: RemoteClient) extends ChannelPipelineFactory {
|
||||
name: String,
|
||||
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
supervisors: ConcurrentMap[Uuid, ActorRef],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: SocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
client: RemoteClient) extends ChannelPipelineFactory {
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
|
||||
|
|
@ -343,15 +362,15 @@ class RemoteClientPipelineFactory(
|
|||
e
|
||||
}
|
||||
|
||||
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
case _ => (join(), join())
|
||||
}
|
||||
|
||||
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
|
||||
|
|
@ -365,18 +384,18 @@ class RemoteClientPipelineFactory(
|
|||
*/
|
||||
@ChannelHandler.Sharable
|
||||
class RemoteClientHandler(
|
||||
val name: String,
|
||||
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
val supervisors: ConcurrentMap[Uuid, ActorRef],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: SocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: RemoteClient)
|
||||
extends SimpleChannelUpstreamHandler with Logging {
|
||||
val name: String,
|
||||
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
val supervisors: ConcurrentMap[Uuid, ActorRef],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: SocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: RemoteClient)
|
||||
extends SimpleChannelUpstreamHandler with Logging {
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
log.debug(event.toString)
|
||||
}
|
||||
super.handleUpstream(ctx, event)
|
||||
|
|
@ -387,7 +406,7 @@ class RemoteClientHandler(
|
|||
val result = event.getMessage
|
||||
if (result.isInstanceOf[RemoteReplyProtocol]) {
|
||||
val reply = result.asInstanceOf[RemoteReplyProtocol]
|
||||
val replyUuid = uuidFrom(reply.getUuid.getHigh,reply.getUuid.getLow)
|
||||
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow)
|
||||
log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString)
|
||||
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
if (reply.getIsSuccessful) {
|
||||
|
|
@ -395,7 +414,7 @@ class RemoteClientHandler(
|
|||
future.completeWithResult(message)
|
||||
} else {
|
||||
if (reply.hasSupervisorUuid()) {
|
||||
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh,reply.getSupervisorUuid.getLow)
|
||||
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
|
||||
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
|
||||
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
|
||||
val supervisedActor = supervisors.get(supervisorUuid)
|
||||
|
|
@ -424,6 +443,7 @@ class RemoteClientHandler(
|
|||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
client.openChannels.remove(event.getChannel)
|
||||
client.isAuthenticated.set(false)
|
||||
log.debug("Remote client reconnecting to [%s]", remoteAddress)
|
||||
client.connection = bootstrap.connect(remoteAddress)
|
||||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||
|
|
@ -469,9 +489,9 @@ class RemoteClientHandler(
|
|||
val exception = reply.getException
|
||||
val classname = exception.getClassname
|
||||
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
|
||||
else Class.forName(classname)
|
||||
else Class.forName(classname)
|
||||
exceptionClass
|
||||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import org.jboss.netty.handler.ssl.SslHandler
|
|||
|
||||
import scala.collection.mutable.Map
|
||||
import scala.reflect.BeanProperty
|
||||
import akka.config.ConfigurationException
|
||||
|
||||
/**
|
||||
* Use this object if you need a single remote server on a specific node.
|
||||
|
|
@ -61,21 +62,30 @@ import scala.reflect.BeanProperty
|
|||
object RemoteNode extends RemoteServer
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
* Holds configuration variables, remote actors, remote typed actors and remote servers.
|
||||
* For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object
|
||||
RemoteServer {
|
||||
object RemoteServer {
|
||||
val UUID_PREFIX = "uuid:"
|
||||
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
||||
val PORT = config.getInt("akka.remote.server.port", 9999)
|
||||
|
||||
val SECURE_COOKIE: Option[String] = {
|
||||
val cookie = config.getString("akka.remote.secure-cookie", "")
|
||||
if (cookie == "") None
|
||||
else Some(cookie)
|
||||
}
|
||||
val REQUIRE_COOKIE = {
|
||||
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
|
||||
if (RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
|
||||
requireCookie
|
||||
}
|
||||
|
||||
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
||||
val PORT = config.getInt("akka.remote.server.port", 9999)
|
||||
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
|
||||
|
||||
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
|
||||
val ZLIB_COMPRESSION_LEVEL = {
|
||||
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
|
||||
val ZLIB_COMPRESSION_LEVEL = {
|
||||
val level = config.getInt("akka.remote.zlib-compression-level", 6)
|
||||
if (level < 1 && level > 9) throw new IllegalArgumentException(
|
||||
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
|
||||
|
|
@ -128,7 +138,6 @@ RemoteServer {
|
|||
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -203,13 +212,14 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
address = Address(_hostname,_port)
|
||||
log.info("Starting remote server at [%s:%s]", hostname, port)
|
||||
RemoteServer.register(hostname, port, this)
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(
|
||||
name, openChannels, loader, this)
|
||||
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
bootstrap.setOption("child.keepAlive", true)
|
||||
bootstrap.setOption("child.reuseAddress", true)
|
||||
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
|
||||
|
||||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||
_isRunning = true
|
||||
Cluster.registerLocalNode(hostname, port)
|
||||
|
|
@ -251,11 +261,8 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
*/
|
||||
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
||||
log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
|
||||
if (id.startsWith(UUID_PREFIX)) {
|
||||
registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid())
|
||||
} else {
|
||||
registerTypedActor(id, typedActor, typedActors())
|
||||
}
|
||||
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
|
||||
else registerTypedActor(id, typedActor, typedActors)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -270,28 +277,19 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
*/
|
||||
def register(id: String, actorRef: ActorRef): Unit = synchronized {
|
||||
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
||||
if (id.startsWith(UUID_PREFIX)) {
|
||||
register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid())
|
||||
} else {
|
||||
register(id, actorRef, actors())
|
||||
}
|
||||
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
|
||||
else register(id, actorRef, actors)
|
||||
}
|
||||
|
||||
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
|
||||
if (_isRunning) {
|
||||
if (!registry.contains(id)) {
|
||||
if (!actorRef.isRunning) actorRef.start
|
||||
registry.put(id, actorRef)
|
||||
}
|
||||
if (_isRunning && !registry.contains(id)) {
|
||||
if (!actorRef.isRunning) actorRef.start
|
||||
registry.put(id, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
|
||||
if (_isRunning) {
|
||||
if (!registry.contains(id)) {
|
||||
registry.put(id, typedActor)
|
||||
}
|
||||
}
|
||||
if (_isRunning && !registry.contains(id)) registry.put(id, typedActor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -300,8 +298,8 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
def unregister(actorRef: ActorRef):Unit = synchronized {
|
||||
if (_isRunning) {
|
||||
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
|
||||
actors().remove(actorRef.id,actorRef)
|
||||
actorsByUuid().remove(actorRef.uuid,actorRef)
|
||||
actors.remove(actorRef.id, actorRef)
|
||||
actorsByUuid.remove(actorRef.uuid, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -313,12 +311,11 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
def unregister(id: String):Unit = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote actor with id [%s]", id)
|
||||
if (id.startsWith(UUID_PREFIX)) {
|
||||
actorsByUuid().remove(id.substring(UUID_PREFIX.length))
|
||||
} else {
|
||||
val actorRef = actors() get id
|
||||
actorsByUuid().remove(actorRef.uuid,actorRef)
|
||||
actors().remove(id,actorRef)
|
||||
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
|
||||
else {
|
||||
val actorRef = actors get id
|
||||
actorsByUuid.remove(actorRef.uuid, actorRef)
|
||||
actors.remove(id,actorRef)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -331,11 +328,8 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
def unregisterTypedActor(id: String):Unit = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
||||
if (id.startsWith(UUID_PREFIX)) {
|
||||
typedActorsByUuid().remove(id.substring(UUID_PREFIX.length))
|
||||
} else {
|
||||
typedActors().remove(id)
|
||||
}
|
||||
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
|
||||
else typedActors.remove(id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -343,10 +337,10 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
|
||||
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||
|
||||
private[akka] def actors() = ActorRegistry.actors(address)
|
||||
private[akka] def actorsByUuid() = ActorRegistry.actorsByUuid(address)
|
||||
private[akka] def typedActors() = ActorRegistry.typedActors(address)
|
||||
private[akka] def typedActorsByUuid() = ActorRegistry.typedActorsByUuid(address)
|
||||
private[akka] def actors = ActorRegistry.actors(address)
|
||||
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
|
||||
private[akka] def typedActors = ActorRegistry.typedActors(address)
|
||||
private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address)
|
||||
}
|
||||
|
||||
object RemoteServerSslContext {
|
||||
|
|
@ -389,7 +383,7 @@ class RemoteServerPipelineFactory(
|
|||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
}
|
||||
|
|
@ -410,7 +404,9 @@ class RemoteServerHandler(
|
|||
val applicationLoader: Option[ClassLoader],
|
||||
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
|
||||
import RemoteServer._
|
||||
|
||||
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||
val CHANNEL_INIT = "channel-init".intern
|
||||
|
||||
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
|
||||
|
||||
|
|
@ -434,9 +430,8 @@ class RemoteServerHandler(
|
|||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else {
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
}
|
||||
} else server.notifyListeners(RemoteServerClientConnected(server))
|
||||
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
|
|
@ -445,8 +440,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
log.debug(event.toString)
|
||||
}
|
||||
super.handleUpstream(ctx, event)
|
||||
|
|
@ -456,7 +450,9 @@ class RemoteServerHandler(
|
|||
val message = event.getMessage
|
||||
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
||||
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
||||
handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel)
|
||||
val requestProtocol = message.asInstanceOf[RemoteRequestProtocol]
|
||||
if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
|
||||
handleRemoteRequestProtocol(requestProtocol, event.getChannel)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -491,8 +487,11 @@ class RemoteServerHandler(
|
|||
case RemoteActorSystemMessage.Stop => actorRef.stop
|
||||
case _ => // then match on user defined messages
|
||||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some(
|
||||
new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
||||
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
None,
|
||||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
||||
override def onComplete(result: AnyRef) {
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
|
|
@ -506,8 +505,7 @@ class RemoteServerHandler(
|
|||
try {
|
||||
channel.write(replyBuilder.build)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -515,8 +513,7 @@ class RemoteServerHandler(
|
|||
try {
|
||||
channel.write(createErrorReplyMessage(exception, request, true))
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -528,8 +525,8 @@ class RemoteServerHandler(
|
|||
val actorInfo = request.getActorInfo
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
||||
val typedActor = createTypedActor(actorInfo)
|
||||
|
||||
val typedActor = createTypedActor(actorInfo)
|
||||
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
||||
val argClasses = args.map(_.getClass)
|
||||
|
||||
|
|
@ -551,49 +548,39 @@ class RemoteServerHandler(
|
|||
case e: InvocationTargetException =>
|
||||
channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable =>
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, false))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
|
||||
private def findActorById(id: String) : ActorRef = {
|
||||
server.actors().get(id)
|
||||
server.actors.get(id)
|
||||
}
|
||||
|
||||
private def findActorByUuid(uuid: String) : ActorRef = {
|
||||
server.actorsByUuid().get(uuid)
|
||||
server.actorsByUuid.get(uuid)
|
||||
}
|
||||
|
||||
private def findTypedActorById(id: String) : AnyRef = {
|
||||
server.typedActors().get(id)
|
||||
server.typedActors.get(id)
|
||||
}
|
||||
|
||||
private def findTypedActorByUuid(uuid: String) : AnyRef = {
|
||||
server.typedActorsByUuid().get(uuid)
|
||||
server.typedActorsByUuid.get(uuid)
|
||||
}
|
||||
|
||||
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
||||
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
|
||||
findActorByUuid(id.substring(UUID_PREFIX.length))
|
||||
} else {
|
||||
findActorById(id)
|
||||
}
|
||||
if (actorRefOrNull eq null) {
|
||||
actorRefOrNull = findActorByUuid(uuid)
|
||||
}
|
||||
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length))
|
||||
else findActorById(id)
|
||||
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
|
||||
actorRefOrNull
|
||||
}
|
||||
|
||||
private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = {
|
||||
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
|
||||
findTypedActorByUuid(id.substring(UUID_PREFIX.length))
|
||||
} else {
|
||||
findTypedActorById(id)
|
||||
}
|
||||
if (actorRefOrNull eq null) {
|
||||
actorRefOrNull = findTypedActorByUuid(uuid)
|
||||
}
|
||||
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length))
|
||||
else findTypedActorById(id)
|
||||
if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid)
|
||||
actorRefOrNull
|
||||
}
|
||||
|
||||
|
|
@ -677,4 +664,19 @@ class RemoteServerHandler(
|
|||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
replyBuilder.build
|
||||
}
|
||||
|
||||
private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = {
|
||||
val attachment = ctx.getAttachment
|
||||
if ((attachment ne null) &&
|
||||
attachment.isInstanceOf[String] &&
|
||||
attachment.asInstanceOf[String] == CHANNEL_INIT) { // is first time around, channel initialization
|
||||
ctx.setAttachment(null)
|
||||
val clientAddress = ctx.getChannel.getRemoteAddress.toString
|
||||
if (!request.hasCookie) throw new SecurityException(
|
||||
"The remote client [" + clientAddress + "] does not have a secure cookie.")
|
||||
if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException(
|
||||
"The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie")
|
||||
log.info("Remote client [%s] successfully authenticated using secure cookie", clientAddress)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,8 +8,9 @@ import akka.stm.global._
|
|||
import akka.stm.TransactionManagement._
|
||||
import akka.stm.TransactionManagement
|
||||
import akka.dispatch.MessageInvocation
|
||||
import akka.remote.{RemoteServer, MessageSerializer}
|
||||
import akka.remote.{RemoteServer, RemoteClient, MessageSerializer}
|
||||
import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
|
||||
|
||||
import ActorTypeProtocol._
|
||||
import akka.config.Supervision._
|
||||
import akka.actor.{uuidFrom,newUuid}
|
||||
|
|
@ -132,7 +133,8 @@ object ActorSerialization {
|
|||
false,
|
||||
actorRef.getSender,
|
||||
None,
|
||||
ActorType.ScalaActor).build)
|
||||
ActorType.ScalaActor,
|
||||
RemoteClient.SECURE_COOKIE).build)
|
||||
|
||||
requestProtocols.foreach(rp => builder.addMessages(rp))
|
||||
}
|
||||
|
|
@ -248,11 +250,11 @@ object RemoteActorSerialization {
|
|||
ActorRegistry.registerActorByUuid(homeAddress, uuid.toString, ar)
|
||||
|
||||
RemoteActorRefProtocol.newBuilder
|
||||
.setClassOrServiceName(uuid.toString)
|
||||
.setActorClassname(actorClassName)
|
||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||
.setTimeout(timeout)
|
||||
.build
|
||||
.setClassOrServiceName(uuid.toString)
|
||||
.setActorClassname(actorClassName)
|
||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||
.setTimeout(timeout)
|
||||
.build
|
||||
}
|
||||
|
||||
def createRemoteRequestProtocolBuilder(
|
||||
|
|
@ -261,8 +263,8 @@ object RemoteActorSerialization {
|
|||
isOneWay: Boolean,
|
||||
senderOption: Option[ActorRef],
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType):
|
||||
RemoteRequestProtocol.Builder = {
|
||||
actorType: ActorType,
|
||||
secureCookie: Option[String]): RemoteRequestProtocol.Builder = {
|
||||
import actorRef._
|
||||
|
||||
val actorInfoBuilder = ActorInfoProtocol.newBuilder
|
||||
|
|
@ -271,13 +273,12 @@ object RemoteActorSerialization {
|
|||
.setTarget(actorClassName)
|
||||
.setTimeout(timeout)
|
||||
|
||||
typedActorInfo.foreach {
|
||||
typedActor =>
|
||||
actorInfoBuilder.setTypedActorInfo(
|
||||
TypedActorInfoProtocol.newBuilder
|
||||
.setInterface(typedActor._1)
|
||||
.setMethod(typedActor._2)
|
||||
.build)
|
||||
typedActorInfo.foreach { typedActor =>
|
||||
actorInfoBuilder.setTypedActorInfo(
|
||||
TypedActorInfoProtocol.newBuilder
|
||||
.setInterface(typedActor._1)
|
||||
.setMethod(typedActor._2)
|
||||
.build)
|
||||
}
|
||||
|
||||
actorType match {
|
||||
|
|
@ -292,6 +293,8 @@ object RemoteActorSerialization {
|
|||
.setActorInfo(actorInfo)
|
||||
.setIsOneWay(isOneWay)
|
||||
|
||||
secureCookie.foreach(requestBuilder.setCookie(_))
|
||||
|
||||
val id = registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(
|
||||
UuidProtocol.newBuilder
|
||||
|
|
@ -306,8 +309,6 @@ object RemoteActorSerialization {
|
|||
}
|
||||
requestBuilder
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -404,5 +405,4 @@ object RemoteTypedActorSerialization {
|
|||
.setInterfaceName(init.interfaceClass.getName)
|
||||
.build
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
37
akka-remote/src/test/scala/remote/RemoteAgentSpec.scala
Normal file
37
akka-remote/src/test/scala/remote/RemoteAgentSpec.scala
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
package akka.actor.remote
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{Test, Before, After}
|
||||
import akka.config.RemoteAddress
|
||||
import akka.actor.Agent
|
||||
import akka.remote. {RemoteClient, RemoteServer}
|
||||
|
||||
|
||||
class RemoteAgentSpec extends JUnitSuite {
|
||||
var server: RemoteServer = _
|
||||
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9992
|
||||
|
||||
@Before def startServer {
|
||||
val s = new RemoteServer()
|
||||
s.start(HOSTNAME, PORT)
|
||||
server = s
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
@After def stopServer {
|
||||
val s = server
|
||||
server = null
|
||||
s.shutdown
|
||||
RemoteClient.shutdownAll
|
||||
}
|
||||
|
||||
@Test def remoteAgentShouldInitializeProperly {
|
||||
val a = Agent(10,RemoteAddress(HOSTNAME,PORT))
|
||||
assert(a() == 10, "Remote agent should have the proper initial value")
|
||||
a(20)
|
||||
assert(a() == 20, "Remote agent should be updated properly")
|
||||
a.close
|
||||
}
|
||||
}
|
||||
|
|
@ -201,18 +201,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
def shouldRegisterAndUnregister {
|
||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
server.register("my-service-1", actor1)
|
||||
assert(server.actors().get("my-service-1") ne null, "actor registered")
|
||||
assert(server.actors.get("my-service-1") ne null, "actor registered")
|
||||
server.unregister("my-service-1")
|
||||
assert(server.actors().get("my-service-1") eq null, "actor unregistered")
|
||||
assert(server.actors.get("my-service-1") eq null, "actor unregistered")
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldRegisterAndUnregisterByUuid {
|
||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
server.register("uuid:" + actor1.uuid, actor1)
|
||||
assert(server.actorsByUuid().get(actor1.uuid.toString) ne null, "actor registered")
|
||||
assert(server.actorsByUuid.get(actor1.uuid.toString) ne null, "actor registered")
|
||||
server.unregister("uuid:" + actor1.uuid)
|
||||
assert(server.actorsByUuid().get(actor1.uuid) eq null, "actor unregistered")
|
||||
assert(server.actorsByUuid.get(actor1.uuid) eq null, "actor unregistered")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,9 +103,9 @@ class ServerInitiatedRemoteTypedActorSpec extends
|
|||
it("should register and unregister typed actors") {
|
||||
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||
server.registerTypedActor("my-test-service", typedActor)
|
||||
assert(server.typedActors().get("my-test-service") ne null, "typed actor registered")
|
||||
assert(server.typedActors.get("my-test-service") ne null, "typed actor registered")
|
||||
server.unregisterTypedActor("my-test-service")
|
||||
assert(server.typedActors().get("my-test-service") eq null, "typed actor unregistered")
|
||||
assert(server.typedActors.get("my-test-service") eq null, "typed actor unregistered")
|
||||
}
|
||||
|
||||
it("should register and unregister typed actors by uuid") {
|
||||
|
|
@ -113,9 +113,9 @@ class ServerInitiatedRemoteTypedActorSpec extends
|
|||
val init = AspectInitRegistry.initFor(typedActor)
|
||||
val uuid = "uuid:" + init.actorRef.uuid
|
||||
server.registerTypedActor(uuid, typedActor)
|
||||
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
||||
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
||||
server.unregisterTypedActor(uuid)
|
||||
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) eq null, "typed actor unregistered")
|
||||
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) eq null, "typed actor unregistered")
|
||||
}
|
||||
|
||||
it("should find typed actors by uuid") {
|
||||
|
|
@ -123,7 +123,7 @@ class ServerInitiatedRemoteTypedActorSpec extends
|
|||
val init = AspectInitRegistry.initFor(typedActor)
|
||||
val uuid = "uuid:" + init.actorRef.uuid
|
||||
server.registerTypedActor(uuid, typedActor)
|
||||
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
||||
assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered")
|
||||
|
||||
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
|
||||
expect("oneway") {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,13 @@ object Put extends ChopstickMessage
|
|||
case class Taken(chopstick: ActorRef) extends ChopstickMessage
|
||||
case class Busy(chopstick: ActorRef) extends ChopstickMessage
|
||||
|
||||
/**
|
||||
* Some states the chopstick can be in
|
||||
*/
|
||||
sealed trait ChopstickState
|
||||
case object Available extends ChopstickState
|
||||
case object Taken extends ChopstickState
|
||||
|
||||
/**
|
||||
* Some state container for the chopstick
|
||||
*/
|
||||
|
|
@ -20,27 +27,27 @@ case class TakenBy(hakker: Option[ActorRef])
|
|||
/*
|
||||
* A chopstick is an actor, it can be taken, and put back
|
||||
*/
|
||||
class Chopstick(name: String) extends Actor with FSM[TakenBy] {
|
||||
class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] {
|
||||
self.id = name
|
||||
|
||||
// A chopstick begins its existence as available and taken by no one
|
||||
def initialState = State(NextState, available, TakenBy(None))
|
||||
|
||||
// When a chopstick is available, it can be taken by a some hakker
|
||||
def available: StateFunction = {
|
||||
inState(Available) {
|
||||
case Event(Take, _) =>
|
||||
State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self)))
|
||||
goto(Taken) using TakenBy(self.sender) replying Taken(self)
|
||||
}
|
||||
|
||||
// When a chopstick is taken by a hakker
|
||||
// It will refuse to be taken by other hakkers
|
||||
// But the owning hakker can put it back
|
||||
def taken: StateFunction = {
|
||||
inState(Taken) {
|
||||
case Event(Take, currentState) =>
|
||||
State(Reply, taken, currentState, replyValue = Some(Busy(self)))
|
||||
stay replying Busy(self)
|
||||
case Event(Put, TakenBy(hakker)) if self.sender == hakker =>
|
||||
State(NextState, available, TakenBy(None))
|
||||
goto(Available) using TakenBy(None)
|
||||
}
|
||||
|
||||
// A chopstick begins its existence as available and taken by no one
|
||||
setInitialState(Available, TakenBy(None))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -49,6 +56,17 @@ class Chopstick(name: String) extends Actor with FSM[TakenBy] {
|
|||
sealed trait FSMHakkerMessage
|
||||
object Think extends FSMHakkerMessage
|
||||
|
||||
/**
|
||||
* Some fsm hakker states
|
||||
*/
|
||||
sealed trait FSMHakkerState
|
||||
case object Waiting extends FSMHakkerState
|
||||
case object Thinking extends FSMHakkerState
|
||||
case object Hungry extends FSMHakkerState
|
||||
case object WaitForOtherChopstick extends FSMHakkerState
|
||||
case object FirstChopstickDenied extends FSMHakkerState
|
||||
case object Eating extends FSMHakkerState
|
||||
|
||||
/**
|
||||
* Some state container to keep track of which chopsticks we have
|
||||
*/
|
||||
|
|
@ -57,13 +75,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef])
|
|||
/*
|
||||
* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
|
||||
*/
|
||||
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] {
|
||||
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] {
|
||||
self.id = name
|
||||
|
||||
//All hakkers start waiting
|
||||
def initialState = State(NextState, waiting, TakenChopsticks(None, None))
|
||||
|
||||
def waiting: StateFunction = {
|
||||
inState(Waiting) {
|
||||
case Event(Think, _) =>
|
||||
log.info("%s starts to think", name)
|
||||
startThinking(5000)
|
||||
|
|
@ -71,30 +86,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
|
|||
|
||||
//When a hakker is thinking it can become hungry
|
||||
//and try to pick up its chopsticks and eat
|
||||
def thinking: StateFunction = {
|
||||
case Event(StateTimeout, current) =>
|
||||
inState(Thinking) {
|
||||
case Event(StateTimeout, _) =>
|
||||
left ! Take
|
||||
right ! Take
|
||||
State(NextState, hungry, current)
|
||||
goto(Hungry)
|
||||
}
|
||||
|
||||
// When a hakker is hungry it tries to pick up its chopsticks and eat
|
||||
// When it picks one up, it goes into wait for the other
|
||||
// If the hakkers first attempt at grabbing a chopstick fails,
|
||||
// it starts to wait for the response of the other grab
|
||||
def hungry: StateFunction = {
|
||||
inState(Hungry) {
|
||||
case Event(Taken(`left`), _) =>
|
||||
State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None))
|
||||
goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None)
|
||||
case Event(Taken(`right`), _) =>
|
||||
State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right)))
|
||||
case Event(Busy(_), current) =>
|
||||
State(NextState, firstChopstickDenied, current)
|
||||
goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right))
|
||||
case Event(Busy(_), _) =>
|
||||
goto(FirstChopstickDenied)
|
||||
}
|
||||
|
||||
// When a hakker is waiting for the last chopstick it can either obtain it
|
||||
// and start eating, or the other chopstick was busy, and the hakker goes
|
||||
// back to think about how he should obtain his chopsticks :-)
|
||||
def waitForOtherChopstick: StateFunction = {
|
||||
inState(WaitForOtherChopstick) {
|
||||
case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right)
|
||||
case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right)
|
||||
case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) =>
|
||||
|
|
@ -105,13 +120,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
|
|||
|
||||
private def startEating(left: ActorRef, right: ActorRef): State = {
|
||||
log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id)
|
||||
State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000))
|
||||
goto(Eating) using TakenChopsticks(Some(left), Some(right)) until 5000
|
||||
}
|
||||
|
||||
// When the results of the other grab comes back,
|
||||
// he needs to put it back if he got the other one.
|
||||
// Then go back and think and try to grab the chopsticks again
|
||||
def firstChopstickDenied: StateFunction = {
|
||||
inState(FirstChopstickDenied) {
|
||||
case Event(Taken(secondChopstick), _) =>
|
||||
secondChopstick ! Put
|
||||
startThinking(10)
|
||||
|
|
@ -121,7 +136,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
|
|||
|
||||
// When a hakker is eating, he can decide to start to think,
|
||||
// then he puts down his chopsticks and starts to think
|
||||
def eating: StateFunction = {
|
||||
inState(Eating) {
|
||||
case Event(StateTimeout, _) =>
|
||||
log.info("%s puts down his chopsticks and starts to think", name)
|
||||
left ! Put
|
||||
|
|
@ -130,15 +145,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
|
|||
}
|
||||
|
||||
private def startThinking(period: Int): State = {
|
||||
State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period))
|
||||
goto(Thinking) using TakenChopsticks(None, None) until period
|
||||
}
|
||||
|
||||
//All hakkers start waiting
|
||||
setInitialState(Waiting, TakenChopsticks(None, None))
|
||||
}
|
||||
|
||||
/*
|
||||
* Alright, here's our test-harness
|
||||
*/
|
||||
object DiningHakkersOnFSM {
|
||||
def run {
|
||||
object DiningHakkersOnFsm {
|
||||
|
||||
def run = {
|
||||
// Create 5 chopsticks
|
||||
val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start
|
||||
// Create 5 awesome fsm hakkers and assign them their left and right chopstick
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ object AkkaRepositories {
|
|||
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||
val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
|
||||
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
|
||||
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
akka {
|
||||
version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka.
|
||||
|
||||
|
||||
time-unit = "seconds" # Default timeout time unit for all timeout properties throughout the config
|
||||
|
||||
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
||||
|
|
@ -109,6 +109,9 @@ akka {
|
|||
}
|
||||
|
||||
remote {
|
||||
|
||||
secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
|
||||
|
||||
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
|
||||
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
||||
|
||||
|
|
@ -133,6 +136,7 @@ akka {
|
|||
hostname = "localhost" # The hostname or IP that clients should connect to
|
||||
port = 9999 # The port clients should connect to
|
||||
connection-timeout = 1
|
||||
require-cookie = on
|
||||
}
|
||||
|
||||
client {
|
||||
|
|
|
|||
|
|
@ -38,29 +38,29 @@
|
|||
<Set name="Acceptors">2</Set>
|
||||
<Set name="statsOn">false</Set>
|
||||
<Set name="confidentialPort">8443</Set>
|
||||
<Set name="lowResourcesConnections">20000</Set>
|
||||
<Set name="lowResourcesMaxIdleTime">5000</Set>
|
||||
<Set name="lowResourcesConnections">20000</Set>
|
||||
<Set name="lowResourcesMaxIdleTime">5000</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
|
||||
<!-- Uncomment this and enter your SSL config/credentials to enable https
|
||||
<Call name="addConnector">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
|
||||
<Arg>
|
||||
<New class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
|
||||
<Set name="Port">8443</Set>
|
||||
<Set name="maxIdleTime">30000</Set>
|
||||
<Set name="Acceptors">2</Set>
|
||||
<Set name="AcceptQueueSize">100</Set>
|
||||
<Set name="Keystore"><SystemProperty name="jetty.home" default="." />/etc/keystore</Set>
|
||||
<Set name="Password">PASSWORD</Set>
|
||||
<Set name="KeyPassword">KEYPASSWORD</Set>
|
||||
<Set name="truststore"><SystemProperty name="jetty.home" default="." />/etc/keystore</Set>
|
||||
<Set name="trustPassword">TRUSTPASSWORD</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
-->
|
||||
<Set name="maxIdleTime">30000</Set>
|
||||
<Set name="Acceptors">2</Set>
|
||||
<Set name="AcceptQueueSize">100</Set>
|
||||
<Set name="Keystore"><SystemProperty name="jetty.home" default="." />/etc/keystore</Set>
|
||||
<Set name="Password">PASSWORD</Set>
|
||||
<Set name="KeyPassword">KEYPASSWORD</Set>
|
||||
<Set name="truststore"><SystemProperty name="jetty.home" default="." />/etc/keystore</Set>
|
||||
<Set name="trustPassword">TRUSTPASSWORD</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
-->
|
||||
|
||||
<!-- =========================================================== -->
|
||||
<!-- Set handler Collection Structure -->
|
||||
|
|
@ -94,4 +94,4 @@
|
|||
<Set name="sendDateHeader">true</Set>
|
||||
<Set name="gracefulShutdown">1000</Set>
|
||||
|
||||
</Configure>
|
||||
</Configure>
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
|
||||
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
|
||||
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
|
||||
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
|
|
@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
object Dependencies {
|
||||
|
||||
// Compile
|
||||
lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile"
|
||||
lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile"
|
||||
|
||||
lazy val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
|
||||
|
||||
|
|
@ -606,7 +606,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
</dependency>
|
||||
|
||||
<dependency org="org.apache.hadoop" name="hadoop-test" rev="0.20.2" conf="test">
|
||||
<exclude module="slf4j-api"/>
|
||||
<exclude module="slf4j-api"/>
|
||||
</dependency>
|
||||
<dependency org="org.slf4j" name="slf4j-api" rev={SLF4J_VERSION} conf="test">
|
||||
</dependency>
|
||||
|
|
@ -655,8 +655,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
}
|
||||
|
||||
class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
|
||||
val couch = Dependencies.commonsHttpClient
|
||||
val spec = Dependencies.specs
|
||||
val couch = Dependencies.commonsHttpClient
|
||||
val spec = Dependencies.specs
|
||||
|
||||
override def testOptions = createTestFilter( _.endsWith("Test"))
|
||||
}
|
||||
|
|
|
|||
62
scripts/generate_config_with_secure_cookie.sh
Executable file
62
scripts/generate_config_with_secure_cookie.sh
Executable file
|
|
@ -0,0 +1,62 @@
|
|||
#!/bin/sh
|
||||
exec scala "$0" "$@"
|
||||
!#
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
import java.security.{MessageDigest, SecureRandom}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Crypt {
|
||||
val hex = "0123456789ABCDEF"
|
||||
val lineSeparator = System.getProperty("line.separator")
|
||||
|
||||
lazy val random = SecureRandom.getInstance("SHA1PRNG")
|
||||
|
||||
def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII"))
|
||||
|
||||
def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5"))
|
||||
|
||||
def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII"))
|
||||
|
||||
def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1"))
|
||||
|
||||
def generateSecureCookie: String = {
|
||||
val bytes = Array.fill(32)(0.byteValue)
|
||||
random.nextBytes(bytes)
|
||||
sha1(bytes)
|
||||
}
|
||||
|
||||
def digest(bytes: Array[Byte], md: MessageDigest): String = {
|
||||
md.update(bytes)
|
||||
hexify(md.digest)
|
||||
}
|
||||
|
||||
def hexify(bytes: Array[Byte]): String = {
|
||||
val builder = new StringBuilder
|
||||
bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) }
|
||||
builder.toString
|
||||
}
|
||||
|
||||
private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n")
|
||||
}
|
||||
|
||||
print("""
|
||||
# This config imports the Akka reference configuration.
|
||||
include "akka-reference.conf"
|
||||
|
||||
# In this file you can override any option defined in the 'akka-reference.conf' file.
|
||||
# Copy in all or parts of the 'akka-reference.conf' file and modify as you please.
|
||||
|
||||
akka {
|
||||
remote {
|
||||
secure-cookie = """")
|
||||
print(Crypt.generateSecureCookie)
|
||||
print(""""
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue