diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index cfedcf9256..41d0fcce1c 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -786,7 +786,7 @@ class LocalActorRef private[akka] ( _status = ActorRefInternals.RUNNING - //If we are not currently creating this ActorRef instance + // If we are not currently creating this ActorRef instance if ((actorInstance ne null) && (actorInstance.get ne null)) initializeActorInstance @@ -853,11 +853,8 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. */ def startLink(actorRef: ActorRef): Unit = guard.withGuard { - try { - link(actorRef) - } finally { - actorRef.start - } + link(actorRef) + actorRef.start } /** @@ -867,12 +864,9 @@ class LocalActorRef private[akka] ( */ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = guard.withGuard { ensureRemotingEnabled - try { - actorRef.makeRemote(hostname, port) - link(actorRef) - } finally { - actorRef.start - } + actorRef.makeRemote(hostname, port) + link(actorRef) + actorRef.start } /** @@ -904,11 +898,8 @@ class LocalActorRef private[akka] ( */ def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard { val actor = Actor.actorOf(clazz) - try { - link(actor) - } finally { - actor.start - } + link(actor) + actor.start actor } @@ -920,12 +911,9 @@ class LocalActorRef private[akka] ( def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { ensureRemotingEnabled val actor = Actor.actorOf(clazz) - try { - actor.makeRemote(hostname, port) - link(actor) - } finally { - actor.start - } + actor.makeRemote(hostname, port) + link(actor) + actor.start actor } @@ -994,8 +982,7 @@ class LocalActorRef private[akka] ( * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { - if (isShutdown) - Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) + if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { currentMessage = messageHandle try { @@ -1004,8 +991,7 @@ class LocalActorRef private[akka] ( case e => Actor.log.error(e, "Could not invoke actor [%s]", this) throw e - } - finally { + } finally { currentMessage = null //TODO: Don't reset this, we might want to resend the message } } @@ -1031,8 +1017,7 @@ class LocalActorRef private[akka] ( protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal false - } - else if (withinTimeRange.isEmpty) { // restrict number of restarts + } else if (withinTimeRange.isEmpty) { // restrict number of restarts maxNrOfRetriesCount += 1 //Increment number of retries maxNrOfRetriesCount > maxNrOfRetries.get } else { // cannot restart more than N within M timerange @@ -1041,10 +1026,8 @@ class LocalActorRef private[akka] ( val now = System.currentTimeMillis val retries = maxNrOfRetriesCount //We are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) - false - else - (now - windowStart) <= withinTimeRange.get + val insideWindow = if (windowStart == 0) false + else (now - windowStart) <= withinTimeRange.get //The actor is dead if it dies X times within the window of restart val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) diff --git a/akka-actor/src/main/scala/actor/Agent.scala b/akka-actor/src/main/scala/actor/Agent.scala index 89acfb6e4c..c9b91b3ca8 100644 --- a/akka-actor/src/main/scala/actor/Agent.scala +++ b/akka-actor/src/main/scala/actor/Agent.scala @@ -9,6 +9,7 @@ import akka.AkkaException import akka.japi.{ Function => JFunc, Procedure => JProc } import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.CountDownLatch +import akka.config.RemoteAddress class AgentException private[akka](message: String) extends AkkaException(message) @@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag * @author Viktor Klang * @author Jonas Bonér */ -sealed class Agent[T] private (initialValue: T) { +sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) { + import Agent._ import Actor._ - - private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start + val dispatcher = remote match { + case Some(address) => + val d = actorOf(new AgentDispatcher[T]()) + d.makeRemote(remote.get.hostname,remote.get.port) + d.start + d ! Value(initialValue) + d + case None => + actorOf(new AgentDispatcher(initialValue)).start + } /** * Submits a request to read the internal state. @@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) { if (dispatcher.isTransactionInScope) throw new AgentException( "Can't call Agent.get within an enclosing transaction."+ "\n\tWould block indefinitely.\n\tPlease refactor your code.") - val ref = new AtomicReference[T] - val latch = new CountDownLatch(1) - sendProc((v: T) => {ref.set(v); latch.countDown}) - latch.await - ref.get + val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await + if (f.exception.isDefined) throw f.exception.get + else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out")) } /** @@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) { * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def map[B](f: (T) => B): Agent[B] = Agent(f(get)) + final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)()) + final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote) /** * Applies function with type 'T => B' to the agent's internal state. @@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) { * Does not change the value of the agent (this). * Java API */ - final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get)) + final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). * Java API */ - final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)()) + final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote) /** * Applies procedure with type T to the agent's internal state. @@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) { * @author Jonas Bonér */ object Agent { - + import Actor._ /* * The internal messages for passing around requests. */ private[akka] case class Value[T](value: T) private[akka] case class Function[T](fun: ((T) => T)) private[akka] case class Procedure[T](fun: ((T) => Unit)) + private[akka] case object Read /** * Creates a new Agent of type T with the initial value of value. */ - def apply[T](value: T): Agent[T] = new Agent(value) + def apply[T](value: T): Agent[T] = + apply(value,None) + + /** + * Creates an Agent backed by a client managed Actor if Some(remoteAddress) + * or a local agent if None + */ + def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] = + new Agent[T](value,remoteAddress) + + /** + * Creates an Agent backed by a client managed Actor + */ + def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] = + apply(value,Some(remoteAddress)) } /** @@ -254,12 +277,15 @@ object Agent { * * @author Jonas Bonér */ -final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor { +final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor { import Agent._ - import Actor._ - log.debug("Starting up Agent [%s]", self.uuid) - private val value = Ref[T](initialValue) + private[akka] def this(initialValue: T) = this(Ref(initialValue)) + private[akka] def this() = this(Ref[T]()) + + private val value = ref + + log.debug("Starting up Agent [%s]", self.uuid) /** * Periodically handles incoming messages. @@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto def receive = { case Value(v: T) => swap(v) + case Read => self.reply_?(value.get()) case Function(fun: (T => T)) => swap(fun(value.getOrWait)) case Procedure(proc: (T => Unit)) => diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index 43eeae2f56..92496489e1 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -4,58 +4,130 @@ package akka.actor -import akka.stm.Ref -import akka.stm.local._ - +import scala.collection.mutable import java.util.concurrent.{ScheduledFuture, TimeUnit} -trait FSM[S] { this: Actor => +trait FSM[S, D] { + this: Actor => type StateFunction = scala.PartialFunction[Event, State] - var currentState: State = initialState - var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + /** DSL */ + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) + } - def initialState: State + protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { + setState(State(stateName, stateData, timeout)) + } - def handleEvent: StateFunction = { - case event@Event(value, stateData) => - log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) - State(NextState, currentState.stateFunction, stateData, currentState.timeout) + protected final def goto(nextStateName: S): State = { + State(nextStateName, currentState.stateData) + } + + protected final def stay(): State = { + goto(currentState.stateName) + } + + protected final def stop(): State = { + stop(Normal) + } + + protected final def stop(reason: Reason): State = { + stop(reason, currentState.stateData) + } + + protected final def stop(reason: Reason, stateData: D): State = { + self ! Stop(reason, stateData) + stay + } + + def whenUnhandled(stateFunction: StateFunction) = { + handleEvent = stateFunction + } + + def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = { + terminateEvent = terminationHandler + } + + /** FSM State data and default handlers */ + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + + private val transitions = mutable.Map[S, StateFunction]() + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + + private var handleEvent: StateFunction = { + case Event(value, stateData) => + log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName) + stay + } + + private var terminateEvent: PartialFunction[Reason, Unit] = { + case failure@Failure(_) => log.error("Stopping because of a %s", failure) + case reason => log.info("Stopping because of reason: %s", reason) } override final protected def receive: Receive = { + case Stop(reason, stateData) => + terminateEvent.apply(reason) + self.stop + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => + log.trace("Ignoring StateTimeout - ") + // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} - val event = Event(value, currentState.stateData) - val newState = (currentState.stateFunction orElse handleEvent).apply(event) + val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) + setState(nextState) + } + } - currentState = newState - - newState match { - case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) - case _ => () // ignore for now - } - - newState.timeout.foreach { - timeout => - timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + private def setState(nextState: State) = { + if (!transitions.contains(nextState.stateName)) { + stop(Failure("Next state %s does not exist".format(nextState.stateName))) + } else { + currentState = nextState + currentState.timeout.foreach { + t => + timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS)) } } } - case class State(stateEvent: StateEvent, - stateFunction: StateFunction, - stateData: S, - timeout: Option[Int] = None, - replyValue: Option[Any] = None) + case class Event(event: Any, stateData: D) - case class Event(event: Any, stateData: S) + case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { - sealed trait StateEvent - object NextState extends StateEvent - object Reply extends StateEvent + def until(timeout: Long): State = { + copy(timeout = Some(timeout)) + } - object StateTimeout + def replying(replyValue:Any): State = { + self.sender match { + case Some(sender) => sender ! replyValue + case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue) + } + this + } + + def using(nextStateDate: D): State = { + copy(stateData = nextStateDate) + } + } + + sealed trait Reason + case object Normal extends Reason + case object Shutdown extends Reason + case class Failure(cause: Any) extends Reason + + case object StateTimeout + + private case class Stop(reason: Reason, stateData: D) } diff --git a/akka-actor/src/main/scala/config/Config.scala b/akka-actor/src/main/scala/config/Config.scala index fe762aefdf..b88a222279 100644 --- a/akka-actor/src/main/scala/config/Config.scala +++ b/akka-actor/src/main/scala/config/Config.scala @@ -27,10 +27,6 @@ object ConfigLogger extends Logging object Config { val VERSION = "1.0-SNAPSHOT" - // Set Multiverse options for max speed - System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") - System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast") - val HOME = { val envHome = System.getenv("AKKA_HOME") match { case null | "" | "." => None diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 5d4ebe5440..873c5e9f55 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -37,10 +37,16 @@ object Futures { def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - def awaitOne(futures: List[Future[_]]): Future[_] = { + /** + * Returns the First Future that is completed + * if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans + */ + def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = { var future: Option[Future[_]] = None do { future = futures.find(_.isCompleted) + if (sleepMs > 0 && future.isEmpty) + Thread.sleep(sleepMs) } while (future.isEmpty) future.get } @@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] { // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { - private val TIME_UNIT = TimeUnit.MILLISECONDS + import TimeUnit.{MILLISECONDS => TIME_UNIT} def this() = this(0) val timeoutInNanos = TIME_UNIT.toNanos(timeout) diff --git a/akka-actor/src/main/scala/util/Crypt.scala b/akka-actor/src/main/scala/util/Crypt.scala new file mode 100644 index 0000000000..65f62f3f27 --- /dev/null +++ b/akka-actor/src/main/scala/util/Crypt.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.util + +import java.security.{MessageDigest, SecureRandom} + +/** + * @author Jonas Bonér + */ +object Crypt extends Logging { + val hex = "0123456789ABCDEF" + val lineSeparator = System.getProperty("line.separator") + + lazy val random = SecureRandom.getInstance("SHA1PRNG") + + def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII")) + + def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5")) + + def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII")) + + def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1")) + + def generateSecureCookie: String = { + log.info("Generating secure cookie...") + val bytes = Array.fill(32)(0.byteValue) + random.nextBytes(bytes) + sha1(bytes) + } + + def digest(bytes: Array[Byte], md: MessageDigest): String = { + md.update(bytes) + hexify(md.digest) + } + + def hexify(bytes: Array[Byte]): String = { + val builder = new StringBuilder + bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) } + builder.toString + } + + private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n") +} diff --git a/akka-actor/src/main/scala/util/Helpers.scala b/akka-actor/src/main/scala/util/Helpers.scala index 4154bfadba..e903f50556 100644 --- a/akka-actor/src/main/scala/util/Helpers.scala +++ b/akka-actor/src/main/scala/util/Helpers.scala @@ -4,8 +4,6 @@ package akka.util -import java.security.MessageDigest - /** * @author Jonas Bonér */ @@ -22,18 +20,8 @@ object Helpers extends Logging { bytes } - def getMD5For(s: String) = { - val digest = MessageDigest.getInstance("MD5") - digest.update(s.getBytes("ASCII")) - val bytes = digest.digest - - val sb = new StringBuilder - val hex = "0123456789ABCDEF" - bytes.foreach(b => { - val n = b.asInstanceOf[Int] - sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) - }) - sb.toString + def bytesToInt(bytes: Array[Byte], offset: Int): Int = { + (0 until 4).foldLeft(0)((value, index) => value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) } /** @@ -57,4 +45,56 @@ object Helpers extends Logging { log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName) None } + + /** + * Reference that can hold either a typed value or an exception. + * + * Usage: + *
+   * scala> ResultOrError(1)
+   * res0: ResultOrError[Int] = ResultOrError@a96606
+   *
+   * scala> res0()
+    res1: Int = 1
+   *
+   * scala> res0() = 3
+   *
+   * scala> res0()
+   * res3: Int = 3
+   * 
+   * scala> res0() = { println("Hello world"); 3}
+   * Hello world
+   *
+   * scala> res0()
+   * res5: Int = 3
+   *  
+   * scala> res0() = error("Lets see what happens here...")
+   *
+   * scala> res0()
+   * java.lang.RuntimeException: Lets see what happens here...
+   *    at ResultOrError.apply(Helper.scala:11)
+   *    at .(:6)
+   *    at .()
+   *    at Re...
+   * 
+ */ + class ResultOrError[R](result: R){ + private[this] var contents: Either[R, Throwable] = Left(result) + + def update(value: => R) = { + contents = try { + Left(value) + } catch { + case (error : Throwable) => Right(error) + } + } + + def apply() = contents match { + case Left(result) => result + case Right(error) => throw error.fillInStackTrace + } + } + object ResultOrError { + def apply[R](result: R) = new ResultOrError(result) + } } diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index c6ae768bad..1a90377ff1 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -120,7 +120,7 @@ class Switch(startAsOn: Boolean = false) { private val switch = new AtomicBoolean(startAsOn) protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized { - if (switch.compareAndSet(from,!from)) { + if (switch.compareAndSet(from, !from)) { try { action } catch { @@ -133,43 +133,35 @@ class Switch(startAsOn: Boolean = false) { } def switchOff(action: => Unit): Boolean = transcend(from = true, action) - def switchOn(action: => Unit): Boolean = transcend(from = false,action) + def switchOn(action: => Unit): Boolean = transcend(from = false, action) - def switchOff: Boolean = synchronized { switch.compareAndSet(true,false) } - def switchOn: Boolean = synchronized { switch.compareAndSet(false,true) } + def switchOff: Boolean = synchronized { switch.compareAndSet(true, false) } + def switchOn: Boolean = synchronized { switch.compareAndSet(false, true) } def ifOnYield[T](action: => T): Option[T] = { - if (switch.get) - Some(action) - else - None + if (switch.get) Some(action) + else None } def ifOffYield[T](action: => T): Option[T] = { - if (switch.get) - Some(action) - else - None + if (switch.get) Some(action) + else None } def ifOn(action: => Unit): Boolean = { if (switch.get) { action true - } - else - false + } else false } def ifOff(action: => Unit): Boolean = { if (!switch.get) { action true - } - else - false + } else false } def isOn = switch.get def isOff = !isOn -} \ No newline at end of file +} diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index 8d0a404da0..16a897fb04 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -13,37 +13,57 @@ import java.util.concurrent.TimeUnit object FSMActorSpec { - class Lock(code: String, - timeout: Int, - unlockedLatch: StandardLatch, - lockedLatch: StandardLatch) extends Actor with FSM[CodeState] { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + val unhandledLatch = new StandardLatch + val terminatedLatch = new StandardLatch - def initialState = State(NextState, locked, CodeState("", code)) + sealed trait LockState + case object Locked extends LockState + case object Open extends LockState - def locked: StateFunction = { + class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] { + + inState(Locked) { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => - State(NextState, locked, CodeState(incomplete, code)) + stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - State(NextState, open, CodeState("", code), Some(timeout)) + goto(Open) using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) - State(NextState, locked, CodeState("", code)) + stay using CodeState("", code) } } } + case Event("hello", _) => stay replying "world" + case Event("bye", _) => stop(Shutdown) } - def open: StateFunction = { + inState(Open) { case Event(StateTimeout, stateData) => { doLock - State(NextState, locked, stateData) + goto(Locked) } } + setInitialState(Locked, CodeState("", code)) + + whenUnhandled { + case Event(_, stateData) => { + log.info("Unhandled") + unhandledLatch.open + stay + } + } + + onTermination { + case reason => terminatedLatch.open + } + private def doLock() { log.info("Locked") lockedLatch.open @@ -63,11 +83,9 @@ class FSMActorSpec extends JUnitSuite { @Test def unlockTheLock = { - val unlockedLatch = new StandardLatch - val lockedLatch = new StandardLatch // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + val lock = Actor.actorOf(new Lock("33221", 1000)).start lock ! '3' lock ! '3' @@ -77,6 +95,25 @@ class FSMActorSpec extends JUnitSuite { assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + + lock ! "not_handled" + assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + + val answerLatch = new StandardLatch + object Hello + object Bye + val tester = Actor.actorOf(new Actor { + protected def receive = { + case Hello => lock ! "hello" + case "world" => answerLatch.open + case Bye => lock ! "bye" + } + }).start + tester ! Hello + assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) + + tester ! Bye + assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS)) } } diff --git a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala index b80af0aa22..1515cb7210 100644 --- a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala +++ b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala @@ -23,21 +23,21 @@ private [akka] object CouchDBStorageBackend extends RefStorageBackend[Array[Byte]] with Logging { - - import dispatch.json._ - - implicit object widgetWrites extends Writes[Map[String,Any]] { - def writes(o: Map[String,Any]): JsValue = JsValue(o) - } + + import dispatch.json._ + + implicit object widgetWrites extends Writes[Map[String,Any]] { + def writes(o: Map[String,Any]): JsValue = JsValue(o) + } - lazy val URL = config. + lazy val URL = config. getString("akka.storage.couchdb.url"). getOrElse(throw new IllegalArgumentException("'akka.storage.couchdb.url' not found in config")) def drop() = { val client = new HttpClient() - val delete = new DeleteMethod(URL) - client.executeMethod(delete) + val delete = new DeleteMethod(URL) + client.executeMethod(delete) } def create() = { @@ -45,60 +45,60 @@ private [akka] object CouchDBStorageBackend extends val put = new PutMethod(URL) put.setRequestEntity(new StringRequestEntity("", null, "utf-8")) put.setRequestHeader("Content-Type", "application/json") - client.executeMethod(put) + client.executeMethod(put) put.getResponseBodyAsString } - private def storeMap(name: String, postfix: String, entries: List[(Array[Byte], Array[Byte])]) ={ - var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> (name + postfix)) - val dataJson = JsonSerialization.tojson(m) - postData(URL, dataJson.toString) - } - - private def storeMap(name: String, postfix: String, entries: Map[String, Any]) ={ + private def storeMap(name: String, postfix: String, entries: List[(Array[Byte], Array[Byte])]) ={ + var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> (name + postfix)) + val dataJson = JsonSerialization.tojson(m) + postData(URL, dataJson.toString) + } + + private def storeMap(name: String, postfix: String, entries: Map[String, Any]) ={ postData(URL, JsonSerialization.tojson(entries + ("_id" -> (name + postfix))).toString) - } + } private def getResponseForNameAsMap(name: String, postfix: String): Option[Map[String, Any]] = { getResponse(URL + name + postfix).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] } - def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ - val newDoc = getResponseForNameAsMap(name, "_map").getOrElse(Map[String, Any]()) ++ - entries.map(e => (new String(e._1) -> new String(e._2))).toMap + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ + val newDoc = getResponseForNameAsMap(name, "_map").getOrElse(Map[String, Any]()) ++ + entries.map(e => (new String(e._1) -> new String(e._2))).toMap storeMap(name, "_map", newDoc) - } - + } + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte])={ - insertMapStorageEntriesFor(name, List((key, value))) - } - - + insertMapStorageEntriesFor(name, List((key, value))) + } + + def removeMapStorageFor(name: String) { lazy val url = URL + name + "_map" - findDocRev(name + "_map").foreach(deleteData(url, _)) - } - + findDocRev(name + "_map").foreach(deleteData(url, _)) + } + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { lazy val sKey = new String(key) // if we can't find the map for name, then we don't need to delete it. - getResponseForNameAsMap(name, "_map").foreach(doc => storeMap(name, "_map", doc - sKey)) - } - + getResponseForNameAsMap(name, "_map").foreach(doc => storeMap(name, "_map", doc - sKey)) + } + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { lazy val sKey = new String(key) getResponseForNameAsMap(name, "_map").flatMap(_.get(sKey)).asInstanceOf[Option[String]].map(_.getBytes) - } + } - def getMapStorageSizeFor(name: String): Int = getMapStorageFor(name).size - + def getMapStorageSizeFor(name: String): Int = getMapStorageFor(name).size + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { - val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) - m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes)) - } + val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) + m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes)) + } - def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) val keys = m.keys.toList.sortWith(_ < _) @@ -112,7 +112,7 @@ private [akka] object CouchDBStorageBackend extends // slice from keys: both ends inclusive val ks = keys.slice(keys.indexOf(s), scala.math.min(keys.indexOf(s) + c, keys.indexOf(f) + 1)) ks.map(k => (k.getBytes, m(k).asInstanceOf[String].getBytes)) - } + } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { insertVectorStorageEntriesFor(name, List(element)) @@ -133,16 +133,16 @@ private [akka] object CouchDBStorageBackend extends } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] ={ - val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]] - if (v.indices.contains(index)) - v(index).getBytes - else - Array[Byte]() - } - + val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]] + if (v.indices.contains(index)) + v(index).getBytes + else + Array[Byte]() + } + def getVectorStorageSizeFor(name: String): Int ={ - getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0) - } + getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0) + } def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).asInstanceOf[Option[List[String]]].getOrElse(List[String]()) @@ -151,60 +151,60 @@ private [akka] object CouchDBStorageBackend extends val c = if (count == 0) v.length else count v.slice(s, scala.math.min(s + c, f)).map(_.getBytes) } - + def insertRefStorageFor(name: String, element: Array[Byte]) ={ - val newDoc = getResponseForNameAsMap(name, "_ref").getOrElse(Map[String, Any]()) + ("ref" -> new String(element)) - storeMap(name, "_ref", newDoc) - } - - def getRefStorageFor(name: String): Option[Array[Byte]] ={ - getResponseForNameAsMap(name, "_ref").flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes) - } + val newDoc = getResponseForNameAsMap(name, "_ref").getOrElse(Map[String, Any]()) + ("ref" -> new String(element)) + storeMap(name, "_ref", newDoc) + } + + def getRefStorageFor(name: String): Option[Array[Byte]] ={ + getResponseForNameAsMap(name, "_ref").flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes) + } - private def findDocRev(name: String) = { - getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] - .flatMap(_.get("_rev")).asInstanceOf[Option[String]] - } + private def findDocRev(name: String) = { + getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] + .flatMap(_.get("_rev")).asInstanceOf[Option[String]] + } - private def deleteData(url:String, rev:String): Option[String] = { - val client = new HttpClient() - val delete = new DeleteMethod(url) - delete.setRequestHeader("If-Match", rev) - client.executeMethod(delete) - - val response = delete.getResponseBodyAsString() - if (response != null) - Some(response) - else - None - } + private def deleteData(url:String, rev:String): Option[String] = { + val client = new HttpClient() + val delete = new DeleteMethod(url) + delete.setRequestHeader("If-Match", rev) + client.executeMethod(delete) + + val response = delete.getResponseBodyAsString() + if (response != null) + Some(response) + else + None + } - private def postData(url: String, data: String): Option[String] = { - val client = new HttpClient() - val post = new PostMethod(url) - post.setRequestEntity(new StringRequestEntity(data, null, "utf-8")) - post.setRequestHeader("Content-Type", "application/json") - client.executeMethod(post) + private def postData(url: String, data: String): Option[String] = { + val client = new HttpClient() + val post = new PostMethod(url) + post.setRequestEntity(new StringRequestEntity(data, null, "utf-8")) + post.setRequestHeader("Content-Type", "application/json") + client.executeMethod(post) val response = post.getResponseBodyAsString if (response != null) - Some(response) - else - None - } - - private def getResponse(url: String): Option[String] = { - val client = new HttpClient() - val method = new GetMethod(url) + Some(response) + else + None + } + + private def getResponse(url: String): Option[String] = { + val client = new HttpClient() + val method = new GetMethod(url) - method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false)) - client.executeMethod(method) + client.executeMethod(method) val response = method.getResponseBodyAsString - if (method.getStatusCode == 200 && response != null) - Some(response) - else - None - } + if (method.getStatusCode == 200 && response != null) + Some(response) + else + None + } } diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala index 253eeba2b0..f19fbe9271 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala @@ -82,12 +82,12 @@ BeforeAndAfterEach { ("guido van rossum", "python"), ("james strachan", "groovy")) val rl = List( - ("james gosling", "java"), - ("james strachan", "groovy"), - ("larry wall", "perl"), - ("martin odersky", "scala"), - ("ola bini", "ioke"), ("rich hickey", "clojure"), - ("slava pestov", "factor")) + ("james gosling", "java"), + ("james strachan", "groovy"), + ("larry wall", "perl"), + ("martin odersky", "scala"), + ("ola bini", "ioke"), ("rich hickey", "clojure"), + ("slava pestov", "factor")) insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) }) getMapStorageSizeFor("t1") should equal(l.size) getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1)) diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 6d0dc52eea..f255e469a2 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -32,7 +32,7 @@ object CommonsCodec { import CommonsCodec._ import CommonsCodec.Base64StringEncoder._ - + /** * A module for supporting Redis based persistence. *

diff --git a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java index dc44a063cf..1be793a2aa 100644 --- a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java @@ -3635,6 +3635,13 @@ public final class RemoteProtocol { return metadata_.get(index); } + // optional string cookie = 8; + public static final int COOKIE_FIELD_NUMBER = 8; + private boolean hasCookie; + private java.lang.String cookie_ = ""; + public boolean hasCookie() { return hasCookie; } + public java.lang.String getCookie() { return cookie_; } + private void initFields() { uuid_ = akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); message_ = akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance(); @@ -3686,6 +3693,9 @@ public final class RemoteProtocol { for (akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) { output.writeMessage(7, element); } + if (hasCookie()) { + output.writeString(8, getCookie()); + } getUnknownFields().writeTo(output); } @@ -3723,6 +3733,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, element); } + if (hasCookie()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(8, getCookie()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3909,6 +3923,9 @@ public final class RemoteProtocol { } result.metadata_.addAll(other.metadata_); } + if (other.hasCookie()) { + setCookie(other.getCookie()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3989,6 +4006,10 @@ public final class RemoteProtocol { addMetadata(subBuilder.buildPartial()); break; } + case 66: { + setCookie(input.readString()); + break; + } } } } @@ -4248,6 +4269,27 @@ public final class RemoteProtocol { return this; } + // optional string cookie = 8; + public boolean hasCookie() { + return result.hasCookie(); + } + public java.lang.String getCookie() { + return result.getCookie(); + } + public Builder setCookie(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasCookie = true; + result.cookie_ = value; + return this; + } + public Builder clearCookie() { + result.hasCookie = false; + result.cookie_ = getDefaultInstance().getCookie(); + return this; + } + // @@protoc_insertion_point(builder_scope:RemoteRequestProtocol) } @@ -4341,6 +4383,13 @@ public final class RemoteProtocol { return metadata_.get(index); } + // optional string cookie = 8; + public static final int COOKIE_FIELD_NUMBER = 8; + private boolean hasCookie; + private java.lang.String cookie_ = ""; + public boolean hasCookie() { return hasCookie; } + public java.lang.String getCookie() { return cookie_; } + private void initFields() { uuid_ = akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); message_ = akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance(); @@ -4391,6 +4440,9 @@ public final class RemoteProtocol { for (akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) { output.writeMessage(7, element); } + if (hasCookie()) { + output.writeString(8, getCookie()); + } getUnknownFields().writeTo(output); } @@ -4428,6 +4480,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, element); } + if (hasCookie()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(8, getCookie()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4614,6 +4670,9 @@ public final class RemoteProtocol { } result.metadata_.addAll(other.metadata_); } + if (other.hasCookie()) { + setCookie(other.getCookie()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4689,6 +4748,10 @@ public final class RemoteProtocol { addMetadata(subBuilder.buildPartial()); break; } + case 66: { + setCookie(input.readString()); + break; + } } } } @@ -4929,6 +4992,27 @@ public final class RemoteProtocol { return this; } + // optional string cookie = 8; + public boolean hasCookie() { + return result.hasCookie(); + } + public java.lang.String getCookie() { + return result.getCookie(); + } + public Builder setCookie(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasCookie = true; + result.cookie_ = value; + return this; + } + public Builder clearCookie() { + result.hasCookie = false; + result.cookie_ = getDefaultInstance().getCookie(); + return this; + } + // @@protoc_insertion_point(builder_scope:RemoteReplyProtocol) } @@ -6657,33 +6741,33 @@ public final class RemoteProtocol { "\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016type" + "dActorInfo\030\005 \001(\0132\027.TypedActorInfoProtoco" + "l\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" + - "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025R" + + "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\232\002\n\025R" + "emoteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" + "dProtocol\022!\n\007message\030\002 \002(\0132\020.MessageProt", "ocol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProto" + "col\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030" + "\005 \001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.R" + "emoteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132" + - "\026.MetadataEntryProtocol\"\364\001\n\023RemoteReplyP" + - "rotocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007" + - "message\030\002 \001(\0132\020.MessageProtocol\022%\n\texcep" + - "tion\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016superv" + - "isorUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor" + - "\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata", - "\030\007 \003(\0132\026.MetadataEntryProtocol\")\n\014UuidPr" + - "otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" + - "adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" + - "\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle" + - "\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoco" + - "l\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Exc" + - "eptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007mes" + - "sage\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001" + - "\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Ser" + - "ializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINAR", - "Y\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PR" + - "OTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001" + - "\022\r\n\tTEMPORARY\020\002B\030\n\024akka.remote.protocolH" + - "\001" + "\026.MetadataEntryProtocol\022\016\n\006cookie\030\010 \001(\t\"" + + "\204\002\n\023RemoteReplyProtocol\022\033\n\004uuid\030\001 \002(\0132\r." + + "UuidProtocol\022!\n\007message\030\002 \001(\0132\020.MessageP" + + "rotocol\022%\n\texception\030\003 \001(\0132\022.ExceptionPr" + + "otocol\022%\n\016supervisorUuid\030\004 \001(\0132\r.UuidPro" + + "tocol\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccessful\030\006", + " \002(\010\022(\n\010metadata\030\007 \003(\0132\026.MetadataEntryPr" + + "otocol\022\016\n\006cookie\030\010 \001(\t\")\n\014UuidProtocol\022\014" + + "\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEnt" + + "ryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6" + + "\n\021LifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016" + + ".LifeCycleType\"1\n\017AddressProtocol\022\020\n\010hos" + + "tname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionPr" + + "otocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002" + + "(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016\n\nJAVA" + + "_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Serializati", + "onSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nS" + + "CALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005" + + "*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMP" + + "ORARY\020\002B\030\n\024akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6751,7 +6835,7 @@ public final class RemoteProtocol { internal_static_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteRequestProtocol_descriptor, - new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", }, + new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", "Cookie", }, akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class, akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class); internal_static_RemoteReplyProtocol_descriptor = @@ -6759,7 +6843,7 @@ public final class RemoteProtocol { internal_static_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteReplyProtocol_descriptor, - new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", }, + new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", "Cookie", }, akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class, akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class); internal_static_UuidProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index ad32c83660..e7ee0129d4 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -91,6 +91,13 @@ message TypedActorInfoProtocol { required string method = 2; } +/** + * Defines a remote connection handshake. + */ +//message HandshakeProtocol { +// required string cookie = 1; +//} + /** * Defines a remote message request. */ @@ -102,6 +109,7 @@ message RemoteRequestProtocol { optional UuidProtocol supervisorUuid = 5; optional RemoteActorRefProtocol sender = 6; repeated MetadataEntryProtocol metadata = 7; + optional string cookie = 8; } /** @@ -115,6 +123,7 @@ message RemoteReplyProtocol { required bool isActor = 5; required bool isSuccessful = 6; repeated MetadataEntryProtocol metadata = 7; + optional string cookie = 8; } /** diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index 3a9278267c..e42a38900c 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -12,26 +12,29 @@ import akka.config.Config._ import akka.serialization.RemoteActorSerialization._ import akka.AkkaException import Actor._ + import org.jboss.netty.channel._ import group.DefaultChannelGroup import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} -import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder} -import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} +import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } +import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } +import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.ReadTimeoutHandler -import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} +import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler -import java.net.{SocketAddress, InetSocketAddress} -import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet} -import java.util.concurrent.atomic.AtomicLong +import java.net.{ SocketAddress, InetSocketAddress } +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } +import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } -import scala.collection.mutable.{HashSet, HashMap} +import scala.collection.mutable.{ HashSet, HashMap } import scala.reflect.BeanProperty + import akka.actor._ import akka.util._ + /** * Life-cycle events for RemoteClient. */ @@ -51,7 +54,7 @@ case class RemoteClientShutdown( /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. */ -class RemoteClientException private[akka](message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) +class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) /** * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. @@ -59,11 +62,18 @@ class RemoteClientException private[akka](message: String, @BeanProperty val cli * @author Jonas Bonér */ object RemoteClient extends Logging { - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None + else Some(cookie) + } + + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] - private val remoteActors = new HashMap[Address, HashSet[Uuid]] + private val remoteActors = new HashMap[Address, HashSet[Uuid]] def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) @@ -86,23 +96,23 @@ object RemoteClient extends Logging { def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(serviceId, className, hostname, port, timeout, None) - def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = { + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = { typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None) } - def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = { + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T = { typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) } - def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = { typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) } - def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = { + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = { typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) } - private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = { + private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = { val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor) TypedActor.createProxyForRemoteActorRef(intfClass, actorRef) } @@ -158,7 +168,7 @@ object RemoteClient extends Logging { * Clean-up all open connections. */ def shutdownAll = synchronized { - remoteClients.foreach({case (addr, client) => client.shutdown}) + remoteClients.foreach({ case (addr, client) => client.shutdown }) remoteClients.clear } @@ -200,34 +210,40 @@ class RemoteClient private[akka] ( private val remoteAddress = new InetSocketAddress(hostname, port) //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) - @volatile private var bootstrap: ClientBootstrap = _ - @volatile private[remote] var connection: ChannelFuture = _ - @volatile private[remote] var openChannels: DefaultChannelGroup = _ - @volatile private var timer: HashedWheelTimer = _ + @volatile + private var bootstrap: ClientBootstrap = _ + @volatile + private[remote] var connection: ChannelFuture = _ + @volatile + private[remote] var openChannels: DefaultChannelGroup = _ + @volatile + private var timer: HashedWheelTimer = _ private[remote] val runSwitch = new Switch() - + private[remote] val isAuthenticated = new AtomicBoolean(false) + private[remote] def isRunning = runSwitch.isOn private val reconnectionTimeWindow = Duration(config.getInt( "akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis - @volatile private var reconnectionTimeWindowStart = 0L + @volatile + private var reconnectionTimeWindowStart = 0L def connect = runSwitch switchOn { openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) timer = new HashedWheelTimer - bootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool,Executors.newCachedThreadPool - ) - ) + + bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) - connection = bootstrap.connect(remoteAddress) + log.info("Starting remote client connection to [%s:%s]", hostname, port) + // Wait until the connection attempt succeeds or fails. + connection = bootstrap.connect(remoteAddress) val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) + if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, this)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) @@ -268,31 +284,34 @@ class RemoteClient private[akka] ( actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType): Option[CompletableFuture[T]] = { + val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE + else None send(createRemoteRequestProtocolBuilder( - actorRef, message, isOneWay, senderOption, typedActorInfo, actorType).build, senderFuture) - } + actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, cookie).build, senderFuture) + } def send[T]( request: RemoteRequestProtocol, - senderFuture: Option[CompletableFuture[T]]): - Option[CompletableFuture[T]] = if (isRunning) { - if (request.getIsOneWay) { - connection.getChannel.write(request) - None - } else { - futures.synchronized { - val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - futures.put(uuidFrom(request.getUuid.getHigh,request.getUuid.getLow), futureResult) + senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + if (isRunning) { + if (request.getIsOneWay) { connection.getChannel.write(request) - Some(futureResult) + None + } else { + futures.synchronized { + val futureResult = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) + connection.getChannel.write(request) + Some(futureResult) + } } + } else { + val exception = new RemoteClientException( + "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) + notifyListeners(RemoteClientError(exception, this)) + throw exception } - } else { - val exception = new RemoteClientException( - "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - notifyListeners(RemoteClientError(exception, this)) - throw exception } private[akka] def registerSupervisorForActor(actorRef: ActorRef) = @@ -325,13 +344,13 @@ class RemoteClient private[akka] ( * @author Jonas Bonér */ class RemoteClientPipelineFactory( - name: String, - futures: ConcurrentMap[Uuid, CompletableFuture[_]], - supervisors: ConcurrentMap[Uuid, ActorRef], - bootstrap: ClientBootstrap, - remoteAddress: SocketAddress, - timer: HashedWheelTimer, - client: RemoteClient) extends ChannelPipelineFactory { + name: String, + futures: ConcurrentMap[Uuid, CompletableFuture[_]], + supervisors: ConcurrentMap[Uuid, ActorRef], + bootstrap: ClientBootstrap, + remoteAddress: SocketAddress, + timer: HashedWheelTimer, + client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) @@ -343,15 +362,15 @@ class RemoteClientPipelineFactory( e } - val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() - val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) - val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) + val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) - case _ => (join(), join()) + case _ => (join(), join()) } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) @@ -365,18 +384,18 @@ class RemoteClientPipelineFactory( */ @ChannelHandler.Sharable class RemoteClientHandler( - val name: String, - val futures: ConcurrentMap[Uuid, CompletableFuture[_]], - val supervisors: ConcurrentMap[Uuid, ActorRef], - val bootstrap: ClientBootstrap, - val remoteAddress: SocketAddress, - val timer: HashedWheelTimer, - val client: RemoteClient) - extends SimpleChannelUpstreamHandler with Logging { + val name: String, + val futures: ConcurrentMap[Uuid, CompletableFuture[_]], + val supervisors: ConcurrentMap[Uuid, ActorRef], + val bootstrap: ClientBootstrap, + val remoteAddress: SocketAddress, + val timer: HashedWheelTimer, + val client: RemoteClient) + extends SimpleChannelUpstreamHandler with Logging { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && - event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) } super.handleUpstream(ctx, event) @@ -387,7 +406,7 @@ class RemoteClientHandler( val result = event.getMessage if (result.isInstanceOf[RemoteReplyProtocol]) { val reply = result.asInstanceOf[RemoteReplyProtocol] - val replyUuid = uuidFrom(reply.getUuid.getHigh,reply.getUuid.getLow) + val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString) val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] if (reply.getIsSuccessful) { @@ -395,7 +414,7 @@ class RemoteClientHandler( future.completeWithResult(message) } else { if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh,reply.getSupervisorUuid.getLow) + val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") val supervisedActor = supervisors.get(supervisorUuid) @@ -424,6 +443,7 @@ class RemoteClientHandler( timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { client.openChannels.remove(event.getChannel) + client.isAuthenticated.set(false) log.debug("Remote client reconnecting to [%s]", remoteAddress) client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. @@ -469,9 +489,9 @@ class RemoteClientHandler( val exception = reply.getException val classname = exception.getClassname val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) - else Class.forName(classname) + else Class.forName(classname) exceptionClass - .getConstructor(Array[Class[_]](classOf[String]): _*) - .newInstance(exception.getMessage).asInstanceOf[Throwable] + .getConstructor(Array[Class[_]](classOf[String]): _*) + .newInstance(exception.getMessage).asInstanceOf[Throwable] } } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 2fa16e7b32..ed4a7beacf 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -31,6 +31,7 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.collection.mutable.Map import scala.reflect.BeanProperty +import akka.config.ConfigurationException /** * Use this object if you need a single remote server on a specific node. @@ -61,21 +62,30 @@ import scala.reflect.BeanProperty object RemoteNode extends RemoteServer /** - * For internal use only. - * Holds configuration variables, remote actors, remote typed actors and remote servers. + * For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers. * * @author Jonas Bonér */ -object -RemoteServer { +object RemoteServer { val UUID_PREFIX = "uuid:" - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None + else Some(cookie) + } + val REQUIRE_COOKIE = { + val requireCookie = config.getBool("akka.remote.server.require-cookie", true) + if (RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") + requireCookie + } + + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 9999) val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT) - - val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") - val ZLIB_COMPRESSION_LEVEL = { + val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") + val ZLIB_COMPRESSION_LEVEL = { val level = config.getInt("akka.remote.zlib-compression-level", 6) if (level < 1 && level > 9) throw new IllegalArgumentException( "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed") @@ -128,7 +138,6 @@ RemoteServer { private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { remoteServers.remove(Address(hostname, port)) } - } /** @@ -203,13 +212,14 @@ class RemoteServer extends Logging with ListenerManagement { address = Address(_hostname,_port) log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) - val pipelineFactory = new RemoteServerPipelineFactory( - name, openChannels, loader, this) + + val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, this) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis) + openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) @@ -251,11 +261,8 @@ class RemoteServer extends Logging with ListenerManagement { */ def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id) - if (id.startsWith(UUID_PREFIX)) { - registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid()) - } else { - registerTypedActor(id, typedActor, typedActors()) - } + if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid) + else registerTypedActor(id, typedActor, typedActors) } /** @@ -270,28 +277,19 @@ class RemoteServer extends Logging with ListenerManagement { */ def register(id: String, actorRef: ActorRef): Unit = synchronized { log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) - if (id.startsWith(UUID_PREFIX)) { - register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid()) - } else { - register(id, actorRef, actors()) - } + if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid) + else register(id, actorRef, actors) } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - if (_isRunning) { - if (!registry.contains(id)) { - if (!actorRef.isRunning) actorRef.start - registry.put(id, actorRef) - } + if (_isRunning && !registry.contains(id)) { + if (!actorRef.isRunning) actorRef.start + registry.put(id, actorRef) } } private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { - if (_isRunning) { - if (!registry.contains(id)) { - registry.put(id, typedActor) - } - } + if (_isRunning && !registry.contains(id)) registry.put(id, typedActor) } /** @@ -300,8 +298,8 @@ class RemoteServer extends Logging with ListenerManagement { def unregister(actorRef: ActorRef):Unit = synchronized { if (_isRunning) { log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) - actors().remove(actorRef.id,actorRef) - actorsByUuid().remove(actorRef.uuid,actorRef) + actors.remove(actorRef.id, actorRef) + actorsByUuid.remove(actorRef.uuid, actorRef) } } @@ -313,12 +311,11 @@ class RemoteServer extends Logging with ListenerManagement { def unregister(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote actor with id [%s]", id) - if (id.startsWith(UUID_PREFIX)) { - actorsByUuid().remove(id.substring(UUID_PREFIX.length)) - } else { - val actorRef = actors() get id - actorsByUuid().remove(actorRef.uuid,actorRef) - actors().remove(id,actorRef) + if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length)) + else { + val actorRef = actors get id + actorsByUuid.remove(actorRef.uuid, actorRef) + actors.remove(id,actorRef) } } } @@ -331,11 +328,8 @@ class RemoteServer extends Logging with ListenerManagement { def unregisterTypedActor(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote typed actor with id [%s]", id) - if (id.startsWith(UUID_PREFIX)) { - typedActorsByUuid().remove(id.substring(UUID_PREFIX.length)) - } else { - typedActors().remove(id) - } + if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length)) + else typedActors.remove(id) } } @@ -343,10 +337,10 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] def actors() = ActorRegistry.actors(address) - private[akka] def actorsByUuid() = ActorRegistry.actorsByUuid(address) - private[akka] def typedActors() = ActorRegistry.typedActors(address) - private[akka] def typedActorsByUuid() = ActorRegistry.typedActorsByUuid(address) + private[akka] def actors = ActorRegistry.actors(address) + private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address) + private[akka] def typedActors = ActorRegistry.typedActors(address) + private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address) } object RemoteServerSslContext { @@ -389,7 +383,7 @@ class RemoteServerPipelineFactory( val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case _ => (join(), join()) } @@ -410,7 +404,9 @@ class RemoteServerHandler( val applicationLoader: Option[ClassLoader], val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { import RemoteServer._ + val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + val CHANNEL_INIT = "channel-init".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -434,9 +430,8 @@ class RemoteServerHandler( } else future.getChannel.close } }) - } else { - server.notifyListeners(RemoteServerClientConnected(server)) - } + } else server.notifyListeners(RemoteServerClientConnected(server)) + if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -445,8 +440,7 @@ class RemoteServerHandler( } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { - if (event.isInstanceOf[ChannelStateEvent] && - event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) } super.handleUpstream(ctx, event) @@ -456,7 +450,9 @@ class RemoteServerHandler( val message = event.getMessage if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) if (message.isInstanceOf[RemoteRequestProtocol]) { - handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) + val requestProtocol = message.asInstanceOf[RemoteRequestProtocol] + if (RemoteServer.REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx) + handleRemoteRequestProtocol(requestProtocol, event.getChannel) } } @@ -491,8 +487,11 @@ class RemoteServerHandler( case RemoteActorSystemMessage.Stop => actorRef.stop case _ => // then match on user defined messages if (request.getIsOneWay) actorRef.!(message)(sender) - else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some( - new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ + else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout( + message, + request.getActorInfo.getTimeout, + None, + Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ override def onComplete(result: AnyRef) { log.debug("Returning result from actor invocation [%s]", result) val replyBuilder = RemoteReplyProtocol.newBuilder @@ -506,8 +505,7 @@ class RemoteServerHandler( try { channel.write(replyBuilder.build) } catch { - case e: Throwable => - server.notifyListeners(RemoteServerError(e, server)) + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) } } @@ -515,8 +513,7 @@ class RemoteServerHandler( try { channel.write(createErrorReplyMessage(exception, request, true)) } catch { - case e: Throwable => - server.notifyListeners(RemoteServerError(e, server)) + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) } } } @@ -528,8 +525,8 @@ class RemoteServerHandler( val actorInfo = request.getActorInfo val typedActorInfo = actorInfo.getTypedActorInfo log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface) - val typedActor = createTypedActor(actorInfo) + val typedActor = createTypedActor(actorInfo) val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList val argClasses = args.map(_.getClass) @@ -551,49 +548,39 @@ class RemoteServerHandler( case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false)) server.notifyListeners(RemoteServerError(e, server)) - case e: Throwable => + case e: Throwable => channel.write(createErrorReplyMessage(e, request, false)) server.notifyListeners(RemoteServerError(e, server)) } } private def findActorById(id: String) : ActorRef = { - server.actors().get(id) + server.actors.get(id) } private def findActorByUuid(uuid: String) : ActorRef = { - server.actorsByUuid().get(uuid) + server.actorsByUuid.get(uuid) } private def findTypedActorById(id: String) : AnyRef = { - server.typedActors().get(id) + server.typedActors.get(id) } private def findTypedActorByUuid(uuid: String) : AnyRef = { - server.typedActorsByUuid().get(uuid) + server.typedActorsByUuid.get(uuid) } private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { - var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) { - findActorByUuid(id.substring(UUID_PREFIX.length)) - } else { - findActorById(id) - } - if (actorRefOrNull eq null) { - actorRefOrNull = findActorByUuid(uuid) - } + var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length)) + else findActorById(id) + if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) actorRefOrNull } private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = { - var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) { - findTypedActorByUuid(id.substring(UUID_PREFIX.length)) - } else { - findTypedActorById(id) - } - if (actorRefOrNull eq null) { - actorRefOrNull = findTypedActorByUuid(uuid) - } + var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length)) + else findTypedActorById(id) + if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid) actorRefOrNull } @@ -677,4 +664,19 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) replyBuilder.build } + + private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = { + val attachment = ctx.getAttachment + if ((attachment ne null) && + attachment.isInstanceOf[String] && + attachment.asInstanceOf[String] == CHANNEL_INIT) { // is first time around, channel initialization + ctx.setAttachment(null) + val clientAddress = ctx.getChannel.getRemoteAddress.toString + if (!request.hasCookie) throw new SecurityException( + "The remote client [" + clientAddress + "] does not have a secure cookie.") + if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException( + "The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie") + log.info("Remote client [%s] successfully authenticated using secure cookie", clientAddress) + } + } } diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 887048c995..a5873f4dc3 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -8,8 +8,9 @@ import akka.stm.global._ import akka.stm.TransactionManagement._ import akka.stm.TransactionManagement import akka.dispatch.MessageInvocation -import akka.remote.{RemoteServer, MessageSerializer} +import akka.remote.{RemoteServer, RemoteClient, MessageSerializer} import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} + import ActorTypeProtocol._ import akka.config.Supervision._ import akka.actor.{uuidFrom,newUuid} @@ -132,7 +133,8 @@ object ActorSerialization { false, actorRef.getSender, None, - ActorType.ScalaActor).build) + ActorType.ScalaActor, + RemoteClient.SECURE_COOKIE).build) requestProtocols.foreach(rp => builder.addMessages(rp)) } @@ -248,11 +250,11 @@ object RemoteActorSerialization { ActorRegistry.registerActorByUuid(homeAddress, uuid.toString, ar) RemoteActorRefProtocol.newBuilder - .setClassOrServiceName(uuid.toString) - .setActorClassname(actorClassName) - .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) - .setTimeout(timeout) - .build + .setClassOrServiceName(uuid.toString) + .setActorClassname(actorClassName) + .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) + .setTimeout(timeout) + .build } def createRemoteRequestProtocolBuilder( @@ -261,8 +263,8 @@ object RemoteActorSerialization { isOneWay: Boolean, senderOption: Option[ActorRef], typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): - RemoteRequestProtocol.Builder = { + actorType: ActorType, + secureCookie: Option[String]): RemoteRequestProtocol.Builder = { import actorRef._ val actorInfoBuilder = ActorInfoProtocol.newBuilder @@ -271,13 +273,12 @@ object RemoteActorSerialization { .setTarget(actorClassName) .setTimeout(timeout) - typedActorInfo.foreach { - typedActor => - actorInfoBuilder.setTypedActorInfo( - TypedActorInfoProtocol.newBuilder - .setInterface(typedActor._1) - .setMethod(typedActor._2) - .build) + typedActorInfo.foreach { typedActor => + actorInfoBuilder.setTypedActorInfo( + TypedActorInfoProtocol.newBuilder + .setInterface(typedActor._1) + .setMethod(typedActor._2) + .build) } actorType match { @@ -292,6 +293,8 @@ object RemoteActorSerialization { .setActorInfo(actorInfo) .setIsOneWay(isOneWay) + secureCookie.foreach(requestBuilder.setCookie(_)) + val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid( UuidProtocol.newBuilder @@ -306,8 +309,6 @@ object RemoteActorSerialization { } requestBuilder } - - } @@ -404,5 +405,4 @@ object RemoteTypedActorSerialization { .setInterfaceName(init.interfaceClass.getName) .build } - } diff --git a/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala new file mode 100644 index 0000000000..df82850d0d --- /dev/null +++ b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala @@ -0,0 +1,37 @@ +package akka.actor.remote + +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} +import akka.config.RemoteAddress +import akka.actor.Agent +import akka.remote. {RemoteClient, RemoteServer} + + +class RemoteAgentSpec extends JUnitSuite { + var server: RemoteServer = _ + + val HOSTNAME = "localhost" + val PORT = 9992 + + @Before def startServer { + val s = new RemoteServer() + s.start(HOSTNAME, PORT) + server = s + Thread.sleep(1000) + } + + @After def stopServer { + val s = server + server = null + s.shutdown + RemoteClient.shutdownAll + } + + @Test def remoteAgentShouldInitializeProperly { + val a = Agent(10,RemoteAddress(HOSTNAME,PORT)) + assert(a() == 10, "Remote agent should have the proper initial value") + a(20) + assert(a() == 20, "Remote agent should be updated properly") + a.close + } +} \ No newline at end of file diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 0f532af1b8..c91b3745de 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -201,18 +201,18 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { def shouldRegisterAndUnregister { val actor1 = actorOf[RemoteActorSpecActorUnidirectional] server.register("my-service-1", actor1) - assert(server.actors().get("my-service-1") ne null, "actor registered") + assert(server.actors.get("my-service-1") ne null, "actor registered") server.unregister("my-service-1") - assert(server.actors().get("my-service-1") eq null, "actor unregistered") + assert(server.actors.get("my-service-1") eq null, "actor unregistered") } @Test def shouldRegisterAndUnregisterByUuid { val actor1 = actorOf[RemoteActorSpecActorUnidirectional] server.register("uuid:" + actor1.uuid, actor1) - assert(server.actorsByUuid().get(actor1.uuid.toString) ne null, "actor registered") + assert(server.actorsByUuid.get(actor1.uuid.toString) ne null, "actor registered") server.unregister("uuid:" + actor1.uuid) - assert(server.actorsByUuid().get(actor1.uuid) eq null, "actor unregistered") + assert(server.actorsByUuid.get(actor1.uuid) eq null, "actor unregistered") } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index e2f7f0c2fc..c918c7e842 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -103,9 +103,9 @@ class ServerInitiatedRemoteTypedActorSpec extends it("should register and unregister typed actors") { val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) server.registerTypedActor("my-test-service", typedActor) - assert(server.typedActors().get("my-test-service") ne null, "typed actor registered") + assert(server.typedActors.get("my-test-service") ne null, "typed actor registered") server.unregisterTypedActor("my-test-service") - assert(server.typedActors().get("my-test-service") eq null, "typed actor unregistered") + assert(server.typedActors.get("my-test-service") eq null, "typed actor unregistered") } it("should register and unregister typed actors by uuid") { @@ -113,9 +113,9 @@ class ServerInitiatedRemoteTypedActorSpec extends val init = AspectInitRegistry.initFor(typedActor) val uuid = "uuid:" + init.actorRef.uuid server.registerTypedActor(uuid, typedActor) - assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered") + assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered") server.unregisterTypedActor(uuid) - assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) eq null, "typed actor unregistered") + assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) eq null, "typed actor unregistered") } it("should find typed actors by uuid") { @@ -123,7 +123,7 @@ class ServerInitiatedRemoteTypedActorSpec extends val init = AspectInitRegistry.initFor(typedActor) val uuid = "uuid:" + init.actorRef.uuid server.registerTypedActor(uuid, typedActor) - assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered") + assert(server.typedActorsByUuid.get(init.actorRef.uuid.toString) ne null, "typed actor registered") val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT) expect("oneway") { diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 979545bfeb..536fda4cef 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -12,6 +12,13 @@ object Put extends ChopstickMessage case class Taken(chopstick: ActorRef) extends ChopstickMessage case class Busy(chopstick: ActorRef) extends ChopstickMessage +/** + * Some states the chopstick can be in + */ +sealed trait ChopstickState +case object Available extends ChopstickState +case object Taken extends ChopstickState + /** * Some state container for the chopstick */ @@ -20,27 +27,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[TakenBy] { +class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { self.id = name - // A chopstick begins its existence as available and taken by no one - def initialState = State(NextState, available, TakenBy(None)) - // When a chopstick is available, it can be taken by a some hakker - def available: StateFunction = { + inState(Available) { case Event(Take, _) => - State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + goto(Taken) using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - def taken: StateFunction = { + inState(Taken) { case Event(Take, currentState) => - State(Reply, taken, currentState, replyValue = Some(Busy(self))) + stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - State(NextState, available, TakenBy(None)) + goto(Available) using TakenBy(None) } + + // A chopstick begins its existence as available and taken by no one + setInitialState(Available, TakenBy(None)) } /** @@ -49,6 +56,17 @@ class Chopstick(name: String) extends Actor with FSM[TakenBy] { sealed trait FSMHakkerMessage object Think extends FSMHakkerMessage +/** + * Some fsm hakker states + */ +sealed trait FSMHakkerState +case object Waiting extends FSMHakkerState +case object Thinking extends FSMHakkerState +case object Hungry extends FSMHakkerState +case object WaitForOtherChopstick extends FSMHakkerState +case object FirstChopstickDenied extends FSMHakkerState +case object Eating extends FSMHakkerState + /** * Some state container to keep track of which chopsticks we have */ @@ -57,13 +75,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] { self.id = name - //All hakkers start waiting - def initialState = State(NextState, waiting, TakenChopsticks(None, None)) - - def waiting: StateFunction = { + inState(Waiting) { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -71,30 +86,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - def thinking: StateFunction = { - case Event(StateTimeout, current) => + inState(Thinking) { + case Event(StateTimeout, _) => left ! Take right ! Take - State(NextState, hungry, current) + goto(Hungry) } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - def hungry: StateFunction = { + inState(Hungry) { case Event(Taken(`left`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) - case Event(Busy(_), current) => - State(NextState, firstChopstickDenied, current) + goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right)) + case Event(Busy(_), _) => + goto(FirstChopstickDenied) } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - def waitForOtherChopstick: StateFunction = { + inState(WaitForOtherChopstick) { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -105,13 +120,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + goto(Eating) using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - def firstChopstickDenied: StateFunction = { + inState(FirstChopstickDenied) { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -121,7 +136,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - def eating: StateFunction = { + inState(Eating) { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -130,15 +145,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + goto(Thinking) using TakenChopsticks(None, None) until period } + + //All hakkers start waiting + setInitialState(Waiting, TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ -object DiningHakkersOnFSM { - def run { +object DiningHakkersOnFsm { + + def run = { // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index e91f73f84f..b89b159c41 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -4,7 +4,7 @@ object AkkaRepositories { val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 025a1a8552..7bdfd58728 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -7,7 +7,7 @@ akka { version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - + time-unit = "seconds" # Default timeout time unit for all timeout properties throughout the config # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up @@ -109,6 +109,9 @@ akka { } remote { + + secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie' + compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 @@ -133,6 +136,7 @@ akka { hostname = "localhost" # The hostname or IP that clients should connect to port = 9999 # The port clients should connect to connection-timeout = 1 + require-cookie = on } client { diff --git a/config/microkernel-server.xml b/config/microkernel-server.xml index 9efec8e287..c9c97e5c95 100644 --- a/config/microkernel-server.xml +++ b/config/microkernel-server.xml @@ -38,29 +38,29 @@ 2 false 8443 - 20000 - 5000 + 20000 + 5000 + 30000 + 2 + 100 + /etc/keystore + PASSWORD + KEYPASSWORD + /etc/keystore + TRUSTPASSWORD + + + + --> @@ -94,4 +94,4 @@ true 1000 - + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e18b421dc2..456da23960 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") @@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { object Dependencies { // Compile - lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" + lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" lazy val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile" @@ -606,7 +606,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { - + @@ -655,8 +655,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val couch = Dependencies.commonsHttpClient - val spec = Dependencies.specs + val couch = Dependencies.commonsHttpClient + val spec = Dependencies.specs override def testOptions = createTestFilter( _.endsWith("Test")) } diff --git a/scripts/generate_config_with_secure_cookie.sh b/scripts/generate_config_with_secure_cookie.sh new file mode 100755 index 0000000000..899ec22025 --- /dev/null +++ b/scripts/generate_config_with_secure_cookie.sh @@ -0,0 +1,62 @@ +#!/bin/sh +exec scala "$0" "$@" +!# + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import java.security.{MessageDigest, SecureRandom} + +/** + * @author Jonas Bonér + */ +object Crypt { + val hex = "0123456789ABCDEF" + val lineSeparator = System.getProperty("line.separator") + + lazy val random = SecureRandom.getInstance("SHA1PRNG") + + def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII")) + + def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5")) + + def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII")) + + def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1")) + + def generateSecureCookie: String = { + val bytes = Array.fill(32)(0.byteValue) + random.nextBytes(bytes) + sha1(bytes) + } + + def digest(bytes: Array[Byte], md: MessageDigest): String = { + md.update(bytes) + hexify(md.digest) + } + + def hexify(bytes: Array[Byte]): String = { + val builder = new StringBuilder + bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) } + builder.toString + } + + private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n") +} + +print(""" +# This config imports the Akka reference configuration. +include "akka-reference.conf" + +# In this file you can override any option defined in the 'akka-reference.conf' file. +# Copy in all or parts of the 'akka-reference.conf' file and modify as you please. + +akka { + remote { + secure-cookie = """") +print(Crypt.generateSecureCookie) +print("""" + } +} +""") +