From d87a71506633531fb369bb8667fe4521fd4f8cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 26 Oct 2010 09:29:04 +0200 Subject: [PATCH 01/11] Fixed bug in startLink and friends + Added cryptographically secure cookie generator --- .../src/main/scala/actor/ActorRef.scala | 49 ++++++------------- akka-actor/src/main/scala/util/Crypt.scala | 36 ++++++++++++++ akka-actor/src/main/scala/util/Helpers.scala | 18 +------ .../src/main/scala/RedisStorageBackend.scala | 2 +- scripts/generate_secure_cookie.sh | 27 ++++++++++ 5 files changed, 81 insertions(+), 51 deletions(-) create mode 100644 akka-actor/src/main/scala/util/Crypt.scala create mode 100755 scripts/generate_secure_cookie.sh diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 7a6e493f43..48552a6a6f 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -787,7 +787,7 @@ class LocalActorRef private[akka] ( _status = ActorRefInternals.RUNNING - //If we are not currently creating this ActorRef instance + // If we are not currently creating this ActorRef instance if ((actorInstance ne null) && (actorInstance.get ne null)) initializeActorInstance @@ -854,11 +854,8 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. */ def startLink(actorRef: ActorRef): Unit = guard.withGuard { - try { - link(actorRef) - } finally { - actorRef.start - } + link(actorRef) + actorRef.start } /** @@ -868,12 +865,9 @@ class LocalActorRef private[akka] ( */ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = guard.withGuard { ensureRemotingEnabled - try { - actorRef.makeRemote(hostname, port) - link(actorRef) - } finally { - actorRef.start - } + actorRef.makeRemote(hostname, port) + link(actorRef) + actorRef.start } /** @@ -905,11 +899,8 @@ class LocalActorRef private[akka] ( */ def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard { val actor = Actor.actorOf(clazz) - try { - link(actor) - } finally { - actor.start - } + link(actor) + actor.start actor } @@ -921,12 +912,9 @@ class LocalActorRef private[akka] ( def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int): ActorRef = guard.withGuard { ensureRemotingEnabled val actor = Actor.actorOf(clazz) - try { - actor.makeRemote(hostname, port) - link(actor) - } finally { - actor.start - } + actor.makeRemote(hostname, port) + link(actor) + actor.start actor } @@ -995,8 +983,7 @@ class LocalActorRef private[akka] ( * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { - if (isShutdown) - Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) + if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { currentMessage = messageHandle try { @@ -1005,8 +992,7 @@ class LocalActorRef private[akka] ( case e => Actor.log.error(e, "Could not invoke actor [%s]", this) throw e - } - finally { + } finally { currentMessage = null //TODO: Don't reset this, we might want to resend the message } } @@ -1032,8 +1018,7 @@ class LocalActorRef private[akka] ( protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal false - } - else if (withinTimeRange.isEmpty) { // restrict number of restarts + } else if (withinTimeRange.isEmpty) { // restrict number of restarts maxNrOfRetriesCount += 1 //Increment number of retries maxNrOfRetriesCount > maxNrOfRetries.get } else { // cannot restart more than N within M timerange @@ -1042,10 +1027,8 @@ class LocalActorRef private[akka] ( val now = System.currentTimeMillis val retries = maxNrOfRetriesCount //We are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) - false - else - (now - windowStart) <= withinTimeRange.get + val insideWindow = if (windowStart == 0) false + else (now - windowStart) <= withinTimeRange.get //The actor is dead if it dies X times within the window of restart val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) diff --git a/akka-actor/src/main/scala/util/Crypt.scala b/akka-actor/src/main/scala/util/Crypt.scala new file mode 100644 index 0000000000..e8cd2f82dd --- /dev/null +++ b/akka-actor/src/main/scala/util/Crypt.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.util + +import java.security.{MessageDigest, SecureRandom} + +/** + * @author Jonas Bonér + */ +object Crypt { + lazy val random = SecureRandom.getInstance("SHA1PRNG") + + def generateSecureCookie: String = { + val bytes = Array.make(32, 0.byteValue) + random.nextBytes(bytes) + getMD5For(bytes) + } + + def getMD5For(s: String): String = getMD5For(s.getBytes("ASCII")) + + def getMD5For(b: Array[Byte]): String = { + val digest = MessageDigest.getInstance("MD5") + digest.update(b) + val bytes = digest.digest + + val sb = new StringBuilder + val hex = "0123456789ABCDEF" + bytes.foreach { b => + val n = b.asInstanceOf[Int] + sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) + } + sb.toString + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/util/Helpers.scala b/akka-actor/src/main/scala/util/Helpers.scala index 394b39e101..75ba61620a 100644 --- a/akka-actor/src/main/scala/util/Helpers.scala +++ b/akka-actor/src/main/scala/util/Helpers.scala @@ -4,8 +4,6 @@ package se.scalablesolutions.akka.util -import java.security.MessageDigest - /** * @author Jonas Bonér */ @@ -22,20 +20,6 @@ object Helpers extends Logging { bytes } - def getMD5For(s: String) = { - val digest = MessageDigest.getInstance("MD5") - digest.update(s.getBytes("ASCII")) - val bytes = digest.digest - - val sb = new StringBuilder - val hex = "0123456789ABCDEF" - bytes.foreach(b => { - val n = b.asInstanceOf[Int] - sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) - }) - sb.toString - } - /** * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException * if the actual type is not assignable from the given one. @@ -57,4 +41,4 @@ object Helpers extends Logging { log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName) None } -} +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 6f2052f0bd..dbb63f972b 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -32,7 +32,7 @@ object CommonsCodec { import CommonsCodec._ import CommonsCodec.Base64StringEncoder._ - + /** * A module for supporting Redis based persistence. *

diff --git a/scripts/generate_secure_cookie.sh b/scripts/generate_secure_cookie.sh new file mode 100755 index 0000000000..12ca10a940 --- /dev/null +++ b/scripts/generate_secure_cookie.sh @@ -0,0 +1,27 @@ +#!/bin/sh +exec scala "$0" "$@" +!# + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import java.security.{MessageDigest, SecureRandom} + +lazy val random = SecureRandom.getInstance("SHA1PRNG") + +val buffer = Array.make(32, 0.byteValue) +random.nextBytes(buffer) + +val digest = MessageDigest.getInstance("MD5") +digest.update(buffer) +val bytes = digest.digest + +val sb = new StringBuilder +val hex = "0123456789ABCDEF" +bytes.foreach { b => + val n = b.asInstanceOf[Int] + sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) +} + +println("Cryptographically secure cookie:") +println(sb.toString) From 4a43c933a1eb9d88af1f8e40642d7f2ec77cd9bc Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 26 Oct 2010 14:23:55 +0200 Subject: [PATCH 02/11] Switching to non-SSL repo for jBoss --- akka-sbt-plugin/src/main/scala/AkkaProject.scala | 2 +- project/build/AkkaProject.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index 2bde073df8..82ccbe401a 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -4,7 +4,7 @@ object AkkaRepositories { val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5b93e30044..6add9542ee 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -72,7 +72,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + lazy val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/") lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") From e300b76fbe61c5939bd52940727d7fbb7f859e32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 26 Oct 2010 12:04:32 +0200 Subject: [PATCH 03/11] Added Erlang-style secure cookie authentication for remote client/server --- akka-actor/src/main/scala/config/Config.scala | 4 - akka-actor/src/main/scala/util/Crypt.scala | 45 +++--- akka-actor/src/main/scala/util/Helpers.scala | 56 ++++++++ .../akka/remote/protocol/RemoteProtocol.java | 131 +++++++++++++++--- .../src/main/protocol/RemoteProtocol.proto | 2 + .../src/main/scala/remote/RemoteClient.scala | 10 +- .../src/main/scala/remote/RemoteServer.scala | 63 ++++++--- .../serialization/SerializationProtocol.scala | 10 +- config/akka-reference.conf | 6 +- scripts/generate_secure_cookie.sh | 4 +- 10 files changed, 262 insertions(+), 69 deletions(-) diff --git a/akka-actor/src/main/scala/config/Config.scala b/akka-actor/src/main/scala/config/Config.scala index e97347754b..16ed6d76c4 100644 --- a/akka-actor/src/main/scala/config/Config.scala +++ b/akka-actor/src/main/scala/config/Config.scala @@ -27,10 +27,6 @@ object ConfigLogger extends Logging object Config { val VERSION = "1.0-SNAPSHOT" - // Set Multiverse options for max speed - System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") - System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast") - val HOME = { val envHome = System.getenv("AKKA_HOME") match { case null | "" | "." => None diff --git a/akka-actor/src/main/scala/util/Crypt.scala b/akka-actor/src/main/scala/util/Crypt.scala index e8cd2f82dd..fad9e4c281 100644 --- a/akka-actor/src/main/scala/util/Crypt.scala +++ b/akka-actor/src/main/scala/util/Crypt.scala @@ -9,28 +9,37 @@ import java.security.{MessageDigest, SecureRandom} /** * @author Jonas Bonér */ -object Crypt { +object Crypt extends Logging { + val hex = "0123456789ABCDEF" + val lineSeparator = System.getProperty("line.separator") + lazy val random = SecureRandom.getInstance("SHA1PRNG") + def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII")) + + def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5")) + + def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII")) + + def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1")) + def generateSecureCookie: String = { - val bytes = Array.make(32, 0.byteValue) + log.info("Generating secure cookie...") + val bytes = Array.fill(32)(0.byteValue) random.nextBytes(bytes) - getMD5For(bytes) + sha1(bytes) } - def getMD5For(s: String): String = getMD5For(s.getBytes("ASCII")) - - def getMD5For(b: Array[Byte]): String = { - val digest = MessageDigest.getInstance("MD5") - digest.update(b) - val bytes = digest.digest - - val sb = new StringBuilder - val hex = "0123456789ABCDEF" - bytes.foreach { b => - val n = b.asInstanceOf[Int] - sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) - } - sb.toString + def digest(bytes: Array[Byte], md: MessageDigest): String = { + md.update(bytes) + hexify(md.digest) + } + + def hexify(bytes: Array[Byte]): String = { + val builder = new StringBuilder + bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) } + builder.toString } -} \ No newline at end of file + + private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n") +} diff --git a/akka-actor/src/main/scala/util/Helpers.scala b/akka-actor/src/main/scala/util/Helpers.scala index 75ba61620a..b9fa5d10b6 100644 --- a/akka-actor/src/main/scala/util/Helpers.scala +++ b/akka-actor/src/main/scala/util/Helpers.scala @@ -20,6 +20,10 @@ object Helpers extends Logging { bytes } + def bytesToInt(bytes: Array[Byte], offset: Int): Int = { + (0 until 4).foldLeft(0)((value, index) => value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) + } + /** * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException * if the actual type is not assignable from the given one. @@ -41,4 +45,56 @@ object Helpers extends Logging { log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName) None } + + /** + * Reference that can hold either a typed value or an exception. + * + * Usage: + *

+   * scala> ResultOrError(1)
+   * res0: ResultOrError[Int] = ResultOrError@a96606
+   *
+   * scala> res0()
+    res1: Int = 1
+   *
+   * scala> res0() = 3
+   *
+   * scala> res0()
+   * res3: Int = 3
+   * 
+   * scala> res0() = { println("Hello world"); 3}
+   * Hello world
+   *
+   * scala> res0()
+   * res5: Int = 3
+   *  
+   * scala> res0() = error("Lets see what happens here...")
+   *
+   * scala> res0()
+   * java.lang.RuntimeException: Lets see what happens here...
+   * 	at ResultOrError.apply(Helper.scala:11)
+   * 	at .(:6)
+   * 	at .()
+   * 	at Re...
+   * 
+ */ + class ResultOrError[R](result: R){ + private[this] var contents: Either[R, Throwable] = Left(result) + + def update(value: => R) = { + contents = try { + Left(value) + } catch { + case (error : Throwable) => Right(error) + } + } + + def apply() = contents match { + case Left(result) => result + case Right(error) => throw error.fillInStackTrace + } + } + object ResultOrError { + def apply[R](result: R) = new ResultOrError(result) + } } \ No newline at end of file diff --git a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index e5265ea396..60f5004861 100644 --- a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -3635,6 +3635,13 @@ public final class RemoteProtocol { return metadata_.get(index); } + // optional string cookie = 8; + public static final int COOKIE_FIELD_NUMBER = 8; + private boolean hasCookie; + private java.lang.String cookie_ = ""; + public boolean hasCookie() { return hasCookie; } + public java.lang.String getCookie() { return cookie_; } + private void initFields() { uuid_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); message_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance(); @@ -3686,6 +3693,9 @@ public final class RemoteProtocol { for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) { output.writeMessage(7, element); } + if (hasCookie()) { + output.writeString(8, getCookie()); + } getUnknownFields().writeTo(output); } @@ -3723,6 +3733,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, element); } + if (hasCookie()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(8, getCookie()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3909,6 +3923,9 @@ public final class RemoteProtocol { } result.metadata_.addAll(other.metadata_); } + if (other.hasCookie()) { + setCookie(other.getCookie()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3989,6 +4006,10 @@ public final class RemoteProtocol { addMetadata(subBuilder.buildPartial()); break; } + case 66: { + setCookie(input.readString()); + break; + } } } } @@ -4248,6 +4269,27 @@ public final class RemoteProtocol { return this; } + // optional string cookie = 8; + public boolean hasCookie() { + return result.hasCookie(); + } + public java.lang.String getCookie() { + return result.getCookie(); + } + public Builder setCookie(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasCookie = true; + result.cookie_ = value; + return this; + } + public Builder clearCookie() { + result.hasCookie = false; + result.cookie_ = getDefaultInstance().getCookie(); + return this; + } + // @@protoc_insertion_point(builder_scope:RemoteRequestProtocol) } @@ -4341,6 +4383,13 @@ public final class RemoteProtocol { return metadata_.get(index); } + // optional string cookie = 8; + public static final int COOKIE_FIELD_NUMBER = 8; + private boolean hasCookie; + private java.lang.String cookie_ = ""; + public boolean hasCookie() { return hasCookie; } + public java.lang.String getCookie() { return cookie_; } + private void initFields() { uuid_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); message_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance(); @@ -4391,6 +4440,9 @@ public final class RemoteProtocol { for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) { output.writeMessage(7, element); } + if (hasCookie()) { + output.writeString(8, getCookie()); + } getUnknownFields().writeTo(output); } @@ -4428,6 +4480,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, element); } + if (hasCookie()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(8, getCookie()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4614,6 +4670,9 @@ public final class RemoteProtocol { } result.metadata_.addAll(other.metadata_); } + if (other.hasCookie()) { + setCookie(other.getCookie()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4689,6 +4748,10 @@ public final class RemoteProtocol { addMetadata(subBuilder.buildPartial()); break; } + case 66: { + setCookie(input.readString()); + break; + } } } } @@ -4929,6 +4992,27 @@ public final class RemoteProtocol { return this; } + // optional string cookie = 8; + public boolean hasCookie() { + return result.hasCookie(); + } + public java.lang.String getCookie() { + return result.getCookie(); + } + public Builder setCookie(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasCookie = true; + result.cookie_ = value; + return this; + } + public Builder clearCookie() { + result.hasCookie = false; + result.cookie_ = getDefaultInstance().getCookie(); + return this; + } + // @@protoc_insertion_point(builder_scope:RemoteReplyProtocol) } @@ -6657,33 +6741,34 @@ public final class RemoteProtocol { "\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016type" + "dActorInfo\030\005 \001(\0132\027.TypedActorInfoProtoco" + "l\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" + - "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025R" + + "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\232\002\n\025R" + "emoteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" + "dProtocol\022!\n\007message\030\002 \002(\0132\020.MessageProt", "ocol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProto" + "col\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030" + "\005 \001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.R" + "emoteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132" + - "\026.MetadataEntryProtocol\"\364\001\n\023RemoteReplyP" + - "rotocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007" + - "message\030\002 \001(\0132\020.MessageProtocol\022%\n\texcep" + - "tion\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016superv" + - "isorUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor" + - "\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata", - "\030\007 \003(\0132\026.MetadataEntryProtocol\")\n\014UuidPr" + - "otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" + - "adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" + - "\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle" + - "\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoco" + - "l\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Exc" + - "eptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007mes" + - "sage\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001" + - "\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Ser" + - "ializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINAR", - "Y\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PR" + - "OTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001" + - "\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolutions." + - "akka.remote.protocolH\001" + "\026.MetadataEntryProtocol\022\016\n\006cookie\030\010 \001(\t\"" + + "\204\002\n\023RemoteReplyProtocol\022\033\n\004uuid\030\001 \002(\0132\r." + + "UuidProtocol\022!\n\007message\030\002 \001(\0132\020.MessageP" + + "rotocol\022%\n\texception\030\003 \001(\0132\022.ExceptionPr" + + "otocol\022%\n\016supervisorUuid\030\004 \001(\0132\r.UuidPro" + + "tocol\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccessful\030\006", + " \002(\010\022(\n\010metadata\030\007 \003(\0132\026.MetadataEntryPr" + + "otocol\022\016\n\006cookie\030\010 \001(\t\")\n\014UuidProtocol\022\014" + + "\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEnt" + + "ryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6" + + "\n\021LifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016" + + ".LifeCycleType\"1\n\017AddressProtocol\022\020\n\010hos" + + "tname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionPr" + + "otocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002" + + "(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016\n\nJAVA" + + "_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Serializati", + "onSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nS" + + "CALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005" + + "*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMP" + + "ORARY\020\002B-\n)se.scalablesolutions.akka.rem" + + "ote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6751,7 +6836,7 @@ public final class RemoteProtocol { internal_static_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteRequestProtocol_descriptor, - new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", }, + new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", "Cookie", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class); internal_static_RemoteReplyProtocol_descriptor = @@ -6759,7 +6844,7 @@ public final class RemoteProtocol { internal_static_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteReplyProtocol_descriptor, - new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", }, + new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", "Cookie", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class); internal_static_UuidProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 40c5756e04..ce694141a0 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -102,6 +102,7 @@ message RemoteRequestProtocol { optional UuidProtocol supervisorUuid = 5; optional RemoteActorRefProtocol sender = 6; repeated MetadataEntryProtocol metadata = 7; + optional string cookie = 8; } /** @@ -115,6 +116,7 @@ message RemoteReplyProtocol { required bool isActor = 5; required bool isSuccessful = 6; repeated MetadataEntryProtocol metadata = 7; + optional string cookie = 8; } /** diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index e39b83a503..1ddf57869e 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -59,7 +59,13 @@ class RemoteClientException private[akka](message: String, @BeanProperty val cli * @author Jonas Bonér */ object RemoteClient extends Logging { - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None + else Some(cookie) + } + + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] @@ -269,7 +275,7 @@ class RemoteClient private[akka] ( typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType): Option[CompletableFuture[T]] = { send(createRemoteRequestProtocolBuilder( - actorRef, message, isOneWay, senderOption, typedActorInfo, actorType).build, senderFuture) + actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, RemoteClient.SECURE_COOKIE).build, senderFuture) } def send[T]( diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index f3c29c62f9..0d39be263a 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -16,6 +16,7 @@ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorType._ import se.scalablesolutions.akka.config.Config._ +import se.scalablesolutions.akka.config.ConfigurationException import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.serialization.RemoteActorSerialization import se.scalablesolutions.akka.serialization.RemoteActorSerialization._ @@ -61,21 +62,30 @@ import scala.reflect.BeanProperty object RemoteNode extends RemoteServer /** - * For internal use only. - * Holds configuration variables, remote actors, remote typed actors and remote servers. + * For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers. * * @author Jonas Bonér */ -object -RemoteServer { +object RemoteServer { val UUID_PREFIX = "uuid:" - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None + else Some(cookie) + } + val REQUIRE_COOKIE = { + val requireCookie = config.getBool("akka.remote.server.require-cookie", true) + if (RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") + requireCookie + } + + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 9999) val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT) - - val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") - val ZLIB_COMPRESSION_LEVEL = { + val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") + val ZLIB_COMPRESSION_LEVEL = { val level = config.getInt("akka.remote.zlib-compression-level", 6) if (level < 1 && level > 9) throw new IllegalArgumentException( "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed") @@ -128,7 +138,6 @@ RemoteServer { private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { remoteServers.remove(Address(hostname, port)) } - } /** @@ -389,7 +398,7 @@ class RemoteServerPipelineFactory( val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case _ => (join(), join()) } @@ -411,6 +420,7 @@ class RemoteServerHandler( val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { import RemoteServer._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + val CHANNEL_INIT = "channel-init".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -437,6 +447,7 @@ class RemoteServerHandler( } else { server.notifyListeners(RemoteServerClientConnected(server)) } + if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -445,8 +456,7 @@ class RemoteServerHandler( } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { - if (event.isInstanceOf[ChannelStateEvent] && - event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) } super.handleUpstream(ctx, event) @@ -456,7 +466,9 @@ class RemoteServerHandler( val message = event.getMessage if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) if (message.isInstanceOf[RemoteRequestProtocol]) { - handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) + val requestProtocol = message.asInstanceOf[RemoteRequestProtocol] + authenticateRemoteClient(requestProtocol, ctx) + handleRemoteRequestProtocol(requestProtocol, event.getChannel) } } @@ -491,8 +503,11 @@ class RemoteServerHandler( case RemoteActorSystemMessage.Stop => actorRef.stop case _ => // then match on user defined messages if (request.getIsOneWay) actorRef.!(message)(sender) - else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some( - new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ + else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout( + message, + request.getActorInfo.getTimeout, + None, + Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ override def onComplete(result: AnyRef) { log.debug("Returning result from actor invocation [%s]", result) val replyBuilder = RemoteReplyProtocol.newBuilder @@ -677,4 +692,20 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) replyBuilder.build } + + private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = { + if (RemoteServer.REQUIRE_COOKIE) { + val attachment = ctx.getAttachment + if ((attachment ne null) && + attachment.isInstanceOf[String] && + attachment.asInstanceOf[String] == CHANNEL_INIT) { + val clientAddress = ctx.getChannel.getRemoteAddress.toString + if (!request.hasCookie) throw new SecurityException( + "The remote client [" + clientAddress + "] does not have a secure cookie.") + if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException( + "The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie") + log.info("Remote client [%s] successfully authenticated using secure cookie", clientAddress) + } + } + } } diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 922db92ad4..8383607f8c 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.dispatch.MessageInvocation -import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer} +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient, MessageSerializer} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import ActorTypeProtocol._ import se.scalablesolutions.akka.config.Supervision._ @@ -132,7 +132,8 @@ object ActorSerialization { false, actorRef.getSender, None, - ActorType.ScalaActor).build) + ActorType.ScalaActor, + RemoteClient.SECURE_COOKIE).build) requestProtocols.foreach(rp => builder.addMessages(rp)) } @@ -261,7 +262,8 @@ object RemoteActorSerialization { isOneWay: Boolean, senderOption: Option[ActorRef], typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): + actorType: ActorType, + secureCookie: Option[String]): RemoteRequestProtocol.Builder = { import actorRef._ @@ -292,6 +294,8 @@ object RemoteActorSerialization { .setActorInfo(actorInfo) .setIsOneWay(isOneWay) + secureCookie.foreach(requestBuilder.setCookie(_)) + val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid( UuidProtocol.newBuilder diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 0ca834b8d4..5017a18fed 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -7,7 +7,7 @@ akka { version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - + time-unit = "seconds" # Default timeout time unit for all timeout properties throughout the config # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up @@ -109,6 +109,9 @@ akka { } remote { + + secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie' + compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 @@ -133,6 +136,7 @@ akka { hostname = "localhost" # The hostname or IP that clients should connect to port = 9999 # The port clients should connect to connection-timeout = 1 + require-cookie = on } client { diff --git a/scripts/generate_secure_cookie.sh b/scripts/generate_secure_cookie.sh index 12ca10a940..bd3813793e 100755 --- a/scripts/generate_secure_cookie.sh +++ b/scripts/generate_secure_cookie.sh @@ -9,10 +9,10 @@ import java.security.{MessageDigest, SecureRandom} lazy val random = SecureRandom.getInstance("SHA1PRNG") -val buffer = Array.make(32, 0.byteValue) +val buffer = Array.fill(32)(0.byteValue) random.nextBytes(buffer) -val digest = MessageDigest.getInstance("MD5") +val digest = MessageDigest.getInstance("SHA1") digest.update(buffer) val bytes = digest.digest From 2b46fcee5066a01afb7667f7bd712a6fe4033e87 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 26 Oct 2010 15:53:34 +0200 Subject: [PATCH 04/11] Added support for remote agent --- akka-actor/src/main/scala/actor/Agent.scala | 63 +++++++++++++------ .../test/scala/remote/RemoteAgentSpec.scala | 37 +++++++++++ 2 files changed, 82 insertions(+), 18 deletions(-) create mode 100644 akka-remote/src/test/scala/remote/RemoteAgentSpec.scala diff --git a/akka-actor/src/main/scala/actor/Agent.scala b/akka-actor/src/main/scala/actor/Agent.scala index 6b9385ca4e..00dceba21c 100644 --- a/akka-actor/src/main/scala/actor/Agent.scala +++ b/akka-actor/src/main/scala/actor/Agent.scala @@ -9,6 +9,7 @@ import se.scalablesolutions.akka.AkkaException import se.scalablesolutions.akka.japi.{ Function => JFunc, Procedure => JProc } import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.config.RemoteAddress class AgentException private[akka](message: String) extends AkkaException(message) @@ -100,11 +101,20 @@ class AgentException private[akka](message: String) extends AkkaException(messag * @author Viktor Klang * @author Jonas Bonér */ -sealed class Agent[T] private (initialValue: T) { +sealed class Agent[T] private (initialValue: T, remote: Option[RemoteAddress] = None) { + import Agent._ import Actor._ - - private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start + val dispatcher = remote match { + case Some(address) => + val d = actorOf(new AgentDispatcher[T]()) + d.makeRemote(remote.get.hostname,remote.get.port) + d.start + d ! Value(initialValue) + d + case None => + actorOf(new AgentDispatcher(initialValue)).start + } /** * Submits a request to read the internal state. @@ -117,11 +127,9 @@ sealed class Agent[T] private (initialValue: T) { if (dispatcher.isTransactionInScope) throw new AgentException( "Can't call Agent.get within an enclosing transaction."+ "\n\tWould block indefinitely.\n\tPlease refactor your code.") - val ref = new AtomicReference[T] - val latch = new CountDownLatch(1) - sendProc((v: T) => {ref.set(v); latch.countDown}) - latch.await - ref.get + val f = (dispatcher.!!![T](Read,java.lang.Long.MAX_VALUE)).await + if (f.exception.isDefined) throw f.exception.get + else f.result.getOrElse(throw new IllegalStateException("Agent remote request timed out")) } /** @@ -185,13 +193,13 @@ sealed class Agent[T] private (initialValue: T) { * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def map[B](f: (T) => B): Agent[B] = Agent(f(get)) + final def map[B](f: (T) => B): Agent[B] = Agent(f(get),remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). */ - final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)()) + final def flatMap[B](f: (T) => Agent[B]): Agent[B] = Agent(f(get)(),remote) /** * Applies function with type 'T => B' to the agent's internal state. @@ -204,14 +212,14 @@ sealed class Agent[T] private (initialValue: T) { * Does not change the value of the agent (this). * Java API */ - final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get)) + final def map[B](f: JFunc[T,B]): Agent[B] = Agent(f(get),remote) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. * Does not change the value of the agent (this). * Java API */ - final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)()) + final def flatMap[B](f: JFunc[T,Agent[B]]): Agent[B] = Agent(f(get)(),remote) /** * Applies procedure with type T to the agent's internal state. @@ -235,18 +243,33 @@ sealed class Agent[T] private (initialValue: T) { * @author Jonas Bonér */ object Agent { - + import Actor._ /* * The internal messages for passing around requests. */ private[akka] case class Value[T](value: T) private[akka] case class Function[T](fun: ((T) => T)) private[akka] case class Procedure[T](fun: ((T) => Unit)) + private[akka] case object Read /** * Creates a new Agent of type T with the initial value of value. */ - def apply[T](value: T): Agent[T] = new Agent(value) + def apply[T](value: T): Agent[T] = + apply(value,None) + + /** + * Creates an Agent backed by a client managed Actor if Some(remoteAddress) + * or a local agent if None + */ + def apply[T](value: T, remoteAddress: Option[RemoteAddress]): Agent[T] = + new Agent[T](value,remoteAddress) + + /** + * Creates an Agent backed by a client managed Actor + */ + def apply[T](value: T, remoteAddress: RemoteAddress): Agent[T] = + apply(value,Some(remoteAddress)) } /** @@ -254,12 +277,15 @@ object Agent { * * @author Jonas Bonér */ -final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transactor { +final class AgentDispatcher[T] private (ref: Ref[T]) extends Transactor { import Agent._ - import Actor._ - log.debug("Starting up Agent [%s]", self.uuid) - private val value = Ref[T](initialValue) + private[akka] def this(initialValue: T) = this(Ref(initialValue)) + private[akka] def this() = this(Ref[T]()) + + private val value = ref + + log.debug("Starting up Agent [%s]", self.uuid) /** * Periodically handles incoming messages. @@ -267,6 +293,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto def receive = { case Value(v: T) => swap(v) + case Read => self.reply_?(value.get()) case Function(fun: (T => T)) => swap(fun(value.getOrWait)) case Procedure(proc: (T => Unit)) => diff --git a/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala new file mode 100644 index 0000000000..02a93e949b --- /dev/null +++ b/akka-remote/src/test/scala/remote/RemoteAgentSpec.scala @@ -0,0 +1,37 @@ +package se.scalablesolutions.akka.actor.remote + +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} +import se.scalablesolutions.akka.config.RemoteAddress +import se.scalablesolutions.akka.actor.Agent +import se.scalablesolutions.akka.remote. {RemoteClient, RemoteServer} + + +class RemoteAgentSpec extends JUnitSuite { + var server: RemoteServer = _ + + val HOSTNAME = "localhost" + val PORT = 9992 + + @Before def startServer { + val s = new RemoteServer() + s.start(HOSTNAME, PORT) + server = s + Thread.sleep(1000) + } + + @After def stopServer { + val s = server + server = null + s.shutdown + RemoteClient.shutdownAll + } + + @Test def remoteAgentShouldInitializeProperly { + val a = Agent(10,RemoteAddress(HOSTNAME,PORT)) + assert(a() == 10, "Remote agent should have the proper initial value") + a(20) + assert(a() == 20, "Remote agent should be updated properly") + a.close + } +} \ No newline at end of file From 9b59bffa9bc7cb0c78342f0a7360c7671cf34594 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 26 Oct 2010 16:40:09 +0200 Subject: [PATCH 05/11] Adding possibility to take naps between scans for finished future, closing ticket #449 --- akka-actor/src/main/scala/dispatch/Future.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index ea06ebb4ec..57ea36d5d0 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -37,10 +37,16 @@ object Futures { def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - def awaitOne(futures: List[Future[_]]): Future[_] = { + /** + * Returns the First Future that is completed + * if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans + */ + def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = { var future: Option[Future[_]] = None do { future = futures.find(_.isCompleted) + if (sleepMs > 0 && future.isEmpty) + Thread.sleep(sleepMs) } while (future.isEmpty) future.get } @@ -110,7 +116,7 @@ trait CompletableFuture[T] extends Future[T] { // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { - private val TIME_UNIT = TimeUnit.MILLISECONDS + import TimeUnit.{MILLISECONDS => TIME_UNIT} def this() = this(0) val timeoutInNanos = TIME_UNIT.toNanos(timeout) From ff81cfbf264f4b0d787e5ffad7d7bb81c3995f04 Mon Sep 17 00:00:00 2001 From: imn Date: Tue, 26 Oct 2010 19:58:14 +0200 Subject: [PATCH 06/11] refactoring the FSM part --- akka-actor/src/main/scala/actor/FSM.scala | 135 +++++++++++++----- .../test/scala/actor/actor/FSMActorSpec.scala | 51 +++++-- .../src/main/scala/DiningHakkersOnFsm.scala | 57 ++++---- 3 files changed, 167 insertions(+), 76 deletions(-) diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index 0bdc04fc48..c5eb00a6fd 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -4,58 +4,125 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.stm.Ref -import se.scalablesolutions.akka.stm.local._ - +import scala.collection.mutable import java.util.concurrent.{ScheduledFuture, TimeUnit} -trait FSM[S] { this: Actor => +trait FSM[S, D] { + this: Actor => type StateFunction = scala.PartialFunction[Event, State] - var currentState: State = initialState - var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None - def initialState: State + private val transitions = mutable.Map[S, StateFunction]() - def handleEvent: StateFunction = { - case event@Event(value, stateData) => - log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) - State(NextState, currentState.stateFunction, stateData, currentState.timeout) + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + + protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { + setState(State(stateName, stateData, timeout)) + } + + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) + } + + protected final def goto(nextStateName: S): State = { + State(nextStateName, currentState.stateData) + } + + protected final def stay(): State = { + goto(currentState.stateName) + } + + protected final def reply(replyValue: Any): State = { + self.sender.foreach(_ ! replyValue) + stay() + } + + /** + * Stop + */ + protected final def stop(): State = { + stop(Normal) + } + + protected final def stop(reason: Reason): State = { + stop(reason, currentState.stateData) + } + + protected final def stop(reason: Reason, stateData: D): State = { + log.info("Stopped because of reason: %s", reason) + terminate(reason, currentState.stateName, stateData) + self.stop + State(currentState.stateName, stateData) + } + + def terminate(reason: Reason, stateName: S, stateData: D) = () + + def whenUnhandled(stateFunction: StateFunction) = { + handleEvent = stateFunction + } + + private var handleEvent: StateFunction = { + case Event(value, stateData) => + log.warning("Event %s not handled in state %s - keeping current state with data %s", value, currentState.stateName, stateData) + currentState } override final protected def receive: Receive = { + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => () + // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} - val event = Event(value, currentState.stateData) - val newState = (currentState.stateFunction orElse handleEvent).apply(event) - - currentState = newState - - newState match { - case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) - case _ => () // ignore for now - } - - newState.timeout.foreach { - timeout => - timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) + if (self.isRunning) { + setState(nextState) } } } - case class State(stateEvent: StateEvent, - stateFunction: StateFunction, - stateData: S, - timeout: Option[Int] = None, - replyValue: Option[Any] = None) + private def setState(nextState: State) = { + if (!transitions.contains(nextState.stateName)) { + stop(Failure("Next state %s not available".format(nextState.stateName))) + } else { + currentState = nextState + currentState.timeout.foreach {t => timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))} + } + } - case class Event(event: Any, stateData: S) + case class Event(event: Any, stateData: D) - sealed trait StateEvent - object NextState extends StateEvent - object Reply extends StateEvent + case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { + def until(timeout: Long): State = { + copy(timeout = Some(timeout)) + } - object StateTimeout + def then(nextStateName: S): State = { + copy(stateName = nextStateName) + } + + def replying(replyValue:Any): State = { + self.sender.foreach(_ ! replyValue) + this + } + + def using(nextStateDate: D): State = { + copy(stateData = nextStateDate) + } + } + + sealed trait Reason + case object Normal extends Reason + case object Shutdown extends Reason + case class Failure(cause: Any) extends Reason + + case object StateTimeout } diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index e4515bd3da..496d9e9e01 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -13,34 +13,44 @@ import java.util.concurrent.TimeUnit object FSMActorSpec { - class Lock(code: String, - timeout: Int, - unlockedLatch: StandardLatch, - lockedLatch: StandardLatch) extends Actor with FSM[CodeState] { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + val unhandledLatch = new StandardLatch - def initialState = State(NextState, locked, CodeState("", code)) + class Lock(code: String, timeout: Int) extends Actor with FSM[String, CodeState] { - def locked: StateFunction = { + inState("locked") { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => - State(NextState, locked, CodeState(incomplete, code)) + stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - State(NextState, open, CodeState("", code), Some(timeout)) + goto("open") using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) - State(NextState, locked, CodeState("", code)) + stay using CodeState("", code) } } } + case Event("hello", _) => stay replying "world" } - def open: StateFunction = { + inState("open") { case Event(StateTimeout, stateData) => { doLock - State(NextState, locked, stateData) + goto("locked") + } + } + + setInitialState("locked", CodeState("", code)) + + whenUnhandled { + case Event(_, stateData) => { + log.info("Unhandled") + unhandledLatch.open + stay } } @@ -63,11 +73,9 @@ class FSMActorSpec extends JUnitSuite { @Test def unlockTheLock = { - val unlockedLatch = new StandardLatch - val lockedLatch = new StandardLatch // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + val lock = Actor.actorOf(new Lock("33221", 1000)).start lock ! '3' lock ! '3' @@ -77,6 +85,21 @@ class FSMActorSpec extends JUnitSuite { assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + + lock ! "not_handled" + assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + + val answerLatch = new StandardLatch + object Go + val tester = Actor.actorOf(new Actor { + protected def receive = { + case Go => lock ! "hello" + case "world" => answerLatch.open + + } + }).start + tester ! Go + assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 8348de2134..9ab27d4fbb 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -20,27 +20,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[TakenBy] { +class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { self.id = name - // A chopstick begins its existence as available and taken by no one - def initialState = State(NextState, available, TakenBy(None)) - // When a chopstick is available, it can be taken by a some hakker - def available: StateFunction = { + inState("available") { case Event(Take, _) => - State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + goto("taken") using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - def taken: StateFunction = { + inState("taken") { case Event(Take, currentState) => - State(Reply, taken, currentState, replyValue = Some(Busy(self))) + stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - State(NextState, available, TakenBy(None)) + goto("available") using TakenBy(None) } + + // A chopstick begins its existence as available and taken by no one + setInitialState("available", TakenBy(None)) } /** @@ -57,13 +57,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[String, TakenChopsticks] { self.id = name - //All hakkers start waiting - def initialState = State(NextState, waiting, TakenChopsticks(None, None)) - - def waiting: StateFunction = { + inState("waiting") { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -71,30 +68,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - def thinking: StateFunction = { - case Event(StateTimeout, current) => + inState("thinking") { + case Event(StateTimeout, _) => left ! Take right ! Take - State(NextState, hungry, current) + goto("hungry") } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - def hungry: StateFunction = { + inState("hungry") { case Event(Taken(`left`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + goto("waitForOtherChopstick") using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) - case Event(Busy(_), current) => - State(NextState, firstChopstickDenied, current) + goto("waitForOtherChopstick") using TakenChopsticks(None, Some(right)) + case Event(Busy(_), _) => + goto("firstChopstickDenied") } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - def waitForOtherChopstick: StateFunction = { + inState("waitForOtherChopstick") { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -105,13 +102,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + goto("eating") using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - def firstChopstickDenied: StateFunction = { + inState("firstChopstickDenied") { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -121,7 +118,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - def eating: StateFunction = { + inState("eating") { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -130,15 +127,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + goto("thinking") using TakenChopsticks(None, None) until period } + + //All hakkers start waiting + setInitialState("waiting", TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ object DiningHakkersOnFSM { - def run { + def main(args: Array[String]) { + // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick From 90642f830352cc7d3b81ba9cb0a0c1d0683e2745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 26 Oct 2010 23:43:16 +0200 Subject: [PATCH 07/11] Changed the script to spit out a full akka.conf file with the secure cookie --- ...h => generate_config_with_secure_cookie.sh} | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) rename scripts/{generate_secure_cookie.sh => generate_config_with_secure_cookie.sh} (62%) diff --git a/scripts/generate_secure_cookie.sh b/scripts/generate_config_with_secure_cookie.sh similarity index 62% rename from scripts/generate_secure_cookie.sh rename to scripts/generate_config_with_secure_cookie.sh index bd3813793e..bd7a83eda2 100755 --- a/scripts/generate_secure_cookie.sh +++ b/scripts/generate_config_with_secure_cookie.sh @@ -23,5 +23,19 @@ bytes.foreach { b => sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) } -println("Cryptographically secure cookie:") -println(sb.toString) +print(""" +# This config imports the Akka reference configuration. +include "akka-reference.conf" + +# In this file you can override any option defined in the 'akka-reference.conf' file. +# Copy in all or parts of the 'akka-reference.conf' file and modify as you please. + +akka { + remote { + secure-cookie = """") +print(sb.toString) +print("""" + } +} +""") + From 83ab962bdac839f97a78cb2831ecf700e262b163 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 26 Oct 2010 23:44:13 +0200 Subject: [PATCH 08/11] converted tabs to spaces --- akka-actor/src/main/scala/util/Helpers.scala | 8 +- .../main/scala/CouchDBStorageBackend.scala | 190 +++++++++--------- .../HbaseStorageSpecTestIntegration.scala | 12 +- config/microkernel-server.xml | 42 ++-- project/build/AkkaProject.scala | 8 +- 5 files changed, 130 insertions(+), 130 deletions(-) diff --git a/akka-actor/src/main/scala/util/Helpers.scala b/akka-actor/src/main/scala/util/Helpers.scala index b9fa5d10b6..bc468fff3c 100644 --- a/akka-actor/src/main/scala/util/Helpers.scala +++ b/akka-actor/src/main/scala/util/Helpers.scala @@ -72,10 +72,10 @@ object Helpers extends Logging { * * scala> res0() * java.lang.RuntimeException: Lets see what happens here... - * at ResultOrError.apply(Helper.scala:11) - * at .(:6) - * at .() - * at Re... + * at ResultOrError.apply(Helper.scala:11) + * at .(:6) + * at .() + * at Re... * */ class ResultOrError[R](result: R){ diff --git a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala index 3be2657540..dcda8e8d3d 100644 --- a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala +++ b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala @@ -23,21 +23,21 @@ private [akka] object CouchDBStorageBackend extends RefStorageBackend[Array[Byte]] with Logging { - - import dispatch.json._ - - implicit object widgetWrites extends Writes[Map[String,Any]] { - def writes(o: Map[String,Any]): JsValue = JsValue(o) - } + + import dispatch.json._ + + implicit object widgetWrites extends Writes[Map[String,Any]] { + def writes(o: Map[String,Any]): JsValue = JsValue(o) + } - lazy val URL = config. + lazy val URL = config. getString("akka.storage.couchdb.url"). getOrElse(throw new IllegalArgumentException("'akka.storage.couchdb.url' not found in config")) def drop() = { val client = new HttpClient() - val delete = new DeleteMethod(URL) - client.executeMethod(delete) + val delete = new DeleteMethod(URL) + client.executeMethod(delete) } def create() = { @@ -45,60 +45,60 @@ private [akka] object CouchDBStorageBackend extends val put = new PutMethod(URL) put.setRequestEntity(new StringRequestEntity("", null, "utf-8")) put.setRequestHeader("Content-Type", "application/json") - client.executeMethod(put) + client.executeMethod(put) put.getResponseBodyAsString } - private def storeMap(name: String, postfix: String, entries: List[(Array[Byte], Array[Byte])]) ={ - var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> (name + postfix)) - val dataJson = JsonSerialization.tojson(m) - postData(URL, dataJson.toString) - } - - private def storeMap(name: String, postfix: String, entries: Map[String, Any]) ={ + private def storeMap(name: String, postfix: String, entries: List[(Array[Byte], Array[Byte])]) ={ + var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> (name + postfix)) + val dataJson = JsonSerialization.tojson(m) + postData(URL, dataJson.toString) + } + + private def storeMap(name: String, postfix: String, entries: Map[String, Any]) ={ postData(URL, JsonSerialization.tojson(entries + ("_id" -> (name + postfix))).toString) - } + } private def getResponseForNameAsMap(name: String, postfix: String): Option[Map[String, Any]] = { getResponse(URL + name + postfix).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] } - def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ - val newDoc = getResponseForNameAsMap(name, "_map").getOrElse(Map[String, Any]()) ++ - entries.map(e => (new String(e._1) -> new String(e._2))).toMap + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ + val newDoc = getResponseForNameAsMap(name, "_map").getOrElse(Map[String, Any]()) ++ + entries.map(e => (new String(e._1) -> new String(e._2))).toMap storeMap(name, "_map", newDoc) - } - + } + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte])={ - insertMapStorageEntriesFor(name, List((key, value))) - } - - + insertMapStorageEntriesFor(name, List((key, value))) + } + + def removeMapStorageFor(name: String) { lazy val url = URL + name + "_map" - findDocRev(name + "_map").foreach(deleteData(url, _)) - } - + findDocRev(name + "_map").foreach(deleteData(url, _)) + } + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { lazy val sKey = new String(key) // if we can't find the map for name, then we don't need to delete it. - getResponseForNameAsMap(name, "_map").foreach(doc => storeMap(name, "_map", doc - sKey)) - } - + getResponseForNameAsMap(name, "_map").foreach(doc => storeMap(name, "_map", doc - sKey)) + } + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { lazy val sKey = new String(key) getResponseForNameAsMap(name, "_map").flatMap(_.get(sKey)).asInstanceOf[Option[String]].map(_.getBytes) - } + } - def getMapStorageSizeFor(name: String): Int = getMapStorageFor(name).size - + def getMapStorageSizeFor(name: String): Int = getMapStorageFor(name).size + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { - val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) - m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes)) - } + val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) + m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes)) + } - def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { val m = getResponseForNameAsMap(name, "_map").map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) val keys = m.keys.toList.sortWith(_ < _) @@ -112,7 +112,7 @@ private [akka] object CouchDBStorageBackend extends // slice from keys: both ends inclusive val ks = keys.slice(keys.indexOf(s), scala.math.min(keys.indexOf(s) + c, keys.indexOf(f) + 1)) ks.map(k => (k.getBytes, m(k).asInstanceOf[String].getBytes)) - } + } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { insertVectorStorageEntriesFor(name, List(element)) @@ -133,16 +133,16 @@ private [akka] object CouchDBStorageBackend extends } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] ={ - val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]] - if (v.indices.contains(index)) - v(index).getBytes - else - Array[Byte]() - } - + val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]] + if (v.indices.contains(index)) + v(index).getBytes + else + Array[Byte]() + } + def getVectorStorageSizeFor(name: String): Int ={ - getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0) - } + getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0) + } def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { val v = getResponseForNameAsMap(name, "_vector").flatMap(_.get("vector")).asInstanceOf[Option[List[String]]].getOrElse(List[String]()) @@ -151,60 +151,60 @@ private [akka] object CouchDBStorageBackend extends val c = if (count == 0) v.length else count v.slice(s, scala.math.min(s + c, f)).map(_.getBytes) } - + def insertRefStorageFor(name: String, element: Array[Byte]) ={ - val newDoc = getResponseForNameAsMap(name, "_ref").getOrElse(Map[String, Any]()) + ("ref" -> new String(element)) - storeMap(name, "_ref", newDoc) - } - - def getRefStorageFor(name: String): Option[Array[Byte]] ={ - getResponseForNameAsMap(name, "_ref").flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes) - } + val newDoc = getResponseForNameAsMap(name, "_ref").getOrElse(Map[String, Any]()) + ("ref" -> new String(element)) + storeMap(name, "_ref", newDoc) + } + + def getRefStorageFor(name: String): Option[Array[Byte]] ={ + getResponseForNameAsMap(name, "_ref").flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes) + } - private def findDocRev(name: String) = { - getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] - .flatMap(_.get("_rev")).asInstanceOf[Option[String]] - } + private def findDocRev(name: String) = { + getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] + .flatMap(_.get("_rev")).asInstanceOf[Option[String]] + } - private def deleteData(url:String, rev:String): Option[String] = { - val client = new HttpClient() - val delete = new DeleteMethod(url) - delete.setRequestHeader("If-Match", rev) - client.executeMethod(delete) - - val response = delete.getResponseBodyAsString() - if (response != null) - Some(response) - else - None - } + private def deleteData(url:String, rev:String): Option[String] = { + val client = new HttpClient() + val delete = new DeleteMethod(url) + delete.setRequestHeader("If-Match", rev) + client.executeMethod(delete) + + val response = delete.getResponseBodyAsString() + if (response != null) + Some(response) + else + None + } - private def postData(url: String, data: String): Option[String] = { - val client = new HttpClient() - val post = new PostMethod(url) - post.setRequestEntity(new StringRequestEntity(data, null, "utf-8")) - post.setRequestHeader("Content-Type", "application/json") - client.executeMethod(post) + private def postData(url: String, data: String): Option[String] = { + val client = new HttpClient() + val post = new PostMethod(url) + post.setRequestEntity(new StringRequestEntity(data, null, "utf-8")) + post.setRequestHeader("Content-Type", "application/json") + client.executeMethod(post) val response = post.getResponseBodyAsString if (response != null) - Some(response) - else - None - } - - private def getResponse(url: String): Option[String] = { - val client = new HttpClient() - val method = new GetMethod(url) + Some(response) + else + None + } + + private def getResponse(url: String): Option[String] = { + val client = new HttpClient() + val method = new GetMethod(url) - method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false)) - client.executeMethod(method) + client.executeMethod(method) val response = method.getResponseBodyAsString - if (method.getStatusCode == 200 && response != null) - Some(response) - else - None - } + if (method.getStatusCode == 200 && response != null) + Some(response) + else + None + } } diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala index 4d118850f0..02f3c04172 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpecTestIntegration.scala @@ -82,12 +82,12 @@ BeforeAndAfterEach { ("guido van rossum", "python"), ("james strachan", "groovy")) val rl = List( - ("james gosling", "java"), - ("james strachan", "groovy"), - ("larry wall", "perl"), - ("martin odersky", "scala"), - ("ola bini", "ioke"), ("rich hickey", "clojure"), - ("slava pestov", "factor")) + ("james gosling", "java"), + ("james strachan", "groovy"), + ("larry wall", "perl"), + ("martin odersky", "scala"), + ("ola bini", "ioke"), ("rich hickey", "clojure"), + ("slava pestov", "factor")) insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) }) getMapStorageSizeFor("t1") should equal(l.size) getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1)) diff --git a/config/microkernel-server.xml b/config/microkernel-server.xml index d7b8087428..71160a6e54 100644 --- a/config/microkernel-server.xml +++ b/config/microkernel-server.xml @@ -38,29 +38,29 @@ 2 false 8443 - 20000 - 5000 + 20000 + 5000 + 30000 + 2 + 100 + /etc/keystore + PASSWORD + KEYPASSWORD + /etc/keystore + TRUSTPASSWORD + + + + --> @@ -73,10 +73,10 @@ / - se.scalablesolutions.akka.comet.AkkaServlet - /* - - + se.scalablesolutions.akka.comet.AkkaServlet + /* + + @@ -94,4 +94,4 @@ true 1000 - + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 6add9542ee..440c838d5c 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { object Dependencies { // Compile - lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" + lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" lazy val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile" @@ -606,7 +606,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { - + @@ -655,8 +655,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val couch = Dependencies.commonsHttpClient - val spec = Dependencies.specs + val couch = Dependencies.commonsHttpClient + val spec = Dependencies.specs override def testOptions = createTestFilter( _.endsWith("Test")) } From 40ddac666c2010325cb8e6ac0fd5706995d6e4fb Mon Sep 17 00:00:00 2001 From: imn Date: Wed, 27 Oct 2010 11:41:35 +0200 Subject: [PATCH 09/11] use nice case objects for the states :-) --- .../test/scala/actor/actor/FSMActorSpec.scala | 16 +++-- .../src/main/scala/DiningHakkersOnFsm.scala | 62 ++++++++++++------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index 496d9e9e01..8646dd5561 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -17,16 +17,20 @@ object FSMActorSpec { val lockedLatch = new StandardLatch val unhandledLatch = new StandardLatch - class Lock(code: String, timeout: Int) extends Actor with FSM[String, CodeState] { + sealed trait LockState + case object Locked extends LockState + case object Open extends LockState - inState("locked") { + class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] { + + inState(Locked) { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - goto("open") using CodeState("", code) until timeout + goto(Open) using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) @@ -37,14 +41,14 @@ object FSMActorSpec { case Event("hello", _) => stay replying "world" } - inState("open") { + inState(Open) { case Event(StateTimeout, stateData) => { doLock - goto("locked") + goto(Locked) } } - setInitialState("locked", CodeState("", code)) + setInitialState(Locked, CodeState("", code)) whenUnhandled { case Event(_, stateData) => { diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 9ab27d4fbb..ecb4d82ba0 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -12,6 +12,13 @@ object Put extends ChopstickMessage case class Taken(chopstick: ActorRef) extends ChopstickMessage case class Busy(chopstick: ActorRef) extends ChopstickMessage +/** + * Some states the chopstick can be in + */ +sealed trait ChopstickState +case object Available extends ChopstickState +case object Taken extends ChopstickState + /** * Some state container for the chopstick */ @@ -20,27 +27,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { +class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { self.id = name // When a chopstick is available, it can be taken by a some hakker - inState("available") { + inState(Available) { case Event(Take, _) => - goto("taken") using TakenBy(self.sender) replying Taken(self) + goto(Taken) using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - inState("taken") { + inState(Taken) { case Event(Take, currentState) => stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - goto("available") using TakenBy(None) + goto(Available) using TakenBy(None) } // A chopstick begins its existence as available and taken by no one - setInitialState("available", TakenBy(None)) + setInitialState(Available, TakenBy(None)) } /** @@ -49,6 +56,17 @@ class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { sealed trait FSMHakkerMessage object Think extends FSMHakkerMessage +/** + * Some fsm hakker states + */ +sealed trait FSMHakkerState +case object Waiting extends FSMHakkerState +case object Thinking extends FSMHakkerState +case object Hungry extends FSMHakkerState +case object WaitForOtherChopstick extends FSMHakkerState +case object FirstChopstickDenied extends FSMHakkerState +case object Eating extends FSMHakkerState + /** * Some state container to keep track of which chopsticks we have */ @@ -57,10 +75,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[String, TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] { self.id = name - inState("waiting") { + inState(Waiting) { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -68,30 +86,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - inState("thinking") { + inState(Thinking) { case Event(StateTimeout, _) => left ! Take right ! Take - goto("hungry") + goto(Hungry) } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - inState("hungry") { + inState(Hungry) { case Event(Taken(`left`), _) => - goto("waitForOtherChopstick") using TakenChopsticks(Some(left), None) + goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - goto("waitForOtherChopstick") using TakenChopsticks(None, Some(right)) + goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right)) case Event(Busy(_), _) => - goto("firstChopstickDenied") + goto(FirstChopstickDenied) } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - inState("waitForOtherChopstick") { + inState(WaitForOtherChopstick) { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -102,13 +120,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - goto("eating") using TakenChopsticks(Some(left), Some(right)) until 5000 + goto(Eating) using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - inState("firstChopstickDenied") { + inState(FirstChopstickDenied) { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -118,7 +136,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - inState("eating") { + inState(Eating) { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -127,19 +145,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - goto("thinking") using TakenChopsticks(None, None) until period + goto(Thinking) using TakenChopsticks(None, None) until period } //All hakkers start waiting - setInitialState("waiting", TakenChopsticks(None, None)) + setInitialState(Waiting, TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ -object DiningHakkersOnFSM { - def main(args: Array[String]) { +object DiningHakkersOnFsm { + def run = { // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick From 658b073ace5c60f681045d2c48a94fa331555e82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 27 Oct 2010 13:22:38 +0200 Subject: [PATCH 10/11] Improved secure cookie generation script --- scripts/generate_config_with_secure_cookie.sh | 45 ++++++++++++++----- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/scripts/generate_config_with_secure_cookie.sh b/scripts/generate_config_with_secure_cookie.sh index bd7a83eda2..899ec22025 100755 --- a/scripts/generate_config_with_secure_cookie.sh +++ b/scripts/generate_config_with_secure_cookie.sh @@ -7,20 +7,41 @@ exec scala "$0" "$@" */ import java.security.{MessageDigest, SecureRandom} -lazy val random = SecureRandom.getInstance("SHA1PRNG") +/** + * @author Jonas Bonér + */ +object Crypt { + val hex = "0123456789ABCDEF" + val lineSeparator = System.getProperty("line.separator") -val buffer = Array.fill(32)(0.byteValue) -random.nextBytes(buffer) + lazy val random = SecureRandom.getInstance("SHA1PRNG") -val digest = MessageDigest.getInstance("SHA1") -digest.update(buffer) -val bytes = digest.digest + def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII")) -val sb = new StringBuilder -val hex = "0123456789ABCDEF" -bytes.foreach { b => - val n = b.asInstanceOf[Int] - sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) + def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5")) + + def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII")) + + def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1")) + + def generateSecureCookie: String = { + val bytes = Array.fill(32)(0.byteValue) + random.nextBytes(bytes) + sha1(bytes) + } + + def digest(bytes: Array[Byte], md: MessageDigest): String = { + md.update(bytes) + hexify(md.digest) + } + + def hexify(bytes: Array[Byte]): String = { + val builder = new StringBuilder + bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) } + builder.toString + } + + private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n") } print(""" @@ -33,7 +54,7 @@ include "akka-reference.conf" akka { remote { secure-cookie = """") -print(sb.toString) +print(Crypt.generateSecureCookie) print("""" } } From 440d36aed9dc1c5f3ef0b1a390007dd999bbd4d8 Mon Sep 17 00:00:00 2001 From: imn Date: Wed, 27 Oct 2010 15:45:30 +0200 Subject: [PATCH 11/11] polishing up code --- akka-actor/src/main/scala/actor/FSM.scala | 89 ++++++++++--------- .../test/scala/actor/actor/FSMActorSpec.scala | 18 +++- 2 files changed, 61 insertions(+), 46 deletions(-) diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index c5eb00a6fd..eac861d358 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -12,27 +12,15 @@ trait FSM[S, D] { type StateFunction = scala.PartialFunction[Event, State] - private var currentState: State = _ - private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None - - private val transitions = mutable.Map[S, StateFunction]() - - private def register(name: S, function: StateFunction) { - if (transitions contains name) { - transitions(name) = transitions(name) orElse function - } else { - transitions(name) = function - } + /** DSL */ + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) } protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { setState(State(stateName, stateData, timeout)) } - protected final def inState(stateName: S)(stateFunction: StateFunction) = { - register(stateName, stateFunction) - } - protected final def goto(nextStateName: S): State = { State(nextStateName, currentState.stateData) } @@ -41,14 +29,6 @@ trait FSM[S, D] { goto(currentState.stateName) } - protected final def reply(replyValue: Any): State = { - self.sender.foreach(_ ! replyValue) - stay() - } - - /** - * Stop - */ protected final def stop(): State = { stop(Normal) } @@ -58,59 +38,82 @@ trait FSM[S, D] { } protected final def stop(reason: Reason, stateData: D): State = { - log.info("Stopped because of reason: %s", reason) - terminate(reason, currentState.stateName, stateData) - self.stop - State(currentState.stateName, stateData) + self ! Stop(reason, stateData) + stay } - def terminate(reason: Reason, stateName: S, stateData: D) = () - def whenUnhandled(stateFunction: StateFunction) = { handleEvent = stateFunction } + def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = { + terminateEvent = terminationHandler + } + + /** FSM State data and default handlers */ + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + + private val transitions = mutable.Map[S, StateFunction]() + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + private var handleEvent: StateFunction = { case Event(value, stateData) => - log.warning("Event %s not handled in state %s - keeping current state with data %s", value, currentState.stateName, stateData) - currentState + log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName) + stay + } + + private var terminateEvent: PartialFunction[Reason, Unit] = { + case failure@Failure(_) => log.error("Stopping because of a %s", failure) + case reason => log.info("Stopping because of reason: %s", reason) } override final protected def receive: Receive = { - case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => () + case Stop(reason, stateData) => + terminateEvent.apply(reason) + self.stop + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => + log.trace("Ignoring StateTimeout - ") // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} val event = Event(value, currentState.stateData) val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) - if (self.isRunning) { - setState(nextState) - } + setState(nextState) } } private def setState(nextState: State) = { if (!transitions.contains(nextState.stateName)) { - stop(Failure("Next state %s not available".format(nextState.stateName))) + stop(Failure("Next state %s does not exist".format(nextState.stateName))) } else { currentState = nextState - currentState.timeout.foreach {t => timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))} + currentState.timeout.foreach { + t => + timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS)) + } } } case class Event(event: Any, stateData: D) case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { + def until(timeout: Long): State = { copy(timeout = Some(timeout)) } - def then(nextStateName: S): State = { - copy(stateName = nextStateName) - } - def replying(replyValue:Any): State = { - self.sender.foreach(_ ! replyValue) + self.sender match { + case Some(sender) => sender ! replyValue + case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue) + } this } @@ -125,4 +128,6 @@ trait FSM[S, D] { case class Failure(cause: Any) extends Reason case object StateTimeout + + private case class Stop(reason: Reason, stateData: D) } diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index 8646dd5561..dc6893c820 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -16,6 +16,7 @@ object FSMActorSpec { val unlockedLatch = new StandardLatch val lockedLatch = new StandardLatch val unhandledLatch = new StandardLatch + val terminatedLatch = new StandardLatch sealed trait LockState case object Locked extends LockState @@ -39,6 +40,7 @@ object FSMActorSpec { } } case Event("hello", _) => stay replying "world" + case Event("bye", _) => stop(Shutdown) } inState(Open) { @@ -58,6 +60,10 @@ object FSMActorSpec { } } + onTermination { + case reason => terminatedLatch.open + } + private def doLock() { log.info("Locked") lockedLatch.open @@ -94,16 +100,20 @@ class FSMActorSpec extends JUnitSuite { assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) val answerLatch = new StandardLatch - object Go + object Hello + object Bye val tester = Actor.actorOf(new Actor { protected def receive = { - case Go => lock ! "hello" + case Hello => lock ! "hello" case "world" => answerLatch.open - + case Bye => lock ! "bye" } }).start - tester ! Go + tester ! Hello assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) + + tester ! Bye + assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS)) } }