diff --git a/akka-actor/src/main/scala/config/Config.scala b/akka-actor/src/main/scala/config/Config.scala index e97347754b..16ed6d76c4 100644 --- a/akka-actor/src/main/scala/config/Config.scala +++ b/akka-actor/src/main/scala/config/Config.scala @@ -27,10 +27,6 @@ object ConfigLogger extends Logging object Config { val VERSION = "1.0-SNAPSHOT" - // Set Multiverse options for max speed - System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") - System.setProperty("org.multiverse.api.GlobalStmInstance.factorymethod", "org.multiverse.stms.alpha.AlphaStm.createFast") - val HOME = { val envHome = System.getenv("AKKA_HOME") match { case null | "" | "." => None diff --git a/akka-actor/src/main/scala/util/Crypt.scala b/akka-actor/src/main/scala/util/Crypt.scala index e8cd2f82dd..fad9e4c281 100644 --- a/akka-actor/src/main/scala/util/Crypt.scala +++ b/akka-actor/src/main/scala/util/Crypt.scala @@ -9,28 +9,37 @@ import java.security.{MessageDigest, SecureRandom} /** * @author Jonas Bonér */ -object Crypt { +object Crypt extends Logging { + val hex = "0123456789ABCDEF" + val lineSeparator = System.getProperty("line.separator") + lazy val random = SecureRandom.getInstance("SHA1PRNG") + def md5(text: String): String = md5(unifyLineSeparator(text).getBytes("ASCII")) + + def md5(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("MD5")) + + def sha1(text: String): String = sha1(unifyLineSeparator(text).getBytes("ASCII")) + + def sha1(bytes: Array[Byte]): String = digest(bytes, MessageDigest.getInstance("SHA1")) + def generateSecureCookie: String = { - val bytes = Array.make(32, 0.byteValue) + log.info("Generating secure cookie...") + val bytes = Array.fill(32)(0.byteValue) random.nextBytes(bytes) - getMD5For(bytes) + sha1(bytes) } - def getMD5For(s: String): String = getMD5For(s.getBytes("ASCII")) - - def getMD5For(b: Array[Byte]): String = { - val digest = MessageDigest.getInstance("MD5") - digest.update(b) - val bytes = digest.digest - - val sb = new StringBuilder - val hex = "0123456789ABCDEF" - bytes.foreach { b => - val n = b.asInstanceOf[Int] - sb.append(hex.charAt((n & 0xF) >> 4)).append(hex.charAt(n & 0xF)) - } - sb.toString + def digest(bytes: Array[Byte], md: MessageDigest): String = { + md.update(bytes) + hexify(md.digest) + } + + def hexify(bytes: Array[Byte]): String = { + val builder = new StringBuilder + bytes.foreach { byte => builder.append(hex.charAt((byte & 0xF) >> 4)).append(hex.charAt(byte & 0xF)) } + builder.toString } -} \ No newline at end of file + + private def unifyLineSeparator(text: String): String = text.replaceAll(lineSeparator, "\n") +} diff --git a/akka-actor/src/main/scala/util/Helpers.scala b/akka-actor/src/main/scala/util/Helpers.scala index 75ba61620a..b9fa5d10b6 100644 --- a/akka-actor/src/main/scala/util/Helpers.scala +++ b/akka-actor/src/main/scala/util/Helpers.scala @@ -20,6 +20,10 @@ object Helpers extends Logging { bytes } + def bytesToInt(bytes: Array[Byte], offset: Int): Int = { + (0 until 4).foldLeft(0)((value, index) => value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) + } + /** * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException * if the actual type is not assignable from the given one. @@ -41,4 +45,56 @@ object Helpers extends Logging { log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName) None } + + /** + * Reference that can hold either a typed value or an exception. + * + * Usage: + *
+   * scala> ResultOrError(1)
+   * res0: ResultOrError[Int] = ResultOrError@a96606
+   *
+   * scala> res0()
+    res1: Int = 1
+   *
+   * scala> res0() = 3
+   *
+   * scala> res0()
+   * res3: Int = 3
+   * 
+   * scala> res0() = { println("Hello world"); 3}
+   * Hello world
+   *
+   * scala> res0()
+   * res5: Int = 3
+   *  
+   * scala> res0() = error("Lets see what happens here...")
+   *
+   * scala> res0()
+   * java.lang.RuntimeException: Lets see what happens here...
+   * 	at ResultOrError.apply(Helper.scala:11)
+   * 	at .(:6)
+   * 	at .()
+   * 	at Re...
+   * 
+ */ + class ResultOrError[R](result: R){ + private[this] var contents: Either[R, Throwable] = Left(result) + + def update(value: => R) = { + contents = try { + Left(value) + } catch { + case (error : Throwable) => Right(error) + } + } + + def apply() = contents match { + case Left(result) => result + case Right(error) => throw error.fillInStackTrace + } + } + object ResultOrError { + def apply[R](result: R) = new ResultOrError(result) + } } \ No newline at end of file diff --git a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index e5265ea396..60f5004861 100644 --- a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -3635,6 +3635,13 @@ public final class RemoteProtocol { return metadata_.get(index); } + // optional string cookie = 8; + public static final int COOKIE_FIELD_NUMBER = 8; + private boolean hasCookie; + private java.lang.String cookie_ = ""; + public boolean hasCookie() { return hasCookie; } + public java.lang.String getCookie() { return cookie_; } + private void initFields() { uuid_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); message_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance(); @@ -3686,6 +3693,9 @@ public final class RemoteProtocol { for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) { output.writeMessage(7, element); } + if (hasCookie()) { + output.writeString(8, getCookie()); + } getUnknownFields().writeTo(output); } @@ -3723,6 +3733,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, element); } + if (hasCookie()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(8, getCookie()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3909,6 +3923,9 @@ public final class RemoteProtocol { } result.metadata_.addAll(other.metadata_); } + if (other.hasCookie()) { + setCookie(other.getCookie()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3989,6 +4006,10 @@ public final class RemoteProtocol { addMetadata(subBuilder.buildPartial()); break; } + case 66: { + setCookie(input.readString()); + break; + } } } } @@ -4248,6 +4269,27 @@ public final class RemoteProtocol { return this; } + // optional string cookie = 8; + public boolean hasCookie() { + return result.hasCookie(); + } + public java.lang.String getCookie() { + return result.getCookie(); + } + public Builder setCookie(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasCookie = true; + result.cookie_ = value; + return this; + } + public Builder clearCookie() { + result.hasCookie = false; + result.cookie_ = getDefaultInstance().getCookie(); + return this; + } + // @@protoc_insertion_point(builder_scope:RemoteRequestProtocol) } @@ -4341,6 +4383,13 @@ public final class RemoteProtocol { return metadata_.get(index); } + // optional string cookie = 8; + public static final int COOKIE_FIELD_NUMBER = 8; + private boolean hasCookie; + private java.lang.String cookie_ = ""; + public boolean hasCookie() { return hasCookie; } + public java.lang.String getCookie() { return cookie_; } + private void initFields() { uuid_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); message_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance(); @@ -4391,6 +4440,9 @@ public final class RemoteProtocol { for (se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol element : getMetadataList()) { output.writeMessage(7, element); } + if (hasCookie()) { + output.writeString(8, getCookie()); + } getUnknownFields().writeTo(output); } @@ -4428,6 +4480,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(7, element); } + if (hasCookie()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(8, getCookie()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4614,6 +4670,9 @@ public final class RemoteProtocol { } result.metadata_.addAll(other.metadata_); } + if (other.hasCookie()) { + setCookie(other.getCookie()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4689,6 +4748,10 @@ public final class RemoteProtocol { addMetadata(subBuilder.buildPartial()); break; } + case 66: { + setCookie(input.readString()); + break; + } } } } @@ -4929,6 +4992,27 @@ public final class RemoteProtocol { return this; } + // optional string cookie = 8; + public boolean hasCookie() { + return result.hasCookie(); + } + public java.lang.String getCookie() { + return result.getCookie(); + } + public Builder setCookie(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasCookie = true; + result.cookie_ = value; + return this; + } + public Builder clearCookie() { + result.hasCookie = false; + result.cookie_ = getDefaultInstance().getCookie(); + return this; + } + // @@protoc_insertion_point(builder_scope:RemoteReplyProtocol) } @@ -6657,33 +6741,34 @@ public final class RemoteProtocol { "\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016type" + "dActorInfo\030\005 \001(\0132\027.TypedActorInfoProtoco" + "l\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" + - "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025R" + + "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\232\002\n\025R" + "emoteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" + "dProtocol\022!\n\007message\030\002 \002(\0132\020.MessageProt", "ocol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProto" + "col\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030" + "\005 \001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.R" + "emoteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132" + - "\026.MetadataEntryProtocol\"\364\001\n\023RemoteReplyP" + - "rotocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007" + - "message\030\002 \001(\0132\020.MessageProtocol\022%\n\texcep" + - "tion\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016superv" + - "isorUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor" + - "\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata", - "\030\007 \003(\0132\026.MetadataEntryProtocol\")\n\014UuidPr" + - "otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" + - "adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" + - "\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle" + - "\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoco" + - "l\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Exc" + - "eptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007mes" + - "sage\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001" + - "\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Ser" + - "ializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINAR", - "Y\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PR" + - "OTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001" + - "\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolutions." + - "akka.remote.protocolH\001" + "\026.MetadataEntryProtocol\022\016\n\006cookie\030\010 \001(\t\"" + + "\204\002\n\023RemoteReplyProtocol\022\033\n\004uuid\030\001 \002(\0132\r." + + "UuidProtocol\022!\n\007message\030\002 \001(\0132\020.MessageP" + + "rotocol\022%\n\texception\030\003 \001(\0132\022.ExceptionPr" + + "otocol\022%\n\016supervisorUuid\030\004 \001(\0132\r.UuidPro" + + "tocol\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccessful\030\006", + " \002(\010\022(\n\010metadata\030\007 \003(\0132\026.MetadataEntryPr" + + "otocol\022\016\n\006cookie\030\010 \001(\t\")\n\014UuidProtocol\022\014" + + "\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEnt" + + "ryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6" + + "\n\021LifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016" + + ".LifeCycleType\"1\n\017AddressProtocol\022\020\n\010hos" + + "tname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionPr" + + "otocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002" + + "(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016\n\nJAVA" + + "_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Serializati", + "onSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nS" + + "CALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005" + + "*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMP" + + "ORARY\020\002B-\n)se.scalablesolutions.akka.rem" + + "ote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6751,7 +6836,7 @@ public final class RemoteProtocol { internal_static_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteRequestProtocol_descriptor, - new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", }, + new java.lang.String[] { "Uuid", "Message", "ActorInfo", "IsOneWay", "SupervisorUuid", "Sender", "Metadata", "Cookie", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class); internal_static_RemoteReplyProtocol_descriptor = @@ -6759,7 +6844,7 @@ public final class RemoteProtocol { internal_static_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteReplyProtocol_descriptor, - new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", }, + new java.lang.String[] { "Uuid", "Message", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", "Metadata", "Cookie", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class); internal_static_UuidProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 40c5756e04..ce694141a0 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -102,6 +102,7 @@ message RemoteRequestProtocol { optional UuidProtocol supervisorUuid = 5; optional RemoteActorRefProtocol sender = 6; repeated MetadataEntryProtocol metadata = 7; + optional string cookie = 8; } /** @@ -115,6 +116,7 @@ message RemoteReplyProtocol { required bool isActor = 5; required bool isSuccessful = 6; repeated MetadataEntryProtocol metadata = 7; + optional string cookie = 8; } /** diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index e39b83a503..1ddf57869e 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -59,7 +59,13 @@ class RemoteClientException private[akka](message: String, @BeanProperty val cli * @author Jonas Bonér */ object RemoteClient extends Logging { - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None + else Some(cookie) + } + + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] @@ -269,7 +275,7 @@ class RemoteClient private[akka] ( typedActorInfo: Option[Tuple2[String, String]], actorType: ActorType): Option[CompletableFuture[T]] = { send(createRemoteRequestProtocolBuilder( - actorRef, message, isOneWay, senderOption, typedActorInfo, actorType).build, senderFuture) + actorRef, message, isOneWay, senderOption, typedActorInfo, actorType, RemoteClient.SECURE_COOKIE).build, senderFuture) } def send[T]( diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index f3c29c62f9..0d39be263a 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -16,6 +16,7 @@ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorType._ import se.scalablesolutions.akka.config.Config._ +import se.scalablesolutions.akka.config.ConfigurationException import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.serialization.RemoteActorSerialization import se.scalablesolutions.akka.serialization.RemoteActorSerialization._ @@ -61,21 +62,30 @@ import scala.reflect.BeanProperty object RemoteNode extends RemoteServer /** - * For internal use only. - * Holds configuration variables, remote actors, remote typed actors and remote servers. + * For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers. * * @author Jonas Bonér */ -object -RemoteServer { +object RemoteServer { val UUID_PREFIX = "uuid:" - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) + val SECURE_COOKIE: Option[String] = { + val cookie = config.getString("akka.remote.secure-cookie", "") + if (cookie == "") None + else Some(cookie) + } + val REQUIRE_COOKIE = { + val requireCookie = config.getBool("akka.remote.server.require-cookie", true) + if (RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") + requireCookie + } + + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 9999) val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT) - - val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") - val ZLIB_COMPRESSION_LEVEL = { + val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") + val ZLIB_COMPRESSION_LEVEL = { val level = config.getInt("akka.remote.zlib-compression-level", 6) if (level < 1 && level > 9) throw new IllegalArgumentException( "zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed") @@ -128,7 +138,6 @@ RemoteServer { private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { remoteServers.remove(Address(hostname, port)) } - } /** @@ -389,7 +398,7 @@ class RemoteServerPipelineFactory( val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case _ => (join(), join()) } @@ -411,6 +420,7 @@ class RemoteServerHandler( val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { import RemoteServer._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + val CHANNEL_INIT = "channel-init".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -437,6 +447,7 @@ class RemoteServerHandler( } else { server.notifyListeners(RemoteServerClientConnected(server)) } + if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -445,8 +456,7 @@ class RemoteServerHandler( } override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { - if (event.isInstanceOf[ChannelStateEvent] && - event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) } super.handleUpstream(ctx, event) @@ -456,7 +466,9 @@ class RemoteServerHandler( val message = event.getMessage if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) if (message.isInstanceOf[RemoteRequestProtocol]) { - handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) + val requestProtocol = message.asInstanceOf[RemoteRequestProtocol] + authenticateRemoteClient(requestProtocol, ctx) + handleRemoteRequestProtocol(requestProtocol, event.getChannel) } } @@ -491,8 +503,11 @@ class RemoteServerHandler( case RemoteActorSystemMessage.Stop => actorRef.stop case _ => // then match on user defined messages if (request.getIsOneWay) actorRef.!(message)(sender) - else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some( - new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ + else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout( + message, + request.getActorInfo.getTimeout, + None, + Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ override def onComplete(result: AnyRef) { log.debug("Returning result from actor invocation [%s]", result) val replyBuilder = RemoteReplyProtocol.newBuilder @@ -677,4 +692,20 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) replyBuilder.build } + + private def authenticateRemoteClient(request: RemoteRequestProtocol, ctx: ChannelHandlerContext) = { + if (RemoteServer.REQUIRE_COOKIE) { + val attachment = ctx.getAttachment + if ((attachment ne null) && + attachment.isInstanceOf[String] && + attachment.asInstanceOf[String] == CHANNEL_INIT) { + val clientAddress = ctx.getChannel.getRemoteAddress.toString + if (!request.hasCookie) throw new SecurityException( + "The remote client [" + clientAddress + "] does not have a secure cookie.") + if (!(request.getCookie == RemoteServer.SECURE_COOKIE.get)) throw new SecurityException( + "The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie") + log.info("Remote client [%s] successfully authenticated using secure cookie", clientAddress) + } + } + } } diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 922db92ad4..8383607f8c 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.dispatch.MessageInvocation -import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer} +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient, MessageSerializer} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import ActorTypeProtocol._ import se.scalablesolutions.akka.config.Supervision._ @@ -132,7 +132,8 @@ object ActorSerialization { false, actorRef.getSender, None, - ActorType.ScalaActor).build) + ActorType.ScalaActor, + RemoteClient.SECURE_COOKIE).build) requestProtocols.foreach(rp => builder.addMessages(rp)) } @@ -261,7 +262,8 @@ object RemoteActorSerialization { isOneWay: Boolean, senderOption: Option[ActorRef], typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): + actorType: ActorType, + secureCookie: Option[String]): RemoteRequestProtocol.Builder = { import actorRef._ @@ -292,6 +294,8 @@ object RemoteActorSerialization { .setActorInfo(actorInfo) .setIsOneWay(isOneWay) + secureCookie.foreach(requestBuilder.setCookie(_)) + val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid( UuidProtocol.newBuilder diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 0ca834b8d4..5017a18fed 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -7,7 +7,7 @@ akka { version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - + time-unit = "seconds" # Default timeout time unit for all timeout properties throughout the config # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up @@ -109,6 +109,9 @@ akka { } remote { + + secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_secure_cookie.sh' or using 'Crypt.generateSecureCookie' + compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 @@ -133,6 +136,7 @@ akka { hostname = "localhost" # The hostname or IP that clients should connect to port = 9999 # The port clients should connect to connection-timeout = 1 + require-cookie = on } client { diff --git a/scripts/generate_secure_cookie.sh b/scripts/generate_secure_cookie.sh index 12ca10a940..bd3813793e 100755 --- a/scripts/generate_secure_cookie.sh +++ b/scripts/generate_secure_cookie.sh @@ -9,10 +9,10 @@ import java.security.{MessageDigest, SecureRandom} lazy val random = SecureRandom.getInstance("SHA1PRNG") -val buffer = Array.make(32, 0.byteValue) +val buffer = Array.fill(32)(0.byteValue) random.nextBytes(buffer) -val digest = MessageDigest.getInstance("MD5") +val digest = MessageDigest.getInstance("SHA1") digest.update(buffer) val bytes = digest.digest