Its a wrap!
This commit is contained in:
parent
551f25aba8
commit
dfa637b090
6 changed files with 23 additions and 21 deletions
|
|
@ -16,5 +16,7 @@ package object actor {
|
||||||
type Uuid = com.eaio.uuid.UUID
|
type Uuid = com.eaio.uuid.UUID
|
||||||
def newUuid(): Uuid = new Uuid()
|
def newUuid(): Uuid = new Uuid()
|
||||||
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode)
|
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode)
|
||||||
def uuidFrom(uuid: String) = new Uuid(uuid)
|
def uuidFrom(uuid: String): Uuid = {
|
||||||
|
new Uuid(uuid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,7 @@ class Logger(val logger: SLFLogger) {
|
||||||
warning(message(fmt,arg,argN:_*))
|
warning(message(fmt,arg,argN:_*))
|
||||||
}
|
}
|
||||||
|
|
||||||
def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN)
|
def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN:_*)
|
||||||
|
|
||||||
def warning(msg: => String) {
|
def warning(msg: => String) {
|
||||||
if (warning_?) logger warn msg
|
if (warning_?) logger warn msg
|
||||||
|
|
|
||||||
|
|
@ -33,18 +33,15 @@ import scala.reflect.BeanProperty
|
||||||
*/
|
*/
|
||||||
class ActorComponent extends DefaultComponent {
|
class ActorComponent extends DefaultComponent {
|
||||||
def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = {
|
def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = {
|
||||||
val idAndUuid = idAndUuidPair(remaining)
|
val (id,uuid) = idAndUuidPair(remaining)
|
||||||
new ActorEndpoint(uri, this, idAndUuid._1, idAndUuid._2)
|
new ActorEndpoint(uri, this, id, uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def idAndUuidPair(remaining: String): Tuple2[Option[String], Option[Uuid]] = {
|
private def idAndUuidPair(remaining: String): Tuple2[Option[String],Option[Uuid]] = remaining match {
|
||||||
remaining split ":" toList match {
|
case null | "" => throw new IllegalArgumentException("invalid path format: [%s] - should be <actorid> or id:<actorid> or uuid:<actoruuid>" format remaining)
|
||||||
case id :: Nil => (Some(id), None)
|
case id if id startsWith "id:" => (Some(id substring 3),None)
|
||||||
case "id" :: id :: Nil => (Some(id), None)
|
case uuid if uuid startsWith "uuid:" => (None,Some(uuidFrom(uuid substring 5)))
|
||||||
case "uuid" :: uuid :: Nil => (None, Some(uuidFrom(uuid)))
|
case id => (Some(id),None)
|
||||||
case _ => throw new IllegalArgumentException(
|
|
||||||
"invalid path format: %s - should be <actorid> or id:<actorid> or uuid:<actoruuid>" format remaining)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,13 @@ import org.apache.camel.{Endpoint, AsyncProcessor}
|
||||||
import org.apache.camel.impl.DefaultCamelContext
|
import org.apache.camel.impl.DefaultCamelContext
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
import se.scalablesolutions.akka.actor.uuidFrom
|
||||||
|
|
||||||
class ActorComponentTest extends JUnitSuite {
|
class ActorComponentTest extends JUnitSuite {
|
||||||
val component: ActorComponent = ActorComponentTest.actorComponent
|
val component: ActorComponent = ActorComponentTest.actorComponent
|
||||||
|
|
||||||
|
def testUUID = uuidFrom("93da8c80-c3fd-11df-abed-60334b120057")
|
||||||
|
|
||||||
@Test def shouldCreateEndpointWithIdDefined = {
|
@Test def shouldCreateEndpointWithIdDefined = {
|
||||||
val ep1: ActorEndpoint = component.createEndpoint("actor:abc").asInstanceOf[ActorEndpoint]
|
val ep1: ActorEndpoint = component.createEndpoint("actor:abc").asInstanceOf[ActorEndpoint]
|
||||||
val ep2: ActorEndpoint = component.createEndpoint("actor:id:abc").asInstanceOf[ActorEndpoint]
|
val ep2: ActorEndpoint = component.createEndpoint("actor:id:abc").asInstanceOf[ActorEndpoint]
|
||||||
|
|
@ -20,15 +23,15 @@ class ActorComponentTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def shouldCreateEndpointWithUuidDefined = {
|
@Test def shouldCreateEndpointWithUuidDefined = {
|
||||||
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:abc").asInstanceOf[ActorEndpoint]
|
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:" + testUUID).asInstanceOf[ActorEndpoint]
|
||||||
assert(ep.uuid === Some("abc"))
|
assert(ep.uuid === Some(testUUID))
|
||||||
assert(ep.id === None)
|
assert(ep.id === None)
|
||||||
assert(!ep.blocking)
|
assert(!ep.blocking)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def shouldCreateEndpointWithBlockingSet = {
|
@Test def shouldCreateEndpointWithBlockingSet = {
|
||||||
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:abc?blocking=true").asInstanceOf[ActorEndpoint]
|
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:"+testUUID+"?blocking=true").asInstanceOf[ActorEndpoint]
|
||||||
assert(ep.uuid === Some("abc"))
|
assert(ep.uuid === Some(testUUID))
|
||||||
assert(ep.id === None)
|
assert(ep.id === None)
|
||||||
assert(ep.blocking)
|
assert(ep.blocking)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -123,8 +123,8 @@ object RemoteServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class RemoteActorSet {
|
private class RemoteActorSet {
|
||||||
private[RemoteServer] val actors = new ConcurrentHashMap[Object, ActorRef]
|
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||||
private[RemoteServer] val typedActors = new ConcurrentHashMap[Object, AnyRef]
|
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||||
}
|
}
|
||||||
|
|
||||||
private val guard = new ReadWriteGuard
|
private val guard = new ReadWriteGuard
|
||||||
|
|
@ -132,11 +132,11 @@ object RemoteServer {
|
||||||
private val remoteServers = Map[Address, RemoteServer]()
|
private val remoteServers = Map[Address, RemoteServer]()
|
||||||
|
|
||||||
private[akka] def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef) = guard.withWriteGuard {
|
private[akka] def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef) = guard.withWriteGuard {
|
||||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid.toString, actor)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: Uuid, typedActor: AnyRef) = guard.withWriteGuard {
|
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: Uuid, typedActor: AnyRef) = guard.withWriteGuard {
|
||||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
|
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid.toString, typedActor)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
||||||
|
|
|
||||||
|
|
@ -231,7 +231,7 @@ object RemoteActorSerialization {
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteActorRefProtocol.newBuilder
|
RemoteActorRefProtocol.newBuilder
|
||||||
.setClassOrServiceName(id)
|
.setClassOrServiceName(uuid.toString)
|
||||||
.setActorClassname(actorClass.getName)
|
.setActorClassname(actorClass.getName)
|
||||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||||
.setTimeout(timeout)
|
.setTimeout(timeout)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue