diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 04a22c2089..04e4570310 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -259,7 +259,7 @@ object AMQP { case object Stop extends AMQPMessage - private[akka] case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage + case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java new file mode 100644 index 0000000000..d1848c49ee --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/Pojo.java @@ -0,0 +1,14 @@ +package se.scalablesolutions.akka.camel; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class Pojo { + + public String foo(String s) { + return String.format("foo: %s", s); + } + +} \ No newline at end of file diff --git a/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java new file mode 100644 index 0000000000..57b0999b8f --- /dev/null +++ b/akka-camel/src/test/java/se/scalablesolutions/akka/camel/PojoRemote.java @@ -0,0 +1,15 @@ +package se.scalablesolutions.akka.camel; + +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class PojoRemote { + + @consume("direct:remote-active-object") + public String foo(String s) { + return String.format("remote active object: %s", s); + } + +} diff --git a/akka-camel/src/test/scala/RemoteConsumerTest.scala b/akka-camel/src/test/scala/RemoteConsumerTest.scala new file mode 100644 index 0000000000..e1a7842e0d --- /dev/null +++ b/akka-camel/src/test/scala/RemoteConsumerTest.scala @@ -0,0 +1,89 @@ +package se.scalablesolutions.akka.camel + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import org.scalatest.{GivenWhenThen, BeforeAndAfterAll, FeatureSpec} + +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.actor.{ActiveObject, ActorRegistry, RemoteActor} +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} + +/** + * @author Martin Krasser + */ +class RemoteConsumerTest extends FeatureSpec with BeforeAndAfterAll with GivenWhenThen { + import RemoteConsumerTest._ + + var service: CamelService = _ + var server: RemoteServer = _ + + override protected def beforeAll = { + ActorRegistry.shutdownAll + + service = CamelService.newInstance + service.load + + server = new RemoteServer() + server.start(host, port) + + Thread.sleep(1000) + } + + override protected def afterAll = { + server.shutdown + service.unload + + RemoteClient.shutdownAll + ActorRegistry.shutdownAll + + Thread.sleep(1000) + } + + feature("Client-initiated remote consumer actor") { + scenario("access published remote consumer actor") { + given("a client-initiated remote consumer actor") + val consumer = actorOf[RemoteConsumer].start + + when("remote consumer publication is triggered") + val latch = service.consumerPublisher.!).get + consumer !! "init" + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + then("the published actor is accessible via its endpoint URI") + val response = CamelContextManager.template.requestBody("direct:remote-actor", "test") + assert(response === "remote actor: test") + } + } + + /* TODO: enable once issues with remote active objects are resolved + feature("Client-initiated remote consumer active object") { + scenario("access published remote consumer method") { + given("a client-initiated remote consumer active object") + val consumer = ActiveObject.newRemoteInstance(classOf[PojoRemote], host, port) + + when("remote consumer publication is triggered") + val latch = service.consumerPublisher.!).get + consumer.foo("init") + assert(latch.await(5000, TimeUnit.MILLISECONDS)) + + then("the published method is accessible via its endpoint URI") + val response = CamelContextManager.template.requestBody("direct:remote-active-object", "test") + assert(response === "remote active object: test") + } + } + */ +} + +object RemoteConsumerTest { + val host = "localhost" + val port = 7774 + + class RemoteConsumer extends RemoteActor(host, port) with Consumer { + def endpointUri = "direct:remote-actor" + + protected def receive = { + case "init" => self.reply("done") + case m: Message => self.reply("remote actor: %s" format m.body) + } + } +} \ No newline at end of file diff --git a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala index 1decc24eef..d80eedfd7a 100644 --- a/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala +++ b/akka-camel/src/test/scala/component/ActiveObjectComponentFeatureTest.scala @@ -2,20 +2,30 @@ package se.scalablesolutions.akka.camel.component import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FeatureSpec} +import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.camel._ import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject} -import org.apache.camel.{ExchangePattern, Exchange, Processor} +import se.scalablesolutions.akka.camel._ +import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry} +import org.apache.camel.{ResolveEndpointFailedException, ExchangePattern, Exchange, Processor} /** * @author Martin Krasser */ class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAll with BeforeAndAfterEach { + import ActiveObjectComponentFeatureTest._ + import CamelContextManager.template + override protected def beforeAll = { + val activePojo = ActiveObject.newInstance(classOf[Pojo]) // not a consumer val activePojoBase = ActiveObject.newInstance(classOf[PojoBase]) val activePojoIntf = ActiveObject.newInstance(classOf[PojoIntf], new PojoImpl) - CamelContextManager.init + val registry = new SimpleRegistry + registry.put("pojo", activePojo) + + CamelContextManager.init(new DefaultCamelContext(registry)) + CamelContextManager.context.addRoutes(new CustomRouteBuilder) CamelContextManager.start CamelContextManager.activeObjectRegistry.put("base", activePojoBase) @@ -29,7 +39,6 @@ class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAl feature("Communicate with an active object from a Camel application using active object endpoint URIs") { import ActiveObjectComponent.InternalSchema - import CamelContextManager.template import ExchangePattern._ scenario("in-out exchange with proxy created from interface and method returning String") { @@ -71,4 +80,26 @@ class ActiveObjectComponentFeatureTest extends FeatureSpec with BeforeAndAfterAl assert(result.getOut.getBody === null) } } + + feature("Communicate with an active object from a Camel application from a custom Camel route") { + + scenario("in-out exchange with externally registered active object") { + val result = template.requestBody("direct:test", "test") + assert(result === "foo: test") + } + + scenario("in-out exchange with internally registered active object not possible") { + intercept[ResolveEndpointFailedException] { + template.requestBodyAndHeader("active-object:intf?method=m2", "x", "test", "y") + } + } + } +} + +object ActiveObjectComponentFeatureTest { + class CustomRouteBuilder extends RouteBuilder { + def configure = { + from("direct:test").to("active-object:pojo?method=foo") + } + } } diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 59ab5c06c2..5f08c7b900 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement { *
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'. * - * This field is used for logging, AspectRegistry.actorsFor, identifier for remote + * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote * actor in RemoteServer etc.But also as the identifier for persistence, which means * that you can use a custom name to be able to retrieve the "correct" persisted state * upon restart, remote restart etc. @@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement { protected[akka] var _sender: Option[ActorRef] = None protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None - protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s} - protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf} + protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s } + protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf } /** * The reference sender Actor of the last received message. @@ -243,6 +243,11 @@ trait ActorRef extends TransactionManagement { */ def uuid = _uuid + /** + * Tests if the actor is able to handle the message passed in as arguments. + */ + def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message) + /** * Only for internal use. UUID is effectively final. */ @@ -891,7 +896,6 @@ sealed class LocalActorRef private[akka]( } protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -924,7 +928,6 @@ sealed class LocalActorRef private[akka]( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -974,9 +977,9 @@ sealed class LocalActorRef private[akka]( Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) return } + sender = messageHandle.sender + senderFuture = messageHandle.senderFuture try { - sender = messageHandle.sender - senderFuture = messageHandle.senderFuture if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) } catch { @@ -990,9 +993,7 @@ sealed class LocalActorRef private[akka]( val message = messageHandle.message //serializeMessage(messageHandle.message) setTransactionSet(messageHandle.transactionSet) try { - if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - "No handler matching message [" + message + "] in " + toString) + actor.base(message) } catch { case e => _isBeingRestarted = true @@ -1021,20 +1022,16 @@ sealed class LocalActorRef private[akka]( } setTransactionSet(txSet) - def proceed = { - if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - toString + " could not process message [" + message + "]" + - "\n\tsince no matching 'case' clause in its 'receive' method could be found") - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - try { if (isTransactor) { atomic { - proceed + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit } - } else proceed + } else { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } } catch { case e: IllegalStateException => {} case e => diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index ba1e8cb056..68842ad1e3 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -16,7 +16,7 @@ class ConfigurationException(message: String) extends RuntimeException(message) * @author Jonas Bonér */ object Config extends Logging { - val VERSION = "0.10-SNAPSHOT" + val VERSION = "0.10" // Set Multiverse options for max speed System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index cfb8a9a5ea..da0f9be72b 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory { def nextId: Long = id.getAndIncrement + nodeId } +/** + * Life-cycle events for RemoteClient. + */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent +case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent @@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O } } else { val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port)) throw exception } @@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String, futures.remove(reply.getId) } else { val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port)) throw exception } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String, client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { client.listeners.toArray.foreach(l => - l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause)) + l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String, } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-core/src/test/scala/Bench.scala b/akka-core/src/test/scala/Bench.scala new file mode 100644 index 0000000000..454fc2b7e9 --- /dev/null +++ b/akka-core/src/test/scala/Bench.scala @@ -0,0 +1,119 @@ +/* The Computer Language Benchmarks Game + http://shootout.alioth.debian.org/ + contributed by Julien Gaugaz + inspired by the version contributed by Yura Taras and modified by Isaac Gouy +*/ +package se.scalablesolutions.akka.actor + +import se.scalablesolutions.akka.actor.Actor._ + +object Chameneos { + + sealed trait ChameneosEvent + case class Meet(from: ActorRef, colour: Colour) extends ChameneosEvent + case class Change(colour: Colour) extends ChameneosEvent + case class MeetingCount(count: Int) extends ChameneosEvent + case object Exit extends ChameneosEvent + + abstract class Colour + case object RED extends Colour + case object YELLOW extends Colour + case object BLUE extends Colour + case object FADED extends Colour + + val colours = Array[Colour](BLUE, RED, YELLOW) + + var start = 0L + var end = 0L + + class Chameneo(var mall: ActorRef, var colour: Colour, cid: Int) extends Actor { + var meetings = 0 + self.start + mall ! Meet(self, colour) + + def receive = { + case Meet(from, otherColour) => + colour = complement(otherColour) + meetings = meetings +1 + from ! Change(colour) + mall ! Meet(self, colour) + + case Change(newColour) => + colour = newColour + meetings = meetings +1 + mall ! Meet(self, colour) + + case Exit => + colour = FADED + self.sender.get ! MeetingCount(meetings) + } + + def complement(otherColour: Colour): Colour = colour match { + case RED => otherColour match { + case RED => RED + case YELLOW => BLUE + case BLUE => YELLOW + case FADED => FADED + } + case YELLOW => otherColour match { + case RED => BLUE + case YELLOW => YELLOW + case BLUE => RED + case FADED => FADED + } + case BLUE => otherColour match { + case RED => YELLOW + case YELLOW => RED + case BLUE => BLUE + case FADED => FADED + } + case FADED => FADED + } + + override def toString = cid + "(" + colour + ")" + } + + class Mall(var n: Int, numChameneos: Int) extends Actor { + var waitingChameneo: Option[ActorRef] = None + var sumMeetings = 0 + var numFaded = 0 + + override def init = { + for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i)) + } + + def receive = { + case MeetingCount(i) => + numFaded += 1 + sumMeetings += i + if (numFaded == numChameneos) { + Chameneos.end = System.currentTimeMillis + self.stop + } + + case msg @ Meet(a, c) => + if (n > 0) { + waitingChameneo match { + case Some(chameneo) => + n -= 1 + chameneo ! msg + waitingChameneo = None + case None => waitingChameneo = self.sender + } + } else { + waitingChameneo.foreach(_ ! Exit) + self.sender.get ! Exit + } + } + } + + def run { +// System.setProperty("akka.config", "akka.conf") + Chameneos.start = System.currentTimeMillis + actorOf(new Mall(1000000, 4)).start + Thread.sleep(10000) + println("Elapsed: " + (end - start)) + } + + def main(args : Array[String]): Unit = run +} diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index a5226eb1a4..df74040b68 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -33,14 +33,6 @@ trait VectorStorageBackend[T] extends StorageBackend { trait RefStorageBackend[T] extends StorageBackend { def insertRefStorageFor(name: String, element: T) def getRefStorageFor(name: String): Option[T] - def incrementAtomically(name: String): Option[Int] = - throw new UnsupportedOperationException // only for redis - def incrementByAtomically(name: String, by: Int): Option[Int] = - throw new UnsupportedOperationException // only for redis - def decrementAtomically(name: String): Option[Int] = - throw new UnsupportedOperationException // only for redis - def decrementByAtomically(name: String, by: Int): Option[Int] = - throw new UnsupportedOperationException // only for redis } // for Queue diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index b1973c3c7b..ad758f9999 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -11,20 +11,45 @@ import se.scalablesolutions.akka.config.Config.config import com.redis._ -trait Encoder { +trait Base64Encoder { def encode(bytes: Array[Byte]): Array[Byte] def decode(bytes: Array[Byte]): Array[Byte] } -trait CommonsCodecBase64 { - import org.apache.commons.codec.binary.Base64._ - - def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) - def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) +trait Base64StringEncoder { + def byteArrayToString(bytes: Array[Byte]): String + def stringToByteArray(str: String): Array[Byte] } -object Base64Encoder extends Encoder with CommonsCodecBase64 -import Base64Encoder._ +trait NullBase64 { + def encode(bytes: Array[Byte]): Array[Byte] = bytes + def decode(bytes: Array[Byte]): Array[Byte] = bytes +} + +object CommonsCodec { + import org.apache.commons.codec.binary.Base64 + import org.apache.commons.codec.binary.Base64._ + + val b64 = new Base64(true) + + trait CommonsCodecBase64 { + def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) + def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) + } + + object Base64Encoder extends Base64Encoder with CommonsCodecBase64 + + trait CommonsCodecBase64StringEncoder { + def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) + def stringToByteArray(str: String) = b64.decode(str) + } + + object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder +} + +import CommonsCodec._ +import CommonsCodec.Base64Encoder._ +import CommonsCodec.Base64StringEncoder._ /** * A module for supporting Redis based persistence. @@ -95,7 +120,7 @@ private [akka] object RedisStorageBackend extends def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling { mset(entries.map(e => - (makeRedisKey(name, e._1), new String(e._2)))) + (makeRedisKey(name, e._1), byteArrayToString(e._2)))) } /** @@ -138,7 +163,7 @@ private [akka] object RedisStorageBackend extends db.get(makeRedisKey(name, key)) match { case None => throw new NoSuchElementException(new String(key) + " not present") - case Some(s) => Some(s.getBytes) + case Some(s) => Some(stringToByteArray(s)) } } @@ -155,7 +180,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList + keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList } } @@ -207,7 +232,7 @@ private [akka] object RedisStorageBackend extends } def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.lpush(new String(encode(name.getBytes)), new String(element)) + db.lpush(new String(encode(name.getBytes)), byteArrayToString(element)) } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { @@ -215,14 +240,15 @@ private [akka] object RedisStorageBackend extends } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { - db.lset(new String(encode(name.getBytes)), index, new String(elem)) + db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem)) } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { db.lindex(new String(encode(name.getBytes)), index) match { case None => throw new NoSuchElementException(name + " does not have element at " + index) - case Some(e) => e.getBytes + case Some(e) => + stringToByteArray(e) } } @@ -246,75 +272,46 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " does not have elements in the range specified") case Some(l) => - l map (_.get.getBytes) + l map ( e => stringToByteArray(e.get)) } } - def getVectorStorageSizeFor(name: String): Int = { + def getVectorStorageSizeFor(name: String): Int = withErrorHandling { db.llen(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(l) => l + case Some(l) => + l } } def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), new String(element)) + db.set(new String(encode(name.getBytes)), byteArrayToString(element)) + } + + def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling { + db.set(new String(encode(name.getBytes)), element) } def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { db.get(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(s) => Some(s.getBytes) - } - } - - override def incrementAtomically(name: String): Option[Int] = withErrorHandling { - db.incr(new String(encode(name.getBytes))) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in incr") - } - } - - override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { - db.incrby(new String(encode(name.getBytes)), by) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in incrby") - } - } - - override def decrementAtomically(name: String): Option[Int] = withErrorHandling { - db.decr(new String(encode(name.getBytes))) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in decr") - } - } - - override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { - db.decrby(new String(encode(name.getBytes)), by) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in decrby") + case Some(s) => Some(stringToByteArray(s)) } } // add to the end of the queue def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.rpush(new String(encode(name.getBytes)), new String(item)) + db.rpush(new String(encode(name.getBytes)), byteArrayToString(item)) } - // pop from the front of the queue def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { db.lpop(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(s) => - Some(s.getBytes) + case Some(s) => Some(stringToByteArray(s)) } } @@ -336,7 +333,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException("No element at " + start) case Some(s) => - List(s.getBytes) + List(stringToByteArray(s)) } case n => db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { @@ -344,7 +341,7 @@ private [akka] object RedisStorageBackend extends throw new NoSuchElementException( "No element found between " + start + " and " + (start + count - 1)) case Some(es) => - es.map(_.get.getBytes) + es.map(e => stringToByteArray(e.get)) } } } @@ -359,7 +356,7 @@ private [akka] object RedisStorageBackend extends // add item to sorted set identified by name def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match { + db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -367,7 +364,7 @@ private [akka] object RedisStorageBackend extends // remove item from sorted set identified by name def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zrem(new String(encode(name.getBytes)), new String(item)) match { + db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -383,7 +380,7 @@ private [akka] object RedisStorageBackend extends } def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling { - db.zscore(new String(encode(name.getBytes)), new String(item)) match { + db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match { case Some(s) => Some(s.toFloat) case None => None } @@ -394,7 +391,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(s) => - s.map(_.get.getBytes) + s.map(e => stringToByteArray(e.get)) } } @@ -404,7 +401,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(l) => - l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) } + l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) } } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala new file mode 100644 index 0000000000..22d294c735 --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala @@ -0,0 +1,74 @@ +package se.scalablesolutions.akka.persistence.redis + +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.persistence.common.PersistentVector +import se.scalablesolutions.akka.stm.Transaction.Global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging + +import java.util.{Calendar, Date} + +object Serial { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = new Date(read[Long](in)) + def writes(out: Output, value: Date) = write[Long](out, value.getTime) + } + case class Name(id: Int, name: String, address: String, dateOfBirth: Date, dateDied: Option[Date]) + implicit val NameFormat: Format[Name] = asProduct5(Name)(Name.unapply(_).get) +} + +case class GETFOO(s: String) +case class SETFOO(s: String) + +object SampleStorage { + class RedisSampleStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val EVENT_MAP = "akka.sample.map" + + private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) } + + import sbinary._ + import DefaultProtocol._ + import Operations._ + import Serial._ + import java.util.Calendar + + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + def receive = { + case SETFOO(str) => + atomic { + eventMap += (str.getBytes, toByteArray[Name](n)) + } + self.reply(str) + + case GETFOO(str) => + val ev = atomic { + eventMap.keySet.size + } + println("************* " + ev) + self.reply(ev) + } + } +} + +import Serial._ +import SampleStorage._ + +object Runner { + def run { + val proc = actorOf[RedisSampleStorage] + proc.start + val i: Option[String] = proc !! SETFOO("debasish") + println("i = " + i) + val ev: Option[Int] = proc !! GETFOO("debasish") + println(ev) + } +} diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index d8d79d7f2a..8a8021b3c5 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -9,6 +9,11 @@ import org.junit.runner.RunWith import se.scalablesolutions.akka.serialization.Serializable import se.scalablesolutions.akka.serialization.Serializer._ +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ +import java.util.{Calendar, Date} + import RedisStorageBackend._ @RunWith(classOf[JUnitRunner]) @@ -39,15 +44,6 @@ class RedisStorageBackendSpec extends "T-1", "debasish.language".getBytes).get) should equal("java") } - /** - it("should enter a custom object for transaction T-1") { - val n = Name(100, "debasish", "kolkata") - // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, Java.out(n)) - // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes) - getMapStorageSizeFor("T-1") should equal(5) - } - **/ - it("should enter key/values for another transaction T-2") { insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes) insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes) @@ -61,6 +57,21 @@ class RedisStorageBackendSpec extends } } + describe("Store and query long value in maps") { + it("should enter 4 entries in redis for transaction T-1") { + val d = Calendar.getInstance.getTime.getTime + insertMapStorageEntryFor("T-11", "debasish".getBytes, + toByteArray[Long](d)) + + getMapStorageSizeFor("T-11") should equal(1) + fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) should equal(d) + } + + it("should remove map storage for T-1 and T2") { + removeMapStorageFor("T-11") + } + } + describe("Range query in maps") { it("should enter 7 entries in redis for transaction T-5") { insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes) @@ -93,73 +104,61 @@ class RedisStorageBackendSpec extends } } + describe("Store and query objects in maps") { + import NameSerialization._ + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n)) + getMapStorageSizeFor("T-31") should equal(1) + fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).get) should equal(n) + } + it("should remove map storage for T31") { + removeMapStorageFor("T-31") + } + } + describe("Store and query in vectors") { it("should write 4 entries in a vector for transaction T-3") { insertVectorStorageEntryFor("T-3", "debasish".getBytes) insertVectorStorageEntryFor("T-3", "maulindu".getBytes) - val n = Name(100, "debasish", "kolkata") - // insertVectorStorageEntryFor("T-3", Java.out(n)) - // insertVectorStorageEntryFor("T-3", n.toBytes) insertVectorStorageEntryFor("T-3", "1200".getBytes) - getVectorStorageSizeFor("T-3") should equal(3) + + val dt = Calendar.getInstance.getTime.getTime + insertVectorStorageEntryFor("T-3", toByteArray[Long](dt)) + getVectorStorageSizeFor("T-3") should equal(4) + fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) should equal(dt) + getVectorStorageSizeFor("T-3") should equal(4) + } + } + + describe("Store and query objects in vectors") { + import NameSerialization._ + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertVectorStorageEntryFor("T-31", toByteArray[Name](n)) + getVectorStorageSizeFor("T-31") should equal(1) + fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) should equal(n) } } describe("Store and query in ref") { + import NameSerialization._ it("should write 4 entries in 4 refs for transaction T-4") { insertRefStorageFor("T-4", "debasish".getBytes) insertRefStorageFor("T-4", "maulindu".getBytes) insertRefStorageFor("T-4", "1200".getBytes) new String(getRefStorageFor("T-4").get) should equal("1200") - - // val n = Name(100, "debasish", "kolkata") - // insertRefStorageFor("T-4", Java.out(n)) - // insertRefStorageFor("T-4", n.toBytes) - // Java.in(getRefStorageFor("T-4").get, Some(classOf[Name])).asInstanceOf[Name] should equal(n) - // n.fromBytes(getRefStorageFor("T-4").get) should equal(n) } - } - - describe("atomic increment in ref") { - it("should increment an existing key value by 1") { - insertRefStorageFor("T-4-1", "1200".getBytes) - new String(getRefStorageFor("T-4-1").get) should equal("1200") - incrementAtomically("T-4-1").get should equal(1201) - } - it("should create and increment a non-existing key value by 1") { - incrementAtomically("T-4-2").get should equal(1) - new String(getRefStorageFor("T-4-2").get) should equal("1") - } - it("should increment an existing key value by the amount specified") { - insertRefStorageFor("T-4-3", "1200".getBytes) - new String(getRefStorageFor("T-4-3").get) should equal("1200") - incrementByAtomically("T-4-3", 50).get should equal(1250) - } - it("should create and increment a non-existing key value by the amount specified") { - incrementByAtomically("T-4-4", 20).get should equal(20) - new String(getRefStorageFor("T-4-4").get) should equal("20") - } - } - - describe("atomic decrement in ref") { - it("should decrement an existing key value by 1") { - insertRefStorageFor("T-4-5", "1200".getBytes) - new String(getRefStorageFor("T-4-5").get) should equal("1200") - decrementAtomically("T-4-5").get should equal(1199) - } - it("should create and decrement a non-existing key value by 1") { - decrementAtomically("T-4-6").get should equal(-1) - new String(getRefStorageFor("T-4-6").get) should equal("-1") - } - it("should decrement an existing key value by the amount specified") { - insertRefStorageFor("T-4-7", "1200".getBytes) - new String(getRefStorageFor("T-4-7").get) should equal("1200") - decrementByAtomically("T-4-7", 50).get should equal(1150) - } - it("should create and decrement a non-existing key value by the amount specified") { - decrementByAtomically("T-4-8", 20).get should equal(-20) - new String(getRefStorageFor("T-4-8").get) should equal("-20") + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + insertRefStorageFor("T-4", toByteArray[Name](n)) + fromByteArray[Name](getRefStorageFor("T-4").get) should equal(n) } } @@ -185,6 +184,14 @@ class RedisStorageBackendSpec extends new String(l(1)) should equal("yukihiro matsumoto") new String(l(2)) should equal("claude shannon") } + it("should write a Name object and fetch it properly") { + import NameSerialization._ + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + enqueue("T-5-1", toByteArray[Name](n)) + fromByteArray[Name](peek("T-5-1", 0, 1).head) should equal(n) + fromByteArray[Name](dequeue("T-5-1").get) should equal(n) + } } describe("store and query in sorted set") { @@ -221,27 +228,18 @@ class RedisStorageBackendSpec extends } } -case class Name(id: Int, name: String, address: String) - extends Serializable.SBinary[Name] { - import sbinary._ - import sbinary.Operations._ - import sbinary.DefaultProtocol._ +object NameSerialization { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = + new Date(read[Long](in)) - def this() = this(0, null, null) - - implicit object NameFormat extends Format[Name] { - def reads(in : Input) = Name( - read[Int](in), - read[String](in), - read[String](in)) - def writes(out: Output, value: Name) = { - write[Int](out, value.id) - write[String](out, value.name) - write[String](out, value.address) - } + def writes(out: Output, value: Date) = + write[Long](out, value.getTime) } - def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes) + case class Name(id: Int, name: String, + address: String, dateOfBirth: Date, dateDied: Option[Date]) - def toBytes: Array[Byte] = toByteArray(this) + implicit val NameFormat: Format[Name] = + asProduct5(Name)(Name.unapply(_).get) } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java new file mode 100644 index 0000000000..10437e7624 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanImpl.java @@ -0,0 +1,12 @@ +package sample.camel; + +/** + * @author Martin Krasser + */ +public class BeanImpl implements BeanIntf { + + public String foo(String s) { + return "hello " + s; + } + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java new file mode 100644 index 0000000000..a7b2e6e6a4 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/BeanIntf.java @@ -0,0 +1,10 @@ +package sample.camel; + +/** + * @author Martin Krasser + */ +public interface BeanIntf { + + public String foo(String s); + +} diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java similarity index 79% rename from akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java rename to akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java index f08c486dac..ed29ac30e6 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/Consumer10.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo1.java @@ -8,15 +8,15 @@ import se.scalablesolutions.akka.actor.annotation.consume; /** * @author Martin Krasser */ -public class Consumer10 { +public class ConsumerPojo1 { - @consume("file:data/input2") + @consume("file:data/input/pojo") public void foo(String body) { System.out.println("Received message:"); System.out.println(body); } - @consume("jetty:http://0.0.0.0:8877/camel/active") + @consume("jetty:http://0.0.0.0:8877/camel/pojo") public String bar(@Body String body, @Header("name") String header) { return String.format("body=%s header=%s", body, header); } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java new file mode 100644 index 0000000000..429e6043ad --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/ConsumerPojo2.java @@ -0,0 +1,17 @@ +package sample.camel; + +import org.apache.camel.Body; +import org.apache.camel.Header; +import se.scalablesolutions.akka.actor.annotation.consume; + +/** + * @author Martin Krasser + */ +public class ConsumerPojo2 { + + @consume("direct:default") + public String foo(String body) { + return String.format("default: %s", body); + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java similarity index 75% rename from akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java rename to akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java index 695aa148f4..ab7e878b0d 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject1.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo1.java @@ -8,9 +8,9 @@ import se.scalablesolutions.akka.actor.annotation.consume; /** * @author Martin Krasser */ -public class RemoteActiveObject1 { +public class RemoteConsumerPojo1 { - @consume("jetty:http://localhost:6644/remote-active-object-1") + @consume("jetty:http://localhost:6644/camel/remote-active-object-1") public String foo(@Body String body, @Header("name") String header) { return String.format("remote1: body=%s header=%s", body, header); } diff --git a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java similarity index 75% rename from akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java rename to akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java index 210a72d2f8..e982fe5025 100644 --- a/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteActiveObject2.java +++ b/akka-samples/akka-sample-camel/src/main/java/sample/camel/RemoteConsumerPojo2.java @@ -7,9 +7,9 @@ import se.scalablesolutions.akka.actor.annotation.consume; /** * @author Martin Krasser */ -public class RemoteActiveObject2 { +public class RemoteConsumerPojo2 { - @consume("jetty:http://localhost:6644/remote-active-object-2") + @consume("jetty:http://localhost:6644/camel/remote-active-object-2") public String foo(@Body String body, @Header("name") String header) { return String.format("remote2: body=%s header=%s", body, header); } diff --git a/akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml b/akka-samples/akka-sample-camel/src/main/resources/context-boot.xml similarity index 100% rename from akka-samples/akka-sample-camel/src/main/resources/sample-camel-context.xml rename to akka-samples/akka-sample-camel/src/main/resources/context-boot.xml diff --git a/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml new file mode 100644 index 0000000000..a493678817 --- /dev/null +++ b/akka-samples/akka-sample-camel/src/main/resources/context-standalone.xml @@ -0,0 +1,12 @@ +