Refactorings

This commit is contained in:
Martin Krasser 2010-10-11 11:02:32 +02:00
parent d01fb42c3c
commit 2d456c19a4
2 changed files with 31 additions and 30 deletions

View file

@ -5,12 +5,10 @@
package se.scalablesolutions.akka.camel.component
import java.net.InetSocketAddress
import java.util.{Map => JavaMap}
import java.util.{Map => JMap}
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference
import jsr166x.Deque
import org.apache.camel._
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
@ -31,16 +29,16 @@ import scala.reflect.BeanProperty
* @author Martin Krasser
*/
class ActorComponent extends DefaultComponent {
def createEndpoint(uri: String, remaining: String, parameters: JavaMap[String, Object]): ActorEndpoint = {
val (id,uuid) = idAndUuidPair(remaining)
new ActorEndpoint(uri, this, id, uuid)
def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint = {
val (idType, idValue) = parseIdentifier(remaining)
new ActorEndpoint(uri, this, idType, idValue)
}
private def idAndUuidPair(remaining: String): Tuple2[Option[String],Option[Uuid]] = remaining match {
case null | "" => throw new IllegalArgumentException("invalid path format: [%s] - should be <actorid> or id:<actorid> or uuid:<actoruuid>" format remaining)
case id if id startsWith "id:" => (Some(id substring 3),None)
case uuid if uuid startsWith "uuid:" => (None,Some(uuidFrom(uuid substring 5)))
case id => (Some(id),None)
private def parseIdentifier(remaining: String): Tuple2[String, String] = remaining match {
case null | "" => throw new IllegalArgumentException("invalid path: [%s] - should be <actorid> or id:<actorid> or uuid:<actoruuid>" format remaining)
case id if id startsWith "id:" => ("id", id substring 3)
case uuid if uuid startsWith "uuid:" => ("uuid", uuid substring 5)
case id => ("id", id)
}
}
@ -60,8 +58,8 @@ class ActorComponent extends DefaultComponent {
*/
class ActorEndpoint(uri: String,
comp: ActorComponent,
val id: Option[String],
val uuid: Option[Uuid]) extends DefaultEndpoint(uri, comp) {
val idType: String,
val idValue: String) extends DefaultEndpoint(uri, comp) {
/**
* Whether to block caller thread during two-way message exchanges with (untyped) actors. This is
@ -109,6 +107,8 @@ class ActorEndpoint(uri: String,
class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with AsyncProcessor {
import ActorProducer._
private lazy val uuid = uuidFrom(ep.idValue);
def process(exchange: Exchange) =
if (exchange.getPattern.isOutCapable) sendSync(exchange) else sendAsync(exchange)
@ -149,16 +149,17 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn
private def target =
targetOption getOrElse (throw new ActorNotRegisteredException(ep.getEndpointUri))
private def targetOption: Option[ActorRef] =
if (ep.id.isDefined) targetById(ep.id.get)
else targetByUuid(ep.uuid.get)
private def targetOption: Option[ActorRef] = ep.idType match {
case "id" => targetById
case "uuid" => targetByUuid
}
private def targetById(id: String) = ActorRegistry.actorsFor(id) match {
private def targetById = ActorRegistry.actorsFor(ep.idValue) match {
case actors if actors.length == 0 => None
case actors => Some(actors(0))
}
private def targetByUuid(uuid: Uuid) = ActorRegistry.actorFor(uuid)
private def targetByUuid = ActorRegistry.actorFor(uuid)
}
/**
@ -257,7 +258,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def linkedActors: JavaMap[Uuid, ActorRef] = unsupported
protected[akka] def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
protected[akka] def remoteAddress_=(addr: Option[InetSocketAddress]): Unit = unsupported

View file

@ -10,30 +10,30 @@ import se.scalablesolutions.akka.actor.uuidFrom
class ActorComponentTest extends JUnitSuite {
val component: ActorComponent = ActorComponentTest.actorComponent
def testUUID = uuidFrom("93da8c80-c3fd-11df-abed-60334b120057")
def testUUID = "93da8c80-c3fd-11df-abed-60334b120057"
@Test def shouldCreateEndpointWithIdDefined = {
val ep1: ActorEndpoint = component.createEndpoint("actor:abc").asInstanceOf[ActorEndpoint]
val ep2: ActorEndpoint = component.createEndpoint("actor:id:abc").asInstanceOf[ActorEndpoint]
assert(ep1.id === Some("abc"))
assert(ep2.id === Some("abc"))
assert(ep1.uuid === None)
assert(ep2.uuid === None)
assert(ep1.idValue === "abc")
assert(ep2.idValue === "abc")
assert(ep1.idType === "id")
assert(ep2.idType === "id")
assert(!ep1.blocking)
assert(!ep2.blocking)
}
@Test def shouldCreateEndpointWithUuidDefined = {
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:" + testUUID).asInstanceOf[ActorEndpoint]
assert(ep.uuid === Some(testUUID))
assert(ep.id === None)
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:%s" format testUUID).asInstanceOf[ActorEndpoint]
assert(ep.idValue === testUUID)
assert(ep.idType === "uuid")
assert(!ep.blocking)
}
@Test def shouldCreateEndpointWithBlockingSet = {
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:"+testUUID+"?blocking=true").asInstanceOf[ActorEndpoint]
assert(ep.uuid === Some(testUUID))
assert(ep.id === None)
val ep: ActorEndpoint = component.createEndpoint("actor:uuid:%s?blocking=true" format testUUID).asInstanceOf[ActorEndpoint]
assert(ep.idValue === testUUID)
assert(ep.idType === "uuid")
assert(ep.blocking)
}
}