Merge with master
This commit is contained in:
commit
c14484c977
20 changed files with 672 additions and 56 deletions
|
|
@ -90,7 +90,7 @@ object Actor extends Logging {
|
|||
*/
|
||||
def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() {
|
||||
start
|
||||
def receive = body
|
||||
def receive: PartialFunction[Any, Unit] = body
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -108,8 +108,8 @@ object Actor extends Logging {
|
|||
* </pre>
|
||||
*
|
||||
*/
|
||||
def actor[A](body: => Unit) = {
|
||||
def handler[A](body: => Unit) = new {
|
||||
def actor(body: => Unit) = {
|
||||
def handler(body: => Unit) = new {
|
||||
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
|
||||
start
|
||||
body
|
||||
|
|
@ -141,8 +141,7 @@ object Actor extends Logging {
|
|||
start
|
||||
send(Spawn)
|
||||
def receive = {
|
||||
case Spawn => body
|
||||
case _ => throw new IllegalArgumentException("Actors created with 'actor(body: => Unit)' do not respond to messages.")
|
||||
case Spawn => body; stop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -204,8 +203,6 @@ trait Actor extends TransactionManagement {
|
|||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||
private[akka] var _uuid = UUID.newUuid.toString
|
||||
|
||||
def uuid = _uuid
|
||||
|
||||
// ====================================
|
||||
// private fields
|
||||
// ====================================
|
||||
|
|
@ -260,7 +257,7 @@ trait Actor extends TransactionManagement {
|
|||
* use a custom name to be able to retrieve the "correct" persisted state
|
||||
* upon restart, remote restart etc.
|
||||
*/
|
||||
protected[akka] var id: String = this.getClass.getName
|
||||
protected var id: String = this.getClass.getName
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -270,8 +267,6 @@ trait Actor extends TransactionManagement {
|
|||
*/
|
||||
@volatile var timeout: Long = Actor.TIMEOUT
|
||||
|
||||
ActorRegistry.register(this)
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
|
|
@ -419,6 +414,7 @@ trait Actor extends TransactionManagement {
|
|||
init
|
||||
}
|
||||
Actor.log.debug("[%s] has started", toString)
|
||||
ActorRegistry.register(this)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
@ -537,18 +533,13 @@ trait Actor extends TransactionManagement {
|
|||
*/
|
||||
def !: Option[T] = !
|
||||
|
||||
|
||||
/*
|
||||
def !!!(message: Any)(implicit sender: Option[Actor] = None): FutureResult = {
|
||||
def !!!(message: Any): FutureResult = {
|
||||
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
|
||||
if (_isRunning) {
|
||||
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
|
||||
else None
|
||||
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, from)
|
||||
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
|
||||
} else throw new IllegalStateException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* This method is evil and has been removed. Use '!!' with a timeout instead.
|
||||
|
|
@ -766,6 +757,16 @@ trait Actor extends TransactionManagement {
|
|||
actor
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the id for the actor.
|
||||
*/
|
||||
def getId = id
|
||||
|
||||
/**
|
||||
* Returns the uuid for the actor.
|
||||
*/
|
||||
def uuid = _uuid
|
||||
|
||||
// =========================================
|
||||
// ==== INTERNAL IMPLEMENTATION DETAILS ====
|
||||
// =========================================
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ object ActorRegistry extends Logging {
|
|||
case Some(instances) => actorsByClassName + (className -> (actor :: instances))
|
||||
case None => actorsByClassName + (className -> (actor :: Nil))
|
||||
}
|
||||
val id = actor.id
|
||||
val id = actor.getId
|
||||
if (id eq null) throw new IllegalStateException("Actor.id is null " + actor)
|
||||
actorsById.get(id) match {
|
||||
case Some(instances) => actorsById + (id -> (actor :: instances))
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
|
|||
actors.put(actor.getClass.getName, actor)
|
||||
actor.lifeCycle = Some(lifeCycle)
|
||||
startLink(actor)
|
||||
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.id, actor))
|
||||
remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.getId, actor))
|
||||
|
||||
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
|
||||
val supervisor = factory.newInstanceFor(supervisorConfig).start
|
||||
|
|
|
|||
|
|
@ -8,10 +8,52 @@
|
|||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import java.util.concurrent.{SynchronousQueue, TimeUnit}
|
||||
|
||||
class FutureTimeoutException(message: String) extends RuntimeException(message)
|
||||
|
||||
object Futures {
|
||||
def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await)
|
||||
|
||||
def awaitOne(futures: List[FutureResult]): FutureResult = {
|
||||
var future: Option[FutureResult] = None
|
||||
do {
|
||||
future = futures.find(_.isCompleted)
|
||||
} while (future.isEmpty)
|
||||
future.get
|
||||
}
|
||||
|
||||
/*
|
||||
def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = {
|
||||
import Actor.Sender.Self
|
||||
import Actor.{spawn, actor}
|
||||
|
||||
case class Result(res: Option[Any])
|
||||
val handOff = new SynchronousQueue[Option[Any]]
|
||||
spawn {
|
||||
try {
|
||||
println("f1 await")
|
||||
f1.await
|
||||
println("f1 offer")
|
||||
handOff.offer(f1.result)
|
||||
} catch {case _ => {}}
|
||||
}
|
||||
spawn {
|
||||
try {
|
||||
println("f2 await")
|
||||
f2.await
|
||||
println("f2 offer")
|
||||
println("f2 offer: " + f2.result)
|
||||
handOff.offer(f2.result)
|
||||
} catch {case _ => {}}
|
||||
}
|
||||
Thread.sleep(100)
|
||||
handOff.take
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
sealed trait FutureResult {
|
||||
def await
|
||||
def awaitBlocking
|
||||
|
|
@ -46,7 +88,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
|
|||
var start = currentTimeInNanos
|
||||
try {
|
||||
wait = _signal.awaitNanos(wait)
|
||||
if (wait <= 0) throw new FutureTimeoutException("Future timed out after [" + timeout + "] milliseconds")
|
||||
if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
|
||||
} catch {
|
||||
case e: InterruptedException =>
|
||||
wait = wait - (currentTimeInNanos - start)
|
||||
|
|
|
|||
|
|
@ -196,8 +196,8 @@ class RemoteServer extends Logging {
|
|||
* Register Remote Actor by the Actor's 'id' field.
|
||||
*/
|
||||
def register(actor: Actor) = if (isRunning) {
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
|
||||
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
111
akka-core/src/test/scala/FutureTest.scala
Normal file
111
akka-core/src/test/scala/FutureTest.scala
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.dispatch.Futures
|
||||
|
||||
class FutureTest extends JUnitSuite {
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
reply("World")
|
||||
case "NoReply" => {}
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("expected")
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldActorReplyResultThroughExplicitFuture = {
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
val future = actor !!! "Hello"
|
||||
future.await
|
||||
assert(future.result.isDefined)
|
||||
assert("World" === future.result.get)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldActorReplyExceptionThroughExplicitFuture = {
|
||||
val actor = new TestActor
|
||||
actor.start
|
||||
val future = actor !!! "Failure"
|
||||
future.await
|
||||
assert(future.exception.isDefined)
|
||||
assert("expected" === future.exception.get._2.getMessage)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
/*
|
||||
@Test def shouldFutureAwaitEitherLeft = {
|
||||
val actor1 = new TestActor
|
||||
actor1.start
|
||||
val actor2 = new TestActor
|
||||
actor2.start
|
||||
val future1 = actor1 !!! "Hello"
|
||||
val future2 = actor2 !!! "NoReply"
|
||||
val result = Futures.awaitEither(future1, future2)
|
||||
assert(result.isDefined)
|
||||
assert("World" === result.get)
|
||||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
|
||||
@Test def shouldFutureAwaitEitherRight = {
|
||||
val actor1 = new TestActor
|
||||
actor1.start
|
||||
val actor2 = new TestActor
|
||||
actor2.start
|
||||
val future1 = actor1 !!! "NoReply"
|
||||
val future2 = actor2 !!! "Hello"
|
||||
val result = Futures.awaitEither(future1, future2)
|
||||
assert(result.isDefined)
|
||||
assert("World" === result.get)
|
||||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
*/
|
||||
@Test def shouldFutureAwaitOneLeft = {
|
||||
val actor1 = new TestActor
|
||||
actor1.start
|
||||
val actor2 = new TestActor
|
||||
actor2.start
|
||||
val future1 = actor1 !!! "NoReply"
|
||||
val future2 = actor2 !!! "Hello"
|
||||
val result = Futures.awaitOne(List(future1, future2))
|
||||
assert(result.result.isDefined)
|
||||
assert("World" === result.result.get)
|
||||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
|
||||
@Test def shouldFutureAwaitOneRight = {
|
||||
val actor1 = new TestActor
|
||||
actor1.start
|
||||
val actor2 = new TestActor
|
||||
actor2.start
|
||||
val future1 = actor1 !!! "Hello"
|
||||
val future2 = actor2 !!! "NoReply"
|
||||
val result = Futures.awaitOne(List(future1, future2))
|
||||
assert(result.result.isDefined)
|
||||
assert("World" === result.result.get)
|
||||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
|
||||
@Test def shouldFutureAwaitAll = {
|
||||
val actor1 = new TestActor
|
||||
actor1.start
|
||||
val actor2 = new TestActor
|
||||
actor2.start
|
||||
val future1 = actor1 !!! "Hello"
|
||||
val future2 = actor2 !!! "Hello"
|
||||
Futures.awaitAll(List(future1, future2))
|
||||
assert(future1.result.isDefined)
|
||||
assert("World" === future1.result.get)
|
||||
assert(future2.result.isDefined)
|
||||
assert("World" === future2.result.get)
|
||||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -28,11 +28,6 @@ object Patterns {
|
|||
val seq = actors
|
||||
}
|
||||
|
||||
//FIXME 2.8, use default params with CyclicIterator
|
||||
/*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor(
|
||||
new CyclicIterator(actors())
|
||||
) */
|
||||
|
||||
def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
|
||||
override def transform(msg : Any) = msgTransformer(msg)
|
||||
def routes = routing
|
||||
|
|
@ -82,20 +77,3 @@ class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
|
|||
nc.head
|
||||
}
|
||||
}
|
||||
|
||||
//Agent
|
||||
/*
|
||||
val a = agent(startValue)
|
||||
a.set(_ + 5)
|
||||
a.get
|
||||
a.foreach println(_)
|
||||
*/
|
||||
object Agent {
|
||||
sealed trait AgentMessage
|
||||
case class FunMessage[T](f : (T) => T) extends AgentMessage
|
||||
case class ProcMessage[T](f : (T) => Unit) extends AgentMessage
|
||||
case class ValMessage[T](t : T) extends AgentMessage
|
||||
}
|
||||
sealed private[akka] class Agent[T] {
|
||||
|
||||
}
|
||||
|
|
@ -18,6 +18,12 @@
|
|||
<artifactId>akka-persistence-common</artifactId>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<version>${project.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.code.google-collections</groupId>
|
||||
<artifactId>google-collect</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- For Cassandra -->
|
||||
|
|
@ -26,6 +32,50 @@
|
|||
<artifactId>cassandra</artifactId>
|
||||
<version>0.5.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.cassandra</groupId>
|
||||
<artifactId>high-scale-lib</artifactId>
|
||||
<version>0.5.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.cassandra</groupId>
|
||||
<artifactId>clhm-production</artifactId>
|
||||
<version>0.5.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.collections</groupId>
|
||||
<artifactId>google-collections</artifactId>
|
||||
<version>1.0-rc1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
<version>2.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.5.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.5.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
|
|
|
|||
|
|
@ -103,6 +103,42 @@ trait CassandraSession extends Closeable with Flushable {
|
|||
// ====================================
|
||||
// ====== Java-style API names
|
||||
// ====================================
|
||||
|
||||
def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel)
|
||||
|
||||
def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel)
|
||||
|
||||
def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate) = / (key, columnParent, slicePredicate)
|
||||
|
||||
def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate, consistencyLevel: Int) = / (key, columnParent, slicePredicate, consistencyLevel)
|
||||
|
||||
|
||||
def get(key: String, colPath: ColumnPath) = |(key, colPath)
|
||||
|
||||
def get(key: String, colPath: ColumnPath, consistencyLevel: Int) = |(key, colPath, consistencyLevel)
|
||||
|
||||
def getCount(key: String, columnParent: ColumnParent)= |#(key, columnParent)
|
||||
|
||||
def getCount(key: String, columnParent: ColumnParent, consistencyLevel: Int) = |#(key, columnParent, consistencyLevel)
|
||||
|
||||
|
||||
def insert(key: String, colPath: ColumnPath, value: Array[Byte]): Unit = ++|(key, colPath, value)
|
||||
|
||||
def insert(key: String, colPath: ColumnPath, value: Array[Byte], consistencyLevel: Int): Unit = ++|(key, colPath, value, consistencyLevel)
|
||||
|
||||
def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit = ++|(key, colPath, value, timestamp)
|
||||
|
||||
def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) = ++|(key, colPath, value, timestamp, consistencyLevel)
|
||||
|
||||
|
||||
def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]]): Unit = ++|(key, batch)
|
||||
|
||||
def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]], consistencyLevel: Int): Unit = ++|(key, batch, consistencyLevel)
|
||||
|
||||
def remove(key: String, columnPath: ColumnPath, timestamp: Long): Unit = --(key, columnPath, timestamp)
|
||||
|
||||
def remove(key: String, columnPath: ColumnPath, timestamp: Long, consistencyLevel: Int): Unit = --(key, columnPath, timestamp, consistencyLevel)
|
||||
|
||||
}
|
||||
|
||||
class CassandraSessionPool[T <: TTransport](
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
log4j.rootLogger=DEBUG,R
|
||||
|
||||
# rolling log file ("system.log
|
||||
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
|
||||
log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH
|
||||
log4j.appender.R.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
|
||||
log4j.appender.R.File=target/logs/system.log
|
||||
|
|
@ -0,0 +1,337 @@
|
|||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
<Storage>
|
||||
<!--======================================================================-->
|
||||
<!-- Basic Configuration -->
|
||||
<!--======================================================================-->
|
||||
|
||||
<!--
|
||||
~ The name of this cluster. This is mainly used to prevent machines in
|
||||
~ one logical cluster from joining another.
|
||||
-->
|
||||
<ClusterName>akka</ClusterName>
|
||||
|
||||
<!--
|
||||
~ Turn on to make new [non-seed] nodes automatically migrate the right data
|
||||
~ to themselves. (If no InitialToken is specified, they will pick one
|
||||
~ such that they will get half the range of the most-loaded node.)
|
||||
~ If a node starts up without bootstrapping, it will mark itself bootstrapped
|
||||
~ so that you can't subsequently accidently bootstrap a node with
|
||||
~ data on it. (You can reset this by wiping your data and commitlog
|
||||
~ directories.)
|
||||
~
|
||||
~ Off by default so that new clusters and upgraders from 0.4 don't
|
||||
~ bootstrap immediately. You should turn this on when you start adding
|
||||
~ new nodes to a cluster that already has data on it. (If you are upgrading
|
||||
~ from 0.4, start your cluster with it off once before changing it to true.
|
||||
~ Otherwise, no data will be lost but you will incur a lot of unnecessary
|
||||
~ I/O before your cluster starts up.)
|
||||
-->
|
||||
<AutoBootstrap>false</AutoBootstrap>
|
||||
|
||||
<!--
|
||||
~ Keyspaces and ColumnFamilies:
|
||||
~ A ColumnFamily is the Cassandra concept closest to a relational
|
||||
~ table. Keyspaces are separate groups of ColumnFamilies. Except in
|
||||
~ very unusual circumstances you will have one Keyspace per application.
|
||||
|
||||
~ There is an implicit keyspace named 'system' for Cassandra internals.
|
||||
-->
|
||||
<Keyspaces>
|
||||
<Keyspace Name="akka">
|
||||
<!-- The fraction of keys per sstable whose locations we
|
||||
keep in memory in "mostly LRU" order. (JUST the key
|
||||
locations, NOT any column values.)
|
||||
|
||||
The amount of memory used by the default setting of
|
||||
0.01 is comparable to the amount used by the internal
|
||||
per-sstable key index. Consider increasing this is
|
||||
fine if you have fewer, wider rows. Set to 0 to
|
||||
disable entirely.
|
||||
-->
|
||||
<KeysCachedFraction>0.01</KeysCachedFraction>
|
||||
<!--
|
||||
The CompareWith attribute tells Cassandra how to sort the columns
|
||||
for slicing operations. For backwards compatibility, the default
|
||||
is to use AsciiType, which is probably NOT what you want.
|
||||
Other options are BytesType, UTF8Type, LexicalUUIDType, TimeUUIDType, and LongType.
|
||||
You can also specify the fully-qualified class name to a class
|
||||
of your choice implementing org.apache.cassandra.db.marshal.IType.
|
||||
|
||||
SuperColumns have a similar CompareSubcolumnsWith attribute.
|
||||
|
||||
ByteType: simple sort by byte value. No validation is performed.
|
||||
AsciiType: like BytesType, but validates that the input can be parsed as US-ASCII.
|
||||
UTF8Type: A string encoded as UTF8
|
||||
LongType: A 64bit long
|
||||
LexicalUUIDType: a 128bit UUID, compared lexically (by byte value)
|
||||
TimeUUIDType: a 128bit version 1 UUID, compared by timestamp
|
||||
|
||||
(To get the closest approximation to 0.3-style supercolumns,
|
||||
you would use CompareWith=UTF8Type CompareSubcolumnsWith=LongType.)
|
||||
|
||||
if FlushPeriodInMinutes is configured and positive, it will be
|
||||
flushed to disk with that period whether it is dirty or not.
|
||||
This is intended for lightly-used columnfamilies so that they
|
||||
do not prevent commitlog segments from being purged.
|
||||
|
||||
-->
|
||||
<ColumnFamily CompareWith="UTF8Type" Name="map"/>
|
||||
<!-- FIXME: change vector to a super column -->
|
||||
<ColumnFamily CompareWith="UTF8Type" Name="vector"/>
|
||||
<ColumnFamily CompareWith="UTF8Type" Name="ref"/>
|
||||
|
||||
<!--ColumnFamily CompareWith="UTF8Type" Name="Standard1" FlushPeriodInMinutes="60"/>
|
||||
<ColumnFamily CompareWith="TimeUUIDType" Name="StandardByUUID1"/>
|
||||
<ColumnFamily ColumnType="Super" CompareWith="UTF8Type" CompareSubcolumnsWith="UTF8Type" Name="Super1"/-->
|
||||
</Keyspace>
|
||||
</Keyspaces>
|
||||
|
||||
<!--
|
||||
~ Partitioner: any IPartitioner may be used, including your own as long
|
||||
~ as it is on the classpath. Out of the box, Cassandra provides
|
||||
~ org.apache.cassandra.dht.RandomPartitioner,
|
||||
~ org.apache.cassandra.dht.OrderPreservingPartitioner, and
|
||||
~ org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
|
||||
~ (CollatingOPP colates according to EN,US rules, not naive byte
|
||||
~ ordering. Use this as an example if you need locale-aware collation.)
|
||||
~ Range queries require using an order-preserving partitioner.
|
||||
~
|
||||
~ Achtung! Changing this parameter requires wiping your data
|
||||
~ directories, since the partitioner can modify the sstable on-disk
|
||||
~ format.
|
||||
-->
|
||||
<Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
|
||||
|
||||
<!--
|
||||
~ If you are using an order-preserving partitioner and you know your key
|
||||
~ distribution, you can specify the token for this node to use. (Keys
|
||||
~ are sent to the node with the "closest" token, so distributing your
|
||||
~ tokens equally along the key distribution space will spread keys
|
||||
~ evenly across your cluster.) This setting is only checked the first
|
||||
~ time a node is started.
|
||||
|
||||
~ This can also be useful with RandomPartitioner to force equal spacing
|
||||
~ of tokens around the hash space, especially for clusters with a small
|
||||
~ number of nodes.
|
||||
-->
|
||||
<InitialToken></InitialToken>
|
||||
|
||||
<!--
|
||||
~ EndPointSnitch: Setting this to the class that implements
|
||||
~ IEndPointSnitch which will see if two endpoints are in the same data
|
||||
~ center or on the same rack. Out of the box, Cassandra provides
|
||||
~ org.apache.cassandra.locator.EndPointSnitch
|
||||
-->
|
||||
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
|
||||
|
||||
<!--
|
||||
~ Strategy: Setting this to the class that implements
|
||||
~ IReplicaPlacementStrategy will change the way the node picker works.
|
||||
~ Out of the box, Cassandra provides
|
||||
~ org.apache.cassandra.locator.RackUnawareStrategy and
|
||||
~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
|
||||
~ a different datacenter, and the others on different racks in the same
|
||||
~ one.)
|
||||
-->
|
||||
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
|
||||
|
||||
<!-- Number of replicas of the data -->
|
||||
<ReplicationFactor>1</ReplicationFactor>
|
||||
|
||||
<!--
|
||||
~ Directories: Specify where Cassandra should store different data on
|
||||
~ disk. Keep the data disks and the CommitLog disks separate for best
|
||||
~ performance
|
||||
-->
|
||||
<CommitLogDirectory>target/cassandra/commitlog</CommitLogDirectory>
|
||||
<DataFileDirectories>
|
||||
<DataFileDirectory>target/cassandra/data</DataFileDirectory>
|
||||
</DataFileDirectories>
|
||||
<CalloutLocation>target/cassandra/callouts</CalloutLocation>
|
||||
<StagingFileDirectory>target/cassandra/staging</StagingFileDirectory>
|
||||
|
||||
|
||||
<!--
|
||||
~ Addresses of hosts that are deemed contact points. Cassandra nodes
|
||||
~ use this list of hosts to find each other and learn the topology of
|
||||
~ the ring. You must change this if you are running multiple nodes!
|
||||
-->
|
||||
<Seeds>
|
||||
<Seed>127.0.0.1</Seed>
|
||||
</Seeds>
|
||||
|
||||
|
||||
<!-- Miscellaneous -->
|
||||
|
||||
<!-- Time to wait for a reply from other nodes before failing the command -->
|
||||
<RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
|
||||
<!-- Size to allow commitlog to grow to before creating a new segment -->
|
||||
<CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
|
||||
|
||||
|
||||
<!-- Local hosts and ports -->
|
||||
|
||||
<!--
|
||||
~ Address to bind to and tell other nodes to connect to. You _must_
|
||||
~ change this if you want multiple nodes to be able to communicate!
|
||||
~
|
||||
~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
|
||||
~ will always do the Right Thing *if* the node is properly configured
|
||||
~ (hostname, name resolution, etc), and the Right Thing is to use the
|
||||
~ address associated with the hostname (it might not be).
|
||||
-->
|
||||
<ListenAddress>localhost</ListenAddress>
|
||||
<!-- TCP port, for commands and data -->
|
||||
<StoragePort>7000</StoragePort>
|
||||
<!-- UDP port, for membership communications (gossip) -->
|
||||
<ControlPort>7001</ControlPort>
|
||||
|
||||
<!--
|
||||
~ The address to bind the Thrift RPC service to. Unlike ListenAddress
|
||||
~ above, you *can* specify 0.0.0.0 here if you want Thrift to listen on
|
||||
~ all interfaces.
|
||||
~
|
||||
~ Leaving this blank has the same effect it does for ListenAddress,
|
||||
~ (i.e. it will be based on the configured hostname of the node).
|
||||
-->
|
||||
<ThriftAddress>localhost</ThriftAddress>
|
||||
<!-- Thrift RPC port (the port clients connect to). -->
|
||||
<ThriftPort>9160</ThriftPort>
|
||||
<!--
|
||||
~ Whether or not to use a framed transport for Thrift. If this option
|
||||
~ is set to true then you must also use a framed transport on the
|
||||
~ client-side, (framed and non-framed transports are not compatible).
|
||||
-->
|
||||
<ThriftFramedTransport>false</ThriftFramedTransport>
|
||||
|
||||
|
||||
<!--======================================================================-->
|
||||
<!-- Memory, Disk, and Performance -->
|
||||
<!--======================================================================-->
|
||||
|
||||
<!--
|
||||
~ Buffer size to use when performing contiguous column slices. Increase
|
||||
~ this to the size of the column slices you typically perform.
|
||||
~ (Name-based queries are performed with a buffer size of
|
||||
~ ColumnIndexSizeInKB.)
|
||||
-->
|
||||
<SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
|
||||
|
||||
<!--
|
||||
~ Buffer size to use when flushing memtables to disk. (Only one
|
||||
~ memtable is ever flushed at a time.) Increase (decrease) the index
|
||||
~ buffer size relative to the data buffer if you have few (many)
|
||||
~ columns per key. Bigger is only better _if_ your memtables get large
|
||||
~ enough to use the space. (Check in your data directory after your
|
||||
~ app has been running long enough.) -->
|
||||
<FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
|
||||
<FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
|
||||
|
||||
<!--
|
||||
~ Add column indexes to a row after its contents reach this size.
|
||||
~ Increase if your column values are large, or if you have a very large
|
||||
~ number of columns. The competing causes are, Cassandra has to
|
||||
~ deserialize this much of the row to read a single column, so you want
|
||||
~ it to be small - at least if you do many partial-row reads - but all
|
||||
~ the index data is read for each access, so you don't want to generate
|
||||
~ that wastefully either.
|
||||
-->
|
||||
<ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
|
||||
|
||||
<!--
|
||||
~ The maximum amount of data to store in memory per ColumnFamily before
|
||||
~ flushing to disk. Note: There is one memtable per column family, and
|
||||
~ this threshold is based solely on the amount of data stored, not
|
||||
~ actual heap memory usage (there is some overhead in indexing the
|
||||
~ columns).
|
||||
-->
|
||||
<MemtableSizeInMB>64</MemtableSizeInMB>
|
||||
<!--
|
||||
~ The maximum number of columns in millions to store in memory per
|
||||
~ ColumnFamily before flushing to disk. This is also a per-memtable
|
||||
~ setting. Use with MemtableSizeInMB to tune memory usage.
|
||||
-->
|
||||
<MemtableObjectCountInMillions>0.1</MemtableObjectCountInMillions>
|
||||
<!--
|
||||
~ The maximum time to leave a dirty memtable unflushed.
|
||||
~ (While any affected columnfamilies have unflushed data from a
|
||||
~ commit log segment, that segment cannot be deleted.)
|
||||
~ This needs to be large enough that it won't cause a flush storm
|
||||
~ of all your memtables flushing at once because none has hit
|
||||
~ the size or count thresholds yet. For production, a larger
|
||||
~ value such as 1440 is recommended.
|
||||
-->
|
||||
<MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
|
||||
|
||||
<!--
|
||||
~ Unlike most systems, in Cassandra writes are faster than reads, so
|
||||
~ you can afford more of those in parallel. A good rule of thumb is 2
|
||||
~ concurrent reads per processor core. Increase ConcurrentWrites to
|
||||
~ the number of clients writing at once if you enable CommitLogSync +
|
||||
~ CommitLogSyncDelay. -->
|
||||
<ConcurrentReads>8</ConcurrentReads>
|
||||
<ConcurrentWrites>32</ConcurrentWrites>
|
||||
|
||||
<!--
|
||||
~ CommitLogSync may be either "periodic" or "batch." When in batch
|
||||
~ mode, Cassandra won't ack writes until the commit log has been
|
||||
~ fsynced to disk. It will wait up to CommitLogSyncBatchWindowInMS
|
||||
~ milliseconds for other writes, before performing the sync.
|
||||
|
||||
~ This is less necessary in Cassandra than in traditional databases
|
||||
~ since replication reduces the odds of losing data from a failure
|
||||
~ after writing the log entry but before it actually reaches the disk.
|
||||
~ So the other option is "timed," where writes may be acked immediately
|
||||
~ and the CommitLog is simply synced every CommitLogSyncPeriodInMS
|
||||
~ milliseconds.
|
||||
-->
|
||||
<CommitLogSync>periodic</CommitLogSync>
|
||||
<!--
|
||||
~ Interval at which to perform syncs of the CommitLog in periodic mode.
|
||||
~ Usually the default of 10000ms is fine; increase it if your i/o
|
||||
~ load is such that syncs are taking excessively long times.
|
||||
-->
|
||||
<CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
|
||||
<!--
|
||||
~ Delay (in milliseconds) during which additional commit log entries
|
||||
~ may be written before fsync in batch mode. This will increase
|
||||
~ latency slightly, but can vastly improve throughput where there are
|
||||
~ many writers. Set to zero to disable (each entry will be synced
|
||||
~ individually). Reasonable values range from a minimal 0.1 to 10 or
|
||||
~ even more if throughput matters more than latency.
|
||||
-->
|
||||
<!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
|
||||
|
||||
<!--
|
||||
~ Time to wait before garbage-collection deletion markers. Set this to
|
||||
~ a large enough value that you are confident that the deletion marker
|
||||
~ will be propagated to all replicas by the time this many seconds has
|
||||
~ elapsed, even in the face of hardware failures. The default value is
|
||||
~ ten days.
|
||||
-->
|
||||
<GCGraceSeconds>864000</GCGraceSeconds>
|
||||
|
||||
<!--
|
||||
~ The threshold size in megabytes the binary memtable must grow to,
|
||||
~ before it's submitted for flushing to disk.
|
||||
-->
|
||||
<BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>
|
||||
|
||||
</Storage>
|
||||
|
|
@ -6,6 +6,10 @@ import junit.framework.TestCase
|
|||
|
||||
import org.junit.Test
|
||||
import org.junit.Assert._
|
||||
import org.apache.cassandra.service.CassandraDaemon
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Before
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
case class GetMapState(key: String)
|
||||
case object GetVectorState
|
||||
|
|
@ -70,7 +74,10 @@ class CassandraPersistentActor extends Actor {
|
|||
}
|
||||
}
|
||||
|
||||
class CassandraPersistentActorSpec extends TestCase {
|
||||
class CassandraPersistentActorSpec extends JUnitSuite {
|
||||
|
||||
@Before
|
||||
def startCassandra = EmbeddedCassandraService.start
|
||||
|
||||
@Test
|
||||
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
|
|
@ -144,4 +151,28 @@ class CassandraPersistentActorSpec extends TestCase {
|
|||
val result: Array[Byte] = (stateful !! GetRefState).get
|
||||
assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
import org.apache.cassandra.service.CassandraDaemon
|
||||
object EmbeddedCassandraService {
|
||||
|
||||
System.setProperty("storage-config", "src/test/resources");
|
||||
|
||||
val cassandra = new Runnable {
|
||||
|
||||
val cassandraDaemon = new CassandraDaemon
|
||||
cassandraDaemon.init(null)
|
||||
|
||||
def run = cassandraDaemon.start
|
||||
|
||||
}
|
||||
|
||||
// spawn cassandra in a new thread
|
||||
val t = new Thread(cassandra)
|
||||
t.setDaemon(true)
|
||||
t.start
|
||||
|
||||
def start: Unit = {}
|
||||
|
||||
}
|
||||
|
|
@ -10,9 +10,8 @@ import se.scalablesolutions.akka.remote.RemoteServer
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import se.scalablesolutions.akka.state.RedisStorage
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
import se.scalablesolutions.akka.state.{PersistentVector, RedisStorage}
|
||||
|
||||
/******************************************************************************
|
||||
To run the sample:
|
||||
|
|
@ -77,7 +76,11 @@ trait ChatStorage extends Actor
|
|||
class RedisChatStorage extends ChatStorage {
|
||||
lifeCycle = Some(LifeCycle(Permanent))
|
||||
|
||||
private var chatLog = RedisStorage.getVector("akka.chat.log")
|
||||
private var chatLog: PersistentVector[Array[Byte]] = _
|
||||
|
||||
override def initTransactionalState = chatLog = RedisStorage.getVector("akka.chat.log")
|
||||
|
||||
chatLog = RedisStorage.getVector("akka.chat.log")
|
||||
|
||||
log.info("Redis-based chat storage is starting up...")
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class SimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
@ -52,7 +52,7 @@ class PersistentSimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = CassandraStorage.newMap
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
|
|||
|
|
@ -29,11 +29,12 @@ public class PersistentSimpleService {
|
|||
private String KEY = "COUNTER";
|
||||
|
||||
private boolean hasStartedTicking = false;
|
||||
private PersistentMap<byte[], byte[]> storage = CassandraStorage.newMap();
|
||||
private PersistentMap<byte[], byte[]> storage;
|
||||
|
||||
@GET
|
||||
@Produces({"application/html"})
|
||||
public String count() {
|
||||
if (storage == null) storage = CassandraStorage.newMap();
|
||||
if (!hasStartedTicking) {
|
||||
storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array());
|
||||
hasStartedTicking = true;
|
||||
|
|
|
|||
|
|
@ -27,11 +27,12 @@ public class SimpleService {
|
|||
private String KEY = "COUNTER";
|
||||
|
||||
private boolean hasStartedTicking = false;
|
||||
private TransactionalMap storage = TransactionalState.newMap();
|
||||
private TransactionalMap<String, Integer> storage;
|
||||
|
||||
@GET
|
||||
@Produces({"application/json"})
|
||||
public String count() {
|
||||
if (storage == null) storage = TransactionalState.newMap();
|
||||
if (!hasStartedTicking) {
|
||||
storage.put(KEY, 0);
|
||||
hasStartedTicking = true;
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ class SimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
@ -105,7 +105,7 @@ class PersistentSimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = CassandraStorage.newMap
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ class SecureTickActor extends Actor with Logging {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
|
||||
/**
|
||||
* allow access for any user to "/secureticker/public"
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue