Rebased from master

This commit is contained in:
Jonas Bonér 2011-04-27 00:38:10 +02:00
parent b041329b98
commit 2e7c76dd98
22 changed files with 69 additions and 793 deletions

View file

@ -60,7 +60,11 @@ object SupervisorSpec {
class Master extends Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt)
val temp = self.spawnLink[TemporaryActor]
val temp = {
val a = actorOf[TemporaryActor]
self link a
a.start
}
override def receive = {
case Die => temp !! (Die, TimeoutMillis)

View file

@ -134,81 +134,6 @@ class FutureSpec extends JUnitSuite {
actor.stop()
}
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf[TestActor].start()
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop()
actor2.stop()
}
@Test def shouldFutureAwaitEitherRight = {
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf[TestActor].start()
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop()
actor2.stop()
}
@Test def shouldFutureAwaitOneLeft = {
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf[TestActor].start()
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop()
actor2.stop()
}
@Test def shouldFutureAwaitOneRight = {
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf[TestActor].start()
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop()
actor2.stop()
}
@Test def shouldFutureAwaitAll = {
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf[TestActor].start()
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "Hello"
Futures.awaitAll(List(future1, future2))
assert(future1.result.isDefined)
assert("World" === future1.result.get)
assert(future2.result.isDefined)
assert("World" === future2.result.get)
actor1.stop()
actor2.stop()
}
@Test def shouldFuturesAwaitMapHandleEmptySequence {
assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil)
}
@Test def shouldFuturesAwaitMapHandleNonEmptySequence {
val latches = (1 to 3) map (_ => new StandardLatch)
val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start())
val futures = actors map (actor => (actor.!!![String]("Hello")))
latches foreach { _.open }
assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length))
}
@Test def shouldFoldResults {
val actors = (1 to 10).toList map { _ =>
actorOf(new Actor {

View file

@ -165,7 +165,7 @@ object Actor extends ListenerManagement {
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
}, None)
})
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
@ -185,7 +185,7 @@ object Actor extends ListenerManagement {
* val actor = actorOf(new MyActor).start()
* </pre>
*/
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None)
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
@ -195,7 +195,7 @@ object Actor extends ListenerManagement {
* This function should <b>NOT</b> be used for remote actors.
* JAVA API
*/
def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None)
def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create)
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
@ -245,8 +245,7 @@ object Actor extends ListenerManagement {
* <p/>
* Here you find functions like:
* - !, !!, !!! and forward
* - link, unlink, startLink, spawnLink etc
* - makeRemote etc.
* - link, unlink, startLink etc
* - start, stop
* - etc.
*
@ -269,7 +268,6 @@ object Actor extends ListenerManagement {
* import self._
* id = ...
* dispatcher = ...
* spawnLink[OtherActor]
* ...
* }
* </pre>

View file

@ -449,39 +449,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
/**
* Atomically start and link an actor.
*/
def startLink(actorRef: ActorRef): Unit
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, use Actor.actorOf instead")
def spawn(clazz: Class[_ <: Actor]): ActorRef
/**
* Atomically create (from actor class), make it remote and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, client managed actors will be removed")
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
/**
* Atomically create (from actor class), link and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, use use Actor.remote.actorOf instead and then link on success")
def spawnLink(clazz: Class[_ <: Actor]): ActorRef
/**
* Atomically create (from actor class), make it remote, link and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, client managed actors will be removed")
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
def startLink(actorRef: ActorRef): ActorRef
/**
* Returns the mailbox size.
@ -584,10 +552,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka] (
private[this] val actorFactory: () => Actor,
val homeAddress: Option[InetSocketAddress],
val clientManaged: Boolean = false)
class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor)
extends ActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard
@ -620,9 +585,8 @@ class LocalActorRef private[akka] (
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () => Actor,
__homeAddress: Option[InetSocketAddress]) = {
this(__factory, __homeAddress)
__factory: () => Actor) = {
this(__factory)
_uuid = __uuid
id = __id
timeout = __timeout
@ -634,11 +598,6 @@ class LocalActorRef private[akka] (
start
}
/**
* Returns whether this actor ref is client-managed remote or not
*/
private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled
// ========= PUBLIC FUNCTIONS =========
/**
@ -653,6 +612,8 @@ class LocalActorRef private[akka] (
@deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`")
def actorClassName: String = actorClass.getName
final def homeAddress: Option[InetSocketAddress] = None
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
@ -684,9 +645,6 @@ class LocalActorRef private[akka] (
if ((actorInstance ne null) && (actorInstance.get ne null))
initializeActorInstance
if (isClientManaged_?)
Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
checkReceiveTimeout //Schedule the initial Receive timeout
}
this
@ -706,11 +664,9 @@ class LocalActorRef private[akka] (
} finally {
currentMessage = null
Actor.registry.unregister(this)
if (isRemotingEnabled) {
if (isClientManaged_?)
Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
if (isRemotingEnabled)
Actor.remote.unregister(this)
}
setActorSelfFields(actorInstance.get,null)
}
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
@ -755,55 +711,10 @@ class LocalActorRef private[akka] (
* <p/>
* To be invoked from within the actor itself.
*/
def startLink(actorRef: ActorRef): Unit = guard.withGuard {
def startLink(actorRef: ActorRef): ActorRef = guard.withGuard {
link(actorRef)
actorRef.start()
}
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawn(clazz: Class[_ <: Actor]): ActorRef =
Actor.actorOf(clazz).start()
/**
* Atomically create (from actor class), start and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
ensureRemotingEnabled
val ref = Actor.remote.actorOf(clazz, hostname, port)
ref.timeout = timeout
ref.start()
}
/**
* Atomically create (from actor class), start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = {
val actor = spawn(clazz)
link(actor)
actor.start()
actor
}
/**
* Atomically create (from actor class), start, link and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
ensureRemotingEnabled
val actor = Actor.remote.actorOf(clazz, hostname, port)
actor.timeout = timeout
link(actor)
actor.start()
actor
actorRef
}
/**
@ -823,10 +734,6 @@ class LocalActorRef private[akka] (
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
if (isClientManaged_?) {
Actor.remote.send[Any](
message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None)
} else
dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
@ -834,17 +741,10 @@ class LocalActorRef private[akka] (
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
if (isClientManaged_?) {
val future = Actor.remote.send[T](
message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
dispatcher dispatchMessage new MessageInvocation(
this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
future.get
}
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
dispatcher dispatchMessage new MessageInvocation(
this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
future.get
}
/**
@ -1000,11 +900,10 @@ class LocalActorRef private[akka] (
}
}
}
//TODO KEEP THIS?
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled
if (_supervisor.isDefined) {
if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
}
@ -1190,11 +1089,7 @@ private[akka] case class RemoteActorRef private[akka] (
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported
def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
@ -1400,32 +1295,4 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
true
} else false
}
/**
* Atomically create (from actor class) and start an actor.
*/
def spawn[T <: Actor: Manifest]: ActorRef =
spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Atomically create (from actor class), start and make an actor remote.
*/
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = {
ensureRemotingEnabled
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
}
/**
* Atomically create (from actor class), start and link an actor.
*/
def spawnLink[T <: Actor: Manifest]: ActorRef =
spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Atomically create (from actor class), start, link and make an actor remote.
*/
def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = {
ensureRemotingEnabled
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
}
}

View file

@ -92,8 +92,7 @@ case class SupervisorFactory(val config: SupervisorConfig) {
* <p/>
* The supervisor class is only used for the configuration system when configuring supervisor
* hierarchies declaratively. Should not be used as part of the regular programming API. Instead
* wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the
* children that should trap error signals and trigger restart.
* wire the children together using 'link', 'startLink' etc.
* <p/>
* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children.
*

View file

@ -74,7 +74,7 @@ class Configuration(val map: Map[String, Any]) {
def getAny(key: String, defaultValue: Any): Any = getAny(key).getOrElse(defaultValue)
def getSeqAny(key: String): Seq[Any] = {
def getListAny(key: String): Seq[Any] = {
try {
map(key).asInstanceOf[Seq[Any]]
} catch {

View file

@ -103,32 +103,18 @@ object Supervision {
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Long,
_dispatcher: MessageDispatcher, // optional
_remoteAddress: RemoteAddress // optional
_dispatcher: MessageDispatcher // optional
) extends Server {
val intf: Option[Class[_]] = Option(_intf)
val dispatcher: Option[MessageDispatcher] = Option(_dispatcher)
val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress)
this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
this(intf, target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
this(intf, target, lifeCycle, timeout, dispatcher, null: RemoteAddress)
this(intf, target, lifeCycle, timeout, null: MessageDispatcher)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
this(null: Class[_], target, lifeCycle, timeout, dispatcher, null: RemoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
this(intf, target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
this(null: Class[_], target, lifeCycle, timeout, dispatcher, remoteAddress)
this(null: Class[_], target, lifeCycle, timeout, dispatcher)
}
}

View file

@ -16,7 +16,10 @@ import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILL
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
import java.lang.{Iterable => JIterable}
import java.util.{LinkedList => JLinkedList}
import annotation.tailrec
import scala.annotation.tailrec
import scala.collection.generic.CanBuildFrom
import scala.collection.mutable.Builder
class FutureTimeoutException(message: String) extends AkkaException(message)
@ -194,37 +197,11 @@ object Futures {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], fn: JFunc[A,Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn)
// =====================================
// Deprecations
// =====================================
/**
* (Blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)")
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
* Returns the First Future that is completed (blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await")
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await
/**
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
*/
@deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }")
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
in map { f => fun(f.await) }
/**
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException")
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <-fb) yield (r += b)
}.map(_.result)
}
object Future {
@ -456,7 +433,7 @@ sealed trait Future[+T] {
fa complete (try {
Right(f(v.right.get))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
Left(e)
})
@ -492,7 +469,7 @@ sealed trait Future[+T] {
try {
fa.completeWith(f(v.right.get))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
fa completeWithException e
}
@ -522,7 +499,7 @@ sealed trait Future[+T] {
if (p(r)) Right(r)
else Left(new MatchError(r))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
Left(e)
})

View file

@ -151,81 +151,6 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
clear
}
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io", 2552)
* actor.start()
* actor ! message
* actor.stop()
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start()
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf(factory: => Actor, host: String, port: Int): ActorRef =
Actor.remote.clientManagedActorOf(() => factory, host, port)
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
* actor.start()
* actor ! message
* actor.stop()
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start()
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
clientManagedActorOf(() =>
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
host, port)
}
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start()
* actor ! message
* actor.stop()
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor]("www.akka.io",2552).start()
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
clientManagedActorOf(() =>
createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
host, port)
}
protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
@ -444,10 +369,6 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T =
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
@deprecated("Will be removed after 1.1")
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef
/**
* Clean-up all open connections.
*/

View file

@ -591,19 +591,6 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
RemoteActorRef(serviceId, className, host, port, timeout, loader)
}
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = {
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
return new LocalActorRef(factory, None) // Code is much simpler with return
}
val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true)
//ref.timeout = timeout //removed because setting default timeout should be done after construction
ref
}
}
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
@ -1299,4 +1286,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
}
}
}
}

View file

@ -157,11 +157,11 @@ object ActorSerialization {
}
private def fromBinaryToLocalActorRef[T <: Actor](
bytes: Array[Byte],
homeAddress: Option[InetSocketAddress],
bytes: Array[Byte],
homeAddress: Option[InetSocketAddress],
format: Format[T]): ActorRef = {
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
homeAddress.foreach { addr =>
homeAddress.foreach { addr =>
val addressProtocol = AddressProtocol.newBuilder.setHostname(addr.getAddress.getHostAddress).setPort(addr.getPort).build
builder.setOriginalAddress(addressProtocol)
}
@ -205,10 +205,11 @@ object ActorSerialization {
else actorClass.newInstance.asInstanceOf[Actor]
}
/* TODO Can we remove originalAddress from the protocol?
val homeAddress = {
val address = protocol.getOriginalAddress
Some(new InetSocketAddress(address.getHostname, address.getPort))
}
}*/
val ar = new LocalActorRef(
uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
@ -218,8 +219,7 @@ object ActorSerialization {
lifeCycle,
supervisor,
hotswap,
factory,
homeAddress)
factory)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))

View file

@ -1,142 +0,0 @@
package akka.actor.remote
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.dispatch.Dispatchers
import akka.actor.Actor._
import akka.actor._
class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg)
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "OneWay" =>
RemoteActorSpecActorUnidirectional.latch.countDown()
}
}
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" => throw new ExpectedRemoteProblem("expected")
}
}
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
class CountDownActor(latch: CountDownLatch) extends Actor {
def receive = {
case "World" => latch.countDown()
}
}
/*
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: ActorRef = _
var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello"
def receive = {
case msg: AnyRef =>
state = Some(msg)
SendOneWayAndReplySenderActor.latch.countDown()
}
}*/
class MyActorCustomConstructor extends Actor {
var prefix = "default-"
var count = 0
def receive = {
case "incrPrefix" => count += 1; prefix = "" + count + "-"
case msg: String => self.reply(prefix + msg)
}
}
class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
"ClientInitiatedRemoteActor" should {
"shouldSendOneWay" in {
val clientManaged = remote.actorOf[RemoteActorSpecActorUnidirectional](host,port).start()
clientManaged must not be null
clientManaged.getClass must be (classOf[LocalActorRef])
clientManaged ! "OneWay"
RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true)
clientManaged.stop()
}
"shouldSendOneWayAndReceiveReply" in {
val latch = new CountDownLatch(1)
val actor = remote.actorOf[SendOneWayAndReplyReceiverActor](host,port).start()
implicit val sender = Some(actorOf(new CountDownActor(latch)).start())
actor ! "Hello"
latch.await(3,TimeUnit.SECONDS) must be (true)
}
"shouldSendBangBangMessageAndReceiveReply" in {
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start()
val result = actor !! ("Hello", 10000)
"World" must equal (result.get.asInstanceOf[String])
actor.stop()
}
"shouldSendBangBangMessageAndReceiveReplyConcurrently" in {
val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start() }).toList
actors.map(_ !!! ("Hello", 10000)) foreach { future =>
"World" must equal (future.await.result.asInstanceOf[Option[String]].get)
}
actors.foreach(_.stop())
}
"shouldRegisterActorByUuid" in {
val actor1 = remote.actorOf[MyActorCustomConstructor](host, port).start()
val actor2 = remote.actorOf[MyActorCustomConstructor](host, port).start()
actor1 ! "incrPrefix"
(actor1 !! "test").get must equal ("1-test")
actor1 ! "incrPrefix"
(actor1 !! "test").get must equal ("2-test")
(actor2 !! "test").get must equal ("default-test")
actor1.stop()
actor2.stop()
}
"shouldSendAndReceiveRemoteException" in {
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host, port).start()
try {
implicit val timeout = 500000000L
val f = (actor !!! "Failure").await.resultOrException
fail("Shouldn't get here!!!")
} catch {
case e: ExpectedRemoteProblem =>
}
actor.stop()
}
}
}

View file

@ -19,11 +19,5 @@ class OptimizedLocalScopedSpec extends AkkaRemoteTest {
remote.actorFor("foo", host, port) must be (fooActor)
}
"Create local actor when client-managed is hosted locally" in {
val localClientManaged = Actor.remote.actorOf[TestActor](host, port)
localClientManaged.homeAddress must be (None)
}
}
}

View file

@ -69,7 +69,7 @@ class RemotePingPong3Actor extends Actor with Serializable {
}
}
class RemoteSupervisorSpec extends AkkaRemoteTest {
/*class RemoteSupervisorSpec extends AkkaRemoteTest {
var pingpong1: ActorRef = _
var pingpong2: ActorRef = _
@ -324,7 +324,6 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
factory.newInstance
}
/*
// Uncomment when the same test passes in SupervisorSpec - pending bug
@Test def shouldKillMultipleActorsOneForOne2 = {
clearMessageLogs
@ -338,9 +337,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
messageLog.poll(5, TimeUnit.SECONDS)
}
}
*/
/*
@Test def shouldOneWayKillSingleActorOneForOne = {
clearMessageLogs
@ -435,6 +432,4 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
messageLog.poll(5, TimeUnit.SECONDS)
}
}
*/
}
}*/

View file

@ -4,13 +4,9 @@
package akka.actor.remote
import akka.config.Supervision._
import akka.actor._
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
import akka.testing._
object RemoteTypedActorLog {
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
@ -24,73 +20,11 @@ object RemoteTypedActorLog {
class RemoteTypedActorSpec extends AkkaRemoteTest {
import RemoteTypedActorLog._
private var conf: TypedActorConfigurator = _
override def beforeEach {
super.beforeEach
Config.config
conf = new TypedActorConfigurator
conf.configure(
new AllForOneStrategy(List(classOf[Exception]), 3, 5000),
List(
new SuperviseTypedActor(
classOf[RemoteTypedActorOne],
classOf[RemoteTypedActorOneImpl],
Permanent,
Testing.testTime(20000),
RemoteAddress(host,port)),
new SuperviseTypedActor(
classOf[RemoteTypedActorTwo],
classOf[RemoteTypedActorTwoImpl],
Permanent,
Testing.testTime(20000),
RemoteAddress(host,port))
).toArray).supervise
}
override def afterEach {
clearMessageLogs
conf.stop
super.afterEach
Thread.sleep(1000)
}
"Remote Typed Actor " should {
/*"receives one-way message" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
"have unit tests" in {
ta.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
}
"responds to request-reply message" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
ta.requestReply("ping") must equal ("pong")
} */
"be restarted on failure" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
try {
ta.requestReply("die")
fail("Shouldn't get here")
} catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => }
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
}
/* "restarts linked friends on failure" in {
val ta1 = conf.getInstance(classOf[RemoteTypedActorOne])
val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo])
try {
ta1.requestReply("die")
fail("Shouldn't get here")
} catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => }
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
}*/
}
}

View file

@ -18,11 +18,5 @@ class UnOptimizedLocalScopedSpec extends AkkaRemoteTest {
remote.actorFor("foo", host, port) must not be (fooActor)
}
"Create remote actor when client-managed is hosted locally" in {
val localClientManaged = Actor.remote.actorOf[TestActor](host, port)
localClientManaged.homeAddress must not be (None)
}
}
}

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.ticket
import akka.actor._
import akka.actor.remote.AkkaRemoteTest
class Ticket519Spec extends AkkaRemoteTest {
"A remote TypedActor" should {
"should handle remote future replies" in {
val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,host,port)
val r = actor.someFutureString
r.await.result.get must equal ("foo")
}
}
}

View file

@ -14,7 +14,7 @@
/******************************************************************************
Akka Chat Client/Server Sample Application
How to run the sample:
1. Fire up two shells. For each of them:
@ -149,7 +149,7 @@
protected def chatManagement: Receive = {
case msg @ ChatMessage(from, _) => getSession(from).foreach(_ ! msg)
case msg @ GetChatLog(from) => getSession(from).foreach(_ forward msg)
case msg @ GetChatLog(from) => getSession(from).foreach(_ forward msg)
}
private def getSession(from: String) : Option[ActorRef] = {
@ -166,7 +166,7 @@
* Creates and links a MemoryChatStorage.
*/
trait MemoryChatStorageFactory { this: Actor =>
val storage = this.self.spawnLink[MemoryChatStorage] // starts and links ChatStorage
val storage = this.self.startLink(actorOf[MemoryChatStorage]) // starts and links ChatStorage
}
/**

View file

@ -1,35 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.remote
import akka.actor.Actor._
import akka.actor. {ActorRegistry, Actor}
import Actor.remote
class RemoteHelloWorldActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
object ClientManagedRemoteActorServer {
def run = {
remote.start("localhost", 2552)
}
def main(args: Array[String]) = run
}
object ClientManagedRemoteActorClient {
def run = {
val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start()
val result = actor !! "Hello"
}
def main(args: Array[String]) = run
}

View file

@ -13,7 +13,7 @@ import akka.event.EventHandler
* @author Roland Kuhn
* @since 1.1
*/
class TestActorRef[T <: Actor](factory: () => T) extends LocalActorRef(factory, None) {
class TestActorRef[T <: Actor](factory: () => T) extends LocalActorRef(factory) {
dispatcher = CallingThreadDispatcher.global
receiveTimeout = None

View file

@ -408,18 +408,12 @@ object TypedActorConfiguration {
new TypedActorConfiguration()
}
def apply(timeout: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
def apply(timeoutMillis: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().timeout(Duration(timeoutMillis, "millis"))
}
@deprecated("Will be removed after 1.1")
def apply(host: String, port: Int) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port)
}
@deprecated("Will be removed after 1.1")
def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis"))
def apply(timeout: Duration) : TypedActorConfiguration = {
new TypedActorConfiguration().timeout(timeout)
}
}
@ -430,7 +424,6 @@ object TypedActorConfiguration {
*/
final class TypedActorConfiguration {
private[akka] var _timeout: Long = Actor.TIMEOUT
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
private[akka] var _threadBasedDispatcher: Option[Boolean] = None
private[akka] var _id: Option[String] = None
@ -447,15 +440,6 @@ final class TypedActorConfiguration {
this
}
@deprecated("Will be removed after 1.1")
def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port))
@deprecated("Will be removed after 1.1")
def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = {
_host = Some(remoteAddress)
this
}
def dispatcher(messageDispatcher: MessageDispatcher) : TypedActorConfiguration = {
if (_threadBasedDispatcher.isDefined) throw new IllegalArgumentException(
"Cannot specify both 'threadBasedDispatcher()' and 'dispatcher()'")
@ -511,30 +495,6 @@ object TypedActor {
newInstance(intfClass, factory, TypedActorConfiguration())
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port))
}
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
@ -555,32 +515,6 @@ object TypedActor {
newInstance(intfClass, factory, TypedActorConfiguration(timeout))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @paramm timeout timeout for future
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @paramm timeout timeout for future
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout))
}
/**
* Factory method for typed actor.
* @param intfClass interface the typed actor implements
@ -588,20 +522,7 @@ object TypedActor {
* @paramm config configuration object fo the typed actor
*/
def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T =
newInstance(intfClass, createActorRef(newTypedActor(factory),config), config)
/**
* Creates an ActorRef, can be local only or client-managed-remote
*/
@deprecated("Will be removed after 1.1")
private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = {
config match {
case null => actorOf(typedActor)
case c: TypedActorConfiguration if (c._host.isDefined) =>
Actor.remote.actorOf(typedActor, c._host.get.getAddress.getHostAddress, c._host.get.getPort)
case _ => actorOf(typedActor)
}
}
newInstance(intfClass, actorOf(newTypedActor(factory)), config)
/**
* Factory method for typed actor.
@ -610,7 +531,7 @@ object TypedActor {
* @paramm config configuration object fo the typed actor
*/
def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T =
newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config)
newInstance(intfClass, actorOf(newTypedActor(targetClass)), config)
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
@ -618,11 +539,8 @@ object TypedActor {
}
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val config = TypedActorConfiguration(timeout)
if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get)
newInstance(intfClass, targetClass, config)
}
remoteAddress: Option[InetSocketAddress], timeout: Long): T =
newInstance(intfClass, targetClass, TypedActorConfiguration(timeout))
private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = {
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
@ -634,14 +552,8 @@ object TypedActor {
actorRef.timeout = config.timeout
val remoteAddress = actorRef match {
case remote: RemoteActorRef => remote.homeAddress
case local: LocalActorRef if local.clientManaged => local.homeAddress
case _ => None
}
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout))
actorRef.start()
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.homeAddress, actorRef.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
@ -666,20 +578,6 @@ object TypedActor {
def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T =
newInstance(intfClass, factory.create)
/**
* Java API.
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, hostname, port)
/**
* Java API.
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, timeout, hostname, port)
/**
* Java API.
*/

View file

@ -106,14 +106,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
val implementationClass = component.target
val timeout = component.timeout
val (remoteAddress,actorRef) =
component.remoteAddress match {
case Some(a) =>
(Some(new InetSocketAddress(a.hostname, a.port)),
Actor.remote.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port))
case None =>
(None, Actor.actorOf(TypedActor.newTypedActor(implementationClass)))
}
val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass))
actorRef.timeout = timeout
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
@ -123,7 +116,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
AspectInitRegistry.register(
proxy,
AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout))
AspectInit(interfaceClass, typedActor, actorRef, None, timeout))
typedActor.initialize(proxy)
actorRef.start()