Removed client-managed actors, a lot of deprecated methods and DataFlowVariable (superceded by Future)

This commit is contained in:
Viktor Klang 2011-04-07 12:48:30 +02:00
parent 46460a9616
commit 0dff50fa52
24 changed files with 53 additions and 1113 deletions

View file

@ -60,7 +60,11 @@ object SupervisorSpec {
class Master extends Actor { class Master extends Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt) self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt)
val temp = self.spawnLink[TemporaryActor] val temp = {
val a = actorOf[TemporaryActor]
self link a
a.start
}
override def receive = { override def receive = {
case Die => temp !! (Die, TimeoutMillis) case Die => temp !! (Die, TimeoutMillis)

View file

@ -1,165 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dataflow
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.dispatch.DefaultCompletableFuture
import java.util.concurrent.{TimeUnit, CountDownLatch}
import annotation.tailrec
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
import akka.actor.ActorRegistry
@RunWith(classOf[JUnitRunner])
class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
describe("DataflowVariable") {
it("should be able to set the value of one variable from other variables") {
import DataFlow._
val latch = new CountDownLatch(1)
val result = new AtomicInteger(0)
val x, y, z = new DataFlowVariable[Int]
thread {
z << x() + y()
result.set(z())
latch.countDown
}
thread { x << 40 }
thread { y << 2 }
latch.await(10,TimeUnit.SECONDS) should equal (true)
result.get should equal (42)
List(x,y,z).foreach(_.shutdown)
}
it("should be able to sum a sequence of ints") {
import DataFlow._
def ints(n: Int, max: Int): List[Int] =
if (n == max) Nil
else n :: ints(n + 1, max)
def sum(s: Int, stream: List[Int]): List[Int] = stream match {
case Nil => s :: Nil
case h :: t => s :: sum(h + s, t)
}
val latch = new CountDownLatch(1)
val result = new AtomicReference[List[Int]](Nil)
val x = new DataFlowVariable[List[Int]]
val y = new DataFlowVariable[List[Int]]
val z = new DataFlowVariable[List[Int]]
thread { x << ints(0, 1000) }
thread { y << sum(0, x()) }
thread { z << y()
result.set(z())
latch.countDown
}
latch.await(10,TimeUnit.SECONDS) should equal (true)
result.get should equal (sum(0,ints(0,1000)))
List(x,y,z).foreach(_.shutdown)
}
/*
it("should be able to join streams") {
import DataFlow._
Actor.registry.shutdownAll
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
val result = new AtomicInteger(0)
val t1 = thread { ints(0, 1000, producer) }
val t2 = thread {
Thread.sleep(1000)
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
latch.countDown
}
latch.await(3,TimeUnit.SECONDS) should equal (true)
result.get should equal (332833500)
}
it("should be able to sum streams recursively") {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val result = new AtomicLong(0)
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
@tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = {
val x = stream()
if(result.addAndGet(x) == 166666500)
latch.countDown
recurseSum(stream)
}
thread { ints(0, 1000, producer) }
thread { sum(0, producer, consumer) }
thread { recurseSum(consumer) }
latch.await(15,TimeUnit.SECONDS) should equal (true)
}
*/
/* Test not ready for prime time, causes some sort of deadlock */
/* it("should be able to conditionally set variables") {
import DataFlow._
Actor.registry.shutdownAll
val latch = new CountDownLatch(1)
val x, y, z, v = new DataFlowVariable[Int]
val main = thread {
x << 1
z << Math.max(x(),y())
latch.countDown
}
val setY = thread {
// Thread.sleep(2000)
y << 2
}
val setV = thread {
v << y
}
List(x,y,z,v) foreach (_.shutdown)
latch.await(2,TimeUnit.SECONDS) should equal (true)
}*/
}
}

View file

@ -134,81 +134,6 @@ class FutureSpec extends JUnitSuite {
actor.stop actor.stop
} }
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitEitherRight = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitEither(future1, future2)
assert(result.isDefined)
assert("World" === result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitOneLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start
val future1 = actor1 !!! "NoReply"
val future2 = actor2 !!! "Hello"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitOneRight = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "NoReply"
val result = Futures.awaitOne(List(future1, future2))
assert(result.result.isDefined)
assert("World" === result.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFutureAwaitAll = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start
val future1 = actor1 !!! "Hello"
val future2 = actor2 !!! "Hello"
Futures.awaitAll(List(future1, future2))
assert(future1.result.isDefined)
assert("World" === future1.result.get)
assert(future2.result.isDefined)
assert("World" === future2.result.get)
actor1.stop
actor2.stop
}
@Test def shouldFuturesAwaitMapHandleEmptySequence {
assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil)
}
@Test def shouldFuturesAwaitMapHandleNonEmptySequence {
val latches = (1 to 3) map (_ => new StandardLatch)
val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start)
val futures = actors map (actor => (actor.!!![String]("Hello")))
latches foreach { _.open }
assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length))
}
@Test def shouldFoldResults { @Test def shouldFoldResults {
val actors = (1 to 10).toList map { _ => val actors = (1 to 10).toList map { _ =>
actorOf(new Actor { actorOf(new Actor {

View file

@ -18,7 +18,7 @@ import akka.japi. {Creator, Procedure}
/** /**
* Life-cycle messages for the Actors * Life-cycle messages for the Actors
*/ */
@serializable sealed trait LifeCycleMessage sealed trait LifeCycleMessage extends Serializable
/* Marker trait to show which Messages are automatically handled by Akka */ /* Marker trait to show which Messages are automatically handled by Akka */
sealed trait AutoReceivedMessage { self: LifeCycleMessage => } sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
@ -165,7 +165,7 @@ object Actor extends ListenerManagement {
"\nMake sure Actor is NOT defined inside a class/trait," + "\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," + "\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
}, None) })
/** /**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function * Creates an ActorRef out of the Actor. Allows you to pass in a factory function
@ -185,7 +185,7 @@ object Actor extends ListenerManagement {
* val actor = actorOf(new MyActor).start * val actor = actorOf(new MyActor).start
* </pre> * </pre>
*/ */
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
/** /**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>) * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
@ -195,7 +195,7 @@ object Actor extends ListenerManagement {
* This function should <b>NOT</b> be used for remote actors. * This function should <b>NOT</b> be used for remote actors.
* JAVA API * JAVA API
*/ */
def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None) def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create)
/** /**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
@ -245,8 +245,7 @@ object Actor extends ListenerManagement {
* <p/> * <p/>
* Here you find functions like: * Here you find functions like:
* - !, !!, !!! and forward * - !, !!, !!! and forward
* - link, unlink, startLink, spawnLink etc * - link, unlink, startLink etc
* - makeRemote etc.
* - start, stop * - start, stop
* - etc. * - etc.
* *
@ -269,7 +268,6 @@ object Actor extends ListenerManagement {
* import self._ * import self._
* id = ... * id = ...
* dispatcher = ... * dispatcher = ...
* spawnLink[OtherActor]
* ... * ...
* } * }
* </pre> * </pre>

View file

@ -446,38 +446,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
*/ */
def startLink(actorRef: ActorRef): Unit def startLink(actorRef: ActorRef): Unit
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, use Actor.actorOf instead")
def spawn(clazz: Class[_ <: Actor]): ActorRef
/**
* Atomically create (from actor class), make it remote and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, client managed actors will be removed")
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
/**
* Atomically create (from actor class), link and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, use use Actor.remote.actorOf instead and then link on success")
def spawnLink(clazz: Class[_ <: Actor]): ActorRef
/**
* Atomically create (from actor class), make it remote, link and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
@deprecated("Will be removed after 1.1, client managed actors will be removed")
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
/** /**
* Returns the mailbox size. * Returns the mailbox size.
*/ */
@ -593,10 +561,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class LocalActorRef private[akka] ( class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor)
private[this] val actorFactory: () => Actor,
val homeAddress: Option[InetSocketAddress],
val clientManaged: Boolean = false)
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef {
@volatile @volatile
@ -626,9 +591,8 @@ class LocalActorRef private[akka] (
__lifeCycle: LifeCycle, __lifeCycle: LifeCycle,
__supervisor: Option[ActorRef], __supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]], __hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () => Actor, __factory: () => Actor) = {
__homeAddress: Option[InetSocketAddress]) = { this(__factory)
this(__factory, __homeAddress)
_uuid = __uuid _uuid = __uuid
id = __id id = __id
timeout = __timeout timeout = __timeout
@ -640,11 +604,6 @@ class LocalActorRef private[akka] (
start start
} }
/**
* Returns whether this actor ref is client-managed remote or not
*/
private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled
// ========= PUBLIC FUNCTIONS ========= // ========= PUBLIC FUNCTIONS =========
/** /**
@ -657,6 +616,8 @@ class LocalActorRef private[akka] (
*/ */
def actorClassName: String = actorClass.getName def actorClassName: String = actorClass.getName
final def homeAddress: Option[InetSocketAddress] = None
/** /**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started. * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/ */
@ -688,9 +649,6 @@ class LocalActorRef private[akka] (
if ((actorInstance ne null) && (actorInstance.get ne null)) if ((actorInstance ne null) && (actorInstance.get ne null))
initializeActorInstance initializeActorInstance
if (isClientManaged_?)
Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
checkReceiveTimeout //Schedule the initial Receive timeout checkReceiveTimeout //Schedule the initial Receive timeout
} }
this this
@ -710,11 +668,9 @@ class LocalActorRef private[akka] (
} finally { } finally {
currentMessage = null currentMessage = null
Actor.registry.unregister(this) Actor.registry.unregister(this)
if (isRemotingEnabled) { if (isRemotingEnabled)
if (isClientManaged_?)
Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
Actor.remote.unregister(this) Actor.remote.unregister(this)
}
setActorSelfFields(actorInstance.get,null) setActorSelfFields(actorInstance.get,null)
} }
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
@ -764,52 +720,6 @@ class LocalActorRef private[akka] (
actorRef.start actorRef.start
} }
/**
* Atomically create (from actor class) and start an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawn(clazz: Class[_ <: Actor]): ActorRef =
Actor.actorOf(clazz).start
/**
* Atomically create (from actor class), start and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
ensureRemotingEnabled
val ref = Actor.remote.actorOf(clazz, hostname, port)
ref.timeout = timeout
ref.start
}
/**
* Atomically create (from actor class), start and link an actor.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = {
val actor = spawn(clazz)
link(actor)
actor.start
actor
}
/**
* Atomically create (from actor class), start, link and make an actor remote.
* <p/>
* To be invoked from within the actor itself.
*/
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
ensureRemotingEnabled
val actor = Actor.remote.actorOf(clazz, hostname, port)
actor.timeout = timeout
link(actor)
actor.start
actor
}
/** /**
* Returns the mailbox. * Returns the mailbox.
*/ */
@ -827,10 +737,6 @@ class LocalActorRef private[akka] (
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
if (isClientManaged_?) {
Actor.remote.send[Any](
message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None)
} else
dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
@ -838,17 +744,10 @@ class LocalActorRef private[akka] (
timeout: Long, timeout: Long,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
if (isClientManaged_?) { val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
val future = Actor.remote.send[T]( dispatcher dispatchMessage new MessageInvocation(
message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
if (future.isDefined) future.get future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
dispatcher dispatchMessage new MessageInvocation(
this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
future.get
}
} }
/** /**
@ -1004,11 +903,10 @@ class LocalActorRef private[akka] (
} }
} }
} }
//TODO KEEP THIS?
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled ensureRemotingEnabled
if (_supervisor.isDefined) { if (_supervisor.isDefined) {
if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this)
Some(_supervisor.get.uuid) Some(_supervisor.get.uuid)
} else None } else None
} }
@ -1179,10 +1077,6 @@ private[akka] case class RemoteActorRef private[akka] (
def link(actorRef: ActorRef): Unit = unsupported def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported def unlink(actorRef: ActorRef): Unit = unsupported
def startLink(actorRef: ActorRef): Unit = unsupported def startLink(actorRef: ActorRef): Unit = unsupported
def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox: AnyRef = unsupported
@ -1388,32 +1282,4 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
true true
} else false } else false
} }
/**
* Atomically create (from actor class) and start an actor.
*/
def spawn[T <: Actor: Manifest]: ActorRef =
spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Atomically create (from actor class), start and make an actor remote.
*/
def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = {
ensureRemotingEnabled
spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
}
/**
* Atomically create (from actor class), start and link an actor.
*/
def spawnLink[T <: Actor: Manifest]: ActorRef =
spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Atomically create (from actor class), start, link and make an actor remote.
*/
def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = {
ensureRemotingEnabled
spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
}
} }

View file

@ -93,8 +93,7 @@ case class SupervisorFactory(val config: SupervisorConfig) {
* <p/> * <p/>
* The supervisor class is only used for the configuration system when configuring supervisor * The supervisor class is only used for the configuration system when configuring supervisor
* hierarchies declaratively. Should not be used as part of the regular programming API. Instead * hierarchies declaratively. Should not be used as part of the regular programming API. Instead
* wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the * wire the children together using 'link', 'startLink' etc.
* children that should trap error signals and trigger restart.
* <p/> * <p/>
* See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children. * See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up children.
* *

View file

@ -101,32 +101,18 @@ object Supervision {
val target: Class[_], val target: Class[_],
val lifeCycle: LifeCycle, val lifeCycle: LifeCycle,
val timeout: Long, val timeout: Long,
_dispatcher: MessageDispatcher, // optional _dispatcher: MessageDispatcher // optional
_remoteAddress: RemoteAddress // optional
) extends Server { ) extends Server {
val intf: Option[Class[_]] = Option(_intf) val intf: Option[Class[_]] = Option(_intf)
val dispatcher: Option[MessageDispatcher] = Option(_dispatcher) val dispatcher: Option[MessageDispatcher] = Option(_dispatcher)
val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) = def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) = def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
this(intf, target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress) this(intf, target, lifeCycle, timeout, null: MessageDispatcher)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
this(intf, target, lifeCycle, timeout, dispatcher, null: RemoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) = def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
this(null: Class[_], target, lifeCycle, timeout, dispatcher, null: RemoteAddress) this(null: Class[_], target, lifeCycle, timeout, dispatcher)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
this(intf, target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
this(null: Class[_], target, lifeCycle, timeout, dispatcher, remoteAddress)
} }
} }

View file

@ -1,165 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dataflow
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import akka.event.EventHandler
import akka.actor.{Actor, ActorRef}
import akka.actor.Actor._
import akka.dispatch.CompletableFuture
import akka.AkkaException
import akka.japi.{ Function, Effect }
/**
* Implements Oz-style dataflow (single assignment) variables.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object DataFlow {
object Start
object Exit
class DataFlowVariableException(msg: String) extends AkkaException(msg)
/**
* Executes the supplied thunk in another thread.
*/
def thread(body: => Unit): Unit = spawn(body)
/**
* JavaAPI.
* Executes the supplied Effect in another thread.
*/
def thread(body: Effect): Unit = spawn(body.apply)
/**
* Executes the supplied function in another thread.
*/
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
actorOf(new ReactiveEventBasedThread(body)).start
/**
* JavaAPI.
* Executes the supplied Function in another thread.
*/
def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) =
actorOf(new ReactiveEventBasedThread(body.apply)).start
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
def receive = {
case Exit => self.stop
case message => self.reply(body(message.asInstanceOf[A]))
}
}
private object DataFlowVariable {
private sealed abstract class DataFlowVariableMessage
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
private object Get extends DataFlowVariableMessage
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@deprecated("Superceeded by Future and CompletableFuture as of 1.1")
sealed class DataFlowVariable[T <: Any](timeoutMs: Long) {
import DataFlowVariable._
def this() = this(1000 * 60)
private val value = new AtomicReference[Option[T]](None)
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = timeoutMs
def receive = {
case s@Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
while(dataFlow.blockedReaders.peek ne null)
dataFlow.blockedReaders.poll ! s
} else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
case Exit => self.stop
}
}
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = timeoutMs
private var readerFuture: Option[CompletableFuture[Any]] = None
def receive = {
case Get => dataFlow.value.get match {
case Some(value) => self reply value
case None => readerFuture = self.senderFuture
}
case Set(v:T) => readerFuture.map(_ completeWithResult v)
case Exit => self.stop
}
}
private[this] val in = actorOf(new In(this)).start
/**
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def <<(ref: DataFlowVariable[T]) {
if (this.value.get.isEmpty) in ! Set(ref())
else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
}
/**
* JavaAPI.
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def set(ref: DataFlowVariable[T]) { this << ref }
/**
* Sets the value of this variable (if unset).
*/
def <<(value: T) {
if (this.value.get.isEmpty) in ! Set(value)
else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
}
/**
* JavaAPI.
* Sets the value of this variable (if unset) with the value of the supplied variable.
*/
def set(value: T) { this << value }
/**
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
*/
def get(): T = this()
/**
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
*/
def apply(): T = {
value.get getOrElse {
val out = actorOf(new Out(this)).start
val result = try {
blockedReaders offer out
(out !! Get).as[T]
} catch {
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
out ! Exit
throw e
}
result.getOrElse(throw new DataFlowVariableException(
"Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
}
}
def shutdown = in ! Exit
}
}

View file

@ -175,36 +175,6 @@ object Futures {
val fb = fn(a.asInstanceOf[A]) val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <-fb) yield (r += b) for (r <- fr; b <-fb) yield (r += b)
}.map(_.result) }.map(_.result)
// =====================================
// Deprecations
// =====================================
/**
* (Blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)")
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
* Returns the First Future that is completed (blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await")
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await
/**
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
*/
@deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }")
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
in map { f => fun(f.await) }
/**
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
*/
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException")
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException
} }
object Future { object Future {

View file

@ -151,81 +151,6 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
clear clear
} }
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io", 2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf(factory: => Actor, host: String, port: Int): ActorRef =
Actor.remote.clientManagedActorOf(() => factory, host, port)
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
clientManagedActorOf(() =>
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
host, port)
}
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor]("www.akka.io",2552).start
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
clientManagedActorOf(() =>
createInstance[Actor](manifest[T].erasure.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")),
host, port)
}
protected override def manageLifeCycleOfListeners = false protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
@ -444,10 +369,6 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T =
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
@deprecated("Will be removed after 1.1")
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef
/** /**
* Clean-up all open connections. * Clean-up all open connections.
*/ */

View file

@ -591,19 +591,6 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
RemoteActorRef(serviceId, className, host, port, timeout, loader) RemoteActorRef(serviceId, className, host, port, timeout, loader)
} }
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef = {
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
return new LocalActorRef(factory, None) // Code is much simpler with return
}
val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true)
//ref.timeout = timeout //removed because setting default timeout should be done after construction
ref
}
} }
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {

View file

@ -44,7 +44,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
* } * }
* </pre> * </pre>
*/ */
@serializable trait StatelessActorFormat[T <: Actor] extends Format[T] { trait StatelessActorFormat[T <: Actor] extends Format[T] with scala.Serializable{
def fromBinary(bytes: Array[Byte], act: T) = act def fromBinary(bytes: Array[Byte], act: T) = act
def toBinary(ac: T) = Array.empty[Byte] def toBinary(ac: T) = Array.empty[Byte]
@ -64,7 +64,7 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
* } * }
* </pre> * </pre>
*/ */
@serializable trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { trait SerializerBasedActorFormat[T <: Actor] extends Format[T] with scala.Serializable {
val serializer: Serializer val serializer: Serializer
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
@ -205,10 +205,11 @@ object ActorSerialization {
else actorClass.newInstance.asInstanceOf[Actor] else actorClass.newInstance.asInstanceOf[Actor]
} }
/* TODO Can we remove originalAddress from the protocol?
val homeAddress = { val homeAddress = {
val address = protocol.getOriginalAddress val address = protocol.getOriginalAddress
Some(new InetSocketAddress(address.getHostname, address.getPort)) Some(new InetSocketAddress(address.getHostname, address.getPort))
} }*/
val ar = new LocalActorRef( val ar = new LocalActorRef(
uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
@ -218,8 +219,7 @@ object ActorSerialization {
lifeCycle, lifeCycle,
supervisor, supervisor,
hotswap, hotswap,
factory, factory)
homeAddress)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))

View file

@ -17,7 +17,7 @@ import sjson.json.{Serializer => SJSONSerializer}
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@serializable trait Serializer { trait Serializer extends scala.Serializable {
@volatile var classLoader: Option[ClassLoader] = None @volatile var classLoader: Option[ClassLoader] = None
def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))

View file

@ -1,142 +0,0 @@
package akka.actor.remote
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.dispatch.Dispatchers
import akka.actor.Actor._
import akka.actor._
class ExpectedRemoteProblem(msg: String) extends RuntimeException(msg)
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "OneWay" =>
RemoteActorSpecActorUnidirectional.latch.countDown
}
}
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" => throw new ExpectedRemoteProblem("expected")
}
}
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
class CountDownActor(latch: CountDownLatch) extends Actor {
def receive = {
case "World" => latch.countDown
}
}
/*
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: ActorRef = _
var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello"
def receive = {
case msg: AnyRef =>
state = Some(msg)
SendOneWayAndReplySenderActor.latch.countDown
}
}*/
class MyActorCustomConstructor extends Actor {
var prefix = "default-"
var count = 0
def receive = {
case "incrPrefix" => count += 1; prefix = "" + count + "-"
case msg: String => self.reply(prefix + msg)
}
}
class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
"ClientInitiatedRemoteActor" should {
"shouldSendOneWay" in {
val clientManaged = remote.actorOf[RemoteActorSpecActorUnidirectional](host,port).start
clientManaged must not be null
clientManaged.getClass must be (classOf[LocalActorRef])
clientManaged ! "OneWay"
RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS) must be (true)
clientManaged.stop
}
"shouldSendOneWayAndReceiveReply" in {
val latch = new CountDownLatch(1)
val actor = remote.actorOf[SendOneWayAndReplyReceiverActor](host,port).start
implicit val sender = Some(actorOf(new CountDownActor(latch)).start)
actor ! "Hello"
latch.await(3,TimeUnit.SECONDS) must be (true)
}
"shouldSendBangBangMessageAndReceiveReply" in {
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start
val result = actor !! ("Hello", 10000)
"World" must equal (result.get.asInstanceOf[String])
actor.stop
}
"shouldSendBangBangMessageAndReceiveReplyConcurrently" in {
val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList
actors.map(_ !!! ("Hello", 10000)) foreach { future =>
"World" must equal (future.await.result.asInstanceOf[Option[String]].get)
}
actors.foreach(_.stop)
}
"shouldRegisterActorByUuid" in {
val actor1 = remote.actorOf[MyActorCustomConstructor](host, port).start
val actor2 = remote.actorOf[MyActorCustomConstructor](host, port).start
actor1 ! "incrPrefix"
(actor1 !! "test").get must equal ("1-test")
actor1 ! "incrPrefix"
(actor1 !! "test").get must equal ("2-test")
(actor2 !! "test").get must equal ("default-test")
actor1.stop
actor2.stop
}
"shouldSendAndReceiveRemoteException" in {
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host, port).start
try {
implicit val timeout = 500000000L
val f = (actor !!! "Failure").await.resultOrException
fail("Shouldn't get here!!!")
} catch {
case e: ExpectedRemoteProblem =>
}
actor.stop
}
}
}

View file

@ -19,11 +19,5 @@ class OptimizedLocalScopedSpec extends AkkaRemoteTest {
remote.actorFor("foo", host, port) must be (fooActor) remote.actorFor("foo", host, port) must be (fooActor)
} }
"Create local actor when client-managed is hosted locally" in {
val localClientManaged = Actor.remote.actorOf[TestActor](host, port)
localClientManaged.homeAddress must be (None)
}
} }
} }

View file

@ -23,7 +23,7 @@ object Log {
} }
} }
@serializable class RemotePingPong1Actor extends Actor { class RemotePingPong1Actor extends Actor with scala.Serializable {
def receive = { def receive = {
case "Ping" => case "Ping" =>
Log.messageLog.put("ping") Log.messageLog.put("ping")
@ -41,7 +41,7 @@ object Log {
} }
} }
@serializable class RemotePingPong2Actor extends Actor { class RemotePingPong2Actor extends Actor with scala.Serializable {
def receive = { def receive = {
case "Ping" => case "Ping" =>
Log.messageLog.put("ping") Log.messageLog.put("ping")
@ -55,7 +55,7 @@ object Log {
} }
} }
@serializable class RemotePingPong3Actor extends Actor { class RemotePingPong3Actor extends Actor with scala.Serializable {
def receive = { def receive = {
case "Ping" => case "Ping" =>
Log.messageLog.put("ping") Log.messageLog.put("ping")
@ -69,7 +69,7 @@ object Log {
} }
} }
class RemoteSupervisorSpec extends AkkaRemoteTest { /*class RemoteSupervisorSpec extends AkkaRemoteTest {
var pingpong1: ActorRef = _ var pingpong1: ActorRef = _
var pingpong2: ActorRef = _ var pingpong2: ActorRef = _
@ -324,7 +324,6 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
factory.newInstance factory.newInstance
} }
/*
// Uncomment when the same test passes in SupervisorSpec - pending bug // Uncomment when the same test passes in SupervisorSpec - pending bug
@Test def shouldKillMultipleActorsOneForOne2 = { @Test def shouldKillMultipleActorsOneForOne2 = {
clearMessageLogs clearMessageLogs
@ -338,9 +337,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
messageLog.poll(5, TimeUnit.SECONDS) messageLog.poll(5, TimeUnit.SECONDS)
} }
} }
*/
/*
@Test def shouldOneWayKillSingleActorOneForOne = { @Test def shouldOneWayKillSingleActorOneForOne = {
clearMessageLogs clearMessageLogs
@ -435,6 +432,4 @@ class RemoteSupervisorSpec extends AkkaRemoteTest {
messageLog.poll(5, TimeUnit.SECONDS) messageLog.poll(5, TimeUnit.SECONDS)
} }
} }
*/ }*/
}

View file

@ -4,13 +4,9 @@
package akka.actor.remote package akka.actor.remote
import akka.config.Supervision._
import akka.actor._
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import akka.config. {RemoteAddress, Config, TypedActorConfigurator} import akka.config. {RemoteAddress, Config, TypedActorConfigurator}
import akka.testing._
object RemoteTypedActorLog { object RemoteTypedActorLog {
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
@ -24,73 +20,11 @@ object RemoteTypedActorLog {
class RemoteTypedActorSpec extends AkkaRemoteTest { class RemoteTypedActorSpec extends AkkaRemoteTest {
import RemoteTypedActorLog._
private var conf: TypedActorConfigurator = _
override def beforeEach {
super.beforeEach
Config.config
conf = new TypedActorConfigurator
conf.configure(
new AllForOneStrategy(List(classOf[Exception]), 3, 5000),
List(
new SuperviseTypedActor(
classOf[RemoteTypedActorOne],
classOf[RemoteTypedActorOneImpl],
Permanent,
Testing.testTime(20000),
RemoteAddress(host,port)),
new SuperviseTypedActor(
classOf[RemoteTypedActorTwo],
classOf[RemoteTypedActorTwoImpl],
Permanent,
Testing.testTime(20000),
RemoteAddress(host,port))
).toArray).supervise
}
override def afterEach {
clearMessageLogs
conf.stop
super.afterEach
Thread.sleep(1000)
}
"Remote Typed Actor " should { "Remote Typed Actor " should {
/*"receives one-way message" in { "have unit tests" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
ta.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway")
} }
"responds to request-reply message" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
ta.requestReply("ping") must equal ("pong")
} */
"be restarted on failure" in {
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
try {
ta.requestReply("die")
fail("Shouldn't get here")
} catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => }
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
}
/* "restarts linked friends on failure" in {
val ta1 = conf.getInstance(classOf[RemoteTypedActorOne])
val ta2 = conf.getInstance(classOf[RemoteTypedActorTwo])
try {
ta1.requestReply("die")
fail("Shouldn't get here")
} catch { case re: RuntimeException if re.getMessage == "Expected exception; to test fault-tolerance" => }
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
}*/
} }
} }

View file

@ -18,11 +18,5 @@ class UnOptimizedLocalScopedSpec extends AkkaRemoteTest {
remote.actorFor("foo", host, port) must not be (fooActor) remote.actorFor("foo", host, port) must not be (fooActor)
} }
"Create remote actor when client-managed is hosted locally" in {
val localClientManaged = Actor.remote.actorOf[TestActor](host, port)
localClientManaged.homeAddress must not be (None)
}
} }
} }

View file

@ -221,7 +221,7 @@ class MyActorWithDualCounter extends Actor {
} }
} }
@serializable class MyActor extends Actor { class MyActor extends Actor with scala.Serializable {
var count = 0 var count = 0
def receive = { def receive = {
@ -249,7 +249,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor {
} }
} }
@serializable class MyJavaSerializableActor extends Actor { class MyJavaSerializableActor extends Actor with scala.Serializable {
var count = 0 var count = 0
self.receiveTimeout = Some(1000) self.receiveTimeout = Some(1000)

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.ticket
import akka.actor._
import akka.actor.remote.AkkaRemoteTest
class Ticket519Spec extends AkkaRemoteTest {
"A remote TypedActor" should {
"should handle remote future replies" in {
val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,host,port)
val r = actor.someFutureString
r.await.result.get must equal ("foo")
}
}
}

View file

@ -1,35 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.remote
import akka.actor.Actor._
import akka.actor. {ActorRegistry, Actor}
import Actor.remote
class RemoteHelloWorldActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
object ClientManagedRemoteActorServer {
def run = {
remote.start("localhost", 2552)
}
def main(args: Array[String]) = run
}
object ClientManagedRemoteActorClient {
def run = {
val actor = remote.actorOf[RemoteHelloWorldActor]("localhost",2552).start
val result = actor !! "Hello"
}
def main(args: Array[String]) = run
}

View file

@ -11,7 +11,7 @@ import org.multiverse.transactional.refs.BasicRef
/** /**
* Common trait for all the transactional objects. * Common trait for all the transactional objects.
*/ */
@serializable trait Transactional { trait Transactional extends Serializable {
val uuid: String val uuid: String
} }

View file

@ -377,18 +377,12 @@ object TypedActorConfiguration {
new TypedActorConfiguration() new TypedActorConfiguration()
} }
def apply(timeout: Long) : TypedActorConfiguration = { def apply(timeoutMillis: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().timeout(Duration(timeout, "millis")) new TypedActorConfiguration().timeout(Duration(timeoutMillis, "millis"))
} }
@deprecated("Will be removed after 1.1") def apply(timeout: Duration) : TypedActorConfiguration = {
def apply(host: String, port: Int) : TypedActorConfiguration = { new TypedActorConfiguration().timeout(timeout)
new TypedActorConfiguration().makeRemote(host, port)
}
@deprecated("Will be removed after 1.1")
def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis"))
} }
} }
@ -399,7 +393,6 @@ object TypedActorConfiguration {
*/ */
final class TypedActorConfiguration { final class TypedActorConfiguration {
private[akka] var _timeout: Long = Actor.TIMEOUT private[akka] var _timeout: Long = Actor.TIMEOUT
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
private[akka] var _threadBasedDispatcher: Option[Boolean] = None private[akka] var _threadBasedDispatcher: Option[Boolean] = None
private[akka] var _id: Option[String] = None private[akka] var _id: Option[String] = None
@ -416,15 +409,6 @@ final class TypedActorConfiguration {
this this
} }
@deprecated("Will be removed after 1.1")
def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port))
@deprecated("Will be removed after 1.1")
def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = {
_host = Some(remoteAddress)
this
}
def dispatcher(messageDispatcher: MessageDispatcher) : TypedActorConfiguration = { def dispatcher(messageDispatcher: MessageDispatcher) : TypedActorConfiguration = {
if (_threadBasedDispatcher.isDefined) throw new IllegalArgumentException( if (_threadBasedDispatcher.isDefined) throw new IllegalArgumentException(
"Cannot specify both 'threadBasedDispatcher()' and 'dispatcher()'") "Cannot specify both 'threadBasedDispatcher()' and 'dispatcher()'")
@ -480,28 +464,6 @@ object TypedActor {
newInstance(intfClass, factory, TypedActorConfiguration()) newInstance(intfClass, factory, TypedActorConfiguration())
} }
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @param host hostanme of the remote server
* @param port port of the remote server
*/
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @param host hostanme of the remote server
* @param port port of the remote server
*/
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port))
}
/** /**
* Factory method for typed actor. * Factory method for typed actor.
* @param intfClass interface the typed actor implements * @param intfClass interface the typed actor implements
@ -522,32 +484,6 @@ object TypedActor {
newInstance(intfClass, factory, TypedActorConfiguration(timeout)) newInstance(intfClass, factory, TypedActorConfiguration(timeout))
} }
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor
* @paramm timeout timeout for future
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout))
}
/**
* Factory method for remote typed actor.
* @param intfClass interface the typed actor implements
* @param factory factory method that constructs the typed actor
* @paramm timeout timeout for future
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout))
}
/** /**
* Factory method for typed actor. * Factory method for typed actor.
* @param intfClass interface the typed actor implements * @param intfClass interface the typed actor implements
@ -555,20 +491,7 @@ object TypedActor {
* @paramm config configuration object fo the typed actor * @paramm config configuration object fo the typed actor
*/ */
def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T = def newInstance[T](intfClass: Class[T], factory: => AnyRef, config: TypedActorConfiguration): T =
newInstance(intfClass, createActorRef(newTypedActor(factory),config), config) newInstance(intfClass, actorOf(newTypedActor(factory)), config)
/**
* Creates an ActorRef, can be local only or client-managed-remote
*/
@deprecated("Will be removed after 1.1")
private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = {
config match {
case null => actorOf(typedActor)
case c: TypedActorConfiguration if (c._host.isDefined) =>
Actor.remote.actorOf(typedActor, c._host.get.getAddress.getHostAddress, c._host.get.getPort)
case _ => actorOf(typedActor)
}
}
/** /**
* Factory method for typed actor. * Factory method for typed actor.
@ -577,7 +500,7 @@ object TypedActor {
* @paramm config configuration object fo the typed actor * @paramm config configuration object fo the typed actor
*/ */
def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T = def newInstance[T](intfClass: Class[T], targetClass: Class[_], config: TypedActorConfiguration): T =
newInstance(intfClass, createActorRef(newTypedActor(targetClass),config), config) newInstance(intfClass, actorOf(newTypedActor(targetClass)), config)
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
@ -585,11 +508,8 @@ object TypedActor {
} }
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = { remoteAddress: Option[InetSocketAddress], timeout: Long): T =
val config = TypedActorConfiguration(timeout) newInstance(intfClass, targetClass, TypedActorConfiguration(timeout))
if (remoteAddress.isDefined) config.makeRemote(remoteAddress.get)
newInstance(intfClass, targetClass, config)
}
private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = { private def newInstance[T](intfClass: Class[T], actorRef: ActorRef, config: TypedActorConfiguration) : T = {
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
@ -601,13 +521,7 @@ object TypedActor {
actorRef.timeout = config.timeout actorRef.timeout = config.timeout
val remoteAddress = actorRef match { AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.homeAddress, actorRef.timeout))
case remote: RemoteActorRef => remote.homeAddress
case local: LocalActorRef if local.clientManaged => local.homeAddress
case _ => None
}
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout))
actorRef.start actorRef.start
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
@ -633,20 +547,6 @@ object TypedActor {
def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T = def newInstance[T](intfClass: Class[T], factory: TypedActorFactory) : T =
newInstance(intfClass, factory.create) newInstance(intfClass, factory.create)
/**
* Java API.
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, hostname, port)
/**
* Java API.
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, timeout, hostname, port)
/** /**
* Java API. * Java API.
*/ */

View file

@ -106,14 +106,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
val implementationClass = component.target val implementationClass = component.target
val timeout = component.timeout val timeout = component.timeout
val (remoteAddress,actorRef) = val actorRef = Actor.actorOf(TypedActor.newTypedActor(implementationClass))
component.remoteAddress match {
case Some(a) =>
(Some(new InetSocketAddress(a.hostname, a.port)),
Actor.remote.actorOf(TypedActor.newTypedActor(implementationClass), a.hostname, a.port))
case None =>
(None, Actor.actorOf(TypedActor.newTypedActor(implementationClass)))
}
actorRef.timeout = timeout actorRef.timeout = timeout
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
@ -123,7 +116,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
AspectInitRegistry.register( AspectInitRegistry.register(
proxy, proxy,
AspectInit(interfaceClass, typedActor, actorRef, remoteAddress, timeout)) AspectInit(interfaceClass, typedActor, actorRef, None, timeout))
typedActor.initialize(proxy) typedActor.initialize(proxy)
actorRef.start actorRef.start