From 1c804846af4a3e7c3f813fb26293622e29e01b54 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 8 Aug 2011 22:47:59 +0200 Subject: [PATCH 01/10] first stab at release notes for release 1.2 - including commit stats and the script which generated them --- akka-docs/project/release-notes.rst | 119 ++++++++++++++++++++++++++-- scripts/authors.pl | 35 ++++++++ 2 files changed, 148 insertions(+), 6 deletions(-) create mode 100755 scripts/authors.pl diff --git a/akka-docs/project/release-notes.rst b/akka-docs/project/release-notes.rst index 20553a15a8..cf5fed572b 100644 --- a/akka-docs/project/release-notes.rst +++ b/akka-docs/project/release-notes.rst @@ -1,15 +1,122 @@ Release Notes ============== -Changes listed in no particular order. +Release 1.2 +----------- -1.1 ----------------------------------------- +This release, while containing several substantial improvements, focuses on +paving the way for the upcoming 2.0 release. A selection of changes is +presented in the following, for the full list of tickets closed during the +development cycle please refer to +`the issue tracker `_. -- **UPD** - improve FSM DSL: make onTransition syntax nicer (Roland Kuhn) +- **Actor:** -Release 1.1-M1 --------------------- + - pluggable serializers for remote actors (Java, Protobuf, ScalaJSON, JavaJSON) + + - unified :class:`Channel` abstraction for :class:`Promise` & :class:`Actor` + + - reintegrate invocation tracing (to be enabled per class and globally) + + - make last message available during :meth:`preRestart()` + + - experimental :meth:`freshInstance()` life-cycle hook for priming the new instance during restart + + - new textual primitives :meth:`tell` (``!``) and :meth:`ask` (``?``, formerly ``!!!``) + + - timeout for :meth:`ask` Futures taken from implicit argument (currently with fallback to deprecated ``ActorRef.timeout`` + +- **durable mailboxes:** + + - beanstalk, file, mongo, redis + +- **Future:** + + - :meth:`onTimeout` callback + + - select dispatcher for execution by implicit argument + + - add safer cast methods :meth:`as[T]: T` and :meth:`mapTo[T]: Future[T]` + +- **TestKit:** + + - add :class:`TestProbe` (can receive, reply and forward messages, supports all :class:`TestKit` assertions) + + - add :meth:`TestKit.awaitCond` + + - support global time-factor for all timing assertions (for running on busy CI servers) + +- **FSM:** + + - add :class:`TestFSMRef` + + - add :class:`LoggingFSM` (transition tracing, rolling event log) + +- updated dependencies: + + - Jackson 1.8.0 + + - Netty 3.2.5 + + - Protobuf 2.4.1 + + - ScalaTest 1.6.1 + +- various fixes, small improvements and documentation updates + +- several **deprecations** in preparation for 2.0 + + ================================ ===================== + Method Replacement + ================================ ===================== + Actor.preRestart(cause) Actor.preRestart(cause, lastMsg) + ActorRef.sendOneWay ActorRef.tell + ActorRef.sendOneWaySafe ActorRef.tryTell + ActorRef.sendRequestReply ActorRef.ask(...).get() + ActorRef.sendRequestReplyFuture ActorRef.ask(...).get() + ActorRef.replyUnsafe ActorRef.reply + ActorRef.replySafe ActorRef.tryReply + ActorRef.mailboxSize ActorRef.dispatcher.mailboxSize(actorRef) + ActorRef.sender/senderFuture ActorRef.channel + ActorRef.!! ActorRef.?(...).as[T] + ActorRef.!!! ActorRef.? + ActorRef.reply\_? ActorRef.tryReply + Future.receive Future.onResult + Future.collect Future.map + Future.failure Future.recover + MessageDispatcher.pendingFutures MessageDispatcher.tasks + RemoteClientModule.*Listener(s) EventHandler. + TestKit.expectMsg(pf) TestKit.expectMsgPF + TestKit.receiveWhile(pf) TestKit.receiveWhile()(pf) + ================================ ===================== + +Trivia +^^^^^^ + +This release contains changes to 213 files, with 16053 insertions and 3624 +deletions. The authorship of the corresponding commits is distributed as shown +below. + +======= ========== ========= ========= +Commits Insertions Deletions Author +======= ========== ========= ========= + 69 11805 170 Viktor Klang + 34 9694 97 Patrik Nordwall + 72 3563 179 Roland Kuhn + 27 1749 115 Peter Vlugter + 7 238 22 Derek Williams + 4 86 25 Peter Veentjer + 1 17 5 Debasish Ghosh + 2 15 5 Jonas Bonér +======= ========== ========= ========= + +.. note:: + + Release notes of previous releases consisted of ticket or change listings in + no particular order + +Release 1.1 +----------- - **ADD** - #647 Extract an akka-camel-typed module out of akka-camel for optional typed actor support (Martin Krasser) - **ADD** - #654 Allow consumer actors to acknowledge in-only message exchanges (Martin Krasser) diff --git a/scripts/authors.pl b/scripts/authors.pl new file mode 100755 index 0000000000..d1f98ecaad --- /dev/null +++ b/scripts/authors.pl @@ -0,0 +1,35 @@ +#!/usr/bin/perl + +# +# This script can generate commit statistics from 'git log --shortstat -z tag1..tag2' output +# + +use strict; +use warnings; + +local $/ = "\x0"; + +my %auth; +our $commits; +our $insertions; +our $deletions; +our $author; + +while (<>) { + ($author) = /Author: (.*) [0] <=> $auth{$a}->[0] } keys %auth) { + ($commits, $insertions, $deletions) = @{$auth{$author}}; + write; +} + +format STDOUT = +@#### @###### @###### @* +$commits, $insertions, $deletions, $author +. From ccfc6aa66aec77efff6bf13d7ab737626d83c135 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 9 Aug 2011 08:54:15 +0200 Subject: [PATCH 02/10] remove pluggable serializers from release notes, since they are only in 2.0 --- akka-docs/project/release-notes.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/akka-docs/project/release-notes.rst b/akka-docs/project/release-notes.rst index cf5fed572b..20fb33a5cd 100644 --- a/akka-docs/project/release-notes.rst +++ b/akka-docs/project/release-notes.rst @@ -12,8 +12,6 @@ development cycle please refer to - **Actor:** - - pluggable serializers for remote actors (Java, Protobuf, ScalaJSON, JavaJSON) - - unified :class:`Channel` abstraction for :class:`Promise` & :class:`Actor` - reintegrate invocation tracing (to be enabled per class and globally) From f894ae18185446fb0a05e999675b5ec398985da6 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 9 Aug 2011 09:41:49 +0200 Subject: [PATCH 03/10] clarify origin of stats in release notes --- akka-docs/project/release-notes.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/akka-docs/project/release-notes.rst b/akka-docs/project/release-notes.rst index 20fb33a5cd..a80c394194 100644 --- a/akka-docs/project/release-notes.rst +++ b/akka-docs/project/release-notes.rst @@ -93,7 +93,11 @@ Trivia This release contains changes to 213 files, with 16053 insertions and 3624 deletions. The authorship of the corresponding commits is distributed as shown -below. +below; the listing should not be taken too seriously, though, it has just been +done using ``git log --shortstat`` and summing up the numbers, so it certainly +misses details like who originally authored changes which were then back-ported +from the master branch (do not fear, you will be correctly attributed when the +stats for 2.0 are made). ======= ========== ========= ========= Commits Insertions Deletions Author From cdae21e07cf5e848f3a227d9e6315471d6955a20 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 9 Aug 2011 21:37:39 +0200 Subject: [PATCH 04/10] restore behavior of Future.as[T] and .asSilently[T] (fixes #1088) - implement both methods equivalently directly on Future - remove implicit conversion futureToAnyOptionAsTypedOption - update docs accordingly --- .../src/main/scala/akka/dispatch/Future.scala | 31 +++++++++++++++++++ akka-actor/src/main/scala/akka/package.scala | 11 ------- akka-docs/scala/actors.rst | 7 +++-- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 96c8f5e252..11e64e2683 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -377,6 +377,37 @@ sealed trait Future[+T] extends japi.Future[T] { */ def await(atMost: Duration): Future[T] + /** + * Await completion of this Future and return its value if it conforms to A's + * erased type. Will throw a ClassCastException if the value does not + * conform, or any exception the Future was completed with. Will return None + * in case of a timeout. + */ + def as[A](implicit m: Manifest[A]): Option[A] = { + try await catch { case _: FutureTimeoutException ⇒ } + value match { + case None ⇒ None + case Some(Left(ex)) ⇒ throw ex + case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + } + } + + /** + * Await completion of this Future and return its value if it conforms to A's + * erased type, None otherwise. Will throw any exception the Future was + * completed with. Will return None in case of a timeout. + */ + def asSilently[A](implicit m: Manifest[A]): Option[A] = { + try await catch { case _: FutureTimeoutException ⇒ } + value match { + case None ⇒ None + case Some(Left(ex)) ⇒ throw ex + case Some(Right(v)) ⇒ + try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + catch { case _: ClassCastException ⇒ None } + } + } + /** * Tests whether this Future has been completed. */ diff --git a/akka-actor/src/main/scala/akka/package.scala b/akka-actor/src/main/scala/akka/package.scala index e96a1eddbf..b122d9297b 100644 --- a/akka-actor/src/main/scala/akka/package.scala +++ b/akka-actor/src/main/scala/akka/package.scala @@ -12,17 +12,6 @@ package object akka { */ implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption) - /** - * Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method as[T] - * to convert an Option[Any] to an Option[T]. - * This means that the following code is equivalent: - * (actor ? "foo").as[Int] (Recommended) - */ - implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({ - try { anyFuture.await } catch { case t: FutureTimeoutException ⇒ } - anyFuture.resultOrException - }) - private[akka] class AnyOptionAsTypedOption(anyOption: Option[Any]) { /** * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 69ebc4edbc..068575a04a 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -287,8 +287,11 @@ processing another message on this actor). For this purpose, there is the method :meth:`Future.as[T]` which waits until either the future is completed or its timeout expires, whichever comes first. The result is then inspected and returned as :class:`Some[T]` if it was -normally completed and the answer’s runtime type matches the desired type; in -all other cases :class:`None` is returned. +normally completed and the answer’s runtime type matches the desired type; if +the future contains an exception or the value cannot be cast to the desired +type, it will throw the exception or a :class:`ClassCastException` (if you want +to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In +case of a timeout, :obj:`None` is returned. .. code-block:: scala From c9e8e4503e9b7ea392852ecdc0c01464d0952713 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 9 Aug 2011 22:33:44 +0200 Subject: [PATCH 05/10] remove now-unused "package object akka" it contained only AnyOptionAsTypedOption and corresponding implicit conversions, which were used in conjunction with ActorRef.!! (gone) and Future[Any] (has .as[T] and .asSilently[T] now) --- akka-actor/src/main/scala/akka/package.scala | 28 -------------------- 1 file changed, 28 deletions(-) delete mode 100644 akka-actor/src/main/scala/akka/package.scala diff --git a/akka-actor/src/main/scala/akka/package.scala b/akka-actor/src/main/scala/akka/package.scala deleted file mode 100644 index b122d9297b..0000000000 --- a/akka-actor/src/main/scala/akka/package.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -import akka.dispatch.{ FutureTimeoutException, Future } -import akka.util.Helpers.{ narrow, narrowSilently } - -package object akka { - /** - * Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method as[T] - * to convert an Option[Any] to an Option[T]. - */ - implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption) - - private[akka] class AnyOptionAsTypedOption(anyOption: Option[Any]) { - /** - * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException - * if the actual type is not assignable from the given one. - */ - def as[T]: Option[T] = narrow[T](anyOption) - - /** - * Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible - * ClassCastException and return None in that case. - */ - def asSilently[T: Manifest]: Option[T] = narrowSilently[T](anyOption) - } -} From 817634cbea288a3fe2add94507fcc43b7b9a4949 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sun, 14 Aug 2011 10:12:04 -0600 Subject: [PATCH 06/10] IO: Include cause of closed socket --- akka-actor/src/main/scala/akka/actor/IO.scala | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 9b0f53180f..8e3a79404f 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -5,6 +5,7 @@ package akka.actor import akka.util.ByteString import akka.dispatch.MessageInvocation +import akka.event.EventHandler import java.net.InetSocketAddress import java.io.IOException @@ -90,7 +91,7 @@ object IO { case class Connect(socket: SocketHandle, address: InetSocketAddress) extends IOMessage case class Connected(socket: SocketHandle) extends IOMessage case class Close(handle: Handle) extends IOMessage - case class Closed(handle: Handle) extends IOMessage + case class Closed(handle: Handle, cause: Option[Exception]) extends IOMessage case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage @@ -168,7 +169,7 @@ trait IO { case Connected(socket) ⇒ state(socket).connected = true run() - case msg @ Closed(handle) ⇒ + case msg @ Closed(handle, _) ⇒ _state -= handle // TODO: clean up better if (_receiveIO.isDefinedAt(msg)) { _next = reset { _receiveIO(msg); Idle } @@ -322,7 +323,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { private val buffer = ByteBuffer.allocate(bufferSize) - private val thread = new Thread() { + private val thread = new Thread("io-worker") { override def run(): Unit = { while (selector.isOpen) { selector select () @@ -349,7 +350,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { writes += (handle -> queue.enqueue(data)) } case Close(handle) ⇒ - cleanup(handle) + cleanup(handle, None) case Shutdown ⇒ channels.values foreach (_.close) selector.close @@ -375,17 +376,18 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { try { write(handle.asWritable, channel) } catch { - case e: IOException ⇒ // ignore, let it fail on read to ensure - // nothing left in read buffer. + case e: IOException ⇒ + // ignore, let it fail on read to ensure nothing left in read buffer. } } } catch { - case e: CancelledKeyException ⇒ cleanup(handle) - case e: IOException ⇒ cleanup(handle) + case e: CancelledKeyException ⇒ cleanup(handle, Some(e)) + case e: IOException ⇒ cleanup(handle, Some(e)) + case e: ActorInitializationException ⇒ cleanup(handle, Some(e)) } } - private def cleanup(handle: IO.Handle): Unit = { + private def cleanup(handle: IO.Handle, cause: Option[Exception]): Unit = { handle match { case server: IO.ServerHandle ⇒ accepted -= server case writable: IO.WriteHandle ⇒ writes -= writable @@ -394,7 +396,12 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { case Some(channel) ⇒ channel.close channels -= handle - handle.owner ! IO.Closed(handle) + try { + handle.owner ! IO.Closed(handle, cause) + } catch { + case e: ActorInitializationException ⇒ + EventHandler debug (this, "IO.Handle's owner not running") + } case None ⇒ } } @@ -415,9 +422,12 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { } private def connect(socket: IO.SocketHandle, channel: SocketChannel): Unit = { - channel.finishConnect - removeOps(socket, OP_CONNECT) - socket.owner ! IO.Connected(socket) + if (channel.finishConnect) { + removeOps(socket, OP_CONNECT) + socket.owner ! IO.Connected(socket) + } else { + cleanup(socket, None) // TODO: Add a cause + } } @tailrec @@ -436,7 +446,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { buffer.clear val readLen = channel read buffer if (readLen == -1) { - cleanup(handle) + cleanup(handle, None) // TODO: Add a cause } else if (readLen > 0) { buffer.flip handle.owner ! IO.Read(handle, ByteString(buffer)) From 18b5033a5349474a67b7c07525f7af8e66c4b613 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 15 Aug 2011 00:17:12 +0200 Subject: [PATCH 07/10] fix cancellation of FSM state timeout by named timer messages, fixes #1108 - actually cancel state timeout timer - increment generation timer in case it had already fired --- .../akka/actor/actor/FSMTimingSpec.scala | 31 +++++++++++++++++++ .../src/main/scala/akka/actor/FSM.scala | 9 ++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index d0656a70fb..dc8f6a570a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -65,6 +65,20 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit { expectMsg(Transition(fsm, TestCancelTimer, Initial)) } + "not get confused between named and state timers" in { + fsm ! TestCancelStateTimerInNamedTimerMessage + fsm ! Tick + expectMsg(100 millis, Tick) + Thread.sleep(200) + fsm.dispatcher resume fsm + expectMsg(100 millis, Transition(fsm, TestCancelStateTimerInNamedTimerMessage, TestCancelStateTimerInNamedTimerMessage2)) + fsm ! Cancel + within(100 millis) { + expectMsg(Cancel) + expectMsg(Transition(fsm, TestCancelStateTimerInNamedTimerMessage2, Initial)) + } + } + "receive and cancel a repeated timer" in { fsm ! TestRepeatedTimer val seq = receiveWhile(600 millis) { @@ -113,6 +127,8 @@ object FSMTimingSpec { case object TestRepeatedTimer extends State case object TestUnhandled extends State case object TestCancelTimer extends State + case object TestCancelStateTimerInNamedTimerMessage extends State + case object TestCancelStateTimerInNamedTimerMessage2 extends State case object Tick case object Tock @@ -170,6 +186,21 @@ object FSMTimingSpec { stay using (remaining - 1) } } + when(TestCancelStateTimerInNamedTimerMessage) { + // FSM is suspended after processing this message and resumed 200ms later + case Ev(Tick) ⇒ + self.dispatcher suspend self + setTimer("named", Tock, 10 millis, false) + stay forMax (100 millis) replying Tick + case Ev(Tock) ⇒ + goto(TestCancelStateTimerInNamedTimerMessage2) + } + when(TestCancelStateTimerInNamedTimerMessage2) { + case Ev(StateTimeout) ⇒ + goto(Initial) + case Ev(Cancel) ⇒ + goto(Initial) replying Cancel + } when(TestUnhandled) { case Ev(SetHandler) ⇒ whenUnhandled { diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 78d03fe1e7..0f1e00453f 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -454,8 +454,13 @@ trait FSM[S, D] extends ListenerManagement { if (generation == gen) { processMsg(StateTimeout, "state timeout") } - case t @ Timer(name, msg, repeat, generation) ⇒ - if ((timers contains name) && (timers(name).generation == generation)) { + case t @ Timer(name, msg, repeat, gen) ⇒ + if ((timers contains name) && (timers(name).generation == gen)) { + if (timeoutFuture.isDefined) { + timeoutFuture.get.cancel(true) + timeoutFuture = None + } + generation += 1 processMsg(msg, t) if (!repeat) { timers -= name From 48ee9deb3a129003f8b22d3d0baf4e3be2837bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 15 Aug 2011 10:42:09 +0200 Subject: [PATCH 08/10] Split up the TransactionLogSpec into two tests: AsynchronousTransactionLogSpec and SynchronousTransactionLogSpec, also did various minor edits and comments. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 21 +- .../remote/netty/NettyRemoteSupport.scala | 3 +- .../ClusterActorRefCleanupMultiJvmSpec.scala | 2 +- ...a => AsynchronousTransactionLogSpec.scala} | 166 +-------------- .../SynchronousTransactionLogSpec.scala | 190 ++++++++++++++++++ 5 files changed, 214 insertions(+), 168 deletions(-) rename akka-cluster/src/test/scala/akka/cluster/{TransactionLogSpec.scala => AsynchronousTransactionLogSpec.scala} (55%) create mode 100644 akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 6a39806a31..d28f4e91e4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -403,13 +403,6 @@ class DefaultClusterNode private[akka] ( def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]] - private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = { - val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]] - if (includeRefNodeInReplicaSet) - conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor - conns - } - // zookeeper listeners private val stateListener = new StateListener(this) private val membershipListener = new MembershipChildListener(this) @@ -420,6 +413,17 @@ class DefaultClusterNode private[akka] ( // Address -> ClusterActorRef private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef] + // ============================================================================================================ + // ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY ============= + // ============================================================================================================ + + lazy private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = { + val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]] + if (includeRefNodeInReplicaSet) + conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor + conns + } + // ZooKeeper client lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer) @@ -441,6 +445,8 @@ class DefaultClusterNode private[akka] ( LEADER_ELECTION_PATH, null, leaderElectionCallback) + // ============================================================================================================ + if (enableJMX) createMBean // ======================================= @@ -476,6 +482,7 @@ class DefaultClusterNode private[akka] ( } def shutdown() { + def shutdownNode() { ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b5ebf69062..8007ef012c 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -407,7 +407,7 @@ class ActiveRemoteClient private[akka] ( //Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown() = runSwitch switchOff { - EventHandler.info(this, "Shutting down [%s]".format(name)) + EventHandler.info(this, "Shutting down remote client [%s]".format(name)) notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop() @@ -655,6 +655,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, serverModule.notifyListeners(RemoteServerStarted(serverModule)) def shutdown() { + EventHandler.info(this, "Shutting down remote server [%s]".format(name)) try { val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala index 7958bbd38e..e8ed5f2992 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala @@ -78,7 +78,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode { case e: RoutingException ⇒ } - //since the call to the node failed, the node must have been removed from the list. + //since the call to the node failed, the node must have been removed from the list. clusteredRef.connectionsSize must be(1) //send a message to this node, diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala similarity index 55% rename from akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala rename to akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala index 47d531f23d..11310ec106 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala @@ -8,170 +8,14 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll +import akka.actor._ + import com.eaio.uuid.UUID -class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { +class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { private var bookKeeper: BookKeeper = _ private var localBookKeeper: LocalBookKeeper = _ - "A synchronous used Transaction Log" should { - - "be able to be deleted - synchronous" in { - val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog.recordEntry(entry) - - txlog.delete() - txlog.close() - - val zkClient = TransactionLog.zkClient - assert(zkClient.readData(txlog.snapshotPath, true) == null) - assert(zkClient.readData(txlog.txLogPath, true) == null) - } - - "fail to be opened if non existing - synchronous" in { - val uuid = (new UUID).toString - intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) - } - - "be able to be checked for existence - synchronous" in { - val uuid = (new UUID).toString - TransactionLog.exists(uuid) must be(false) - - TransactionLog.newLogFor(uuid, false, null) - TransactionLog.exists(uuid) must be(true) - } - - "be able to record entries - synchronous" in { - val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog.recordEntry(entry) - } - - "be able to overweite an existing txlog if one already exists - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txLog2 = TransactionLog.newLogFor(uuid, false, null) - txLog2.latestSnapshotId.isDefined must be(false) - txLog2.latestEntryId must be(-1) - } - - "be able to record and delete entries - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.delete - txlog1.close - intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) - } - - "be able to record entries and read entries with 'entriesInRange' - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(2) - entries(0) must equal("hello") - entries(1) must equal("hello") - txlog2.close - } - - "be able to record entries and read entries with 'entries' - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - // txlog1.close // should work without txlog.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(4) - entries(0) must equal("hello") - entries(1) must equal("hello") - entries(2) must equal("hello") - entries(3) must equal("hello") - txlog2.close - } - - "be able to record a snapshot - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val snapshot = "snapshot".getBytes("UTF-8") - txlog1.recordSnapshot(snapshot) - txlog1.close - } - - "be able to record and read a snapshot and following entries - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - val snapshot = "snapshot".getBytes("UTF-8") - txlog1.recordSnapshot(snapshot) - - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") - - val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(4) - entries(0) must equal("hello") - entries(1) must equal("hello") - entries(2) must equal("hello") - entries(3) must equal("hello") - txlog2.close - } - - "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { - val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, false, null) - - val entry = "hello".getBytes("UTF-8") - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - - val snapshot = "snapshot".getBytes("UTF-8") - txlog1.recordSnapshot(snapshot) - - txlog1.recordEntry(entry) - txlog1.recordEntry(entry) - txlog1.close - - val txlog2 = TransactionLog.logFor(uuid, false, null) - val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") - - val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) - entries.size must equal(2) - entries(0) must equal("hello") - entries(1) must equal("hello") - txlog2.close - } - } - "An asynchronous Transaction Log" should { "be able to record entries - asynchronous" in { val uuid = (new UUID).toString @@ -373,7 +217,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA } override def afterAll() = { + Cluster.node.shutdown() + Cluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() + Actor.registry.local.shutdownAll() + Scheduler.shutdown() } } diff --git a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala new file mode 100644 index 0000000000..ec6fa1d92f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala @@ -0,0 +1,190 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.cluster + +import org.apache.bookkeeper.client.BookKeeper +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +import akka.actor._ + +import com.eaio.uuid.UUID + +class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { + private var bookKeeper: BookKeeper = _ + private var localBookKeeper: LocalBookKeeper = _ + + "A synchronous used Transaction Log" should { + + "be able to be deleted - synchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + + txlog.delete() + txlog.close() + + val zkClient = TransactionLog.zkClient + assert(zkClient.readData(txlog.snapshotPath, true) == null) + assert(zkClient.readData(txlog.txLogPath, true) == null) + } + + "fail to be opened if non existing - synchronous" in { + val uuid = (new UUID).toString + intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + } + + "be able to be checked for existence - synchronous" in { + val uuid = (new UUID).toString + TransactionLog.exists(uuid) must be(false) + + TransactionLog.newLogFor(uuid, false, null) + TransactionLog.exists(uuid) must be(true) + } + + "be able to record entries - synchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + } + + "be able to overweite an existing txlog if one already exists - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txLog2 = TransactionLog.newLogFor(uuid, false, null) + txLog2.latestSnapshotId.isDefined must be(false) + txLog2.latestEntryId must be(-1) + } + + "be able to record and delete entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.delete + txlog1.close + // intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + } + + "be able to record entries and read entries with 'entriesInRange' - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + txlog2.close + } + + "be able to record entries and read entries with 'entries' - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close // should work without txlog.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + txlog2.close + } + + "be able to record a snapshot - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + txlog1.close + } + + "be able to record and read a snapshot and following entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) + entries(0) must equal("hello") + entries(1) must equal("hello") + entries(2) must equal("hello") + entries(3) must equal("hello") + txlog2.close + } + + "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + + val snapshot = "snapshot".getBytes("UTF-8") + txlog1.recordSnapshot(snapshot) + + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) + entries(0) must equal("hello") + entries(1) must equal("hello") + txlog2.close + } + } + + override def beforeAll() = { + Cluster.startLocalCluster() + LocalBookKeeperEnsemble.start() + } + + override def afterAll() = { + Cluster.node.shutdown() + Cluster.shutdownLocalCluster() + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + Actor.registry.local.shutdownAll() + Scheduler.shutdown() + } +} From 5138fa97647d89c5dfe6764b9db33833576df925 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 15 Aug 2011 11:15:56 +0200 Subject: [PATCH 09/10] Removing global dispatcher --- .../test/scala/akka/config/ConfigSpec.scala | 6 ++-- .../scala/akka/dispatch/DispatchersSpec.scala | 3 +- .../scala/akka/dispatch/Dispatchers.scala | 31 +++++++------------ config/akka-reference.conf | 12 +++---- 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index e7234509d7..3aa42858c8 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -20,10 +20,10 @@ class ConfigSpec extends WordSpec with MustMatchers { getString("akka.time-unit") must equal(Some("seconds")) getString("akka.version") must equal(Some("2.0-SNAPSHOT")) - getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalDispatcher")) + getString("akka.actor.default-dispatcher.type") must equal(Some("Dispatcher")) getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60)) - getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(1.0)) - getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(4.0)) + getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0)) + getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0)) getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1)) getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true)) getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs")) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala index 4ef2fc08f7..4a028b0478 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala @@ -28,8 +28,7 @@ object DispatchersSpec { def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map( "BalancingDispatcher" -> ofType[BalancingDispatcher], - "Dispatcher" -> ofType[Dispatcher], - "GlobalDispatcher" -> instance(globalDispatcher)) + "Dispatcher" -> ofType[Dispatcher]) def validTypes = typesAndValidators.keys.toList diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 623490a88e..b818652a84 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -56,11 +56,8 @@ object Dispatchers { val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox() - lazy val defaultGlobalDispatcher = { - config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalDispatcher) - } - - object globalDispatcher extends Dispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) + lazy val defaultGlobalDispatcher = + config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MAILBOX_TYPE).build /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -171,11 +168,11 @@ object Dispatchers { * Creates of obtains a dispatcher from a ConfigMap according to the format below * * default-dispatcher { - * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - * # GlobalExecutorBasedEventDriven + * type = "Dispatcher" # Must be one of the following + * # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor - * keep-alive-time = 60 # Keep alive time for threads + * name = "MyDispatcher" # Optional, will be a generated UUID if omitted + * keep-alive-time = 60 # Keep alive time for threads in akka.time-unit * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded @@ -188,18 +185,18 @@ object Dispatchers { * Gotcha: Only configures the dispatcher if possible * Returns: None if "type" isn't specified in the config * Throws: IllegalArgumentException if the value of "type" is not valid - * IllegalArgumentException if it cannot + * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator */ def from(cfg: Configuration): Option[MessageDispatcher] = { - cfg.getString("type") map { - case "Dispatcher" ⇒ new DispatcherConfigurator() - case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator() - case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator + cfg.getString("type") flatMap { + case "Dispatcher" ⇒ Some(new DispatcherConfigurator()) + case "BalancingDispatcher" ⇒ Some(new BalancingDispatcherConfigurator()) + case "GlobalDispatcher" ⇒ None //TODO FIXME remove this case fqn ⇒ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { case Right(clazz) ⇒ ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match { - case Right(configurator) ⇒ configurator + case Right(configurator) ⇒ Some(configurator) case Left(exception) ⇒ throw new IllegalArgumentException( "Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception) @@ -213,10 +210,6 @@ object Dispatchers { } } -object GlobalDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = Dispatchers.globalDispatcher -} - class DispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = { configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher( diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 5adfb2112a..fff25dd5c1 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -81,13 +81,13 @@ akka { } default-dispatcher { - type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable - # - Dispatcher - # - BalancingDispatcher - # - GlobalDispatcher + type = "Dispatcher" # Must be one of the following + # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), + # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor + name = "MyDispatcher" # Optional, will be a generated UUID if omitted keep-alive-time = 60 # Keep alive time for threads - core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) - max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) + core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) + max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded allow-core-timeout = on # Allow core threads to time out rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard From ce2df3b05e3003b7543e57e7120ed410a807c67c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 15 Aug 2011 17:19:22 +0200 Subject: [PATCH 10/10] Making TypedActor use actorOf(props) --- .../akka/actor/actor/TypedActorSpec.scala | 22 ++-- .../src/main/scala/akka/actor/Actor.scala | 21 ++++ .../src/main/scala/akka/actor/ActorRef.scala | 58 +++++---- .../main/scala/akka/actor/ActorRegistry.scala | 7 +- .../main/scala/akka/actor/TypedActor.scala | 119 +++++++++--------- .../akka/camel/TypedConsumerJavaTestBase.java | 6 +- .../TypedConsumerPublishRequestorTest.scala | 9 +- .../akka/camel/TypedConsumerScalaTest.scala | 5 +- .../TypedActorComponentFeatureTest.scala | 7 +- .../src/main/scala/sample/camel/Boot.scala | 4 +- .../sample/camel/ServerApplication.scala | 5 +- .../sample/camel/StandaloneApplication.scala | 7 +- 12 files changed, 149 insertions(+), 121 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 66f9a09f71..4906b225c3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -144,14 +144,14 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach def newFooBar: Foo = newFooBar(Duration(2, "s")) - def newFooBar(timeout: Duration): Foo = - newFooBar(Configuration(timeout)) + def newFooBar(d: Duration): Foo = + newFooBar(Props().withTimeout(Timeout(d))) - def newFooBar(config: Configuration): Foo = - typedActorOf(classOf[Foo], classOf[Bar], config) + def newFooBar(props: Props): Foo = + typedActorOf(classOf[Foo], classOf[Bar], props) - def newStacked(config: Configuration = Configuration(Duration(2, "s"))): Stacked = - typedActorOf(classOf[Stacked], classOf[StackedImpl], config) + def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked = + typedActorOf(classOf[Stacked], classOf[StackedImpl], props) def mustStop(typedActor: AnyRef) = stop(typedActor) must be(true) @@ -295,7 +295,7 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach } "be able to support implementation only typed actors" in { - val t = typedActorOf[Foo, Bar](Configuration()) + val t = typedActorOf[Foo, Bar](Props()) val f = t.futurePigdog(200) val f2 = t.futurePigdog(0) f2.isCompleted must be(false) @@ -312,15 +312,15 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach } "be able to use work-stealing dispatcher" in { - val config = Configuration( - Duration(6600, "ms"), - Dispatchers.newBalancingDispatcher("pooled-dispatcher") + val props = Props( + timeout = Timeout(6600), + dispatcher = Dispatchers.newBalancingDispatcher("pooled-dispatcher") .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(60) .setMaxPoolSize(60) .build) - val thais = for (i ← 1 to 60) yield newFooBar(config) + val thais = for (i ← 1 to 60) yield newFooBar(props) val iterator = new CyclicIterator(thais) val results = for (i ← 1 to 120) yield (i, iterator.next.futurePigdog(200L, i)) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b3459ca925..95dd8a18b9 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -371,6 +371,27 @@ object Actor { createActor(address, () ⇒ new LocalActorRef(() ⇒ creator.create, address)) } + //TODO FIXME + def actorOf(props: Props): ActorRef = { + //TODO Implement support for configuring by deployment ID etc + //TODO If localOnly = true, never use the config file deployment and always create a new actor + //TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor + //TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?) + + val address = props.deployId match { //TODO handle deployId separately from address? + case "" | null ⇒ newUuid().toString + case other ⇒ other + } + val newActor = new LocalActorRef(props.creator, address) + newActor.dispatcher = props.dispatcher + newActor.faultHandler = props.faultHandler + newActor.lifeCycle = props.lifeCycle + newActor.timeout = props.timeout.duration.toMillis + newActor.receiveTimeout = props.receiveTimeout.map(_.toMillis) + props.supervisor.foreach(newActor.link(_)) + newActor.start + } + def localActorOf[T <: Actor: Manifest]: ActorRef = { newLocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index fb57fd0275..258d2d17d1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -47,12 +47,19 @@ private[akka] object ActorRefInternals { * TODO document me */ object Props { - val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis")) - def defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher - val noCreatorSpecified: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actorFactoryProvided!") + object Default { + val creator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") + val deployId: String = "" + val dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher + val timeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis")) + val receiveTimeout: Option[Duration] = None + val lifeCycle: LifeCycle = Permanent + val faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy + val supervisor: Option[ActorRef] = None + val localOnly: Boolean = false + } - val default = new Props(creator = noCreatorSpecified) - def apply(): Props = default + val default = new Props() def apply[T <: Actor: Manifest]: Props = default.withCreator(() ⇒ implicitly[Manifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance) @@ -64,15 +71,26 @@ object Props { /** * ActorRef configuration object, this is thread safe and fully sharable */ -case class Props(creator: () ⇒ Actor, - deployId: String = "", - dispatcher: MessageDispatcher = Props.defaultDispatcher, - timeout: Timeout = Props.defaultTimeout, - receiveTimeout: Option[Duration] = None, - lifeCycle: LifeCycle = Permanent, - faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy, - supervisor: Option[ActorRef] = None, - localOnly: Boolean = false) { +case class Props(creator: () ⇒ Actor = Props.Default.creator, + deployId: String = Props.Default.deployId, + dispatcher: MessageDispatcher = Props.Default.dispatcher, + timeout: Timeout = Props.Default.timeout, + receiveTimeout: Option[Duration] = Props.Default.receiveTimeout, + lifeCycle: LifeCycle = Props.Default.lifeCycle, + faultHandler: FaultHandlingStrategy = Props.Default.faultHandler, + supervisor: Option[ActorRef] = Props.Default.supervisor, + localOnly: Boolean = Props.Default.localOnly) { + + def this() = this( + creator = Props.Default.creator, + deployId = Props.Default.deployId, + dispatcher = Props.Default.dispatcher, + timeout = Props.Default.timeout, + receiveTimeout = Props.Default.receiveTimeout, + lifeCycle = Props.Default.lifeCycle, + faultHandler = Props.Default.faultHandler, + supervisor = Props.Default.supervisor, + localOnly = Props.Default.localOnly) /** * Returns a new Props with the specified creator set * Scala API @@ -581,13 +599,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, new AtomicReference[Actor](newActor) } - def serializerErrorDueTo(reason: String) = - throw new akka.config.ConfigurationException( - "Could not create Serializer object [" + this.getClass.getName + - "]") - - private val serializer: Serializer = - try { Serialization.serializerFor(this.getClass) } catch { case e: Exception ⇒ serializerErrorDueTo(e.toString) } + private def serializer: Serializer = //TODO Is this used or needed? + try { Serialization.serializerFor(this.getClass) } catch { + case e: Exception ⇒ throw new akka.config.ConfigurationException( + "Could not create Serializer object for [" + this.getClass.getName + "]") + } private lazy val replicationStorage: Option[TransactionLog] = { import DeploymentConfig._ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 314b2a5076..8766b28cf8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -61,10 +61,11 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag notifyListeners(ActorRegistered(address, actor, Option(typedActorsByUuid get actor.uuid))) } - private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef) { + private[akka] def registerTypedActor(actorRef: ActorRef, interface: AnyRef): Unit = typedActorsByUuid.put(actorRef.uuid, interface) - actorRef.start // register actorRef - } + + private[akka] def unregisterTypedActor(actorRef: ActorRef, interface: AnyRef): Unit = + typedActorsByUuid.remove(actorRef.uuid, interface) /** * Unregisters an actor in the ActorRegistry. diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 54076886f2..88c377b7a2 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -6,11 +6,11 @@ package akka.actor import akka.japi.{ Creator, Option ⇒ JOption } import akka.actor.Actor._ -import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutException } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Duration } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serializer, Serialization } +import akka.dispatch._ //TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala /** @@ -55,15 +55,6 @@ object TypedActor { case some ⇒ some } - @deprecated("This should be replaced with the same immutable configuration that will be used for ActorRef.actorOf", "!!!") - object Configuration { //TODO: Replace this with the new ActorConfiguration when it exists - val defaultTimeout = Duration(Actor.TIMEOUT, "millis") - val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher) - def apply(): Configuration = defaultConfiguration - } - @deprecated("This should be replaced with the same immutable configuration that will be used for ActorRef.actorOf", "!!!") - case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) - /** * This class represents a Method call, and has a reference to the Method to be called and the parameters to supply * It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized @@ -122,46 +113,46 @@ object TypedActor { } /** - * Creates a new TypedActor proxy using the supplied configuration, + * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R = - createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = + createProxyAndTypedActor(interface, impl.newInstance, props, interface.getClassLoader) /** - * Creates a new TypedActor proxy using the supplied configuration, + * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R = - createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = + createProxyAndTypedActor(interface, impl.create, props, interface.getClassLoader) - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R = - createProxyAndTypedActor(interface, impl.newInstance, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.newInstance, props, loader) /** - * Creates a new TypedActor proxy using the supplied configuration, + * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * all interfaces (Class.getInterfaces) if it's not an interface class */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R = - createProxyAndTypedActor(interface, impl.create, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.create, props, loader) /** - * Creates a new TypedActor proxy using the supplied configuration, + * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ - def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R = - createProxyAndTypedActor(impl, impl.newInstance, config, loader) + def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = + createProxyAndTypedActor(impl, impl.newInstance, props, loader) /** - * Creates a new TypedActor proxy using the supplied configuration, + * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ - def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { + def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { val clazz = m.erasure.asInstanceOf[Class[T]] - createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) + createProxyAndTypedActor(clazz, clazz.newInstance, props, if (loader eq null) clazz.getClassLoader else loader) } /** @@ -186,25 +177,25 @@ object TypedActor { def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null /** - * Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = - createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = + createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, if (loader eq null) m.erasure.getClassLoader else loader) /** - * Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R = - createProxy(interfaces, (ref: AtomVar[R]) ⇒ constructor.create, config, loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = + createProxy(interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, loader) /** - * Creates a proxy given the supplied configuration, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = - createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, config, loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = + createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, props, loader) /* Internal API */ @@ -219,44 +210,51 @@ object TypedActor { } else null - private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomVar[R] - configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader) + private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, loader: ClassLoader): R = { + val proxyVar = new AtomVar[R] + configureAndProxyLocalActorRef[R](interfaces, proxyVar, props.withCreator(() ⇒ constructor(proxyVar)), loader) } - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = - createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, props: Props, loader: ClassLoader): R = + createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, loader) - private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: ⇒ Actor, config: Configuration, loader: ClassLoader): T = { - - val ref = actorOf(actor) - - ref.timeout = config.timeout.toMillis - ref.dispatcher = config.dispatcher - - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T] - proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(ref, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop - proxy + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, loader: ClassLoader): T = { + //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling + val actorVar = new AtomVar[ActorRef](null) + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)).asInstanceOf[T] + proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + val ref = actorOf(props) + actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet + proxyVar.get } private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces - private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomVar[R], createInstance: ⇒ T) extends Actor { + private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor { + + override def preStart = Actor.registry.registerTypedActor(self, proxyVar.get) //Make sure actor registry knows about this actor + override def postStop = Actor.registry.unregisterTypedActor(self, proxyVar.get) + val me = createInstance def receive = { case m: MethodCall ⇒ - selfReference set proxyRef.get + selfReference set proxyVar.get try { if (m.isOneWay) m(me) - else if (m.returnsFuture_?) self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] - else self reply m(me) + else if (m.returnsFuture_?) { + self.channel match { + case p: ActorPromise ⇒ p completeWith m(me).asInstanceOf[Future[Any]] + case _ ⇒ throw new IllegalStateException("Future-returning TypedActor didn't use ?/ask so cannot reply") + } + } else self reply m(me) } finally { selfReference set null } } } - private[akka] case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { + private[akka] class TypedActorInvocationHandler(actorVar: AtomVar[ActorRef]) extends InvocationHandler { + def actor = actorVar.get + def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean @@ -264,11 +262,8 @@ object TypedActor { case _ ⇒ implicit val timeout = Timeout(actor.timeout) MethodCall(method, args) match { - case m if m.isOneWay ⇒ - actor ! m - null - case m if m.returnsFuture_? ⇒ - actor ? m + case m if m.isOneWay ⇒ actor ! m; null //Null return value + case m if m.returnsFuture_? ⇒ actor ? m case m if m.returnsJOption_? || m.returnsOption_? ⇒ val f = actor ? m try { f.await } catch { case _: FutureTimeoutException ⇒ } diff --git a/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java b/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java index 64aa29ed54..232ef0d2df 100644 --- a/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java +++ b/akka-camel-typed/src/test/java/akka/camel/TypedConsumerJavaTestBase.java @@ -2,7 +2,8 @@ package akka.camel; import akka.actor.Actor; import akka.actor.TypedActor; -import akka.actor.TypedActor.Configuration; +import akka.actor.Props; +import akka.actor.Timeout; import akka.dispatch.Dispatchers; import akka.japi.SideEffect; import akka.util.FiniteDuration; @@ -42,8 +43,7 @@ public class TypedConsumerJavaTestBase { consumer = TypedActor.typedActorOf( SampleErrorHandlingTypedConsumer.class, SampleErrorHandlingTypedConsumerImpl.class, - new Configuration(new FiniteDuration(5000, "millis"), Dispatchers.defaultGlobalDispatcher() - )); + (new Props()).withTimeout(new Timeout(new FiniteDuration(5000, "millis")))); } }); String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class); diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala index 03c727834b..8d1669f9d8 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerPublishRequestorTest.scala @@ -7,7 +7,6 @@ import org.scalatest.junit.JUnitSuite import akka.actor._ import akka.actor.Actor._ -import akka.actor.TypedActor.Configuration._ import akka.camel.TypedCamelTestSupport.{ SetExpectedMessageCount ⇒ SetExpectedTestMessageCount, _ } class TypedConsumerPublishRequestorTest extends JUnitSuite { @@ -41,7 +40,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { def shouldReceiveOneConsumerMethodRegisteredEvent = { Actor.registry.addListener(requestor) val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get - val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration) + val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props()) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val event = (publisher ? GetRetainedMessage).as[ConsumerMethodRegistered].get assert(event.endpointUri === "direct:foo") @@ -51,7 +50,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveOneConsumerMethodUnregisteredEvent = { - val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], defaultConfiguration) + val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props()) val latch = (publisher ? SetExpectedTestMessageCount(1)).as[CountDownLatch].get Actor.registry.addListener(requestor) TypedActor.stop(obj) @@ -66,7 +65,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { def shouldReceiveThreeConsumerMethodRegisteredEvents = { Actor.registry.addListener(requestor) val latch = (publisher ? SetExpectedTestMessageCount(3)).as[CountDownLatch].get - val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration) + val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props()) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodRegistered]) val events = (publisher ? request).as[List[ConsumerMethodRegistered]].get @@ -75,7 +74,7 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite { @Test def shouldReceiveThreeConsumerMethodUnregisteredEvents = { - val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration) + val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props()) val latch = (publisher ? SetExpectedTestMessageCount(3)).as[CountDownLatch].get Actor.registry.addListener(requestor) TypedActor.stop(obj) diff --git a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala index 1692c8e6fc..20703bdffe 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/TypedConsumerScalaTest.scala @@ -7,7 +7,6 @@ import org.scalatest.matchers.MustMatchers import akka.actor.Actor._ import akka.actor._ -import akka.actor.TypedActor.Configuration._ /** * @author Martin Krasser @@ -33,7 +32,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa "started" must { "support in-out message exchanges via its endpoints" in { service.awaitEndpointActivation(3) { - actor = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], defaultConfiguration) + actor = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props()) } must be(true) mandatoryTemplate.requestBodyAndHeader("direct:m2", "x", "test", "y") must equal("m2: x y") mandatoryTemplate.requestBodyAndHeader("direct:m3", "x", "test", "y") must equal("m3: x y") @@ -63,7 +62,7 @@ class TypedConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMa "started" must { "support in-out message exchanges via its endpoints" in { service.awaitEndpointActivation(2) { - actor = TypedActor.typedActorOf(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl], defaultConfiguration) + actor = TypedActor.typedActorOf(classOf[TestTypedConsumer], classOf[TestTypedConsumerImpl], Props()) } must be(true) mandatoryTemplate.requestBody("direct:publish-test-3", "x") must equal("foo: x") mandatoryTemplate.requestBody("direct:publish-test-4", "x") must equal("bar: x") diff --git a/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala b/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala index 91058e3109..d1084f2719 100644 --- a/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala +++ b/akka-camel-typed/src/test/scala/akka/camel/component/TypedActorComponentFeatureTest.scala @@ -5,8 +5,7 @@ import org.apache.camel.builder.RouteBuilder import org.apache.camel.impl.{ DefaultCamelContext, SimpleRegistry } import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec } -import akka.actor.{ Actor, TypedActor } -import akka.actor.TypedActor.Configuration._ +import akka.actor.{ Actor, TypedActor, Props } import akka.camel._ /** @@ -21,10 +20,10 @@ class TypedActorComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll override protected def beforeAll = { val typedActor = TypedActor.typedActorOf( classOf[SampleTypedActor], - classOf[SampleTypedActorImpl], defaultConfiguration) // not a consumer + classOf[SampleTypedActorImpl], Props()) // not a consumer val typedConsumer = TypedActor.typedActorOf( classOf[SampleTypedConsumer], - classOf[SampleTypedConsumerImpl], defaultConfiguration) + classOf[SampleTypedConsumerImpl], Props()) typedConsumerUuid = TypedActor.getActorRefFor(typedConsumer).uuid.toString diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala index 32628ea04f..bb2026f7ba 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/Boot.scala @@ -7,8 +7,8 @@ import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext import akka.actor.Actor._ +import akka.actor.Props import akka.actor.TypedActor -import akka.actor.TypedActor.Configuration._ import akka.camel.CamelContextManager import akka.config.Supervision._ @@ -91,7 +91,7 @@ class Boot { // ----------------------------------------------------------------------- // TODO: investigate why this consumer is not published - TypedActor.typedActorOf(classOf[TypedConsumer1], classOf[TypedConsumer1Impl], defaultConfiguration) + TypedActor.typedActorOf(classOf[TypedConsumer1], classOf[TypedConsumer1Impl], Props()) } /** diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala index af3d97eb13..9e943b4d30 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/ServerApplication.scala @@ -2,8 +2,7 @@ package sample.camel import akka.actor.Actor._ import akka.camel.CamelServiceManager -import akka.actor.TypedActor -import akka.actor.TypedActor.Configuration._ +import akka.actor.{ TypedActor, Props } /** * @author Martin Krasser @@ -18,7 +17,7 @@ object ServerApplication extends App { val ua = actorOf[RemoteActor2].start val ta = TypedActor.typedActorOf( classOf[RemoteTypedConsumer2], - classOf[RemoteTypedConsumer2Impl], defaultConfiguration) + classOf[RemoteTypedConsumer2Impl], Props()) remote.start("localhost", 7777) remote.register("remote2", ua) diff --git a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala index 9e252208af..f922d9544f 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/sample/camel/StandaloneApplication.scala @@ -5,8 +5,7 @@ import org.apache.camel.builder.RouteBuilder import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext -import akka.actor.{ Actor, TypedActor } -import akka.actor.TypedActor.Configuration._ +import akka.actor.{ Actor, TypedActor, Props } import akka.camel._ /** @@ -18,7 +17,7 @@ object StandaloneApplication extends App { // 'externally' register typed actors val registry = new SimpleRegistry - registry.put("sample", TypedActor.typedActorOf(classOf[BeanIntf], classOf[BeanImpl], defaultConfiguration)) + registry.put("sample", TypedActor.typedActorOf(classOf[BeanIntf], classOf[BeanImpl], Props())) // customize CamelContext CamelContextManager.init(new DefaultCamelContext(registry)) @@ -31,7 +30,7 @@ object StandaloneApplication extends App { mandatoryService.awaitEndpointActivation(1) { // 'internally' register typed actor (requires CamelService) - TypedActor.typedActorOf(classOf[TypedConsumer2], classOf[TypedConsumer2Impl], defaultConfiguration) + TypedActor.typedActorOf(classOf[TypedConsumer2], classOf[TypedConsumer2Impl], Props()) } // access 'internally' (automatically) registered typed-actors