From ecc26c7380f87caa90729d55e2d301ca068ef713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 8 Jun 2010 20:46:12 +0200 Subject: [PATCH 01/12] Fixed bug in setting sender ref + changed version to 0.10 --- akka-core/src/main/scala/actor/ActorRef.scala | 26 +++++++------------ akka-core/src/main/scala/config/Config.scala | 2 +- config/akka-reference.conf | 2 +- project/build.properties | 2 +- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 59ab5c06c2..32442d2b09 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -891,7 +891,6 @@ sealed class LocalActorRef private[akka]( } protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -924,7 +923,6 @@ sealed class LocalActorRef private[akka]( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -974,9 +972,9 @@ sealed class LocalActorRef private[akka]( Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) return } + sender = messageHandle.sender + senderFuture = messageHandle.senderFuture try { - sender = messageHandle.sender - senderFuture = messageHandle.senderFuture if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) } catch { @@ -990,9 +988,7 @@ sealed class LocalActorRef private[akka]( val message = messageHandle.message //serializeMessage(messageHandle.message) setTransactionSet(messageHandle.transactionSet) try { - if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - "No handler matching message [" + message + "] in " + toString) + actor.base(message) } catch { case e => _isBeingRestarted = true @@ -1021,20 +1017,16 @@ sealed class LocalActorRef private[akka]( } setTransactionSet(txSet) - def proceed = { - if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - toString + " could not process message [" + message + "]" + - "\n\tsince no matching 'case' clause in its 'receive' method could be found") - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - try { if (isTransactor) { atomic { - proceed + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit } - } else proceed + } else { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } } catch { case e: IllegalStateException => {} case e => diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index ba1e8cb056..68842ad1e3 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -16,7 +16,7 @@ class ConfigurationException(message: String) extends RuntimeException(message) * @author Jonas Bonér */ object Config extends Logging { - val VERSION = "0.10-SNAPSHOT" + val VERSION = "0.10" // Set Multiverse options for max speed System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 04ba26798d..2ac6bf42f1 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -15,7 +15,7 @@ - version = "0.10-SNAPSHOT" + version = "0.10" # FQN (Fully Qualified Name) to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor diff --git a/project/build.properties b/project/build.properties index 060f9af2b5..cc8e376f1b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,6 +1,6 @@ project.organization=se.scalablesolutions.akka project.name=akka -project.version=0.10-SNAPSHOT +project.version=0.10 scala.version=2.8.0.RC3 sbt.version=0.7.4 def.scala.version=2.7.7 From bc8ff879b7d1498c0015bdd9e61b6d4c30659329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 8 Jun 2010 20:46:52 +0200 Subject: [PATCH 02/12] Added bench from akka-bench for convenience --- akka-core/src/test/scala/Bench.scala | 119 +++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 akka-core/src/test/scala/Bench.scala diff --git a/akka-core/src/test/scala/Bench.scala b/akka-core/src/test/scala/Bench.scala new file mode 100644 index 0000000000..454fc2b7e9 --- /dev/null +++ b/akka-core/src/test/scala/Bench.scala @@ -0,0 +1,119 @@ +/* The Computer Language Benchmarks Game + http://shootout.alioth.debian.org/ + contributed by Julien Gaugaz + inspired by the version contributed by Yura Taras and modified by Isaac Gouy +*/ +package se.scalablesolutions.akka.actor + +import se.scalablesolutions.akka.actor.Actor._ + +object Chameneos { + + sealed trait ChameneosEvent + case class Meet(from: ActorRef, colour: Colour) extends ChameneosEvent + case class Change(colour: Colour) extends ChameneosEvent + case class MeetingCount(count: Int) extends ChameneosEvent + case object Exit extends ChameneosEvent + + abstract class Colour + case object RED extends Colour + case object YELLOW extends Colour + case object BLUE extends Colour + case object FADED extends Colour + + val colours = Array[Colour](BLUE, RED, YELLOW) + + var start = 0L + var end = 0L + + class Chameneo(var mall: ActorRef, var colour: Colour, cid: Int) extends Actor { + var meetings = 0 + self.start + mall ! Meet(self, colour) + + def receive = { + case Meet(from, otherColour) => + colour = complement(otherColour) + meetings = meetings +1 + from ! Change(colour) + mall ! Meet(self, colour) + + case Change(newColour) => + colour = newColour + meetings = meetings +1 + mall ! Meet(self, colour) + + case Exit => + colour = FADED + self.sender.get ! MeetingCount(meetings) + } + + def complement(otherColour: Colour): Colour = colour match { + case RED => otherColour match { + case RED => RED + case YELLOW => BLUE + case BLUE => YELLOW + case FADED => FADED + } + case YELLOW => otherColour match { + case RED => BLUE + case YELLOW => YELLOW + case BLUE => RED + case FADED => FADED + } + case BLUE => otherColour match { + case RED => YELLOW + case YELLOW => RED + case BLUE => BLUE + case FADED => FADED + } + case FADED => FADED + } + + override def toString = cid + "(" + colour + ")" + } + + class Mall(var n: Int, numChameneos: Int) extends Actor { + var waitingChameneo: Option[ActorRef] = None + var sumMeetings = 0 + var numFaded = 0 + + override def init = { + for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i)) + } + + def receive = { + case MeetingCount(i) => + numFaded += 1 + sumMeetings += i + if (numFaded == numChameneos) { + Chameneos.end = System.currentTimeMillis + self.stop + } + + case msg @ Meet(a, c) => + if (n > 0) { + waitingChameneo match { + case Some(chameneo) => + n -= 1 + chameneo ! msg + waitingChameneo = None + case None => waitingChameneo = self.sender + } + } else { + waitingChameneo.foreach(_ ! Exit) + self.sender.get ! Exit + } + } + } + + def run { +// System.setProperty("akka.config", "akka.conf") + Chameneos.start = System.currentTimeMillis + actorOf(new Mall(1000000, 4)).start + Thread.sleep(10000) + println("Elapsed: " + (end - start)) + } + + def main(args : Array[String]): Unit = run +} From 14822e95c4e4841e5a5a55e9f58161f15e1645bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 8 Jun 2010 20:47:12 +0200 Subject: [PATCH 03/12] added redis test from debasish --- .../scala/RedisInconsistentSizeBugTest.scala | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala new file mode 100644 index 0000000000..22d294c735 --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala @@ -0,0 +1,74 @@ +package se.scalablesolutions.akka.persistence.redis + +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.persistence.common.PersistentVector +import se.scalablesolutions.akka.stm.Transaction.Global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging + +import java.util.{Calendar, Date} + +object Serial { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = new Date(read[Long](in)) + def writes(out: Output, value: Date) = write[Long](out, value.getTime) + } + case class Name(id: Int, name: String, address: String, dateOfBirth: Date, dateDied: Option[Date]) + implicit val NameFormat: Format[Name] = asProduct5(Name)(Name.unapply(_).get) +} + +case class GETFOO(s: String) +case class SETFOO(s: String) + +object SampleStorage { + class RedisSampleStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val EVENT_MAP = "akka.sample.map" + + private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) } + + import sbinary._ + import DefaultProtocol._ + import Operations._ + import Serial._ + import java.util.Calendar + + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + def receive = { + case SETFOO(str) => + atomic { + eventMap += (str.getBytes, toByteArray[Name](n)) + } + self.reply(str) + + case GETFOO(str) => + val ev = atomic { + eventMap.keySet.size + } + println("************* " + ev) + self.reply(ev) + } + } +} + +import Serial._ +import SampleStorage._ + +object Runner { + def run { + val proc = actorOf[RedisSampleStorage] + proc.start + val i: Option[String] = proc !! SETFOO("debasish") + println("i = " + i) + val ev: Option[Int] = proc !! GETFOO("debasish") + println(ev) + } +} From 06296b2a4fa1b4da8a481e8b8f32d9388f48db23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 9 Jun 2010 15:20:22 +0200 Subject: [PATCH 04/12] Improved RemoteClient listener info --- akka-core/src/main/scala/actor/ActorRef.scala | 6 +++--- .../src/main/scala/remote/RemoteClient.scala | 17 ++++++++++------- project/build/AkkaProject.scala | 6 +++--- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 32442d2b09..4d0deb1e6c 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement { *

* Identifier for actor, does not have to be a unique one. Default is the 'uuid'. *

- * This field is used for logging, AspectRegistry.actorsFor, identifier for remote + * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote * actor in RemoteServer etc.But also as the identifier for persistence, which means * that you can use a custom name to be able to retrieve the "correct" persisted state * upon restart, remote restart etc. @@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement { protected[akka] var _sender: Option[ActorRef] = None protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None - protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s} - protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf} + protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s } + protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf } /** * The reference sender Actor of the last received message. diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index cfb8a9a5ea..da0f9be72b 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory { def nextId: Long = id.getAndIncrement + nodeId } +/** + * Life-cycle events for RemoteClient. + */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent +case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent @@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O } } else { val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port)) throw exception } @@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String, futures.remove(reply.getId) } else { val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port)) throw exception } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String, client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { client.listeners.toArray.foreach(l => - l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause)) + l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String, } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 1f4b0cd669..a9db966298 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -46,7 +46,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { // must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action. // Therefore, if repositories are defined, this must happen as def, not as val. // ------------------------------------------------------------------------------------------------------------------- - val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here! + val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/" val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo) @@ -365,8 +365,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { def removeDupEntries(paths: PathFinder) = Path.lazyPathFinder { val mapped = paths.get map { p => (p.relativePath, p) } - (Map() ++ mapped).values.toList - } + (Map() ++ mapped).values.toList + } def allArtifacts = { Path.fromFile(buildScalaInstance.libraryJar) +++ From b6228ae0e5bda3b215f02cd3433e077da9a3eae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 10 Jun 2010 09:42:41 +0200 Subject: [PATCH 05/12] Added a isDefinedAt method on the ActorRef --- akka-core/src/main/scala/actor/ActorRef.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 4d0deb1e6c..5f08c7b900 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -243,6 +243,11 @@ trait ActorRef extends TransactionManagement { */ def uuid = _uuid + /** + * Tests if the actor is able to handle the message passed in as arguments. + */ + def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message) + /** * Only for internal use. UUID is effectively final. */ From 9275e0be3b3a9bcae8938dc8d77821ee1c2137c4 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 10 Jun 2010 17:05:09 +0200 Subject: [PATCH 06/12] tests for accessing active objects from Camel routes (ticket #266) --- .../ActiveObjectComponentFeatureTest.scala | 39 +++++++++++++++++-- .../{Consumer10.java => ConsumerPojo1.java} | 6 +-- ...eObject1.java => RemoteConsumerPojo1.java} | 2 +- ...eObject2.java => RemoteConsumerPojo2.java} | 2 +- ...ple-camel-context.xml => context-boot.xml} | 0 ...ication1.scala => ClientApplication.scala} | 10 ++--- ...ication2.scala => ServerApplication.scala} | 2 +- 7 files changed, 46 insertions(+), 15 deletions(-) rename akka-samples/akka-sample-camel/src/main/java/sample/camel/{Consumer10.java => ConsumerPojo1.java} (79%) rename akka-samples/akka-sample-camel/src/main/java/sample/camel/{RemoteActiveObject1.java => RemoteConsumerPojo1.java} (92%) rename akka-samples/akka-sample-camel/src/main/java/sample/camel/{RemoteActiveObject2.java => RemoteConsumerPojo2.java} (92%) rename akka-samples/akka-sample-camel/src/main/resources/{sample-camel-context.xml => context-boot.xml} (100%) rename akka-samples/akka-sample-camel/src/main/scala/{Application1.scala => ClientApplication.scala} (70%) rename akka-samples/akka-sample-camel/src/main/scala/{Application2.scala => ServerApplication.scala} (94%) diff --git a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala index 1decc24eef..d80eedfd7a 100644 --- a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala @@ -2,20 +2,30 @@ package se.scalablesolutions.akka.camel.component import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} +import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.camel._ import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject} -import org.apache.camel.{ExchangePattern, Exchange, Processor} +import se.scalablesolutions.akka.camel._ +import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry} +import org.apache.camel.{ResolveEndpointFailedException, ExchangePattern, Exchange, Processor} /** * @author Martin Krasser */ class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach { + import ActiveObjectComponentFeatureTest._ + import CamelContextManager.template + override protected def beforeAll = { + val activePojo = ActiveObject.newInstance(classOf[Pojo]) // not a consumer val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) - CamelContextManager.init + val registry = new SimpleRegistry + registry.put("pojo", activePojo) + + CamelContextManager.init(new DefaultCamelContext(registry)) + CamelContextManager.context.addRoutes(new CustomRouteBuilder) CamelContextManager.start CamelContextManager.activeObjectRegistry.put("base", activePojoBase) @@ -29,7 +39,6 @@ class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAl feature("Communicate with an active object from a Camel application using active object endpoint URIs") { import ActiveObjectComponent.InternalSchema - import CamelContextManager.template import ExchangePattern._ scenario("in-out exchange with proxy created from interface and method returning String") { @@ -71,4 +80,26 @@ class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAl assert(result.getOut.getBody === null) } } + + feature("Communicate with an active object from a Camel application from a custom Camel route") { + + scenario("in-out exchange with externally registered active object") { + val result = template.requestBody("direct:test", "test") + assert(result === "foo: test") + } + + scenario("in-out exchange with internally registered active object not possible") { + intercept[ResolveEndpointFailedException] { + template.requestBodyAndHeader("active-object:intf?method=m2", "x", "test", "y") + } + } + } +} + +object ActiveObjectComponentFeatureTest { + class CustomRouteBuilder extends RouteBuilder { + def configure = { + from("direct:test").to("active-object:pojo?method=foo") + } + } } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java similarity index 79% rename from akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java rename to akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java index f08c486dac..ed29ac30e6 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java @@ -8,15 +8,15 @@ import se.scalablesolutions.akka.actor.annotation.consume; /** * @author Martin Krasser */ -public class Consumer10 { +public class ConsumerPojo1 { - @consume("file:data/input2") + @consume("file:data/input/pojo") public void foo(String body) { System.out.println("Received message:"); System.out.println(body); } - @consume("jetty:http://0.0.0.0:8877/camel/active") + @consume("jetty:http://0.0.0.0:8877/camel/pojo") public String bar(@Body String body, @Header("name") String header) { return String.format("body=%s header=%s", body, header); } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java similarity index 92% rename from akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java rename to akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java index 695aa148f4..0cf22a3e62 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.annotation.consume; /** * @author Martin Krasser */ -public class RemoteActiveObject1 { +public class RemoteConsumerPojo1 { @consume("jetty:http://localhost:6644/remote-active-object-1") public String foo(@Body String body, @Header("name") String header) { diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java similarity index 92% rename from akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java rename to akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java index 210a72d2f8..fc2f391233 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java @@ -7,7 +7,7 @@ import se.scalablesolutions.akka.actor.annotation.consume; /** * @author Martin Krasser */ -public class RemoteActiveObject2 { +public class RemoteConsumerPojo2 { @consume("jetty:http://localhost:6644/remote-active-object-2") public String foo(@Body String body, @Header("name") String header) { diff --git a/akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml b/akka-samples/akka-sample-camel/src/main/resources/context-boot.xml similarity index 100% rename from akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml rename to akka-samples/akka-sample-camel/src/main/resources/context-boot.xml diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala similarity index 70% rename from akka-samples/akka-sample-camel/src/main/scala/Application1.scala rename to akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala index 2b3c7b5db5..cf38267ab3 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala @@ -2,13 +2,13 @@ package sample.camel import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.Message -import se.scalablesolutions.akka.remote.RemoteClient import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRef} +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteClient} /** * @author Martin Krasser */ -object Application1 { +object ClientApplication { // // TODO: completion of example @@ -16,10 +16,10 @@ object Application1 { def main(args: Array[String]) { val actor1 = actorOf[RemoteActor1] - val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) + val actor2 = ClientApplication.actorFor("remote2", "localhost", 7777) - val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteActiveObject1], "localhost", 7777) - //val actobj2 = TODO: create reference to server-managed active object (RemoteActiveObject2) + val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteConsumerPojo1], "localhost", 7777) + //val actobj2 = TODO: create reference to server-managed active object (RemoteConsumerPojo2) actor1.start diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala b/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala similarity index 94% rename from akka-samples/akka-sample-camel/src/main/scala/Application2.scala rename to akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala index 411f7b96b4..d7267bc90d 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala @@ -7,7 +7,7 @@ import se.scalablesolutions.akka.actor.Actor._ /** * @author Martin Krasser */ -object Application2 { +object ServerApplication { // // TODO: completion of example From f2c0a0d0e69a66497096e1dbedd146788263e897 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 10 Jun 2010 17:09:23 +0200 Subject: [PATCH 07/12] restructured akka-sample-camel --- .../src/main/java/sample/camel/BeanImpl.java | 12 ++++ .../src/main/java/sample/camel/BeanIntf.java | 10 ++++ .../main/java/sample/camel/ConsumerPojo2.java | 17 ++++++ .../sample/camel/RemoteConsumerPojo1.java | 2 +- .../sample/camel/RemoteConsumerPojo2.java | 2 +- .../src/main/resources/context-standalone.xml | 12 ++++ .../src/main/scala/Actors.scala | 8 +-- .../src/main/scala/Boot.scala | 35 ++++++++--- .../src/main/scala/ClientApplication.scala | 6 +- .../src/main/scala/ServerApplication.scala | 2 +- .../main/scala/StandaloneApplication.scala | 60 +++++++++++++++++++ 11 files changed, 147 insertions(+), 19 deletions(-) create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java create mode 100644 akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java create mode 100644 akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml create mode 100644 akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java new file mode 100644 index 0000000000..10437e7624 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java @@ -0,0 +1,12 @@ +package sample.camel; + +/** + * @author Martin Krasser + */ +public class BeanImpl implements BeanIntf { + + public String foo(String s) { + return "hello " + s; + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java new file mode 100644 index 0000000000..a7b2e6e6a4 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java @@ -0,0 +1,10 @@ +package sample.camel; + +/** + * @author Martin Krasser + */ +public interface BeanIntf { + + public String foo(String s); + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java new file mode 100644 index 0000000000..429e6043ad --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java @@ -0,0 +1,17 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class ConsumerPojo2 { + + @consume("direct:default") + public String foo(String body) { + return String.format("default: %s", body); + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java index 0cf22a3e62..ab7e878b0d 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java @@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.annotation.consume; */ public class RemoteConsumerPojo1 { - @consume("jetty:http://localhost:6644/remote-active-object-1") + @consume("jetty:http://localhost:6644/camel/remote-active-object-1") public String foo(@Body String body, @Header("name") String header) { return String.format("remote1: body=%s header=%s", body, header); } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java index fc2f391233..e982fe5025 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java @@ -9,7 +9,7 @@ import se.scalablesolutions.akka.actor.annotation.consume; */ public class RemoteConsumerPojo2 { - @consume("jetty:http://localhost:6644/remote-active-object-2") + @consume("jetty:http://localhost:6644/camel/remote-active-object-2") public String foo(@Body String body, @Header("name") String header) { return String.format("remote2: body=%s header=%s", body, header); } diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml new file mode 100644 index 0000000000..a493678817 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml @@ -0,0 +1,12 @@ + + + + + diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index 25f7986730..f9feaa7f4d 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -9,7 +9,7 @@ import se.scalablesolutions.akka.util.Logging * Client-initiated remote actor. */ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { - def endpointUri = "jetty:http://localhost:6644/remote-actor-1" + def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-1" protected def receive = { case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote1"))) @@ -20,7 +20,7 @@ class RemoteActor1 extends RemoteActor("localhost", 7777) with Consumer { * Server-initiated remote actor. */ class RemoteActor2 extends Actor with Consumer { - def endpointUri = "jetty:http://localhost:6644/remote-actor-2" + def endpointUri = "jetty:http://localhost:6644/camel/remote-actor-2" protected def receive = { case msg: Message => self.reply(Message("hello %s" format msg.bodyAs[String], Map("sender" -> "remote2"))) @@ -37,14 +37,14 @@ class Producer1 extends Actor with Producer { } class Consumer1 extends Actor with Consumer with Logging { - def endpointUri = "file:data/input1" + def endpointUri = "file:data/input/actor" def receive = { case msg: Message => log.info("received %s" format msg.bodyAs[String]) } } -@consume("jetty:http://0.0.0.0:8877/camel/test1") +@consume("jetty:http://0.0.0.0:8877/camel/default") class Consumer2 extends Actor { def receive = { case msg: Message => self.reply("Hello %s" format msg.bodyAs[String]) diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 24e0a41605..f5335e5ecd 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -7,7 +7,7 @@ import org.apache.camel.spring.spi.ApplicationContextRegistry import org.springframework.context.support.ClassPathXmlApplicationContext import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.actor.{ActiveObject, SupervisorFactory} +import se.scalablesolutions.akka.actor.{ActiveObject, Supervisor} import se.scalablesolutions.akka.camel.CamelContextManager import se.scalablesolutions.akka.config.ScalaConfig._ @@ -16,23 +16,29 @@ import se.scalablesolutions.akka.config.ScalaConfig._ */ class Boot { + // ----------------------------------------------------------------------- // Create CamelContext with Spring-based registry and custom route builder + // ----------------------------------------------------------------------- - val context = new ClassPathXmlApplicationContext("/sample-camel-context.xml", getClass) + val context = new ClassPathXmlApplicationContext("/context-boot.xml", getClass) val registry = new ApplicationContextRegistry(context) + CamelContextManager.init(new DefaultCamelContext(registry)) CamelContextManager.context.addRoutes(new CustomRouteBuilder) - // Basic example + // ----------------------------------------------------------------------- + // Basic example (using a supervisor for consumer actors) + // ----------------------------------------------------------------------- - val factory = SupervisorFactory( + val supervisor = Supervisor( SupervisorConfig( RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), Supervise(actorOf[Consumer1], LifeCycle(Permanent)) :: Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil)) - factory.newInstance.start + // ----------------------------------------------------------------------- // Routing example + // ----------------------------------------------------------------------- val producer = actorOf[Producer1] val mediator = actorOf(new Transformer(producer)) @@ -42,7 +48,9 @@ class Boot { mediator.start consumer.start - // Publish subscribe example + // ----------------------------------------------------------------------- + // Publish subscribe examples + // ----------------------------------------------------------------------- // // Cometd example commented out because camel-cometd is broken in Camel 2.3 @@ -60,18 +68,27 @@ class Boot { //val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start + // ----------------------------------------------------------------------- + // Actor un-publishing and re-publishing example + // ----------------------------------------------------------------------- + actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor actorOf[Consumer5].start // POSTing any msg to http://0.0.0.0:8877/camel/start starts and published Consumer4 again. + // ----------------------------------------------------------------------- // Active object example - - ActiveObject.newInstance(classOf[Consumer10]) + // ----------------------------------------------------------------------- + + ActiveObject.newInstance(classOf[ConsumerPojo1]) } +/** + * @author Martin Krasser + */ class CustomRouteBuilder extends RouteBuilder { def configure { val actorUri = "actor:%s" format classOf[Consumer2].getName - from("jetty:http://0.0.0.0:8877/camel/test2").to(actorUri) + from("jetty:http://0.0.0.0:8877/camel/custom").to(actorUri) from("direct:welcome").process(new Processor() { def process(exchange: Exchange) { exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody) diff --git a/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala index cf38267ab3..467d715360 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/ClientApplication.scala @@ -1,9 +1,9 @@ package sample.camel import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.camel.Message import se.scalablesolutions.akka.actor.{ActiveObject, Actor, ActorRef} -import se.scalablesolutions.akka.remote.{RemoteClient, RemoteClient} +import se.scalablesolutions.akka.camel.Message +import se.scalablesolutions.akka.remote.RemoteClient /** * @author Martin Krasser @@ -16,7 +16,7 @@ object ClientApplication { def main(args: Array[String]) { val actor1 = actorOf[RemoteActor1] - val actor2 = ClientApplication.actorFor("remote2", "localhost", 7777) + val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) val actobj1 = ActiveObject.newRemoteInstance(classOf[RemoteConsumerPojo1], "localhost", 7777) //val actobj2 = TODO: create reference to server-managed active object (RemoteConsumerPojo2) diff --git a/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala index d7267bc90d..8f53bbb866 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/ServerApplication.scala @@ -1,8 +1,8 @@ package sample.camel +import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.CamelService import se.scalablesolutions.akka.remote.RemoteNode -import se.scalablesolutions.akka.actor.Actor._ /** * @author Martin Krasser diff --git a/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala new file mode 100644 index 0000000000..ebfabe9ce2 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/scala/StandaloneApplication.scala @@ -0,0 +1,60 @@ +package sample.camel + +import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry} +import org.apache.camel.builder.RouteBuilder + +import se.scalablesolutions.akka.camel.{CamelService, CamelContextManager} +import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject} + +/** + * @author Martin Krasser + */ +object PlainApplication { + def main(args: Array[String]) { + import CamelContextManager.context + + // 'externally' register active objects + val registry = new SimpleRegistry + registry.put("pojo1", ActiveObject.newInstance(classOf[BeanIntf], new BeanImpl)) + registry.put("pojo2", ActiveObject.newInstance(classOf[BeanImpl])) + + // customize CamelContext + CamelContextManager.init(new DefaultCamelContext(registry)) + CamelContextManager.context.addRoutes(new PlainApplicationRoute) + + // start CamelService + val camelService = CamelService.newInstance + camelService.load + + // 'internally' register active object (requires CamelService) + ActiveObject.newInstance(classOf[ConsumerPojo2]) + + // access 'externally' registered active objects with active-object component + assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test1", "msg1")) + assert("hello msg2" == context.createProducerTemplate.requestBody("direct:test2", "msg2")) + + // internal registration is done in background. Wait a bit ... + Thread.sleep(1000) + + // access 'internally' (automatically) registered active-objects + // (see @consume annotation value at ConsumerPojo2.foo method) + assert("default: msg3" == context.createProducerTemplate.requestBody("direct:default", "msg3")) + + // shutdown CamelService + camelService.unload + + // shutdown all (internally) created actors + ActorRegistry.shutdownAll + } +} + +class PlainApplicationRoute extends RouteBuilder { + def configure = { + from("direct:test1").to("active-object:pojo1?method=foo") + from("direct:test2").to("active-object:pojo2?method=foo") + } +} + +object SpringApplication { + // TODO +} From 5d3400240632570e5670abc3df5b6f3d060c11e0 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Thu, 10 Jun 2010 17:11:20 +0200 Subject: [PATCH 08/12] remote consumer tests --- .../se/scalablesolutions/akka/camel/Pojo.java | 14 +++ .../akka/camel/PojoRemote.java | 15 ++++ .../src/test/scala/RemoteConsumerTest.scala | 89 +++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java create mode 100644 akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java create mode 100644 akka-camel/src/test/scala/RemoteConsumerTest.scala diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java new file mode 100644 index 0000000000..d1848c49ee --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java @@ -0,0 +1,14 @@ +package se.scalablesolutions.akka.camel; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class Pojo { + + public String foo(String s) { + return String.format("foo: %s", s); + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java new file mode 100644 index 0000000000..57b0999b8f --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java @@ -0,0 +1,15 @@ +package se.scalablesolutions.akka.camel; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoRemote { + + @consume("direct:remote-active-object") + public String foo(String s) { + return String.format("remote active object: %s", s); + } + +} diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala new file mode 100644 index 0000000000..e1a7842e0d --- /dev/null +++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala @@ -0,0 +1,89 @@ +package se.scalablesolutions.akka.camel + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec} + +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{ActiveObject, ActorRegistry, RemoteActor} +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} + +/** + * @author Martin Krasser + */ +class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen { + import RemoteConsumerTest._ + + var service: CamelService = _ + var server: RemoteServer = _ + + override protected def beforeAll = { + ActorRegistry.shutdownAll + + service = CamelService.newInstance + service.load + + server = new RemoteServer() + server.start(host, port) + + Thread.sleep(1000) + } + + override protected def afterAll = { + server.shutdown + service.unload + + RemoteClient.shutdownAll + ActorRegistry.shutdownAll + + Thread.sleep(1000) + } + + feature("Client-initiated remote consumer actor") { + scenario("access published remote consumer actor") { + given("a client-initiated remote consumer actor") + val consumer = actorOf[RemoteConsumer].start + + when("remote consumer publication is triggered") + val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get + consumer !! "init" + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + then("the published actor is accessible via its endpoint URI") + val response = CamelContextManager.template.requestBody("direct:remote-actor", "test") + assert(response === "remote actor: test") + } + } + + /* TODO: enable once issues with remote active objects are resolved + feature("Client-initiated remote consumer active object") { + scenario("access published remote consumer method") { + given("a client-initiated remote consumer active object") + val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port) + + when("remote consumer publication is triggered") + val latch = service.consumerPublisher.!![CountDownLatch](SetExpectedMessageCount(1)).get + consumer.foo("init") + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + then("the published method is accessible via its endpoint URI") + val response = CamelContextManager.template.requestBody("direct:remote-active-object", "test") + assert(response === "remote active object: test") + } + } + */ +} + +object RemoteConsumerTest { + val host = "localhost" + val port = 7774 + + class RemoteConsumer extends RemoteActor(host, port) with Consumer { + def endpointUri = "direct:remote-actor" + + protected def receive = { + case "init" => self.reply("done") + case m: Message => self.reply("remote actor: %s" format m.body) + } + } +} \ No newline at end of file From ccf9e09ed42d6eea2cb0d5f8d80e5c3708a76c39 Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Fri, 11 Jun 2010 01:27:02 +0530 Subject: [PATCH 09/12] Redis persistence now handles serialized classes.Removed apis for increment / decrement atomically from Ref. Issue #267 fixed --- .../src/main/scala/StorageBackend.scala | 8 - .../src/main/scala/RedisStorageBackend.scala | 121 +++++++------- .../test/scala/RedisStorageBackendSpec.scala | 156 +++++++++--------- project/build/AkkaProject.scala | 1 + 4 files changed, 137 insertions(+), 149 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index a5226eb1a4..df74040b68 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -33,14 +33,6 @@ trait VectorStorageBackend[T] extends StorageBackend { trait RefStorageBackend[T] extends StorageBackend { def insertRefStorageFor(name: String, element: T) def getRefStorageFor(name: String): Option[T] - def incrementAtomically(name: String): Option[Int] = - throw new UnsupportedOperationException // only for redis - def incrementByAtomically(name: String, by: Int): Option[Int] = - throw new UnsupportedOperationException // only for redis - def decrementAtomically(name: String): Option[Int] = - throw new UnsupportedOperationException // only for redis - def decrementByAtomically(name: String, by: Int): Option[Int] = - throw new UnsupportedOperationException // only for redis } // for Queue diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index b1973c3c7b..ad758f9999 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -11,20 +11,45 @@ import se.scalablesolutions.akka.config.Config.config import com.redis._ -trait Encoder { +trait Base64Encoder { def encode(bytes: Array[Byte]): Array[Byte] def decode(bytes: Array[Byte]): Array[Byte] } -trait CommonsCodecBase64 { - import org.apache.commons.codec.binary.Base64._ - - def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) - def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) +trait Base64StringEncoder { + def byteArrayToString(bytes: Array[Byte]): String + def stringToByteArray(str: String): Array[Byte] } -object Base64Encoder extends Encoder with CommonsCodecBase64 -import Base64Encoder._ +trait NullBase64 { + def encode(bytes: Array[Byte]): Array[Byte] = bytes + def decode(bytes: Array[Byte]): Array[Byte] = bytes +} + +object CommonsCodec { + import org.apache.commons.codec.binary.Base64 + import org.apache.commons.codec.binary.Base64._ + + val b64 = new Base64(true) + + trait CommonsCodecBase64 { + def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) + def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) + } + + object Base64Encoder extends Base64Encoder with CommonsCodecBase64 + + trait CommonsCodecBase64StringEncoder { + def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) + def stringToByteArray(str: String) = b64.decode(str) + } + + object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder +} + +import CommonsCodec._ +import CommonsCodec.Base64Encoder._ +import CommonsCodec.Base64StringEncoder._ /** * A module for supporting Redis based persistence. @@ -95,7 +120,7 @@ private [akka] object RedisStorageBackend extends def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling { mset(entries.map(e => - (makeRedisKey(name, e._1), new String(e._2)))) + (makeRedisKey(name, e._1), byteArrayToString(e._2)))) } /** @@ -138,7 +163,7 @@ private [akka] object RedisStorageBackend extends db.get(makeRedisKey(name, key)) match { case None => throw new NoSuchElementException(new String(key) + " not present") - case Some(s) => Some(s.getBytes) + case Some(s) => Some(stringToByteArray(s)) } } @@ -155,7 +180,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList + keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList } } @@ -207,7 +232,7 @@ private [akka] object RedisStorageBackend extends } def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.lpush(new String(encode(name.getBytes)), new String(element)) + db.lpush(new String(encode(name.getBytes)), byteArrayToString(element)) } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { @@ -215,14 +240,15 @@ private [akka] object RedisStorageBackend extends } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { - db.lset(new String(encode(name.getBytes)), index, new String(elem)) + db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem)) } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { db.lindex(new String(encode(name.getBytes)), index) match { case None => throw new NoSuchElementException(name + " does not have element at " + index) - case Some(e) => e.getBytes + case Some(e) => + stringToByteArray(e) } } @@ -246,75 +272,46 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " does not have elements in the range specified") case Some(l) => - l map (_.get.getBytes) + l map ( e => stringToByteArray(e.get)) } } - def getVectorStorageSizeFor(name: String): Int = { + def getVectorStorageSizeFor(name: String): Int = withErrorHandling { db.llen(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(l) => l + case Some(l) => + l } } def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), new String(element)) + db.set(new String(encode(name.getBytes)), byteArrayToString(element)) + } + + def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling { + db.set(new String(encode(name.getBytes)), element) } def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { db.get(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(s) => Some(s.getBytes) - } - } - - override def incrementAtomically(name: String): Option[Int] = withErrorHandling { - db.incr(new String(encode(name.getBytes))) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in incr") - } - } - - override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { - db.incrby(new String(encode(name.getBytes)), by) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in incrby") - } - } - - override def decrementAtomically(name: String): Option[Int] = withErrorHandling { - db.decr(new String(encode(name.getBytes))) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in decr") - } - } - - override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { - db.decrby(new String(encode(name.getBytes)), by) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in decrby") + case Some(s) => Some(stringToByteArray(s)) } } // add to the end of the queue def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.rpush(new String(encode(name.getBytes)), new String(item)) + db.rpush(new String(encode(name.getBytes)), byteArrayToString(item)) } - // pop from the front of the queue def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { db.lpop(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(s) => - Some(s.getBytes) + case Some(s) => Some(stringToByteArray(s)) } } @@ -336,7 +333,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException("No element at " + start) case Some(s) => - List(s.getBytes) + List(stringToByteArray(s)) } case n => db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { @@ -344,7 +341,7 @@ private [akka] object RedisStorageBackend extends throw new NoSuchElementException( "No element found between " + start + " and " + (start + count - 1)) case Some(es) => - es.map(_.get.getBytes) + es.map(e => stringToByteArray(e.get)) } } } @@ -359,7 +356,7 @@ private [akka] object RedisStorageBackend extends // add item to sorted set identified by name def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match { + db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -367,7 +364,7 @@ private [akka] object RedisStorageBackend extends // remove item from sorted set identified by name def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zrem(new String(encode(name.getBytes)), new String(item)) match { + db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -383,7 +380,7 @@ private [akka] object RedisStorageBackend extends } def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling { - db.zscore(new String(encode(name.getBytes)), new String(item)) match { + db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match { case Some(s) => Some(s.toFloat) case None => None } @@ -394,7 +391,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(s) => - s.map(_.get.getBytes) + s.map(e => stringToByteArray(e.get)) } } @@ -404,7 +401,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(l) => - l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) } + l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) } } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index d8d79d7f2a..8a8021b3c5 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -9,6 +9,11 @@ import org.junit.runner.RunWith import se.scalablesolutions.akka.serialization.Serializable import se.scalablesolutions.akka.serialization.Serializer._ +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ +import java.util.{Calendar, Date} + import RedisStorageBackend._ @RunWith(classOf[JUnitRunner]) @@ -39,15 +44,6 @@ class RedisStorageBackendSpec extends "T-1", "debasish.language".getBytes).get) should equal("java") } - /** - it("should enter a custom object for transaction T-1") { - val n = Name(100, "debasish", "kolkata") - // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, Java.out(n)) - // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes) - getMapStorageSizeFor("T-1") should equal(5) - } - **/ - it("should enter key/values for another transaction T-2") { insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes) insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes) @@ -61,6 +57,21 @@ class RedisStorageBackendSpec extends } } + describe("Store and query long value in maps") { + it("should enter 4 entries in redis for transaction T-1") { + val d = Calendar.getInstance.getTime.getTime + insertMapStorageEntryFor("T-11", "debasish".getBytes, + toByteArray[Long](d)) + + getMapStorageSizeFor("T-11") should equal(1) + fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) should equal(d) + } + + it("should remove map storage for T-1 and T2") { + removeMapStorageFor("T-11") + } + } + describe("Range query in maps") { it("should enter 7 entries in redis for transaction T-5") { insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes) @@ -93,73 +104,61 @@ class RedisStorageBackendSpec extends } } + describe("Store and query objects in maps") { + import NameSerialization._ + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n)) + getMapStorageSizeFor("T-31") should equal(1) + fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).get) should equal(n) + } + it("should remove map storage for T31") { + removeMapStorageFor("T-31") + } + } + describe("Store and query in vectors") { it("should write 4 entries in a vector for transaction T-3") { insertVectorStorageEntryFor("T-3", "debasish".getBytes) insertVectorStorageEntryFor("T-3", "maulindu".getBytes) - val n = Name(100, "debasish", "kolkata") - // insertVectorStorageEntryFor("T-3", Java.out(n)) - // insertVectorStorageEntryFor("T-3", n.toBytes) insertVectorStorageEntryFor("T-3", "1200".getBytes) - getVectorStorageSizeFor("T-3") should equal(3) + + val dt = Calendar.getInstance.getTime.getTime + insertVectorStorageEntryFor("T-3", toByteArray[Long](dt)) + getVectorStorageSizeFor("T-3") should equal(4) + fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) should equal(dt) + getVectorStorageSizeFor("T-3") should equal(4) + } + } + + describe("Store and query objects in vectors") { + import NameSerialization._ + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertVectorStorageEntryFor("T-31", toByteArray[Name](n)) + getVectorStorageSizeFor("T-31") should equal(1) + fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) should equal(n) } } describe("Store and query in ref") { + import NameSerialization._ it("should write 4 entries in 4 refs for transaction T-4") { insertRefStorageFor("T-4", "debasish".getBytes) insertRefStorageFor("T-4", "maulindu".getBytes) insertRefStorageFor("T-4", "1200".getBytes) new String(getRefStorageFor("T-4").get) should equal("1200") - - // val n = Name(100, "debasish", "kolkata") - // insertRefStorageFor("T-4", Java.out(n)) - // insertRefStorageFor("T-4", n.toBytes) - // Java.in(getRefStorageFor("T-4").get, Some(classOf[Name])).asInstanceOf[Name] should equal(n) - // n.fromBytes(getRefStorageFor("T-4").get) should equal(n) } - } - - describe("atomic increment in ref") { - it("should increment an existing key value by 1") { - insertRefStorageFor("T-4-1", "1200".getBytes) - new String(getRefStorageFor("T-4-1").get) should equal("1200") - incrementAtomically("T-4-1").get should equal(1201) - } - it("should create and increment a non-existing key value by 1") { - incrementAtomically("T-4-2").get should equal(1) - new String(getRefStorageFor("T-4-2").get) should equal("1") - } - it("should increment an existing key value by the amount specified") { - insertRefStorageFor("T-4-3", "1200".getBytes) - new String(getRefStorageFor("T-4-3").get) should equal("1200") - incrementByAtomically("T-4-3", 50).get should equal(1250) - } - it("should create and increment a non-existing key value by the amount specified") { - incrementByAtomically("T-4-4", 20).get should equal(20) - new String(getRefStorageFor("T-4-4").get) should equal("20") - } - } - - describe("atomic decrement in ref") { - it("should decrement an existing key value by 1") { - insertRefStorageFor("T-4-5", "1200".getBytes) - new String(getRefStorageFor("T-4-5").get) should equal("1200") - decrementAtomically("T-4-5").get should equal(1199) - } - it("should create and decrement a non-existing key value by 1") { - decrementAtomically("T-4-6").get should equal(-1) - new String(getRefStorageFor("T-4-6").get) should equal("-1") - } - it("should decrement an existing key value by the amount specified") { - insertRefStorageFor("T-4-7", "1200".getBytes) - new String(getRefStorageFor("T-4-7").get) should equal("1200") - decrementByAtomically("T-4-7", 50).get should equal(1150) - } - it("should create and decrement a non-existing key value by the amount specified") { - decrementByAtomically("T-4-8", 20).get should equal(-20) - new String(getRefStorageFor("T-4-8").get) should equal("-20") + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + insertRefStorageFor("T-4", toByteArray[Name](n)) + fromByteArray[Name](getRefStorageFor("T-4").get) should equal(n) } } @@ -185,6 +184,14 @@ class RedisStorageBackendSpec extends new String(l(1)) should equal("yukihiro matsumoto") new String(l(2)) should equal("claude shannon") } + it("should write a Name object and fetch it properly") { + import NameSerialization._ + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + enqueue("T-5-1", toByteArray[Name](n)) + fromByteArray[Name](peek("T-5-1", 0, 1).head) should equal(n) + fromByteArray[Name](dequeue("T-5-1").get) should equal(n) + } } describe("store and query in sorted set") { @@ -221,27 +228,18 @@ class RedisStorageBackendSpec extends } } -case class Name(id: Int, name: String, address: String) - extends Serializable.SBinary[Name] { - import sbinary._ - import sbinary.Operations._ - import sbinary.DefaultProtocol._ +object NameSerialization { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = + new Date(read[Long](in)) - def this() = this(0, null, null) - - implicit object NameFormat extends Format[Name] { - def reads(in : Input) = Name( - read[Int](in), - read[String](in), - read[String](in)) - def writes(out: Output, value: Name) = { - write[Int](out, value.id) - write[String](out, value.name) - write[String](out, value.address) - } + def writes(out: Output, value: Date) = + write[Long](out, value.getTime) } - def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes) + case class Name(id: Int, name: String, + address: String, dateOfBirth: Date, dateDied: Option[Date]) - def toBytes: Array[Byte] = toByteArray(this) + implicit val NameFormat: Format[Name] = + asProduct5(Name)(Name.unapply(_).get) } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index a9db966298..9c7af71867 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -241,6 +241,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile" + val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil } From 2ec2a3665f2bf5cf69c01dc3d1cc105cd81aad51 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 11 Jun 2010 18:29:41 +1200 Subject: [PATCH 10/12] Marked Multiverse dependency as intransitive --- project/build/AkkaProject.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 9c7af71867..850311c5f4 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -190,7 +190,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" - val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" + val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive() val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile" // testing From 8fe51dd37ad81fb97e80abd8d679c6767c5cf1c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 15 Jun 2010 14:36:43 +0200 Subject: [PATCH 11/12] fixed problem with cassandra map storage in rest example --- .../src/main/scala/SimpleService.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index f31e10861d..657a6ba217 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -67,11 +67,12 @@ class SimpleServiceActor extends Transactor { def receive = { case "Tick" => if (hasStartedTicking) { - val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue - storage.put(KEY, new Integer(counter + 1)) + val bytes = storage.get(KEY.getBytes).get + val counter = Integer.parseInt(new String(bytes, "UTF8")) + storage.put(KEY.getBytes, (counter + 1).toString.getBytes ) self.reply(Tick:{counter + 1}) } else { - storage.put(KEY, new Integer(0)) + storage.put(KEY.getBytes, "0".getBytes) hasStartedTicking = true self.reply(Tick: 0) } From ee193659c110e37d00c3edea864f775907e027ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 15 Jun 2010 15:36:41 +0200 Subject: [PATCH 12/12] Made AMQP UnregisterMessageConsumerListener public --- akka-amqp/src/main/scala/AMQP.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 04a22c2089..04e4570310 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -259,7 +259,7 @@ object AMQP { case object Stop extends AMQPMessage - private[akka] case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage + case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage