Refactored and changed boot of Cluster and ClusterDeployer.
Fixed problems with ClusterDeployerSpec and ClusterMultiJvmSpec. Removed all akka-remote tests and samples (needs to be rewritten later). Added Actor.cluster member field. Removed Actor.remote member field. Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
parent
b95382c3e2
commit
f0be165a07
34 changed files with 97 additions and 3339 deletions
|
|
@ -99,6 +99,15 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception
|
|||
*/
|
||||
object Actor extends ListenerManagement {
|
||||
|
||||
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
|
||||
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
|
||||
|
||||
/**
|
||||
* A Receive is a convenience type that defines actor message behavior currently modeled as
|
||||
* a PartialFunction[Any, Unit].
|
||||
*/
|
||||
type Receive = PartialFunction[Any, Unit]
|
||||
|
||||
/**
|
||||
* Add shutdown cleanups
|
||||
*/
|
||||
|
|
@ -116,29 +125,29 @@ object Actor extends ListenerManagement {
|
|||
hook
|
||||
}
|
||||
|
||||
val registry = new ActorRegistry
|
||||
|
||||
lazy val remote: RemoteSupport = {
|
||||
ReflectiveAccess
|
||||
.RemoteModule
|
||||
.defaultRemoteSupport
|
||||
.map(_())
|
||||
.getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath"))
|
||||
}
|
||||
|
||||
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
|
||||
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
|
||||
|
||||
/**
|
||||
* A Receive is a convenience type that defines actor message behavior currently modeled as
|
||||
* a PartialFunction[Any, Unit].
|
||||
*/
|
||||
type Receive = PartialFunction[Any, Unit]
|
||||
|
||||
private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] {
|
||||
override def initialValue = None
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle to the ActorRegistry.
|
||||
*/
|
||||
val registry = new ActorRegistry
|
||||
|
||||
/**
|
||||
* Handle to the ClusterNode. API for the cluster client.
|
||||
*/
|
||||
lazy val cluster: ClusterModule.ClusterNode = ClusterModule.node
|
||||
|
||||
/**
|
||||
* Handle to the RemoteSupport. API for the remote client/server.
|
||||
* Only for internal use.
|
||||
*/
|
||||
private[akka] lazy val remote: RemoteSupport = cluster.remoteService
|
||||
|
||||
// start up a cluster node to join the ZooKeeper cluster
|
||||
if (ClusterModule.isEnabled) cluster.start()
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor with type T.
|
||||
* <pre>
|
||||
|
|
|
|||
|
|
@ -614,6 +614,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
|
|||
} finally {
|
||||
currentMessage = null
|
||||
Actor.registry.unregister(this)
|
||||
|
||||
if (isRemotingEnabled)
|
||||
Actor.remote.unregister(this)
|
||||
|
||||
|
|
|
|||
|
|
@ -6,11 +6,10 @@ package akka.util
|
|||
|
||||
import akka.dispatch.{ Future, CompletableFuture, MessageInvocation }
|
||||
import akka.config.{ Config, ModuleNotAvailableException }
|
||||
|
||||
import akka.remoteinterface.RemoteSupport
|
||||
import akka.actor._
|
||||
import DeploymentConfig.Deploy
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.DeploymentConfig.Deploy
|
||||
import akka.serialization.Format
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
|
@ -82,6 +81,8 @@ object ReflectiveAccess {
|
|||
def start()
|
||||
def shutdown()
|
||||
|
||||
def remoteService: RemoteSupport
|
||||
|
||||
def store(address: String, actorClass: Class[_ <: Actor], replicas: Int, serializeMailbox: Boolean, format: Serializer)
|
||||
def store(actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean, format: Serializer)
|
||||
|
||||
|
|
|
|||
|
|
@ -312,15 +312,6 @@ object Cluster {
|
|||
def shutdownLocalCluster() {
|
||||
withPrintStackTraceOnError {
|
||||
EventHandler.info(this, "Shuts down local cluster")
|
||||
|
||||
node.disconnect()
|
||||
node.remoteService.shutdown()
|
||||
|
||||
implicit val zkClient = newZkClient
|
||||
ignore[ZkNoNodeException](zkClient.deleteRecursive("/" + name))
|
||||
ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode))
|
||||
zkClient.close()
|
||||
|
||||
_zkServer.get.foreach(_.shutdown())
|
||||
_zkServer.set(None)
|
||||
}
|
||||
|
|
@ -489,7 +480,7 @@ class ClusterNode private[akka] (
|
|||
this
|
||||
}
|
||||
|
||||
def stop() {
|
||||
def shutdown() {
|
||||
isConnected switchOff {
|
||||
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
|
||||
|
||||
|
|
@ -880,9 +871,7 @@ class ClusterNode private[akka] (
|
|||
|
||||
// FIXME remove?
|
||||
def refByUuid(uuid: UUID): ActorRef = {
|
||||
val actor = Router newRouter (router, addresses,
|
||||
uuidToString(uuid),
|
||||
Actor.TIMEOUT)
|
||||
val actor = Router newRouter (router, addresses, uuidToString(uuid), Actor.TIMEOUT)
|
||||
registerClusterActorRefForAddress(actor, addresses)
|
||||
actor
|
||||
}
|
||||
|
|
@ -890,9 +879,7 @@ class ClusterNode private[akka] (
|
|||
def refByAddress(actorAddress: String): ActorRef = {
|
||||
//FIXME: unused uuids
|
||||
val uuids = uuidsForActorAddress(actorAddress)
|
||||
val actor = Router newRouter (router, addresses,
|
||||
actorAddress,
|
||||
Actor.TIMEOUT)
|
||||
val actor = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
|
||||
registerClusterActorRefForAddress(actor, addresses)
|
||||
actor
|
||||
}
|
||||
|
|
@ -1208,6 +1195,7 @@ class ClusterNode private[akka] (
|
|||
|
||||
private[cluster] def initializeNode() {
|
||||
EventHandler.info(this, "Initializing cluster node [%s]".format(nodeAddress))
|
||||
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
|
||||
createRootClusterNode()
|
||||
val isLeader = joinLeaderElection
|
||||
if (isLeader) createNodeStructureIfNeeded()
|
||||
|
|
@ -1441,7 +1429,7 @@ class ClusterNode private[akka] (
|
|||
}
|
||||
|
||||
override def stop() {
|
||||
self.stop()
|
||||
self.shutdown()
|
||||
}
|
||||
|
||||
override def disconnect() = self.disconnect()
|
||||
|
|
@ -1628,7 +1616,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
|
||||
case START ⇒ cluster.start()
|
||||
|
||||
case STOP ⇒ cluster.stop()
|
||||
case STOP ⇒ cluster.shutdown()
|
||||
|
||||
case DISCONNECT ⇒ cluster.disconnect()
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import scala.collection.JavaConversions.collectionAsScalaIterable
|
|||
import com.eaio.uuid.UUID
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
/**
|
||||
* A ClusterDeployer is responsible for deploying a Deploy.
|
||||
|
|
@ -42,14 +43,12 @@ object ClusterDeployer {
|
|||
private val isConnected = new Switch(false)
|
||||
private val deploymentCompleted = new CountDownLatch(1)
|
||||
|
||||
private lazy val zkClient = {
|
||||
val zk = new AkkaZkClient(
|
||||
Cluster.zooKeeperServers,
|
||||
Cluster.sessionTimeout,
|
||||
Cluster.connectionTimeout,
|
||||
Cluster.defaultSerializer)
|
||||
EventHandler.info(this, "ClusterDeployer started")
|
||||
zk
|
||||
private val _zkClient = new AtomicReference[AkkaZkClient](null)
|
||||
|
||||
private def zkClient: AkkaZkClient = ensureRunning {
|
||||
val zk = _zkClient.get
|
||||
if (zk eq null) handleError(new IllegalStateException("No ZooKeeper client connection available"))
|
||||
else zk
|
||||
}
|
||||
|
||||
private val clusterDeploymentLockListener = new LockListener {
|
||||
|
|
@ -74,7 +73,13 @@ object ClusterDeployer {
|
|||
private val systemDeployments: List[Deploy] = Nil
|
||||
|
||||
private[akka] def init(deployments: List[Deploy]) {
|
||||
isConnected.switchOn {
|
||||
isConnected switchOn {
|
||||
_zkClient.compareAndSet(null, new AkkaZkClient(
|
||||
Cluster.zooKeeperServers,
|
||||
Cluster.sessionTimeout,
|
||||
Cluster.connectionTimeout,
|
||||
Cluster.defaultSerializer))
|
||||
|
||||
baseNodes.foreach { path ⇒
|
||||
try {
|
||||
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
|
||||
|
|
@ -101,51 +106,44 @@ object ClusterDeployer {
|
|||
}
|
||||
|
||||
def shutdown() {
|
||||
val zk = zkClient
|
||||
isConnected switchOff {
|
||||
undeployAll()
|
||||
zkClient.close()
|
||||
// undeploy all
|
||||
try {
|
||||
for {
|
||||
child ← collectionAsScalaIterable(zk.getChildren(deploymentPath))
|
||||
deployment ← zk.readData(deploymentAddressPath.format(child)).asInstanceOf[Deploy]
|
||||
} zk.delete(deploymentAddressPath.format(deployment.address))
|
||||
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
handleError(new DeploymentException("Could not undeploy all deployment data in ZooKeeper due to: " + e))
|
||||
}
|
||||
|
||||
// shut down ZooKeeper client
|
||||
zk.close()
|
||||
EventHandler.info(this, "ClusterDeployer shut down successfully")
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def deploy(deployment: Deploy) {
|
||||
val path = deploymentAddressPath.format(deployment.address)
|
||||
try {
|
||||
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
|
||||
zkClient.writeData(path, deployment)
|
||||
ensureRunning {
|
||||
val path = deploymentAddressPath.format(deployment.address)
|
||||
try {
|
||||
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
|
||||
zkClient.writeData(path, deployment)
|
||||
|
||||
// FIXME trigger cluster-wide deploy action
|
||||
} catch {
|
||||
case e: NullPointerException ⇒
|
||||
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
|
||||
case e: Exception ⇒
|
||||
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e))
|
||||
// FIXME trigger cluster-wide deploy action
|
||||
} catch {
|
||||
case e: NullPointerException ⇒
|
||||
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
|
||||
case e: Exception ⇒
|
||||
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def undeploy(deployment: Deploy) {
|
||||
try {
|
||||
zkClient.delete(deploymentAddressPath.format(deployment.address))
|
||||
|
||||
// FIXME trigger cluster-wide undeployment action
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
handleError(new DeploymentException("Could not undeploy deployment [" + deployment + "] in ZooKeeper due to: " + e))
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def undeployAll() {
|
||||
try {
|
||||
for {
|
||||
child ← collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
|
||||
deployment ← lookupDeploymentFor(child)
|
||||
} undeploy(deployment)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
handleError(new DeploymentException("Could not undeploy all deployment data in ZooKeeper due to: " + e))
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
|
||||
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = ensureRunning {
|
||||
try {
|
||||
Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy])
|
||||
} catch {
|
||||
|
|
@ -156,6 +154,11 @@ object ClusterDeployer {
|
|||
}
|
||||
}
|
||||
|
||||
private def ensureRunning[T](body: ⇒ T): T = {
|
||||
if (isConnected.isOn) body
|
||||
else throw new IllegalStateException("ClusterDeployer is not running")
|
||||
}
|
||||
|
||||
private[akka] def handleError(e: Throwable): Nothing = {
|
||||
EventHandler.error(e, this, e.toString)
|
||||
throw e
|
||||
|
|
|
|||
|
|
@ -7,8 +7,10 @@ import org.I0Itec.zkclient._
|
|||
import org.I0Itec.zkclient.serialize._
|
||||
import org.I0Itec.zkclient.exception._
|
||||
|
||||
import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* todo: what is the purpose of this class?
|
||||
* ZooKeeper client. Holds the ZooKeeper connection and manages its session.
|
||||
*/
|
||||
class AkkaZkClient(zkServers: String,
|
||||
sessionTimeout: Int,
|
||||
|
|
@ -16,6 +18,8 @@ class AkkaZkClient(zkServers: String,
|
|||
zkSerializer: ZkSerializer = new SerializableSerializer)
|
||||
extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) {
|
||||
|
||||
EventHandler.debug(this, "Connecting to ZooKeeper ensamble [%s]" format zkServers)
|
||||
|
||||
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
|
||||
|
||||
def reconnect() {
|
||||
|
|
|
|||
|
|
@ -49,18 +49,14 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
|
|||
try {
|
||||
zkServer = Cluster.startLocalCluster(dataPath, logPath)
|
||||
Thread.sleep(5000)
|
||||
Cluster.node.start()
|
||||
Actor.cluster.start()
|
||||
} catch {
|
||||
case e ⇒ e.printStackTrace()
|
||||
}
|
||||
}
|
||||
|
||||
override def beforeEach() {
|
||||
// Cluster.reset()
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
Cluster.node.stop()
|
||||
Actor.cluster.shutdown()
|
||||
ClusterDeployer.shutdown()
|
||||
Cluster.shutdownLocalCluster()
|
||||
Actor.registry.local.shutdownAll()
|
||||
|
|
|
|||
|
|
@ -835,10 +835,6 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with
|
|||
zkServer = Cluster.startLocalCluster(dataPath, logPath)
|
||||
}
|
||||
|
||||
override def beforeEach() = {
|
||||
Cluster.reset
|
||||
}
|
||||
|
||||
override def afterAll() = {
|
||||
Cluster.shutdownLocalCluster
|
||||
Actor.registry.local.shutdownAll
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -1,125 +0,0 @@
|
|||
package akka.serialization;
|
||||
|
||||
import org.junit.Test;
|
||||
import akka.actor.*;
|
||||
import akka.actor.serialization.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static akka.serialization.ActorSerialization.*;
|
||||
|
||||
class SerializationTestActorFormat implements StatelessActorFormat<SerializationTestActor> {
|
||||
@Override
|
||||
public SerializationTestActor fromBinary(byte[] bytes, SerializationTestActor act) {
|
||||
return (SerializationTestActor) StatelessActorFormat$class.fromBinary(this, bytes, act);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBinary(SerializationTestActor ac) {
|
||||
return StatelessActorFormat$class.toBinary(this, ac);
|
||||
}
|
||||
}
|
||||
|
||||
class MyUntypedActorFormat implements Format<MyUntypedActor> {
|
||||
@Override
|
||||
public MyUntypedActor fromBinary(byte[] bytes, MyUntypedActor act) {
|
||||
ProtobufProtocol.Counter p =
|
||||
(ProtobufProtocol.Counter) new SerializerFactory().getProtobuf().fromBinary(bytes, ProtobufProtocol.Counter.class);
|
||||
act.count_$eq(p.getCount());
|
||||
return act;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBinary(MyUntypedActor ac) {
|
||||
return ProtobufProtocol.Counter.newBuilder().setCount(ac.count()).build().toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public class SerializationTest {
|
||||
/*
|
||||
@Test public void mustBeAbleToSerializeAfterCreateActorRefFromClass() {
|
||||
ActorRef ref = Actors.actorOf(SerializationTestActor.class);
|
||||
assertNotNull(ref);
|
||||
ref.start();
|
||||
try {
|
||||
Object result = ref.sendRequestReply("Hello");
|
||||
assertEquals("got it!", result);
|
||||
} catch (ActorTimeoutException ex) {
|
||||
fail("actor should not time out");
|
||||
}
|
||||
|
||||
Format<SerializationTestActor> f = new SerializationTestActorFormat();
|
||||
byte[] bytes = toBinaryJ(ref, f, false);
|
||||
ActorRef r = fromBinaryJ(bytes, f);
|
||||
assertNotNull(r);
|
||||
r.start();
|
||||
try {
|
||||
Object result = r.sendRequestReply("Hello");
|
||||
assertEquals("got it!", result);
|
||||
} catch (ActorTimeoutException ex) {
|
||||
fail("actor should not time out");
|
||||
}
|
||||
ref.stop();
|
||||
r.stop();
|
||||
}
|
||||
|
||||
@Test public void mustBeAbleToSerializeAfterCreateActorRefFromFactory() {
|
||||
ActorRef ref = Actors.actorOf(new UntypedActorFactory() {
|
||||
public Actor create() {
|
||||
return new SerializationTestActor();
|
||||
}
|
||||
});
|
||||
assertNotNull(ref);
|
||||
ref.start();
|
||||
try {
|
||||
Object result = ref.sendRequestReply("Hello");
|
||||
assertEquals("got it!", result);
|
||||
} catch (ActorTimeoutException ex) {
|
||||
fail("actor should not time out");
|
||||
}
|
||||
|
||||
Format<SerializationTestActor> f = new SerializationTestActorFormat();
|
||||
byte[] bytes = toBinaryJ(ref, f, false);
|
||||
ActorRef r = fromBinaryJ(bytes, f);
|
||||
assertNotNull(r);
|
||||
r.start();
|
||||
try {
|
||||
Object result = r.sendRequestReply("Hello");
|
||||
assertEquals("got it!", result);
|
||||
} catch (ActorTimeoutException ex) {
|
||||
fail("actor should not time out");
|
||||
}
|
||||
ref.stop();
|
||||
r.stop();
|
||||
}
|
||||
|
||||
@Test public void mustBeAbleToSerializeAStatefulActor() {
|
||||
ActorRef ref = Actors.actorOf(MyUntypedActor.class);
|
||||
assertNotNull(ref);
|
||||
ref.start();
|
||||
try {
|
||||
Object result = ref.sendRequestReply("hello");
|
||||
assertEquals("world 1", result);
|
||||
result = ref.sendRequestReply("hello");
|
||||
assertEquals("world 2", result);
|
||||
} catch (ActorTimeoutException ex) {
|
||||
fail("actor should not time out");
|
||||
}
|
||||
|
||||
Format<MyUntypedActor> f = new MyUntypedActorFormat();
|
||||
byte[] bytes = toBinaryJ(ref, f, false);
|
||||
ActorRef r = fromBinaryJ(bytes, f);
|
||||
assertNotNull(r);
|
||||
r.start();
|
||||
try {
|
||||
Object result = r.sendRequestReply("hello");
|
||||
assertEquals("world 3", result);
|
||||
result = r.sendRequestReply("hello");
|
||||
assertEquals("world 4", result);
|
||||
} catch (ActorTimeoutException ex) {
|
||||
fail("actor should not time out");
|
||||
}
|
||||
ref.stop();
|
||||
r.stop();
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
package akka.serialization;
|
||||
|
||||
import akka.actor.UntypedActor;
|
||||
|
||||
public class SerializationTestActor extends UntypedActor {
|
||||
public void onReceive(Object msg) {
|
||||
getContext().replySafe("got it!");
|
||||
}
|
||||
}
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor;
|
||||
|
||||
/*
|
||||
Compile with:
|
||||
cd ./akka-remote/src/test/protocol
|
||||
protoc ProtobufProtocol.proto --java_out ../java
|
||||
*/
|
||||
|
||||
message ProtobufPOJO {
|
||||
required uint64 id = 1;
|
||||
required string name = 2;
|
||||
required bool status = 3;
|
||||
}
|
||||
|
||||
message Counter {
|
||||
required uint32 count = 1;
|
||||
}
|
||||
|
||||
message DualCounter {
|
||||
required uint32 count1 = 1;
|
||||
required uint32 count2 = 2;
|
||||
}
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
||||
case class User(val usernamePassword: Tuple2[String, String],
|
||||
val email: String,
|
||||
val age: Int) extends java.io.Serializable
|
||||
|
||||
case object RemotePing extends TestMessage
|
||||
case object RemotePong extends TestMessage
|
||||
case object RemoteOneWay extends TestMessage
|
||||
case object RemoteDie extends TestMessage
|
||||
case object RemoteNotifySupervisorExit extends TestMessage
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ConfigSpec extends WordSpec with MustMatchers {
|
||||
|
||||
"The default configuration file (i.e. akka-reference.conf)" should {
|
||||
"contain all configuration properties for akka-remote that are used in code with their correct defaults" in {
|
||||
import Config.config._
|
||||
|
||||
getInt("akka.remote.client.message-frame-size") must equal(Some(1048576))
|
||||
getInt("akka.remote.client.read-timeout") must equal(Some(10))
|
||||
getInt("akka.remote.client.reap-futures-delay") must equal(Some(5))
|
||||
getInt("akka.remote.client.reconnect-delay") must equal(Some(5))
|
||||
getInt("akka.remote.client.reconnection-time-window") must equal(Some(600))
|
||||
|
||||
getString("akka.remote.compression-scheme") must equal(Some("zlib"))
|
||||
getString("akka.remote.secure-cookie") must equal(Some(""))
|
||||
|
||||
getInt("akka.remote.server.backlog") must equal(Some(4096))
|
||||
getInt("akka.remote.server.connection-timeout") must equal(Some(1))
|
||||
getString("akka.remote.server.hostname") must equal(Some("localhost"))
|
||||
getInt("akka.remote.server.message-frame-size") must equal(Some(1048576))
|
||||
getInt("akka.remote.server.port") must equal(Some(2552))
|
||||
getBool("akka.remote.server.require-cookie") must equal(Some(false))
|
||||
getBool("akka.remote.server.untrusted-mode") must equal(Some(false))
|
||||
|
||||
getBool("akka.remote.ssl.debug") must equal(None)
|
||||
getBool("akka.remote.ssl.service") must equal(None)
|
||||
getInt("akka.remote.zlib-compression-level") must equal(Some(6))
|
||||
getInt("akka.remote.server.execution-pool-size") must equal(Some(16))
|
||||
getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60))
|
||||
getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0))
|
||||
getInt("akka.remote.server.max-total-memory-size") must equal(Some(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,151 +0,0 @@
|
|||
package akka.actor.remote
|
||||
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import akka.remote.netty.NettyRemoteSupport
|
||||
import akka.actor.{ Actor, ActorRegistry }
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
import org.scalatest.{ Spec, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
object AkkaRemoteTest {
|
||||
class ReplyHandlerActor(latch: CountDownLatch, expect: String) extends Actor {
|
||||
def receive = {
|
||||
case x: String if x == expect ⇒ latch.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class AkkaRemoteTest extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||
import AkkaRemoteTest._
|
||||
|
||||
val remote = Actor.remote
|
||||
val unit = TimeUnit.SECONDS
|
||||
|
||||
val host = "localhost"
|
||||
val port = 25520
|
||||
|
||||
def OptimizeLocal = false
|
||||
|
||||
var optimizeLocal_? = remote.asInstanceOf[NettyRemoteSupport].optimizeLocalScoped_?
|
||||
|
||||
override def beforeAll {
|
||||
if (!OptimizeLocal)
|
||||
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(false) //Can't run the test if we're eliminating all remote calls
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
if (!OptimizeLocal)
|
||||
remote.asInstanceOf[NettyRemoteSupport].optimizeLocal.set(optimizeLocal_?) //Reset optimizelocal after all tests
|
||||
}
|
||||
|
||||
override def beforeEach() {
|
||||
remote.start(host, port)
|
||||
super.beforeEach
|
||||
}
|
||||
|
||||
override def afterEach() {
|
||||
remote.shutdown()
|
||||
Actor.registry.local.shutdownAll()
|
||||
super.afterEach()
|
||||
}
|
||||
|
||||
/* Utilities */
|
||||
|
||||
def replyHandler(latch: CountDownLatch, expect: String) = Some(Actor.actorOf(new ReplyHandlerActor(latch, expect)).start())
|
||||
}
|
||||
|
||||
trait NetworkFailureTest { self: WordSpec ⇒
|
||||
import akka.actor.Actor._
|
||||
import akka.util.Duration
|
||||
|
||||
// override is subclass if needed
|
||||
val BYTES_PER_SECOND = "60KByte/s"
|
||||
val DELAY_MILLIS = "350ms"
|
||||
val PORT_RANGE = "1024-65535"
|
||||
|
||||
// FIXME add support for TCP FIN by hooking into Netty and do socket.close
|
||||
|
||||
def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = {
|
||||
spawn {
|
||||
try {
|
||||
enableTcpReset()
|
||||
println("===>>> Reply with [TCP RST] for [" + duration + "]")
|
||||
Thread.sleep(duration.toMillis)
|
||||
restoreIP
|
||||
} catch {
|
||||
case e ⇒
|
||||
dead.set(true)
|
||||
e.printStackTrace
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def throttleNetworkFor(duration: Duration, dead: AtomicBoolean) = {
|
||||
spawn {
|
||||
try {
|
||||
enableNetworkThrottling()
|
||||
println("===>>> Throttling network with [" + BYTES_PER_SECOND + ", " + DELAY_MILLIS + "] for [" + duration + "]")
|
||||
Thread.sleep(duration.toMillis)
|
||||
restoreIP
|
||||
} catch {
|
||||
case e ⇒
|
||||
dead.set(true)
|
||||
e.printStackTrace
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def dropNetworkFor(duration: Duration, dead: AtomicBoolean) = {
|
||||
spawn {
|
||||
try {
|
||||
enableNetworkDrop()
|
||||
println("===>>> Blocking network [TCP DENY] for [" + duration + "]")
|
||||
Thread.sleep(duration.toMillis)
|
||||
restoreIP
|
||||
} catch {
|
||||
case e ⇒
|
||||
dead.set(true)
|
||||
e.printStackTrace
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def sleepFor(duration: Duration) = {
|
||||
println("===>>> Sleeping for [" + duration + "]")
|
||||
Thread sleep (duration.toMillis)
|
||||
}
|
||||
|
||||
def enableNetworkThrottling() = {
|
||||
restoreIP()
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0)
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0)
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "pipe", "1", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0)
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "pipe", "2", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0)
|
||||
}
|
||||
|
||||
def enableNetworkDrop() = {
|
||||
restoreIP()
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0)
|
||||
}
|
||||
|
||||
def enableTcpReset() = {
|
||||
restoreIP()
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0)
|
||||
}
|
||||
|
||||
def restoreIP() = {
|
||||
println("===>>> Restoring network")
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "1").start.waitFor == 0)
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "2").start.waitFor == 0)
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "flush").start.waitFor == 0)
|
||||
assert(new ProcessBuilder("sudo", "ipfw", "pipe", "flush").start.waitFor == 0)
|
||||
}
|
||||
|
||||
def validateSudo() = {
|
||||
println("===>>> Validating sudo")
|
||||
assert(new ProcessBuilder("sudo", "-v").start.waitFor == 0)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
package akka.actor.remote
|
||||
|
||||
import akka.actor.{ Actor }
|
||||
|
||||
object OptimizedLocalScopedSpec {
|
||||
class TestActor extends Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
}
|
||||
|
||||
class OptimizedLocalScopedSpec extends AkkaRemoteTest {
|
||||
import OptimizedLocalScopedSpec._
|
||||
override def OptimizeLocal = true
|
||||
|
||||
"An enabled optimized local scoped remote" should {
|
||||
"Fetch local actor ref when scope is local" in {
|
||||
val fooActor = Actor.actorOf[TestActor].start()
|
||||
remote.register("foo", fooActor)
|
||||
|
||||
remote.actorFor("foo", host, port) must be(fooActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,119 +0,0 @@
|
|||
package akka.actor.remote
|
||||
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.util.duration._
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
object RemoteErrorHandlingNetworkTest {
|
||||
case class Send(actor: ActorRef)
|
||||
|
||||
class RemoteActorSpecActorUnidirectional extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "Ping" ⇒ self.reply_?("Pong")
|
||||
}
|
||||
}
|
||||
|
||||
class Decrementer extends Actor {
|
||||
def receive = {
|
||||
case "done" ⇒ self.reply_?(false)
|
||||
case i: Int if i > 0 ⇒
|
||||
self.reply_?(i - 1)
|
||||
case i: Int ⇒
|
||||
self.reply_?(0)
|
||||
this become {
|
||||
case "done" ⇒ self.reply_?(true)
|
||||
case _ ⇒ //Do Nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteActorSpecActorBidirectional extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "Hello" ⇒
|
||||
self.reply("World")
|
||||
case "Failure" ⇒
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteActorSpecActorAsyncSender(latch: CountDownLatch) extends Actor {
|
||||
def receive = {
|
||||
case Send(actor: ActorRef) ⇒
|
||||
actor ! "Hello"
|
||||
case "World" ⇒ latch.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureTest {
|
||||
import RemoteErrorHandlingNetworkTest._
|
||||
|
||||
"RemoteModule actors" should {
|
||||
|
||||
"be able to recover from network drop without loosing any messages" in {
|
||||
validateSudo()
|
||||
val latch = new CountDownLatch(10)
|
||||
implicit val sender = replyHandler(latch, "Pong")
|
||||
val service = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
remote.register(service.address, service)
|
||||
val actor = remote.actorFor(service.address, 5000L, host, port)
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
val dead = new AtomicBoolean(false)
|
||||
dropNetworkFor(10 seconds, dead) // drops the network - in another thread - so async
|
||||
sleepFor(2 seconds) // wait until network drop is done before sending the other messages
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // queue up messages
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
latch.await(15, TimeUnit.SECONDS) must be(true) // network should be restored and the messages delivered
|
||||
dead.get must be(false)
|
||||
}
|
||||
|
||||
"be able to recover from TCP RESET without loosing any messages" in {
|
||||
validateSudo()
|
||||
val latch = new CountDownLatch(10)
|
||||
implicit val sender = replyHandler(latch, "Pong")
|
||||
val service = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
remote.register(service.address, service)
|
||||
val actor = remote.actorFor(service.address, 5000L, host, port)
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
actor ! "Ping"
|
||||
val dead = new AtomicBoolean(false)
|
||||
replyWithTcpResetFor(10 seconds, dead)
|
||||
sleepFor(2 seconds)
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // queue up messages
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
try { actor ! "Ping" } catch { case e ⇒ () } // ...
|
||||
latch.await(15, TimeUnit.SECONDS) must be(true)
|
||||
dead.get must be(false)
|
||||
}
|
||||
/*
|
||||
"sendWithBangAndGetReplyThroughSenderRef" in {
|
||||
remote.register(actorOf[RemoteActorSpecActorBidirectional])
|
||||
implicit val timeout = 500000000L
|
||||
val actor = remote.actorFor(
|
||||
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port)
|
||||
val latch = new CountDownLatch(1)
|
||||
val sender = actorOf( new RemoteActorSpecActorAsyncSender(latch) ).start()
|
||||
sender ! Send(actor)
|
||||
latch.await(1, TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,435 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor.remote
|
||||
|
||||
import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit, BlockingQueue }
|
||||
import akka.config.Supervision._
|
||||
import akka.OneWay
|
||||
import org.scalatest._
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.actor.{ SupervisorFactory, Supervisor, ActorRef, Actor }
|
||||
import Actor._
|
||||
|
||||
object Log {
|
||||
val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
val oneWayLog = new LinkedBlockingQueue[String]
|
||||
|
||||
def clearMessageLogs {
|
||||
messageLog.clear
|
||||
oneWayLog.clear
|
||||
}
|
||||
}
|
||||
|
||||
class RemotePingPong1Actor extends Actor with scala.Serializable {
|
||||
def receive = {
|
||||
case "Ping" ⇒
|
||||
Log.messageLog.put("ping")
|
||||
self.reply("pong")
|
||||
|
||||
case OneWay ⇒
|
||||
Log.oneWayLog.put("oneway")
|
||||
|
||||
case "Die" ⇒
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
Log.messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
class RemotePingPong2Actor extends Actor with scala.Serializable {
|
||||
def receive = {
|
||||
case "Ping" ⇒
|
||||
Log.messageLog.put("ping")
|
||||
self.reply("pong")
|
||||
case "Die" ⇒
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
Log.messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
class RemotePingPong3Actor extends Actor with scala.Serializable {
|
||||
def receive = {
|
||||
case "Ping" ⇒
|
||||
Log.messageLog.put("ping")
|
||||
self.reply("pong")
|
||||
case "Die" ⇒
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
Log.messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
/*class RemoteSupervisorSpec extends AkkaRemoteTest {
|
||||
|
||||
var pingpong1: ActorRef = _
|
||||
var pingpong2: ActorRef = _
|
||||
var pingpong3: ActorRef = _
|
||||
|
||||
import Log._
|
||||
|
||||
"RemoteModule supervision" should {
|
||||
|
||||
"start server" in {
|
||||
Log.messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
|
||||
(pingpong1 !! "Ping") must equal (Some("pong"))
|
||||
}
|
||||
|
||||
"StartServerForNestedSupervisorHierarchy" in {
|
||||
clearMessageLogs
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup.start
|
||||
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
"killSingleActorOneForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
|
||||
(pingpong1 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
"callKillCallSingleActorOneForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
|
||||
(pingpong1 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
}
|
||||
|
||||
"KillSingleActorAllForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
|
||||
(pingpong1 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
"CallKillCallSingleActorAllForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
|
||||
(pingpong1 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
}
|
||||
|
||||
"KillMultipleActorsOneForOne1" in {
|
||||
clearMessageLogs
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
|
||||
(pingpong1 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
"KillCallMultipleActorsOneForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
(pingpong2 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
(pingpong3 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
|
||||
(pingpong2 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
(pingpong2 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
(pingpong3 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
}
|
||||
|
||||
"KillMultipleActorsAllForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
|
||||
(pingpong2 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
|
||||
"CallKillCallMultipleActorsAllForOne" in {
|
||||
clearMessageLogs
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
|
||||
pingpong1 !! ("Ping", 5000) must equal (Some("pong"))
|
||||
pingpong2 !! ("Ping", 5000) must equal (Some("pong"))
|
||||
pingpong3 !! ("Ping", 5000) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
|
||||
(pingpong2 !!! ("Die", 5000)).await.exception.isDefined must be (true)
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("Expected exception; to test fault-tolerance")
|
||||
|
||||
pingpong1 !! ("Ping", 5000) must equal (Some("pong"))
|
||||
pingpong2 !! ("Ping", 5000) must equal (Some("pong"))
|
||||
pingpong3 !! ("Ping", 5000) must equal (Some("pong"))
|
||||
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) must equal ("ping")
|
||||
}
|
||||
}
|
||||
|
||||
def getSingleActorAllForOneSupervisor: Supervisor = {
|
||||
|
||||
// Create an abstract SupervisorContainer that works for all implementations
|
||||
// of the different Actors (Services).
|
||||
//
|
||||
// Then create a concrete container in which we mix in support for the specific
|
||||
// implementation of the Actors we want to use.
|
||||
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start()
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
|
||||
factory.newInstance
|
||||
}
|
||||
|
||||
def getSingleActorOneForOneSupervisor: Supervisor = {
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start()
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance
|
||||
}
|
||||
|
||||
def getMultipleActorsAllForOneConf: Supervisor = {
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start()
|
||||
pingpong2 = remote.actorOf[RemotePingPong2Actor](host,port).start()
|
||||
pingpong3 = remote.actorOf[RemotePingPong3Actor](host,port).start()
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong3,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance
|
||||
}
|
||||
|
||||
def getMultipleActorsOneForOneConf: Supervisor = {
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start()
|
||||
pingpong2 = remote.actorOf[RemotePingPong2Actor](host,port).start()
|
||||
pingpong3 = remote.actorOf[RemotePingPong3Actor](host,port).start()
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong3,
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance
|
||||
}
|
||||
|
||||
def getNestedSupervisorsAllForOneConf: Supervisor = {
|
||||
pingpong1 = remote.actorOf[RemotePingPong1Actor](host,port).start()
|
||||
pingpong2 = remote.actorOf[RemotePingPong2Actor](host,port).start()
|
||||
pingpong3 = remote.actorOf[RemotePingPong3Actor](host,port).start()
|
||||
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
pingpong1,
|
||||
Permanent)
|
||||
::
|
||||
SupervisorConfig(
|
||||
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
pingpong2,
|
||||
Permanent)
|
||||
::
|
||||
Supervise(
|
||||
pingpong3,
|
||||
Permanent)
|
||||
:: Nil)
|
||||
:: Nil))
|
||||
factory.newInstance
|
||||
}
|
||||
|
||||
// Uncomment when the same test passes in SupervisorSpec - pending bug
|
||||
@Test def shouldKillMultipleActorsOneForOne2 = {
|
||||
clearMessageLogs
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 !! ("Die", 5000)
|
||||
}
|
||||
|
||||
expect("Expected exception; to test fault-tolerance") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test def shouldOneWayKillSingleActorOneForOne = {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
|
||||
pingpong1 ! "Die"
|
||||
|
||||
expect("Expected exception; to test fault-tolerance") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
|
||||
clearMessageLogs
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
|
||||
pingpong1 ! OneWay
|
||||
|
||||
expect("oneway") {
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
pingpong1 ! "Die"
|
||||
|
||||
expect("Expected exception; to test fault-tolerance") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
pingpong1 ! OneWay
|
||||
|
||||
expect("oneway") {
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = {
|
||||
clearMessageLogs
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
|
||||
|
||||
expect("pong") {
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! ("Die", 5000)
|
||||
}
|
||||
|
||||
expect("Expected exception; to test fault-tolerance") {
|
||||
messageLog.poll(5 , TimeUnit.SECONDS)
|
||||
}
|
||||
expect("Expected exception; to test fault-tolerance") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("Expected exception; to test fault-tolerance") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! ("Ping", 5000)) must equal (Some("pong"))
|
||||
}
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
|
@ -1,55 +0,0 @@
|
|||
package akka.actor.remote
|
||||
|
||||
import akka.actor.Actor
|
||||
import Actor._
|
||||
import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* ***********************************
|
||||
* Instructions how to run the sample:
|
||||
*
|
||||
* Download Akka distribution.
|
||||
* Unzip and step into the Akka root dir
|
||||
* Set AKKA_HOME. For exampe 'export AKKA_HOME=`pwd`
|
||||
*
|
||||
* Then open up two shells and in each run:
|
||||
* sbt
|
||||
* > project akka-remote
|
||||
* > console
|
||||
*
|
||||
* Then paste in the code below into both shells.
|
||||
*
|
||||
* Then run:
|
||||
* ServerInitiatedRemoteActorServer.run() in one shell
|
||||
* ServerInitiatedRemoteActorClient.run() in the other shell
|
||||
* Have fun.
|
||||
* ***********************************
|
||||
*/
|
||||
|
||||
class HelloWorldActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" ⇒ self.reply("World")
|
||||
}
|
||||
}
|
||||
|
||||
object ServerInitiatedRemoteActorServer {
|
||||
|
||||
def run() {
|
||||
remote.start("localhost", 2552)
|
||||
remote.register("hello-service", actorOf[HelloWorldActor])
|
||||
}
|
||||
|
||||
def main(args: Array[String]) { run() }
|
||||
}
|
||||
|
||||
object ServerInitiatedRemoteActorClient {
|
||||
|
||||
def run() {
|
||||
val actor = remote.actorFor("hello-service", "localhost", 2552)
|
||||
val result = actor !! "Hello"
|
||||
EventHandler.info("Result from Remote Actor: %s", result)
|
||||
}
|
||||
|
||||
def main(args: Array[String]) { run() }
|
||||
}
|
||||
|
||||
|
|
@ -1,207 +0,0 @@
|
|||
package akka.actor.remote
|
||||
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{ ActorRegistry, ActorRef, Actor }
|
||||
|
||||
object ServerInitiatedRemoteActorSpec {
|
||||
case class Send(actor: ActorRef)
|
||||
|
||||
class RemoteActorSpecActorUnidirectional extends Actor {
|
||||
def receive = {
|
||||
case "Ping" ⇒ self.reply_?("Pong")
|
||||
}
|
||||
}
|
||||
|
||||
class Decrementer extends Actor {
|
||||
def receive = {
|
||||
case "done" ⇒ self.reply_?(false)
|
||||
case i: Int if i > 0 ⇒
|
||||
self.reply_?(i - 1)
|
||||
case i: Int ⇒
|
||||
self.reply_?(0)
|
||||
this become {
|
||||
case "done" ⇒ self.reply_?(true)
|
||||
case _ ⇒ //Do Nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteActorSpecActorBidirectional extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "Hello" ⇒
|
||||
self.reply("World")
|
||||
case "Failure" ⇒
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteActorSpecActorAsyncSender(latch: CountDownLatch) extends Actor {
|
||||
def receive = {
|
||||
case Send(actor: ActorRef) ⇒
|
||||
actor ! "Hello"
|
||||
case "World" ⇒ latch.countDown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest {
|
||||
import ServerInitiatedRemoteActorSpec._
|
||||
"Server-managed remote actors" should {
|
||||
/*
|
||||
"sendWithBang" in {
|
||||
val latch = new CountDownLatch(1)
|
||||
implicit val sender = replyHandler(latch, "Pong")
|
||||
remote.register(actorOf[RemoteActorSpecActorUnidirectional])
|
||||
val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional",5000L,host, port)
|
||||
|
||||
actor ! "Ping"
|
||||
latch.await(1, TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
|
||||
"sendWithBangBangAndGetReply" in {
|
||||
remote.register(actorOf[RemoteActorSpecActorBidirectional])
|
||||
val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", 5000L,host, port)
|
||||
(actor !! "Hello").as[String].get must equal ("World")
|
||||
}
|
||||
|
||||
"sendWithBangAndGetReplyThroughSenderRef" in {
|
||||
remote.register(actorOf[RemoteActorSpecActorBidirectional])
|
||||
implicit val timeout = 500000000L
|
||||
val actor = remote.actorFor(
|
||||
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout,host, port)
|
||||
val latch = new CountDownLatch(1)
|
||||
val sender = actorOf( new RemoteActorSpecActorAsyncSender(latch) ).start()
|
||||
sender ! Send(actor)
|
||||
latch.await(1, TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
|
||||
"sendWithBangBangAndReplyWithException" in {
|
||||
remote.register(actorOf[RemoteActorSpecActorBidirectional])
|
||||
implicit val timeout = 500000000L
|
||||
val actor = remote.actorFor(
|
||||
"akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port)
|
||||
try {
|
||||
actor !! "Failure"
|
||||
fail("Should have thrown an exception")
|
||||
} catch {
|
||||
case e => e.getMessage must equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
||||
"notRecreateRegisteredActor" in {
|
||||
val latch = new CountDownLatch(1)
|
||||
implicit val sender = replyHandler(latch, "Pong")
|
||||
remote.register(actorOf[RemoteActorSpecActorUnidirectional])
|
||||
val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port)
|
||||
val numberOfActorsInRegistry = Actor.registry.local.actors.length
|
||||
actor ! "Ping"
|
||||
latch.await(1, TimeUnit.SECONDS) must be (true)
|
||||
numberOfActorsInRegistry must equal (Actor.registry.local.actors.length)
|
||||
}
|
||||
|
||||
"UseServiceNameAsIdForRemoteActorRef" in {
|
||||
val latch = new CountDownLatch(3)
|
||||
implicit val sender = replyHandler(latch, "Pong")
|
||||
remote.register(actorOf[RemoteActorSpecActorUnidirectional])
|
||||
remote.register("my-service", actorOf[RemoteActorSpecActorUnidirectional])
|
||||
val actor1 = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port)
|
||||
val actor2 = remote.actorFor("my-service", host, port)
|
||||
val actor3 = remote.actorFor("my-service", host, port)
|
||||
|
||||
actor1 ! "Ping"
|
||||
actor2 ! "Ping"
|
||||
actor3 ! "Ping"
|
||||
|
||||
latch.await(1, TimeUnit.SECONDS) must be (true)
|
||||
actor1.uuid must not equal actor2.uuid
|
||||
actor1.uuid must not equal actor3.uuid
|
||||
actor1.address must not equal actor2.address
|
||||
actor2.address must equal (actor3.address)
|
||||
}
|
||||
|
||||
"shouldFindActorByUuid" in {
|
||||
val latch = new CountDownLatch(2)
|
||||
implicit val sender = replyHandler(latch, "Pong")
|
||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
val actor2 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
remote.register("uuid:" + actor1.uuid, actor1)
|
||||
remote.register("my-service", actor2)
|
||||
|
||||
val ref1 = remote.actorFor("uuid:" + actor1.uuid, host, port)
|
||||
val ref2 = remote.actorFor("my-service", host, port)
|
||||
|
||||
ref1 ! "Ping"
|
||||
ref2 ! "Ping"
|
||||
latch.await(1, TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
|
||||
"shouldRegisterAndUnregister" in {
|
||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
|
||||
remote.register("my-service-1", actor1)
|
||||
remote.actors.get("my-service-1") must not be null
|
||||
|
||||
remote.unregister("my-service-1")
|
||||
remote.actors.get("my-service-1") must be (null)
|
||||
}
|
||||
|
||||
"shouldRegisterAndUnregisterByUuid" in {
|
||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
val uuid = "uuid:" + actor1.uuid
|
||||
|
||||
remote.register(uuid, actor1)
|
||||
remote.actorsByUuid.get(actor1.uuid.toString) must not be null
|
||||
|
||||
remote.unregister(uuid)
|
||||
remote.actorsByUuid.get(actor1.uuid) must be (null)
|
||||
}
|
||||
|
||||
"shouldHandleOneWayReplyThroughPassiveRemoteClient" in {
|
||||
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
|
||||
remote.register("foo", actor1)
|
||||
val latch = new CountDownLatch(1)
|
||||
val actor2 = actorOf(new Actor { def receive = { case "Pong" => latch.countDown() } }).start()
|
||||
|
||||
val remoteActor = remote.actorFor("foo", host, port)
|
||||
remoteActor.!("Ping")(Some(actor2))
|
||||
latch.await(3,TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* FIXME rewrite after new registry changes
|
||||
* "should be able to remotely communicate between 2 server-managed actors" in {
|
||||
* val localFoo = actorOf[Decrementer]
|
||||
* val localBar = actorOf[Decrementer]
|
||||
* remote.register("foo", localFoo)
|
||||
* remote.register("bar", localBar)
|
||||
*
|
||||
* val remoteFoo = remote.actorFor("foo", host, port)
|
||||
* val remoteBar = remote.actorFor("bar", host, port)
|
||||
*
|
||||
* //Seed the start
|
||||
* remoteFoo.!(10)(Some(remoteBar))
|
||||
*
|
||||
* val latch = new CountDownLatch(100)
|
||||
*
|
||||
* def testDone() = (remoteFoo !! "done").as[Boolean].getOrElse(false) &&
|
||||
* (remoteBar !! "done").as[Boolean].getOrElse(false)
|
||||
*
|
||||
* while(!testDone()) {
|
||||
* if (latch.await(200, TimeUnit.MILLISECONDS))
|
||||
* sys.error("Test didn't complete within 100 cycles")
|
||||
* else
|
||||
* latch.countDown()
|
||||
* }
|
||||
*
|
||||
* val decrementer = Actor.registry.local.actorFor[Decrementer]
|
||||
* decrementers.find( _ eq localFoo) must equal (Some(localFoo))
|
||||
* decrementers.find( _ eq localBar) must equal (Some(localBar))
|
||||
* }
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,96 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor.remote
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import java.util.concurrent.{ ConcurrentSkipListSet, TimeUnit }
|
||||
import akka.remote.netty.NettyRemoteSupport
|
||||
|
||||
object ServerInitiatedRemoteSessionActorSpec {
|
||||
|
||||
case class Login(user: String)
|
||||
case class GetUser()
|
||||
case class DoSomethingFunny()
|
||||
|
||||
val instantiatedSessionActors = new ConcurrentSkipListSet[ActorRef]()
|
||||
|
||||
class RemoteStatefullSessionActorSpec extends Actor {
|
||||
|
||||
override def preStart() = instantiatedSessionActors.add(self)
|
||||
override def postStop() = instantiatedSessionActors.remove(self)
|
||||
var user: String = "anonymous"
|
||||
|
||||
def receive = {
|
||||
case Login(user) ⇒ this.user = user
|
||||
case GetUser() ⇒ self.reply(this.user)
|
||||
case DoSomethingFunny() ⇒ throw new Exception("Bad boy")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest {
|
||||
import ServerInitiatedRemoteSessionActorSpec._
|
||||
|
||||
"A remote session Actor" should {
|
||||
/*
|
||||
"create a new session actor per connection" in {
|
||||
remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
|
||||
|
||||
val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
|
||||
|
||||
val default1 = session1 !! GetUser()
|
||||
default1.as[String] must equal (Some("anonymous"))
|
||||
|
||||
session1 ! Login("session[1]")
|
||||
val result1 = session1 !! GetUser()
|
||||
result1.as[String] must equal (Some("session[1]"))
|
||||
|
||||
remote.shutdownClientModule()
|
||||
|
||||
val session2 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
|
||||
|
||||
// since this is a new session, the server should reset the state
|
||||
val default2 = session2 !! GetUser()
|
||||
default2.as[String] must equal (Some("anonymous"))
|
||||
}
|
||||
|
||||
"stop the actor when the client disconnects" in {
|
||||
instantiatedSessionActors.clear
|
||||
remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
|
||||
val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
|
||||
|
||||
val default1 = session1 !! GetUser()
|
||||
default1.as[String] must equal (Some("anonymous"))
|
||||
|
||||
instantiatedSessionActors must have size (1)
|
||||
remote.shutdownClientModule()
|
||||
Thread.sleep(1000)
|
||||
instantiatedSessionActors must have size (0)
|
||||
}
|
||||
|
||||
"stop the actor when there is an error" in {
|
||||
instantiatedSessionActors.clear
|
||||
remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
|
||||
val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
|
||||
|
||||
session1 ! DoSomethingFunny()
|
||||
session1.stop()
|
||||
Thread.sleep(1000)
|
||||
|
||||
instantiatedSessionActors must have size (0)
|
||||
}
|
||||
|
||||
"be able to unregister" in {
|
||||
remote.registerPerSession("my-service-1", actorOf[RemoteStatefullSessionActorSpec])
|
||||
remote.asInstanceOf[NettyRemoteSupport].actorsFactories.get("my-service-1") must not be (null)
|
||||
remote.unregisterPerSession("my-service-1")
|
||||
remote.asInstanceOf[NettyRemoteSupport].actorsFactories.get("my-service-1") must be (null)
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
package akka.actor.remote
|
||||
import akka.actor.{ ActorRegistry, Actor }
|
||||
|
||||
object UnOptimizedLocalScopedSpec {
|
||||
class TestActor extends Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
}
|
||||
|
||||
class UnOptimizedLocalScopedSpec extends AkkaRemoteTest {
|
||||
import UnOptimizedLocalScopedSpec._
|
||||
override def OptimizeLocal = false
|
||||
|
||||
"An enabled optimized local scoped remote" should {
|
||||
/*
|
||||
"Fetch remote actor ref when scope is local" in {
|
||||
val fooActor = Actor.actorOf[TestActor].start()
|
||||
remote.register("foo", fooActor)
|
||||
|
||||
remote.actorFor("foo", host, port) must not be (fooActor)
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
/*
|
||||
package akka.serialization
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
class JavaSerializationTest extends SerializationTest with JUnitSuite
|
||||
*/
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
package akka.actor.serialization
|
||||
|
||||
import akka.actor.{ ProtobufProtocol, Actor }
|
||||
import ProtobufProtocol.ProtobufPOJO
|
||||
import Actor._
|
||||
import akka.actor.remote.AkkaRemoteTest
|
||||
|
||||
/* ---------------------------
|
||||
Uses this Protobuf message:
|
||||
|
||||
message ProtobufPOJO {
|
||||
required uint64 id = 1;
|
||||
required string name = 2;
|
||||
required bool status = 3;
|
||||
}
|
||||
--------------------------- */
|
||||
|
||||
object ProtobufActorMessageSerializationSpec {
|
||||
class RemoteActorSpecActorBidirectional extends Actor {
|
||||
def receive = {
|
||||
case pojo: ProtobufPOJO ⇒
|
||||
val id = pojo.getId
|
||||
self.reply(id + 1)
|
||||
case msg ⇒
|
||||
throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ProtobufActorMessageSerializationSpec extends AkkaRemoteTest {
|
||||
import ProtobufActorMessageSerializationSpec._
|
||||
|
||||
"A ProtobufMessage" should {
|
||||
/*
|
||||
"SendReplyAsync" in {
|
||||
remote.register("RemoteActorSpecActorBidirectional",actorOf[RemoteActorSpecActorBidirectional])
|
||||
val actor = remote.actorFor("RemoteActorSpecActorBidirectional", 5000L, host, port)
|
||||
val result = actor !! ProtobufPOJO.newBuilder.setId(11).setStatus(true).setName("Coltrane").build
|
||||
result.as[Long] must equal (Some(12))
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,59 +0,0 @@
|
|||
package akka.serialization
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.serialization.Serializable.ScalaJSON
|
||||
|
||||
object Serializables {
|
||||
import DefaultProtocol._
|
||||
import JsonSerialization._
|
||||
|
||||
case class Shop(store: String, item: String, price: Int) extends ScalaJSON[Shop] {
|
||||
implicit val ShopFormat: sjson.json.Format[Shop] =
|
||||
asProduct3("store", "item", "price")(Shop)(Shop.unapply(_).get)
|
||||
|
||||
def toJSON: String = JsValue.toJson(tojson(this))
|
||||
def toBytes: Array[Byte] = tobinary(this)
|
||||
def fromBytes(bytes: Array[Byte]) = frombinary[Shop](bytes)
|
||||
def fromJSON(js: String) = fromjson[Shop](Js(js))
|
||||
}
|
||||
|
||||
case class MyMessage(val id: String, val value: Tuple2[String, Int])
|
||||
implicit val MyMessageFormat: sjson.json.Format[MyMessage] =
|
||||
asProduct2("id", "value")(MyMessage)(MyMessage.unapply(_).get)
|
||||
|
||||
case class MyJsonObject(val key: String, val map: Map[String, Int],
|
||||
val standAloneInt: Int) extends ScalaJSON[MyJsonObject] {
|
||||
implicit val MyJsonObjectFormat: sjson.json.Format[MyJsonObject] =
|
||||
asProduct3("key", "map", "standAloneInt")(MyJsonObject)(MyJsonObject.unapply(_).get)
|
||||
|
||||
def toJSON: String = JsValue.toJson(tojson(this))
|
||||
def toBytes: Array[Byte] = tobinary(this)
|
||||
def fromBytes(bytes: Array[Byte]) = frombinary[MyJsonObject](bytes)
|
||||
def fromJSON(js: String) = fromjson[MyJsonObject](Js(js))
|
||||
}
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ScalaJSONSerializableSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
import Serializables._
|
||||
describe("Serialization of case classes") {
|
||||
it("should be able to serialize and de-serialize") {
|
||||
val s = Shop("Target", "cooker", 120)
|
||||
s.fromBytes(s.toBytes) should equal(s)
|
||||
s.fromJSON(s.toJSON) should equal(s)
|
||||
|
||||
val key: String = "myKey"
|
||||
val value: Int = 123
|
||||
val standAloneInt: Int = 35
|
||||
val message = MyJsonObject(key, Map(key -> value), standAloneInt)
|
||||
message.fromBytes(message.toBytes) should equal(message)
|
||||
message.fromJSON(message.toJSON) should equal(message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,50 +0,0 @@
|
|||
package akka.serialization
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.serialization.Serializers.ScalaJSON
|
||||
//TODO: FIXME WHY IS THIS COMMENTED OUT?
|
||||
|
||||
object Protocols {
|
||||
import sjson.json.DefaultProtocol._
|
||||
case class Shop(store: String, item: String, price: Int)
|
||||
implicit val ShopFormat: sjson.json.Format[Shop] =
|
||||
asProduct3("store", "item", "price")(Shop)(Shop.unapply(_).get)
|
||||
|
||||
case class MyMessage(val id: String, val value: Tuple2[String, Int])
|
||||
implicit val MyMessageFormat: sjson.json.Format[MyMessage] =
|
||||
asProduct2("id", "value")(MyMessage)(MyMessage.unapply(_).get)
|
||||
|
||||
case class MyJsonObject(val key: String, val map: Map[String, Int],
|
||||
val standAloneInt: Int)
|
||||
implicit val MyJsonObjectFormat: sjson.json.Format[MyJsonObject] =
|
||||
asProduct3("key", "map", "standAloneInt")(MyJsonObject)(MyJsonObject.unapply(_).get)
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ScalaJSONSerializerSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
import Protocols._
|
||||
import ScalaJSON._
|
||||
describe("Serialization of case classes") {
|
||||
it("should be able to serialize and de-serialize") {
|
||||
val s = Shop("Target", "cooker", 120)
|
||||
fromjson[Shop](tojson(s)) should equal(s)
|
||||
frombinary[Shop](tobinary(s)) should equal(s)
|
||||
|
||||
val o = MyMessage("dg", ("akka", 100))
|
||||
fromjson[MyMessage](tojson(o)) should equal(o)
|
||||
frombinary[MyMessage](tobinary(o)) should equal(o)
|
||||
|
||||
val key: String = "myKey"
|
||||
val value: Int = 123
|
||||
val standAloneInt: Int = 35
|
||||
val message = MyJsonObject(key, Map(key -> value), standAloneInt)
|
||||
fromjson[MyJsonObject](tojson(message)) should equal(message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,309 +0,0 @@
|
|||
package akka.actor.serialization
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.serialization._
|
||||
// import dispatch.json._
|
||||
import akka.actor._
|
||||
import ActorSerialization._
|
||||
import Actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SerializableTypeClassActorSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
object BinaryFormatMyActor {
|
||||
implicit object MyActorFormat extends Format[MyActor] {
|
||||
def fromBinary(bytes: Array[Byte], act: MyActor) = {
|
||||
val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
|
||||
act.count = p.getCount
|
||||
act
|
||||
}
|
||||
def toBinary(ac: MyActor) =
|
||||
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
|
||||
}
|
||||
}
|
||||
|
||||
object BinaryFormatMyActorWithDualCounter {
|
||||
implicit object MyActorWithDualCounterFormat extends Format[MyActorWithDualCounter] {
|
||||
def fromBinary(bytes: Array[Byte], act: MyActorWithDualCounter) = {
|
||||
val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
|
||||
act.count1 = p.getCount1
|
||||
act.count2 = p.getCount2
|
||||
act
|
||||
}
|
||||
def toBinary(ac: MyActorWithDualCounter) =
|
||||
ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray
|
||||
}
|
||||
}
|
||||
|
||||
object BinaryFormatMyStatelessActor {
|
||||
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
|
||||
}
|
||||
|
||||
object BinaryFormatMyStatelessActorWithMessagesInMailbox {
|
||||
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox]
|
||||
}
|
||||
|
||||
object BinaryFormatMyActorWithSerializableMessages {
|
||||
implicit object MyActorWithSerializableMessagesFormat extends StatelessActorFormat[MyActorWithSerializableMessages]
|
||||
}
|
||||
|
||||
object BinaryFormatMyJavaSerializableActor {
|
||||
implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
||||
val serializer = Serializers.Java
|
||||
}
|
||||
}
|
||||
|
||||
describe("Serializable actor") {
|
||||
/*
|
||||
it("should be able to serialize and de-serialize a stateful actor") {
|
||||
import BinaryFormatMyActor._
|
||||
|
||||
val actor1 = actorOf[MyActor].start()
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||
|
||||
val bytes = toBinary(actor1)
|
||||
val actor2 = fromBinary(bytes)
|
||||
actor2.start()
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||
}
|
||||
|
||||
it("should be able to serialize and de-serialize a stateful actor with compound state") {
|
||||
import BinaryFormatMyActorWithDualCounter._
|
||||
|
||||
val actor1 = actorOf[MyActorWithDualCounter].start()
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1 1")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 2 2")
|
||||
|
||||
val bytes = toBinary(actor1)
|
||||
val actor2 = fromBinary(bytes)
|
||||
actor2.start()
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world 3 3")
|
||||
}
|
||||
|
||||
it("should be able to serialize and de-serialize a stateless actor") {
|
||||
import BinaryFormatMyStatelessActor._
|
||||
|
||||
val actor1 = actorOf[MyStatelessActor].start()
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world")
|
||||
|
||||
val bytes = toBinary(actor1)
|
||||
val actor2 = fromBinary(bytes)
|
||||
actor2.start()
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world")
|
||||
}
|
||||
|
||||
it("should be able to serialize and de-serialize a stateful actor with a given serializer") {
|
||||
import BinaryFormatMyJavaSerializableActor._
|
||||
|
||||
val actor1 = actorOf[MyJavaSerializableActor].start()
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||
|
||||
val bytes = toBinary(actor1)
|
||||
val actor2 = fromBinary(bytes)
|
||||
actor2.start()
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||
|
||||
actor2.receiveTimeout should equal (Some(1000))
|
||||
actor1.stop()
|
||||
actor2.stop()
|
||||
}
|
||||
|
||||
it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") {
|
||||
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
|
||||
|
||||
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start()
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
actor1.mailboxSize should be > (0)
|
||||
val actor2 = fromBinary(toBinary(actor1))
|
||||
Thread.sleep(1000)
|
||||
actor2.mailboxSize should be > (0)
|
||||
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
|
||||
val actor3 = fromBinary(toBinary(actor1, false))
|
||||
Thread.sleep(1000)
|
||||
actor3.mailboxSize should equal(0)
|
||||
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
}
|
||||
|
||||
it("should be able to serialize and de-serialize an Actor hotswapped with 'become'") {
|
||||
import BinaryFormatMyActor._
|
||||
val actor1 = actorOf[MyActor].start()
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||
actor1 ! "swap"
|
||||
(actor1 !! "hello").getOrElse("_") should equal("swapped")
|
||||
|
||||
val bytes = toBinary(actor1)
|
||||
val actor2 = fromBinary(bytes)
|
||||
actor2.start()
|
||||
|
||||
(actor1 !! "hello").getOrElse("_") should equal("swapped")
|
||||
|
||||
actor1 ! RevertHotSwap
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||
}
|
||||
*/
|
||||
/*
|
||||
it("should be able to serialize and de-serialize an hotswapped actor") {
|
||||
import BinaryFormatMyActor._
|
||||
|
||||
val actor1 = actorOf[MyActor].start()
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 1")
|
||||
(actor1 !! "hello").getOrElse("_") should equal("world 2")
|
||||
actor1 ! HotSwap {
|
||||
case "hello" =>
|
||||
self.reply("swapped")
|
||||
}
|
||||
(actor1 !! "hello").getOrElse("_") should equal("swapped")
|
||||
|
||||
val bytes = toBinary(actor1)
|
||||
val actor2 = fromBinary(bytes)
|
||||
actor2.start()
|
||||
|
||||
(actor1 !! "hello").getOrElse("_") should equal("swapped")
|
||||
|
||||
actor1 ! RevertHotSwap
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world 3")
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
describe("Custom serializable actors") {
|
||||
it("should serialize and de-serialize") {
|
||||
import BinaryFormatMyActorWithSerializableMessages._
|
||||
|
||||
val actor1 = actorOf[MyActorWithSerializableMessages].start()
|
||||
(actor1 ! MyMessage("hello1", ("akka", 100)))
|
||||
(actor1 ! MyMessage("hello2", ("akka", 200)))
|
||||
(actor1 ! MyMessage("hello3", ("akka", 300)))
|
||||
(actor1 ! MyMessage("hello4", ("akka", 400)))
|
||||
(actor1 ! MyMessage("hello5", ("akka", 500)))
|
||||
actor1.mailboxSize should be > (0)
|
||||
val actor2 = fromBinary(toBinary(actor1))
|
||||
Thread.sleep(1000)
|
||||
actor2.mailboxSize should be > (0)
|
||||
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
|
||||
val actor3 = fromBinary(toBinary(actor1, false))
|
||||
Thread.sleep(1000)
|
||||
actor3.mailboxSize should equal(0)
|
||||
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
}
|
||||
}
|
||||
|
||||
describe("ActorRef serialization") {
|
||||
it("should serialize and deserialize local actor refs ") {
|
||||
val a = actorOf[MyActorWithDualCounter].start
|
||||
val out = RemoteActorSerialization.toRemoteActorRefProtocol(a).toByteArray
|
||||
val in = RemoteActorSerialization.fromBinaryToRemoteActorRef(out)
|
||||
|
||||
in.address should equal(a.address)
|
||||
in.timeout should equal(a.timeout)
|
||||
a.stop
|
||||
}
|
||||
|
||||
it("should serialize and deserialize remote actor refs ") {
|
||||
val a = Actor.remote.actorFor("foo", "localhost", 6666)
|
||||
val out = RemoteActorSerialization.toRemoteActorRefProtocol(a).toByteArray
|
||||
val in = RemoteActorSerialization.fromBinaryToRemoteActorRef(out)
|
||||
|
||||
in.address should equal(a.address)
|
||||
in.timeout should equal(a.timeout)
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
class MyActorWithDualCounter extends Actor {
|
||||
var count1 = 0
|
||||
var count2 = 0
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
count1 = count1 + 1
|
||||
count2 = count2 + 1
|
||||
self.reply("world " + count1 + " " + count2)
|
||||
}
|
||||
}
|
||||
|
||||
class MyActor extends Actor with scala.Serializable {
|
||||
var count = 0
|
||||
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
count = count + 1
|
||||
self.reply("world " + count)
|
||||
case "swap" ⇒
|
||||
become { case "hello" ⇒ self.reply("swapped") }
|
||||
}
|
||||
}
|
||||
|
||||
class MyStatelessActor extends Actor {
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
self.reply("world")
|
||||
}
|
||||
}
|
||||
|
||||
class MyStatelessActorWithMessagesInMailbox extends Actor {
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
//println("# messages in mailbox " + self.mailboxSize)
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" ⇒ self.reply("world")
|
||||
}
|
||||
}
|
||||
|
||||
class MyJavaSerializableActor extends Actor with scala.Serializable {
|
||||
var count = 0
|
||||
self.receiveTimeout = Some(1000)
|
||||
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
count = count + 1
|
||||
self.reply("world " + count)
|
||||
}
|
||||
}
|
||||
|
||||
class MyActorWithSerializableMessages extends Actor {
|
||||
def receive = {
|
||||
case MyMessage(s, t) ⇒
|
||||
//println("# messages in mailbox " + self.mailboxSize)
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" ⇒ self.reply("world")
|
||||
}
|
||||
}
|
||||
|
||||
case class MyMessage(val id: String, val value: Tuple2[String, Int])
|
||||
extends Serializable.ScalaJSON[MyMessage] {
|
||||
|
||||
def this() = this(null, null)
|
||||
|
||||
import DefaultProtocol._
|
||||
import JsonSerialization._
|
||||
|
||||
implicit val MyMessageFormat: sjson.json.Format[MyMessage] =
|
||||
asProduct2("id", "value")(MyMessage)(MyMessage.unapply(_).get)
|
||||
|
||||
def toJSON: String = JsValue.toJson(tojson(this))
|
||||
def toBytes: Array[Byte] = tobinary(this)
|
||||
def fromBytes(bytes: Array[Byte]) = frombinary[MyMessage](bytes)
|
||||
def fromJSON(js: String) = fromjson[MyMessage](Js(js))
|
||||
}
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
package akka.serialization
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import scala.reflect.BeanInfo
|
||||
|
||||
@BeanInfo
|
||||
case class Foo(foo: String) {
|
||||
def this() = this(null)
|
||||
}
|
||||
|
||||
@BeanInfo
|
||||
case class MyMessage(val id: String, val value: Tuple2[String, Int]) {
|
||||
private def this() = this(null, null)
|
||||
}
|
||||
|
||||
class SerializerSpec extends JUnitSuite {
|
||||
@Test
|
||||
def shouldSerializeString = {
|
||||
val f = Foo("debasish")
|
||||
val json = Serializers.ScalaJSON.toBinary(f)
|
||||
assert(new String(json) == """{"foo":"debasish"}""")
|
||||
val fo = Serializers.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo]
|
||||
assert(fo == f)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldSerializeTuple2 = {
|
||||
val message = MyMessage("id", ("hello", 34))
|
||||
val json = Serializers.ScalaJSON.toBinary(message)
|
||||
assert(new String(json) == """{"id":"id","value":{"hello":34}}""")
|
||||
val f = Serializers.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage]
|
||||
assert(f == message)
|
||||
val g = Serializers.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage]
|
||||
assert(f == message)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,123 +0,0 @@
|
|||
package akka.actor.serialization
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.serialization._
|
||||
import akka.actor._
|
||||
import ActorSerialization._
|
||||
import Actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class Ticket435Spec extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
object BinaryFormatMyStatefulActor {
|
||||
implicit object MyStatefulActorFormat extends Format[MyStatefulActor] {
|
||||
def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = {
|
||||
val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
|
||||
act.count = p.getCount
|
||||
act
|
||||
}
|
||||
def toBinary(ac: MyStatefulActor) =
|
||||
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
|
||||
}
|
||||
}
|
||||
|
||||
object BinaryFormatMyStatelessActorWithMessagesInMailbox {
|
||||
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox]
|
||||
}
|
||||
|
||||
describe("Serializable actor") {
|
||||
/*
|
||||
it("should be able to serialize and deserialize a stateless actor with messages in mailbox") {
|
||||
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
|
||||
|
||||
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start()
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
actor1.mailboxSize should be > (0)
|
||||
val actor2 = fromBinary(toBinary(actor1))
|
||||
Thread.sleep(1000)
|
||||
actor2.mailboxSize should be > (0)
|
||||
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
|
||||
val actor3 = fromBinary(toBinary(actor1, false))
|
||||
Thread.sleep(1000)
|
||||
actor3.mailboxSize should equal(0)
|
||||
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
}
|
||||
|
||||
it("should serialize the mailbox optionally") {
|
||||
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
|
||||
|
||||
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start()
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
(actor1 ! "hello")
|
||||
actor1.mailboxSize should be > (0)
|
||||
|
||||
val actor2 = fromBinary(toBinary(actor1, false))
|
||||
Thread.sleep(1000)
|
||||
actor2.mailboxSize should equal(0)
|
||||
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
|
||||
}
|
||||
|
||||
it("should be able to serialize and deserialize a stateful actor with messages in mailbox") {
|
||||
import BinaryFormatMyStatefulActor._
|
||||
|
||||
val actor1 = actorOf[MyStatefulActor].start()
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
(actor1 ! "hi")
|
||||
actor1.mailboxSize should be > (0)
|
||||
val actor2 = fromBinary(toBinary(actor1))
|
||||
Thread.sleep(1000)
|
||||
actor2.mailboxSize should be > (0)
|
||||
(actor2 !! "hello").getOrElse("_") should equal("world 1")
|
||||
|
||||
val actor3 = fromBinary(toBinary(actor1, false))
|
||||
Thread.sleep(1000)
|
||||
actor3.mailboxSize should equal(0)
|
||||
(actor3 !! "hello").getOrElse("_") should equal("world 1")
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
class MyStatefulActor extends Actor {
|
||||
var count = 0
|
||||
|
||||
def receive = {
|
||||
case "hi" ⇒
|
||||
//println("# messages in mailbox " + self.mailboxSize)
|
||||
Thread.sleep(500)
|
||||
case "hello" ⇒
|
||||
count = count + 1
|
||||
self.reply("world " + count)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,121 +0,0 @@
|
|||
package akka.actor.serialization
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.serialization._
|
||||
import akka.actor._
|
||||
import ActorSerialization._
|
||||
import Actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class UntypedActorSerializationSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
class MyUntypedActorFormat extends Format[MyUntypedActor] {
|
||||
def fromBinary(bytes: Array[Byte], act: MyUntypedActor) = {
|
||||
val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
|
||||
act.count = p.getCount
|
||||
act
|
||||
}
|
||||
def toBinary(ac: MyUntypedActor) =
|
||||
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
|
||||
}
|
||||
|
||||
class MyUntypedActorWithDualCounterFormat extends Format[MyUntypedActorWithDualCounter] {
|
||||
def fromBinary(bytes: Array[Byte], act: MyUntypedActorWithDualCounter) = {
|
||||
val p = Serializers.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
|
||||
act.count1 = p.getCount1
|
||||
act.count2 = p.getCount2
|
||||
act
|
||||
}
|
||||
def toBinary(ac: MyUntypedActorWithDualCounter) =
|
||||
ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray
|
||||
}
|
||||
|
||||
object MyUntypedStatelessActorFormat extends StatelessActorFormat[MyUntypedStatelessActor]
|
||||
|
||||
describe("Serializable untyped actor") {
|
||||
/*
|
||||
it("should be able to serialize and de-serialize a stateful untyped actor") {
|
||||
val actor1 = Actors.actorOf(classOf[MyUntypedActor]).start()
|
||||
actor1.sendRequestReply("hello") should equal("world 1")
|
||||
actor1.sendRequestReply("debasish") should equal("hello debasish 2")
|
||||
|
||||
val f = new MyUntypedActorFormat
|
||||
val bytes = toBinaryJ(actor1, f)
|
||||
val actor2 = fromBinaryJ(bytes, f)
|
||||
actor2.start()
|
||||
actor2.sendRequestReply("hello") should equal("world 3")
|
||||
}
|
||||
|
||||
it("should be able to serialize and de-serialize a stateful actor with compound state") {
|
||||
val actor1 = actorOf[MyUntypedActorWithDualCounter].start()
|
||||
actor1.sendRequestReply("hello") should equal("world 1 1")
|
||||
actor1.sendRequestReply("hello") should equal("world 2 2")
|
||||
|
||||
val f = new MyUntypedActorWithDualCounterFormat
|
||||
val bytes = toBinaryJ(actor1, f)
|
||||
val actor2 = fromBinaryJ(bytes, f)
|
||||
actor2.start()
|
||||
actor2.sendRequestReply("hello") should equal("world 3 3")
|
||||
}
|
||||
|
||||
it("should be able to serialize and de-serialize a stateless actor") {
|
||||
val actor1 = actorOf[MyUntypedStatelessActor].start()
|
||||
actor1.sendRequestReply("hello") should equal("world")
|
||||
actor1.sendRequestReply("hello") should equal("world")
|
||||
|
||||
val bytes = toBinaryJ(actor1, MyUntypedStatelessActorFormat)
|
||||
val actor2 = fromBinaryJ(bytes, MyUntypedStatelessActorFormat)
|
||||
actor2.start()
|
||||
actor2.sendRequestReply("hello") should equal("world")
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
class MyUntypedActor extends UntypedActor {
|
||||
var count = 0
|
||||
def onReceive(message: Any): Unit = message match {
|
||||
case m: String if m == "hello" ⇒
|
||||
count = count + 1
|
||||
getContext.replyUnsafe("world " + count)
|
||||
case m: String ⇒
|
||||
count = count + 1
|
||||
getContext.replyUnsafe("hello " + m + " " + count)
|
||||
case _ ⇒
|
||||
throw new Exception("invalid message type")
|
||||
}
|
||||
}
|
||||
|
||||
class MyUntypedActorWithDualCounter extends UntypedActor {
|
||||
var count1 = 0
|
||||
var count2 = 0
|
||||
|
||||
def onReceive(message: Any): Unit = message match {
|
||||
case m: String if m == "hello" ⇒
|
||||
count1 = count1 + 1
|
||||
count2 = count2 + 1
|
||||
getContext.replyUnsafe("world " + count1 + " " + count2)
|
||||
case m: String ⇒
|
||||
count1 = count1 + 1
|
||||
count2 = count2 + 1
|
||||
getContext.replyUnsafe("hello " + m + " " + count1 + " " + count2)
|
||||
case _ ⇒
|
||||
throw new Exception("invalid message type")
|
||||
}
|
||||
}
|
||||
|
||||
class MyUntypedStatelessActor extends UntypedActor {
|
||||
def onReceive(message: Any): Unit = message match {
|
||||
case m: String if m == "hello" ⇒
|
||||
getContext.replyUnsafe("world")
|
||||
case m: String ⇒
|
||||
getContext.replyUnsafe("hello " + m)
|
||||
case _ ⇒
|
||||
throw new Exception("invalid message type")
|
||||
}
|
||||
}
|
||||
|
|
@ -1,13 +0,0 @@
|
|||
package akka.actor.ticket
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
class Ticket001Spec extends WordSpec with MustMatchers {
|
||||
|
||||
"An XXX" must {
|
||||
"do YYY" in {
|
||||
1 must be(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package akka.actor.ticket
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{ Uuid, newUuid, uuidFrom }
|
||||
import akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional
|
||||
import akka.remote.protocol.RemoteProtocol._
|
||||
import akka.actor.remote.AkkaRemoteTest
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
class Ticket434Spec extends AkkaRemoteTest {
|
||||
"A server managed remote actor" should {
|
||||
/*
|
||||
"can use a custom service name containing ':'" in {
|
||||
val latch = new CountDownLatch(1)
|
||||
implicit val sender = replyHandler(latch,"Pong")
|
||||
remote.register("my:service", actorOf[RemoteActorSpecActorUnidirectional])
|
||||
|
||||
val actor = remote.actorFor("my:service", 5000L, host, port)
|
||||
|
||||
actor ! "Ping"
|
||||
|
||||
latch.await(1, unit) must be (true)
|
||||
}
|
||||
|
||||
"should be possible to set the actor id and uuid" in {
|
||||
val uuid = newUuid
|
||||
val actorInfo = ActorInfoProtocol.newBuilder
|
||||
.setUuid(UuidProtocol.newBuilder.setHigh(uuid.getTime).setLow(uuid.getClockSeqAndNode).build)
|
||||
.setAddress("some-id")
|
||||
.setTimeout(5000L)
|
||||
.setActorType(ActorType.SCALA_ACTOR).build
|
||||
|
||||
uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow) must equal (uuid)
|
||||
actorInfo.getAddress must equal ("some-id")
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
package ticket
|
||||
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import akka.serialization.RemoteActorSerialization
|
||||
import akka.actor.Actor.actorOf
|
||||
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
import akka.actor.remote.AkkaRemoteTest
|
||||
|
||||
case class RecvActorRef(bytes: Array[Byte])
|
||||
|
||||
class ActorRefService(latch: CountDownLatch) extends Actor {
|
||||
import self._
|
||||
|
||||
def receive: Receive = {
|
||||
case RecvActorRef(bytes) ⇒
|
||||
val ref = RemoteActorSerialization.fromBinaryToRemoteActorRef(bytes)
|
||||
ref ! "hello"
|
||||
case "hello" ⇒ latch.countDown()
|
||||
}
|
||||
}
|
||||
|
||||
class Ticket506Spec extends AkkaRemoteTest {
|
||||
"a RemoteActorRef serialized" should {
|
||||
/*
|
||||
"should be remotely usable" in {
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
val a1 = actorOf( new ActorRefService(null))
|
||||
val a2 = actorOf( new ActorRefService(latch))
|
||||
|
||||
remote.register("service1", a1)
|
||||
remote.register("service2", a2)
|
||||
|
||||
// connect to the first server/service
|
||||
val c1 = remote.actorFor("service1", host, port)
|
||||
|
||||
val bytes = RemoteActorSerialization.toRemoteActorRefProtocol(a2).toByteArray
|
||||
c1 ! RecvActorRef(bytes)
|
||||
|
||||
latch.await(1, unit) must be(true)
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
@ -433,10 +433,10 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
|||
new AkkaSampleAntsProject(_), akka_stm)
|
||||
lazy val akka_sample_fsm = project("akka-sample-fsm", "akka-sample-fsm",
|
||||
new AkkaSampleFSMProject(_), akka_actor)
|
||||
lazy val akka_sample_remote = project("akka-sample-remote", "akka-sample-remote",
|
||||
new AkkaSampleRemoteProject(_), akka_remote)
|
||||
lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
|
||||
new AkkaSampleChatProject(_), akka_remote)
|
||||
// lazy val akka_sample_remote = project("akka-sample-remote", "akka-sample-remote",
|
||||
// new AkkaSampleRemoteProject(_), akka_remote)
|
||||
// lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
|
||||
// new AkkaSampleChatProject(_), akka_remote)
|
||||
lazy val akka_sample_osgi = project("akka-sample-osgi", "akka-sample-osgi",
|
||||
new AkkaSampleOsgiProject(_), akka_actor)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue