Merge branch 'master' into wip-config-patriknw

This commit is contained in:
Patrik Nordwall 2011-12-02 08:49:34 +01:00
commit c5a367ad54
46 changed files with 789 additions and 489 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
*.vim
*~
*#
src_managed

View file

@ -0,0 +1,61 @@
package akka.actor
import akka.testkit.AkkaSpec
import akka.dispatch.UnboundedMailbox
import akka.util.duration._
object ConsistencySpec {
class CacheMisaligned(var value: Long, var padding1: Long, var padding2: Long, var padding3: Int) //Vars, no final fences
class ConsistencyCheckingActor extends Actor {
var left = new CacheMisaligned(42, 0, 0, 0) //var
var right = new CacheMisaligned(0, 0, 0, 0) //var
var lastStep = -1L
def receive = {
case step: Long
if (lastStep != (step - 1))
sender.tell("Test failed: Last step %s, this step %s".format(lastStep, step))
var shouldBeFortyTwo = left.value + right.value
if (shouldBeFortyTwo != 42)
sender ! "Test failed: 42 failed"
else {
left.value += 1
right.value -= 1
}
lastStep = step
case "done" sender ! "done"; self.stop()
}
}
}
class ConsistencySpec extends AkkaSpec {
import ConsistencySpec._
"The Akka actor model implementation" must {
"provide memory consistency" in {
val noOfActors = 7
val dispatcher = system
.dispatcherFactory
.newDispatcher("consistency-dispatcher", 1, UnboundedMailbox())
.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(noOfActors, true)
.setCorePoolSize(10)
.setMaxPoolSize(10)
.setKeepAliveTimeInMillis(1)
.setAllowCoreThreadTimeout(true)
.build
val props = Props[ConsistencyCheckingActor].withDispatcher(dispatcher)
val actors = Vector.fill(noOfActors)(system.actorOf(props))
for (i 0L until 600000L) {
actors.foreach(_.tell(i, testActor))
}
for (a actors) { a.tell("done", testActor) }
for (a actors) expectMsg(5 minutes, "done")
}
}
}

View file

@ -11,6 +11,10 @@ import java.util.concurrent.atomic._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
def startWatching(target: ActorRef) = actorOf(Props(new Actor {
watch(target)
def receive = { case x testActor forward x }
}))
"The Death Watch" must {
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {
@ -19,8 +23,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"notify with one Terminated message when an Actor is stopped" in {
val terminal = actorOf(Props(context { case _ }))
testActor startsWatching terminal
startWatching(terminal)
testActor ! "ping"
expectMsg("ping")
@ -32,11 +35,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"notify with all monitors with one Terminated message when an Actor is stopped" in {
val terminal = actorOf(Props(context { case _ }))
val monitor1, monitor2, monitor3 =
actorOf(Props(new Actor {
watch(terminal)
def receive = { case t: Terminated testActor ! t }
}))
val monitor1, monitor2, monitor3 = startWatching(terminal)
terminal ! PoisonPill
@ -51,11 +50,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
val terminal = actorOf(Props(context { case _ }))
val monitor1, monitor3 =
actorOf(Props(new Actor {
watch(terminal)
def receive = { case t: Terminated testActor ! t }
}))
val monitor1, monitor3 = startWatching(terminal)
val monitor2 = actorOf(Props(new Actor {
watch(terminal)
unwatch(terminal)
@ -85,10 +80,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val terminalProps = Props(context { case x context.sender ! x })
val terminal = (supervisor ? terminalProps).as[ActorRef].get
val monitor = actorOf(Props(new Actor {
watch(terminal)
def receive = { case t: Terminated testActor ! t }
}))
val monitor = startWatching(terminal)
terminal ! Kill
terminal ! Kill
@ -113,9 +105,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
}
}))
val failed, brother = (supervisor ? Props.empty).as[ActorRef].get
brother startsWatching failed
testActor startsWatching brother
val failed = (supervisor ? Props.empty).as[ActorRef].get
val brother = (supervisor ? Props(new Actor {
watch(failed)
def receive = Actor.emptyBehavior
})).as[ActorRef].get
startWatching(brother)
failed ! Kill
val result = receiveWhile(3 seconds, messages = 3) {

View file

@ -58,7 +58,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
val forward = actorOf(new Forwarder(testActor))
val fsm = actorOf(new MyFSM(testActor))
val sup = actorOf(Props(new Actor {
self startsWatching fsm
watch(fsm)
def receive = { case _ }
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))

View file

@ -206,7 +206,7 @@ class RestartStrategySpec extends AkkaSpec {
val boss = actorOf(Props(new Actor {
def receive = {
case p: Props sender ! context.actorOf(p)
case p: Props sender ! watch(context.actorOf(p))
case t: Terminated maxNoOfRestartsLatch.open
}
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))))
@ -228,8 +228,6 @@ class RestartStrategySpec extends AkkaSpec {
})
val slave = (boss ? slaveProps).as[ActorRef].get
boss startsWatching slave
slave ! Ping
slave ! Crash
slave ! Ping

View file

@ -52,8 +52,7 @@ class SupervisorHierarchySpec extends AkkaSpec {
val countDownMessages = new CountDownLatch(1)
val countDownMax = new CountDownLatch(1)
val boss = actorOf(Props(new Actor {
val crasher = context.actorOf(Props(new CountDownActor(countDownMessages)))
self startsWatching crasher
val crasher = watch(context.actorOf(Props(new CountDownActor(countDownMessages))))
protected def receive = {
case "killCrasher" crasher ! Kill

View file

@ -341,9 +341,11 @@ abstract class ActorModelSpec extends AkkaSpec {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
val stopLatch = new CountDownLatch(num)
val waitTime = (30 seconds).dilated.toMillis
val boss = actorOf(Props(context {
case "run" for (_ 1 to num) (context.self startsWatching context.actorOf(props)) ! cachedMessage
case Terminated(child) stopLatch.countDown()
val boss = actorOf(Props(new Actor {
def receive = {
case "run" for (_ 1 to num) (watch(context.actorOf(props))) ! cachedMessage
case Terminated(child) stopLatch.countDown()
}
}).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss")))
boss ! "run"
try {
@ -492,3 +494,39 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FJDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher =
(new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput,
system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType,
new ForkJoinPoolConfig(), system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor).asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "FJDispatcher"
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher)
a ! Meet(aStart, aStop)
assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
b ! CountDown(bParallel)
assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")
aStop.countDown()
a.stop
b.stop
while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
}
}

View file

@ -12,8 +12,8 @@ final class AbstractMailbox {
static {
try {
mailboxStatusOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_status"));
systemMessageOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_systemQueue"));
mailboxStatusOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_statusDoNotCallMeDirectly"));
systemMessageOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_systemQueueDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}

View file

@ -363,24 +363,24 @@ trait Actor {
* Puts the behavior on top of the hotswap stack.
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) }
final def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) }
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome() { context.unbecome() }
final def unbecome() { context.unbecome() }
/**
* Registers this actor as a Monitor for the provided ActorRef
* @return the provided ActorRef
*/
def watch(subject: ActorRef): ActorRef = self startsWatching subject
final def watch(subject: ActorRef): ActorRef = context startsWatching subject
/**
* Unregisters this actor as Monitor for the provided ActorRef
* @return the provided ActorRef
*/
def unwatch(subject: ActorRef): ActorRef = self stopsWatching subject
final def unwatch(subject: ActorRef): ActorRef = context stopsWatching subject
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
@ -395,6 +395,6 @@ trait Actor {
}
}
private val processingBehavior = receive //ProcessingBehavior is the original behavior
private[this] val processingBehavior = receive //ProcessingBehavior is the original behavior
}

View file

@ -47,6 +47,10 @@ trait ActorContext extends ActorRefFactory {
def system: ActorSystem
def parent: ActorRef
def startsWatching(subject: ActorRef): ActorRef
def stopsWatching(subject: ActorRef): ActorRef
}
private[akka] object ActorCell {
@ -136,13 +140,13 @@ private[akka] class ActorCell(
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
final def startsWatching(subject: ActorRef): ActorRef = {
override final def startsWatching(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Link(subject))
subject
}
final def stopsWatching(subject: ActorRef): ActorRef = {
override final def stopsWatching(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Unlink(subject))
subject

View file

@ -46,7 +46,7 @@ import akka.event.DeathWatch
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef
scalaRef: ScalaActorRef with RefInternals
// Only mutable for RemoteServer in order to maintain identity across nodes
/**
@ -108,16 +108,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
/**
* Suspends the actor. It will not process messages while suspended.
*/
def suspend(): Unit //TODO FIXME REMOVE THIS
/**
* Resumes a suspended actor.
*/
def resume(): Unit //TODO FIXME REMOVE THIS
/**
* Shuts down the actor its dispatcher and message queue.
*/
@ -128,24 +118,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/
def isTerminated: Boolean
/**
* Registers this actor to be a death monitor of the provided ActorRef
* This means that this actor will get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS
/**
* Deregisters this actor from being a death monitor of the provided ActorRef
* This means that this actor will not get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS
override def hashCode: Int = HashCode.hash(HashCode.SEED, address)
override def equals(that: Any): Boolean = {
@ -169,7 +141,7 @@ class LocalActorRef private[akka] (
val systemService: Boolean = false,
_receiveTimeout: Option[Long] = None,
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
extends ActorRef with ScalaActorRef {
extends ActorRef with ScalaActorRef with RefInternals {
def name = path.name
@ -215,24 +187,6 @@ class LocalActorRef private[akka] (
*/
def stop(): Unit = actorCell.stop()
/**
* Registers this actor to be a death monitor of the provided ActorRef
* This means that this actor will get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def startsWatching(subject: ActorRef): ActorRef = actorCell.startsWatching(subject)
/**
* Deregisters this actor from being a death monitor of the provided ActorRef
* This means that this actor will not get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def stopsWatching(subject: ActorRef): ActorRef = actorCell.stopsWatching(subject)
// ========= AKKA PROTECTED FUNCTIONS =========
protected[akka] def underlying: ActorCell = actorCell
@ -296,7 +250,11 @@ trait ScalaActorRef { ref: ActorRef ⇒
* implicit timeout
*/
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
private[akka] trait RefInternals {
def resume(): Unit
def suspend(): Unit
protected[akka] def restart(cause: Throwable): Unit
}
@ -325,14 +283,11 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) {
/**
* Trait for ActorRef implementations where all methods contain default stubs.
*/
trait MinimalActorRef extends ActorRef with ScalaActorRef {
trait MinimalActorRef extends ActorRef with ScalaActorRef with RefInternals {
private[akka] val uuid: Uuid = newUuid()
def name: String = uuid.toString
def startsWatching(actorRef: ActorRef): ActorRef = actorRef
def stopsWatching(actorRef: ActorRef): ActorRef = actorRef
def suspend(): Unit = ()
def resume(): Unit = ()

View file

@ -409,7 +409,12 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
}
}
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) extends Scheduler {
/**
* Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher.
* Note that dispatcher is by-name parameter, because dispatcher might not be initialized
* when the scheduler is created.
*/
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: MessageDispatcher) extends Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay))
@ -429,8 +434,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem)
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
// FIXME: consider executing runnable inside main dispatcher to prevent blocking of scheduler
runnable.run()
dispatcher.dispatchTask(() runnable.run())
}
}
@ -444,7 +448,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem)
private def createSingleTask(f: () Unit): TimerTask =
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
f()
dispatcher.dispatchTask(f)
}
}
@ -456,7 +460,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem)
receiver ! message
timeout.getTimer.newTimeout(this, delay)
} else {
system.eventStream.publish(Warning(this.getClass.getSimpleName, "Could not reschedule message to be sent because receiving actor has been terminated."))
log.warning("Could not reschedule message to be sent because receiving actor has been terminated.")
}
}
}
@ -465,7 +469,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem)
private def createContinuousTask(f: () Unit, delay: Duration): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
f()
dispatcher.dispatchTask(f)
timeout.getTimer.newTimeout(this, delay)
}
}

View file

@ -255,7 +255,7 @@ abstract class ActorSystem extends ActorRefFactory {
* Register a block of code to run after all actors in this actor system have
* been stopped.
*/
def registerOnTermination(code: Unit)
def registerOnTermination[T](code: T)
/**
* Register a block of code to run after all actors in this actor system have
@ -312,7 +312,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel), this)
val scheduler = createScheduler()
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
@ -346,6 +346,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
// TODO why implicit val dispatcher?
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
//FIXME Set this to a Failure when things bubble to the top
@ -374,16 +375,38 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def start() = _start
def registerOnTermination(code: Unit) { terminationFuture onComplete (_ code) }
def registerOnTermination[T](code: T) { terminationFuture onComplete (_ code) }
def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ code.run) }
// TODO shutdown all that other stuff, whatever that may be
def stop() {
guardian.stop()
terminationFuture onComplete (_ scheduler.stop())
terminationFuture onComplete (_ stopScheduler())
terminationFuture onComplete (_ dispatcher.shutdown())
}
protected def createScheduler(): Scheduler = {
val threadFactory = new MonitorableThreadFactory("DefaultScheduler")
val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)
// note that dispatcher is by-name parameter in DefaultScheduler constructor,
// because dispatcher is not initialized when the scheduler is created
def safeDispatcher = {
if (dispatcher eq null) {
val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized")
log.error(exc, exc.getMessage)
throw exc
} else {
dispatcher
}
}
new DefaultScheduler(hwt, log, safeDispatcher)
}
protected def stopScheduler(): Unit = scheduler match {
case x: DefaultScheduler x.stop()
case _
}
private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef]
/**

View file

@ -122,12 +122,12 @@ abstract class FaultHandlingStrategy {
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
if (children.nonEmpty)
children.foreach(_.suspend())
children.foreach(_.asInstanceOf[RefInternals].suspend())
}
def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
if (children.nonEmpty)
children.foreach(_.restart(cause))
children.foreach(_.asInstanceOf[RefInternals].restart(cause))
}
/**
@ -136,7 +136,7 @@ abstract class FaultHandlingStrategy {
def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
action match {
case Resume child.resume(); true
case Resume child.asInstanceOf[RefInternals].resume(); true
case Restart processFailure(true, child, cause, stats, children); true
case Stop processFailure(false, child, cause, stats, children); true
case Escalate false
@ -194,7 +194,7 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) {
if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
children.foreach(_.child.restart(cause))
children.foreach(_.child.asInstanceOf[RefInternals].restart(cause))
else
children.foreach(_.child.stop())
}
@ -247,7 +247,7 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (restart && stats.requestRestartPermission(retriesWindow))
child.restart(cause)
child.asInstanceOf[RefInternals].restart(cause)
else
child.stop() //TODO optimization to drop child here already?
}

View file

@ -6,12 +6,9 @@ package akka.dispatch
import akka.event.Logging.Warning
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue }
import akka.actor.{ ActorCell, ActorKilledException }
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
import akka.actor.ActorCell
import akka.util.Duration
import java.util.concurrent._
/**
* Default settings are:
@ -156,4 +153,4 @@ abstract class PriorityGenerator extends java.util.Comparator[Envelope] {
final def compare(thisMessage: Envelope, thatMessage: Envelope): Int =
gen(thisMessage.message) - gen(thatMessage.message)
}
}

View file

@ -40,13 +40,13 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
import Mailbox._
@volatile
protected var _status: Status = _ //0 by default
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@volatile
protected var _systemQueue: SystemMessage = _ //null by default
protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default
@inline
final def status: Mailbox.Status = _status
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def shouldProcessMessage: Boolean = (status & 3) == Open
@ -65,7 +65,8 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
@inline
protected final def setStatus(newStatus: Status): Unit = _status = newStatus
protected final def setStatus(newStatus: Status): Unit =
Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus)
/**
* set new primary status Open. Caller does not need to worry about whether
@ -130,7 +131,8 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
/*
* AtomicReferenceFieldUpdater for system queue
*/
protected final def systemQueueGet: SystemMessage = _systemQueue
protected final def systemQueueGet: SystemMessage =
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)

View file

@ -11,6 +11,9 @@ import akka.event.Logging.{ Warning, Error }
import akka.actor.ActorSystem
import java.util.concurrent._
import akka.event.EventStream
import concurrent.forkjoin.ForkJoinPool._
import concurrent.forkjoin.{ ForkJoinTask, ForkJoinWorkerThread, ForkJoinPool }
import concurrent.forkjoin.ForkJoinTask._
object ThreadPoolConfig {
type Bounds = Int
@ -184,6 +187,52 @@ class MonitorableThread(runnable: Runnable, name: String)
}
}
case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider {
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
new ForkJoinPool(targetParallelism) with ExecutorService {
setAsyncMode(true)
setMaintainsParallelism(true)
override final def execute(r: Runnable) {
r match {
case fjmbox: FJMailbox
//fjmbox.fjTask.reinitialize()
Thread.currentThread match {
case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this
fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected
case _ super.execute[Unit](fjmbox.fjTask)
}
case _
super.execute(r)
}
}
import java.util.{ Collection JCollection }
def invokeAny[T](callables: JCollection[_ <: Callable[T]]) =
throw new UnsupportedOperationException("invokeAny. NOT!")
def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
throw new UnsupportedOperationException("invokeAny. NOT!")
def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
throw new UnsupportedOperationException("invokeAny. NOT!")
}
}
}
}
trait FJMailbox { self: Mailbox
final val fjTask = new ForkJoinTask[Unit] with Runnable {
private[this] var result: Unit = ()
final def getRawResult() = result
final def setRawResult(v: Unit) { result = v }
final def exec() = { self.run(); true }
final def run() { invoke() }
}
}
/**
* As the name says
*/

View file

@ -11,6 +11,7 @@ import scala.annotation.tailrec
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import java.net.InetSocketAddress
import akka.remote.RemoteAddress
import collection.JavaConverters
/**
* An Iterable that also contains a version.
@ -85,6 +86,10 @@ trait ConnectionManager {
*/
class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager {
def this(iterable: java.lang.Iterable[ActorRef]) {
this(JavaConverters.iterableAsScalaIterableConverter(iterable).asScala)
}
case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] {
def iterable = connections
}

View file

@ -9,11 +9,11 @@ import akka.actor._
import akka.config.ConfigurationException
import akka.dispatch.{ Future, MessageDispatcher }
import akka.util.{ ReflectiveAccess, Duration }
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import scala.annotation.tailrec
import akka.japi.Creator
sealed trait RouterType
@ -76,6 +76,12 @@ case class RoutedProps private[akka] (
connectionManager: ConnectionManager,
timeout: Timeout = RoutedProps.defaultTimeout,
localOnly: Boolean = RoutedProps.defaultLocalOnly) {
// Java API
def this(creator: Creator[Router], connectionManager: ConnectionManager, timeout: Timeout, localOnly: Boolean) {
this(() creator.create(), connectionManager, timeout, localOnly)
}
}
object RoutedProps {

View file

@ -293,9 +293,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
callback.done(false)
}
def startsWatching(actorRef: ActorRef): ActorRef = unsupported
def stopsWatching(actorRef: ActorRef): ActorRef = unsupported
def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))
def restart(reason: Throwable): Unit = unsupported

View file

@ -155,6 +155,15 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator:
public class Main {
// A simple Actor that just prints the messages it processes
public static class MyActor extends UntypedActor {
public MyActor() {
self.tell("lowpriority");
getSelf().tell("lowpriority");
getSelf().tell("highpriority");
getSelf().tell("pigdog");
getSelf().tell("pigdog2");
getSelf().tell("pigdog3");
getSelf().tell("highpriority");
}
public void onReceive(Object message) throws Exception {
System.out.println(message);
}
@ -170,19 +179,9 @@ Creating a Dispatcher with a priority mailbox using PriorityGenerator:
}
};
// We create an instance of the actor that will print out the messages it processes
ActorRef ref = Actors.actorOf(MyActor.class);
// We create a new Priority dispatcher and seed it with the priority generator
ref.setDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen)));
// We create a new Priority dispatcher and seed it with the priority generator
ActorRef ref = Actors.actorOf((new Props()).withCreator(MyActor.class).withDispatcher(new Dispatcher("foo", 5, new UnboundedPriorityMailbox(gen))));
ref.getDispatcher().suspend(ref); // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
ref.tell("lowpriority");
ref.tell("lowpriority");
ref.tell("highpriority");
ref.tell("pigdog");
ref.tell("pigdog2");
ref.tell("pigdog3");
ref.tell("highpriority");
ref.getDispatcher().resume(ref); // Resuming the actor so it will start treating its messages
}
}

View file

@ -178,10 +178,7 @@ The messages that it prevents are all that extends 'LifeCycleMessage':
* case object ReceiveTimeout
It also prevents the client from invoking any life-cycle and side-effecting methods, such as:
* start
* stop
* startsWatching
* stopsWatching
* etc.
Using secure cookie for remote client authentication

View file

@ -155,23 +155,18 @@ Creating a Dispatcher using PriorityGenerator:
val a = Actor.actorOf( // We create a new Actor that just prints out what it processes
Props(new Actor {
self ! 'lowpriority
self ! 'lowpriority
self ! 'highpriority
self ! 'pigdog
self ! 'pigdog2
self ! 'pigdog3
self ! 'highpriority
def receive = {
case x => println(x)
}
}).withDispatcher(new Dispatcher("foo", 5, UnboundedPriorityMailbox(gen)))) // We create a new Priority dispatcher and seed it with the priority generator
a.dispatcher.suspend(a) // Suspending the actor so it doesn't start to treat the messages before we have enqueued all of them :-)
a ! 'lowpriority
a ! 'lowpriority
a ! 'highpriority
a ! 'pigdog
a ! 'pigdog2
a ! 'pigdog3
a ! 'highpriority
a.dispatcher.resume(a) // Resuming the actor so it will start treating its messages
Prints:
'highpriority

View file

@ -180,10 +180,7 @@ The messages that it prevents are all that extends 'LifeCycleMessage':
* class ReceiveTimeout..)
It also prevents the client from invoking any life-cycle and side-effecting methods, such as:
* start
* stop
* startsWatching
* stopsWatching
* etc.
Using secure cookie for remote client authentication

View file

@ -285,7 +285,7 @@ private[akka] case class RemoteActorRef private[akka] (
remoteAddress: RemoteAddress,
path: ActorPath,
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
extends ActorRef with ScalaActorRef with RefInternals {
@volatile
private var running: Boolean = true
@ -296,7 +296,7 @@ private[akka] case class RemoteActorRef private[akka] (
def isTerminated: Boolean = !running
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader)
@ -318,11 +318,5 @@ private[akka] case class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = provider.serialize(this)
def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
def stopsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
protected[akka] def restart(cause: Throwable): Unit = ()
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}

View file

@ -59,35 +59,34 @@ abstract class RemoteClient private[akka] (
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit =
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
throw exception
}
/**
* Sends the message across the wire
*/
def send(request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY
log.debug("Sending message: " + new RemoteMessage(request, remoteSupport))
def send(request: RemoteMessageProtocol): Unit = {
log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport))
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress))
}
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress))
}
})
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
throw exception
}
})
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}
}
@ -132,8 +131,7 @@ class ActiveRemoteClient private[akka] (
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var timer: HashedWheelTimer = _
@volatile
private var reconnectionTimeWindowStart = 0L
@ -180,10 +178,9 @@ class ActiveRemoteClient private[akka] (
runSwitch switchOn {
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, timer, this))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -219,8 +216,6 @@ class ActiveRemoteClient private[akka] (
log.debug("Shutting down remote client [{}]", name)
notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
timer.stop()
timer = null
openChannels.close.awaitUninterruptibly
openChannels = null
bootstrap.releaseExternalResources()
@ -253,18 +248,17 @@ class ActiveRemoteClientPipelineFactory(
name: String,
bootstrap: ClientBootstrap,
remoteAddress: RemoteAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
import client.remoteSupport.clientSettings._
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, ReadTimeout.length, ReadTimeout.unit)
val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit)
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, timer, client)
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client)
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)
}
@ -361,6 +355,10 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
val serverSettings = RemoteExtension(system).serverSettings
val clientSettings = RemoteExtension(system).clientSettings
val timer: HashedWheelTimer = new HashedWheelTimer
_system.registerOnTermination(timer.stop()) //Shut this guy down at the end
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
@ -519,6 +517,10 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setHostname(address.hostname)
.setPort(address.port)
.build)
if (SecureCookie.nonEmpty)
b.setCookie(SecureCookie.get)
b.build
@ -648,7 +650,7 @@ class RemoteServerHandler(
val inbound = RemoteAddress(origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
remoteSupport.bindClient(inbound, client)
case CommandType.SHUTDOWN //TODO FIXME Dispose passive connection here
case CommandType.SHUTDOWN //No need to do anything here
case _ //Unknown command
}
case _ //ignore

View file

@ -20,7 +20,7 @@ trait NetworkFailureSpec { self: AkkaSpec ⇒
val BytesPerSecond = "60KByte/s"
val DelayMillis = "350ms"
val PortRang = "1024-65535"
val PortRange = "1024-65535"
def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = {
Future {
@ -82,12 +82,12 @@ trait NetworkFailureSpec { self: AkkaSpec ⇒
def enableNetworkDrop() = {
restoreIP()
assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PortRang).start.waitFor == 0)
assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PortRange).start.waitFor == 0)
}
def enableTcpReset() = {
restoreIP()
assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PortRang).start.waitFor == 0)
assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PortRange).start.waitFor == 0)
}
def restoreIP() = {

View file

@ -0,0 +1,28 @@
FSM
===
Requirements
------------
To build and run FSM you need [Simple Build Tool][sbt] (sbt).
Running
-------
First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'.
Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run':
> cd $AKKA_HOME
> % sbt
> > project akka-sample-fsm
> > run
> > Choose 1 or 2 depending on what sample you wish to run
Notice
------
[akka]: http://akka.io
[sbt]: http://code.google.com/p/simple-build-tool/

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>.
*/
package sample.fsm.buncher
import akka.actor.ActorRefFactory
@ -6,15 +9,15 @@ import akka.util.Duration
import akka.actor.{ FSM, Actor, ActorRef }
/*
* generic typed object buncher.
*
* To instantiate it, use the factory method like so:
* Buncher(100, 500)(x : List[AnyRef] => x foreach println)
* which will yield a fully functional ActorRef.
* The type of messages allowed is strongly typed to match the
* supplied processing method; other messages are discarded (and
* possibly logged).
*/
* generic typed object buncher.
*
* To instantiate it, use the factory method like so:
* Buncher(100, 500)(x : List[AnyRef] => x foreach println)
* which will yield a fully functional ActorRef.
* The type of messages allowed is strongly typed to match the
* supplied processing method; other messages are discarded (and
* possibly logged).
*/
object GenericBuncher {
trait State
case object Idle extends State

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>.
*/
package sample.fsm.dining.become
//Akka adaptation of
@ -7,8 +10,8 @@ import akka.actor.{ ActorRef, Actor, ActorSystem }
import akka.util.duration._
/*
* First we define our messages, they basically speak for themselves
*/
* First we define our messages, they basically speak for themselves
*/
sealed trait DiningHakkerMessage
case class Busy(chopstick: ActorRef) extends DiningHakkerMessage
case class Put(hakker: ActorRef) extends DiningHakkerMessage
@ -18,9 +21,9 @@ object Eat extends DiningHakkerMessage
object Think extends DiningHakkerMessage
/*
* A Chopstick is an actor, it can be taken, and put back
*/
class Chopstick(name: String) extends Actor {
* A Chopstick is an actor, it can be taken, and put back
*/
class Chopstick extends Actor {
//When a Chopstick is taken by a hakker
//It will refuse to be taken by other hakkers
@ -44,8 +47,8 @@ class Chopstick(name: String) extends Actor {
}
/*
* A hakker is an awesome dude or dudett who either thinks about hacking or has to eat ;-)
*/
* A hakker is an awesome dude or dudett who either thinks about hacking or has to eat ;-)
*/
class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
//When a hakker is thinking it can become hungry
@ -75,7 +78,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
//back to think about how he should obtain his chopsticks :-)
def waiting_for(chopstickToWaitFor: ActorRef, otherChopstick: ActorRef): Receive = {
case Taken(`chopstickToWaitFor`)
println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address)
println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name))
become(eating)
system.scheduler.scheduleOnce(self, Think, 5 seconds)
@ -105,27 +108,33 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
become(thinking)
left ! Put(self)
right ! Put(self)
println("%s puts down his chopsticks and starts to think", name)
println("%s puts down his chopsticks and starts to think".format(name))
system.scheduler.scheduleOnce(self, Eat, 5 seconds)
}
//All hakkers start in a non-eating state
def receive = {
case Think
println("%s starts to think", name)
println("%s starts to think".format(name))
become(thinking)
system.scheduler.scheduleOnce(self, Eat, 5 seconds)
}
}
/*
* Alright, here's our test-harness
*/
* Alright, here's our test-harness
*/
object DiningHakkers {
val system = ActorSystem()
def main(args: Array[String]): Unit = {
run
}
def run {
//Create 5 chopsticks
val chopsticks = for (i 1 to 5) yield system.actorOf(new Chopstick("Chopstick " + i))
val chopsticks = for (i 1 to 5) yield system.actorOf[Chopstick]("Chopstick " + i)
//Create 5 awesome hakkers and assign them their left and right chopstick
val hakkers = for {
(name, i) List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2010 Typesafe Inc. <http://www.typesafe.com>.
*/
package sample.fsm.dining.fsm
import akka.actor.{ ActorRef, Actor, FSM, ActorSystem }
@ -6,8 +9,8 @@ import akka.util.Duration
import akka.util.duration._
/*
* Some messages for the chopstick
*/
* Some messages for the chopstick
*/
sealed trait ChopstickMessage
object Take extends ChopstickMessage
object Put extends ChopstickMessage
@ -27,9 +30,9 @@ case object Taken extends ChopstickState
case class TakenBy(hakker: ActorRef)
/*
* A chopstick is an actor, it can be taken, and put back
*/
class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] {
* A chopstick is an actor, it can be taken, and put back
*/
class Chopstick extends Actor with FSM[ChopstickState, TakenBy] {
// A chopstick begins its existence as available and taken by no one
startWith(Available, TakenBy(system.deadLetters))
@ -77,8 +80,8 @@ case object Eating extends FSMHakkerState
case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef])
/*
* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
*/
* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
*/
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] {
//All hakkers start waiting
@ -86,7 +89,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
when(Waiting) {
case Event(Think, _)
println("%s starts to think", name)
println("%s starts to think".format(name))
startThinking(5 seconds)
}
@ -125,7 +128,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
}
private def startEating(left: ActorRef, right: ActorRef): State = {
println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address)
println("%s has picked up %s and %s and starts to eat".format(name, left.name, right.name))
goto(Eating) using TakenChopsticks(Some(left), Some(right)) forMax (5 seconds)
}
@ -144,7 +147,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
// then he puts down his chopsticks and starts to think
when(Eating) {
case Event(StateTimeout, _)
println("%s puts down his chopsticks and starts to think", name)
println("%s puts down his chopsticks and starts to think".format(name))
left ! Put
right ! Put
startThinking(5 seconds)
@ -159,15 +162,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
}
/*
* Alright, here's our test-harness
*/
* Alright, here's our test-harness
*/
object DiningHakkersOnFsm {
val system = ActorSystem()
def main(args: Array[String]): Unit = {
run
}
def run = {
// Create 5 chopsticks
val chopsticks = for (i 1 to 5) yield system.actorOf(new Chopstick("Chopstick " + i))
val chopsticks = for (i 1 to 5) yield system.actorOf[Chopstick]("Chopstick " + i)
// Create 5 awesome fsm hakkers and assign them their left and right chopstick
val hakkers = for {
(name, i) List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex

View file

@ -0,0 +1,26 @@
HELLO
=====
Requirements
------------
To build and run FSM you need [Simple Build Tool][sbt] (sbt).
Running
-------
First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'.
Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run':
> cd $AKKA_HOME
> % sbt
> > project akka-sample-hello
> > run
Notice
------
[akka]: http://akka.io
[sbt]: http://code.google.com/p/simple-build-tool/

View file

@ -0,0 +1,32 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.hello
import akka.actor.{ ActorSystem, Actor }
case object Start
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem()
system.actorOf[HelloActor] ! Start
}
}
class HelloActor extends Actor {
val worldActor = system.actorOf[WorldActor]
def receive = {
case Start worldActor ! "Hello"
case s: String
println("Received message: %s".format(s))
system.stop()
}
}
class WorldActor extends Actor {
def receive = {
case s: String sender ! s.toUpperCase + " world!"
}
}

View file

@ -42,6 +42,24 @@ class TestActorRef[T <: Actor](
*/
def underlyingActor: T = underlyingActorInstance.asInstanceOf[T]
/**
* Registers this actor to be a death monitor of the provided ActorRef
* This means that this actor will get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def startsWatching(subject: ActorRef): ActorRef = underlying.startsWatching(subject)
/**
* Deregisters this actor from being a death monitor of the provided ActorRef
* This means that this actor will not get a Terminated()-message when the provided actor
* is permanently terminated.
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def stopsWatching(subject: ActorRef): ActorRef = underlying.stopsWatching(subject)
override def toString = "TestActor[" + address + "]"
override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].address == address

View file

@ -155,7 +155,10 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
"stop when sent a poison pill" in {
EventFilter[ActorKilledException]() intercept {
val a = TestActorRef(Props[WorkerActor])
testActor startsWatching a
val forwarder = actorOf(Props(new Actor {
watch(a)
def receive = { case x testActor forward x }
}))
a.!(PoisonPill)(testActor)
expectMsgPF(5 seconds) {
case Terminated(`a`) true

View file

@ -13,7 +13,7 @@
<dependencies>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>

View file

@ -0,0 +1,22 @@
import sbt._
import Keys._
object TutorialBuild extends Build {
lazy val buildSettings = Seq(
organization := "com.typesafe.akka",
version := "2.0-SNAPSHOT",
scalaVersion := "2.9.1"
)
lazy val akka = Project(
id = "akka-tutorial-first",
base = file("."),
settings = Defaults.defaultSettings ++ Seq(
libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-actor" % "2.0-SNAPSHOT",
"junit" % "junit" % "4.5" % "test",
"org.scalatest" % "scalatest_2.9.0" % "1.6.1" % "test",
"com.typesafe.akka" % "akka-testkit" % "2.0-SNAPSHOT" % "test")
)
)
}

View file

@ -1,5 +1 @@
project.organization=se.scalablesolutions.akka
project.name=akka-tutorial-first
project.version=2.0-SNAPSHOT
build.scala.versions=2.9.0
sbt.version=0.7.7
sbt.version=0.11.0

View file

@ -1,3 +0,0 @@
import sbt._
class TutorialOneProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject

View file

@ -1,6 +0,0 @@
import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val akkaRepo = "Akka Repo" at "http://akka.io/repository"
val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT"
}

View file

@ -1,182 +1,182 @@
// *
// * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.tutorial.first.java;
// package akka.tutorial.first.java;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.japi.Creator;
import akka.routing.*;
// import static akka.actor.Actors.poisonPill;
// import static java.util.Arrays.asList;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
// import akka.actor.ActorRef;
// import akka.actor.Actors;
// import akka.actor.ActorSystem;
// import akka.actor.UntypedActor;
// import akka.actor.UntypedActorFactory;
// import akka.routing.RoutedProps;
// import akka.routing.RouterType;
// import akka.routing.LocalConnectionManager;
// import akka.routing.Routing;
// import akka.routing.Routing.Broadcast;
// import scala.collection.JavaConversions;
public class Pi {
// import java.util.LinkedList;
// import java.util.concurrent.CountDownLatch;
public static void main(String[] args) throws Exception {
Pi pi = new Pi();
pi.calculate(4, 10000, 10000);
}
// public class Pi {
// ====================
// ===== Messages =====
// ====================
static class Calculate {
}
// private static final ActorSystem system = new ActorSystem();
static class Work {
private final int start;
private final int nrOfElements;
// public static void main(String[] args) throws Exception {
// Pi pi = new Pi();
// pi.calculate(4, 10000, 10000);
// }
public Work(int start, int nrOfElements) {
this.start = start;
this.nrOfElements = nrOfElements;
}
// // ====================
// // ===== Messages =====
// // ====================
// static class Calculate {}
public int getStart() {
return start;
}
// static class Work {
// private final int start;
// private final int nrOfElements;
public int getNrOfElements() {
return nrOfElements;
}
}
// public Work(int start, int nrOfElements) {
// this.start = start;
// this.nrOfElements = nrOfElements;
// }
static class Result {
private final double value;
// public int getStart() { return start; }
// public int getNrOfElements() { return nrOfElements; }
// }
public Result(double value) {
this.value = value;
}
// static class Result {
// private final double value;
public double getValue() {
return value;
}
}
// public Result(double value) {
// this.value = value;
// }
// ==================
// ===== Worker =====
// ==================
public static class Worker extends UntypedActor {
// public double getValue() { return value; }
// }
// define the work
private double calculatePiFor(int start, int nrOfElements) {
double acc = 0.0;
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
return acc;
}
// // ==================
// // ===== Worker =====
// // ==================
// static class Worker extends UntypedActor {
// message handler
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
// // define the work
// private double calculatePiFor(int start, int nrOfElements) {
// double acc = 0.0;
// for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
// }
// return acc;
// }
// perform the work
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
// // message handler
// public void onReceive(Object message) {
// if (message instanceof Work) {
// Work work = (Work) message;
// reply with the result
getSender().tell(new Result(result));
// // perform the work
// double result = calculatePiFor(work.getStart(), work.getNrOfElements());
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
}
// // reply with the result
// getSender().tell(new Result(result));
// ==================
// ===== Master =====
// ==================
public static class Master extends UntypedActor {
private final int nrOfMessages;
private final int nrOfElements;
private final CountDownLatch latch;
// } else throw new IllegalArgumentException("Unknown message [" + message + "]");
// }
// }
private double pi;
private int nrOfResults;
private long start;
// // ==================
// // ===== Master =====
// // ==================
// static class Master extends UntypedActor {
// private final int nrOfMessages;
// private final int nrOfElements;
// private final CountDownLatch latch;
private ActorRef router;
// private double pi;
// private int nrOfResults;
// private long start;
public Master(final int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
Creator<Router> routerCreator = new Creator<Router>() {
public Router create() {
return new RoundRobinRouter(dispatcher(), new akka.actor.Timeout(-1));
}
};
LinkedList<ActorRef> actors = new LinkedList<ActorRef>() {
{
for (int i = 0; i < nrOfWorkers; i++) add(context().actorOf(Worker.class));
}
};
RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true);
router = new RoutedActorRef(system(), props, getSelf(), "pi");
}
// private ActorRef router;
// message handler
public void onReceive(Object message) {
// public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
// this.nrOfMessages = nrOfMessages;
// this.nrOfElements = nrOfElements;
// this.latch = latch;
if (message instanceof Calculate) {
// schedule work
for (int start = 0; start < nrOfMessages; start++) {
router.tell(new Work(start, nrOfElements), getSelf());
}
// LinkedList<ActorRef> workers = new LinkedList<ActorRef>();
// for (int i = 0; i < nrOfWorkers; i++) {
// ActorRef worker = system.actorOf(Worker.class);
// workers.add(worker);
// }
} else if (message instanceof Result) {
// router = system.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi");
// }
// handle result from the worker
Result result = (Result) message;
pi += result.getValue();
nrOfResults += 1;
if (nrOfResults == nrOfMessages) getSelf().stop();
// // message handler
// public void onReceive(Object message) {
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
// if (message instanceof Calculate) {
// // schedule work
// for (int start = 0; start < nrOfMessages; start++) {
// router.tell(new Work(start, nrOfElements), getSelf());
// }
@Override
public void preStart() {
start = System.currentTimeMillis();
}
// // send a PoisonPill to all workers telling them to shut down themselves
// router.tell(new Broadcast(poisonPill()));
@Override
public void postStop() {
// tell the world that the calculation is complete
System.out.println(String.format(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
pi, (System.currentTimeMillis() - start)));
latch.countDown();
}
}
// // send a PoisonPill to the router, telling him to shut himself down
// router.tell(poisonPill());
// ==================
// ===== Run it =====
// ==================
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages)
throws Exception {
final ActorSystem system = ActorSystem.create();
// } else if (message instanceof Result) {
// this latch is only plumbing to know when the calculation is completed
final CountDownLatch latch = new CountDownLatch(1);
// // handle result from the worker
// Result result = (Result) message;
// pi += result.getValue();
// nrOfResults += 1;
// if (nrOfResults == nrOfMessages) getSelf().stop();
// create the master
ActorRef master = system.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
}
});
// } else throw new IllegalArgumentException("Unknown message [" + message + "]");
// }
// start the calculation
master.tell(new Calculate());
// @Override
// public void preStart() {
// start = System.currentTimeMillis();
// }
// wait for master to shut down
latch.await();
// @Override
// public void postStop() {
// // tell the world that the calculation is complete
// System.out.println(String.format(
// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
// pi, (System.currentTimeMillis() - start)));
// latch.countDown();
// }
// }
// // ==================
// // ===== Run it =====
// // ==================
// public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages)
// throws Exception {
// // this latch is only plumbing to know when the calculation is completed
// final CountDownLatch latch = new CountDownLatch(1);
// // create the master
// ActorRef master = system.actorOf(new UntypedActorFactory() {
// public UntypedActor create() {
// return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
// }
// });
// // start the calculation
// master.tell(new Calculate());
// // wait for master to shut down
// latch.await();
// }
// }
// Shut down the system
system.stop();
}
}

View file

@ -1,113 +1,109 @@
// /**
// * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
// */
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.tutorial.first.scala
// package akka.tutorial.first.scala
import java.util.concurrent.CountDownLatch
import akka.routing.{ RoutedActorRef, LocalConnectionManager, RoundRobinRouter, RoutedProps }
import akka.actor.{ ActorSystemImpl, Actor, ActorSystem }
// import akka.actor.{ Actor, PoisonPill, ActorSystem }
// import Actor._
// import java.util.concurrent.CountDownLatch
// import akka.routing.Routing.Broadcast
// import akka.routing.{ RoutedProps, Routing }
object Pi extends App {
// object Pi extends App {
// Initiate the calculation
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
// val system = ActorSystem()
// ====================
// ===== Messages =====
// ====================
sealed trait PiMessage
// calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
case object Calculate extends PiMessage
// // ====================
// // ===== Messages =====
// // ====================
// sealed trait PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
// case object Calculate extends PiMessage
case class Result(value: Double) extends PiMessage
// case class Work(start: Int, nrOfElements: Int) extends PiMessage
// ==================
// ===== Worker =====
// ==================
class Worker extends Actor {
// case class Result(value: Double) extends PiMessage
// define the work
def calculatePiFor(start: Int, nrOfElements: Int): Double = {
var acc = 0.0
for (i start until (start + nrOfElements))
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
acc
}
// // ==================
// // ===== Worker =====
// // ==================
// class Worker extends Actor {
def receive = {
case Work(start, nrOfElements) sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
// // define the work
// def calculatePiFor(start: Int, nrOfElements: Int): Double = {
// var acc = 0.0
// for (i start until (start + nrOfElements))
// acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
// acc
// }
// ==================
// ===== Master =====
// ==================
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
extends Actor {
// def receive = {
// case Work(start, nrOfElements) sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
// }
// }
var pi: Double = _
var nrOfResults: Int = _
var start: Long = _
// // ==================
// // ===== Master =====
// // ==================
// class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
// extends Actor {
// create the workers
val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker])
// var pi: Double = _
// var nrOfResults: Int = _
// var start: Long = _
// wrap them with a load-balancing router
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers))
val router = new RoutedActorRef(system, props, self, "pi")
// // create the workers
// val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker])
// message handler
def receive = {
case Calculate
// schedule work
for (i 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
case Result(value)
// handle result from the worker
pi += value
nrOfResults += 1
// // wrap them with a load-balancing router
// val router = system.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
// Stop this actor and all its supervised children
if (nrOfResults == nrOfMessages) self.stop()
}
// // message handler
// def receive = {
// case Calculate
// // schedule work
// for (i 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
override def preStart() {
start = System.currentTimeMillis
}
// // send a PoisonPill to all workers telling them to shut down themselves
// router ! Broadcast(PoisonPill)
override def postStop() {
// tell the world that the calculation is complete
println(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
.format(pi, (System.currentTimeMillis - start)))
latch.countDown()
}
}
// // send a PoisonPill to the router, telling him to shut himself down
// router ! PoisonPill
// ==================
// ===== Run it =====
// ==================
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
val system = ActorSystem()
// case Result(value)
// // handle result from the worker
// pi += value
// nrOfResults += 1
// if (nrOfResults == nrOfMessages) self.stop()
// }
// this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1)
// override def preStart() {
// start = System.currentTimeMillis
// }
// create the master
val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
// override def postStop() {
// // tell the world that the calculation is complete
// println(
// "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
// .format(pi, (System.currentTimeMillis - start)))
// latch.countDown()
// }
// }
// start the calculation
master ! Calculate
// // ==================
// // ===== Run it =====
// // ==================
// def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// wait for master to shut down
latch.await()
// // this latch is only plumbing to know when the calculation is completed
// val latch = new CountDownLatch(1)
// // create the master
// val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
// // start the calculation
// master ! Calculate
// // wait for master to shut down
// latch.await()
// }
// }
// Shut down the system
system.stop()
}
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.tutorial.first.scala
import org.junit.runner.RunWith
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.testkit.TestActorRef
import akka.tutorial.first.scala.Pi.Worker
import akka.actor.ActorSystem
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class WorkerSpec extends WordSpec with MustMatchers {
implicit def system = ActorSystem()
"Worker" must {
"calculate pi correctly" in {
val testActor = TestActorRef[Worker]
val actor = testActor.underlyingActor
actor.calculatePiFor(0, 0) must equal(0.0)
actor.calculatePiFor(1, 1) must be(-1.3333333333333333 plusOrMinus 0.0000000001)
}
}
}

7
build.sbt Normal file
View file

@ -0,0 +1,7 @@
seq(lsSettings:_*)
(LsKeys.tags in LsKeys.lsync) := Seq("actors", "stm", "concurrency", "distributed", "fault-tolerance", "scala", "java", "futures", "dataflow", "remoting")
(externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/")
(description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM."

View file

@ -215,7 +215,7 @@ object AkkaBuild extends Build {
id = "akka-samples",
base = file("akka-samples"),
settings = parentSettings,
aggregate = Seq(fsmSample)
aggregate = Seq(fsmSample, helloSample)
)
lazy val fsmSample = Project(
@ -224,27 +224,36 @@ object AkkaBuild extends Build {
dependencies = Seq(actor),
settings = defaultSettings
)
lazy val helloSample = Project(
id = "akka-sample-hello",
base = file("akka-samples/akka-sample-hello"),
dependencies = Seq(actor),
settings = defaultSettings
)
lazy val tutorials = Project(
id = "akka-tutorials",
base = file("akka-tutorials"),
settings = parentSettings,
aggregate = Seq(firstTutorial, secondTutorial)
aggregate = Seq(firstTutorial)
)
lazy val firstTutorial = Project(
id = "akka-tutorial-first",
base = file("akka-tutorials/akka-tutorial-first"),
dependencies = Seq(actor),
settings = defaultSettings
dependencies = Seq(actor, testkit),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.tutorials
)
)
lazy val secondTutorial = Project(
id = "akka-tutorial-second",
base = file("akka-tutorials/akka-tutorial-second"),
dependencies = Seq(actor),
settings = defaultSettings
)
// lazy val secondTutorial = Project(
// id = "akka-tutorial-second",
// base = file("akka-tutorials/akka-tutorial-second"),
// dependencies = Seq(actor),
// settings = defaultSettings
// )
lazy val docs = Project(
id = "akka-docs",
@ -406,6 +415,8 @@ object Dependencies {
// val sampleCamel = Seq(camelCore, camelSpring, commonsCodec, Runtime.camelJms, Runtime.activemq, Runtime.springJms,
// Test.junit, Test.scalatest, Test.logback)
val tutorials = Seq(Test.scalatest, Test.junit)
val docs = Seq(Test.scalatest, Test.junit)
}

View file

@ -4,3 +4,9 @@ resolvers += Classpaths.typesafeResolver
addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.7")
addSbtPlugin("com.typesafe.sbtscalariform" % "sbt-scalariform" % "0.1.4")
resolvers ++= Seq(
"less is" at "http://repo.lessis.me",
"coda" at "http://repo.codahale.com")
addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.0")