Cleaned up formatting in tutorials

This commit is contained in:
Jonas Bonér 2011-04-19 13:29:22 +02:00
commit 902fe7be0c
15 changed files with 922 additions and 804 deletions

1
.gitignore vendored
View file

@ -46,5 +46,6 @@ multiverse.log
.eprj
.*.swp
akka-docs/_build/
*.pyc
akka-tutorials/akka-tutorial-first/project/boot/
akka-tutorials/akka-tutorial-first/project/plugins/project/

View file

@ -1,115 +1,136 @@
package akka.actor.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.testing._
import akka.testing.Testing.{sleepFor, testMillis}
import akka.util.duration._
import akka.actor.Actor
import akka.actor.Actor._
import org.scalatest.Suite
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.routing._
@RunWith(classOf[JUnitRunner])
class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers {
import java.util.concurrent.atomic.AtomicInteger
class RoutingSpec extends WordSpec with MustMatchers {
import Routing._
@Test def testDispatcher = {
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
val targetOk = new AtomicInteger(0)
val t1 = actorOf( new Actor() {
"Routing" must {
"dispatch" in {
val Test1 = "test1"
val Test2 = "test2"
val Test3 = "test3"
val t1 = actorOf(new Actor {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
case Test1 => self.reply(3)
case Test2 => self.reply(7)
}
} ).start()
}).start()
val t2 = actorOf( new Actor() {
def receive = {
case `testMsg3` => self.reply(11)
case Test3 => self.reply(11)
}
}).start()
val d = dispatcherActor {
case `testMsg1`|`testMsg2` => t1
case `testMsg3` => t2
case Test1 | Test2 => t1
case Test3 => t2
}.start()
val result = for {
a <- (d !! (testMsg1, 5000)).as[Int]
b <- (d !! (testMsg2, 5000)).as[Int]
c <- (d !! (testMsg3, 5000)).as[Int]
a <- (d !! (Test1, testMillis(5 seconds))).as[Int]
b <- (d !! (Test2, testMillis(5 seconds))).as[Int]
c <- (d !! (Test3, testMillis(5 seconds))).as[Int]
} yield a + b + c
result.isDefined must be (true)
result.get must be(21)
result.get must be (21)
for(a <- List(t1,t2,d)) a.stop()
for(a <- List(t1, t2, d)) a.stop()
}
@Test def testLogger = {
"have messages logged" in {
val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any]
val latch = new CountDownLatch(2)
val t1 = actorOf(new Actor { def receive = { case _ => } }).start()
val l = loggerActor(t1,(x) => { msgs.add(x); latch.countDown() }).start()
val foo : Any = "foo"
val bar : Any = "bar"
l ! foo
l ! bar
val done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
val latch = TestLatch(2)
val actor = actorOf(new Actor {
def receive = { case _ => }
}).start()
val logger = loggerActor(actor, x => { msgs.add(x); latch.countDown() }).start()
val foo: Any = "foo"
val bar: Any = "bar"
logger ! foo
logger ! bar
latch.await
msgs must ( have size (2) and contain (foo) and contain (bar) )
t1.stop()
l.stop()
actor.stop()
logger.stop()
}
@Test def testSmallestMailboxFirstDispatcher = {
val t1ProcessedCount = new AtomicInteger(0)
val latch = new CountDownLatch(500)
"dispatch to smallest mailbox" in {
val t1Count = new AtomicInteger(0)
val t2Count = new AtomicInteger(0)
val latch = TestLatch(500)
val t1 = actorOf(new Actor {
def receive = {
case x =>
Thread.sleep(50) // slow actor
t1ProcessedCount.incrementAndGet
sleepFor(50 millis) // slow actor
t1Count.incrementAndGet
latch.countDown()
}
}).start()
val t2ProcessedCount = new AtomicInteger(0)
val t2 = actorOf(new Actor {
def receive = {
case x => t2ProcessedCount.incrementAndGet
case x =>
t2Count.incrementAndGet
latch.countDown()
}
}).start()
val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
for (i <- 1 to 500) d ! i
val done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
t1ProcessedCount.get must be < (t2ProcessedCount.get) // because t1 is much slower and thus has a bigger mailbox all the time
for(a <- List(t1,t2,d)) a.stop()
latch.await(10 seconds)
// because t1 is much slower and thus has a bigger mailbox all the time
t1Count.get must be < (t2Count.get)
for(a <- List(t1, t2, d)) a.stop()
}
@Test def testListener = {
val latch = new CountDownLatch(2)
val foreachListener = new CountDownLatch(2)
val num = new AtomicInteger(0)
val i = actorOf(new Actor with Listeners {
"listen" in {
val fooLatch = TestLatch(2)
val barLatch = TestLatch(2)
val barCount = new AtomicInteger(0)
val broadcast = actorOf(new Actor with Listeners {
def receive = listenerManagement orElse {
case "foo" => gossip("bar")
}
})
i.start()
}).start()
def newListener = actorOf(new Actor {
def receive = {
case "bar" =>
num.incrementAndGet
latch.countDown()
case "foo" => foreachListener.countDown()
barCount.incrementAndGet
barLatch.countDown()
case "foo" =>
fooLatch.countDown()
}
}).start()
@ -117,124 +138,113 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
val a2 = newListener
val a3 = newListener
i ! Listen(a1)
i ! Listen(a2)
i ! Listen(a3)
i ! Deafen(a3)
i ! WithListeners(_ ! "foo")
i ! "foo"
broadcast ! Listen(a1)
broadcast ! Listen(a2)
broadcast ! Listen(a3)
val done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
num.get must be (2)
val withListeners = foreachListener.await(5,TimeUnit.SECONDS)
withListeners must be (true)
for(a <- List(i,a1,a2,a3)) a.stop()
broadcast ! Deafen(a3)
broadcast ! WithListeners(_ ! "foo")
broadcast ! "foo"
barLatch.await
barCount.get must be (2)
fooLatch.await
for(a <- List(broadcast, a1 ,a2 ,a3)) a.stop()
}
@Test def testIsDefinedAt = {
"be defined at" in {
import akka.actor.ActorRef
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
val Yes = "yes"
val No = "no"
val t1 = actorOf( new Actor() {
def testActor() = actorOf( new Actor() {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
case Yes => "yes"
}
} ).start()
}).start()
val t2 = actorOf( new Actor() {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
}
} ).start()
val t3 = actorOf( new Actor() {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
}
} ).start()
val t4 = actorOf( new Actor() {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
}
} ).start()
val t1 = testActor()
val t2 = testActor()
val t3 = testActor()
val t4 = testActor()
val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil))
val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil))
t1.isDefinedAt(testMsg1) must be (true)
t1.isDefinedAt(testMsg3) must be (false)
t2.isDefinedAt(testMsg1) must be (true)
t2.isDefinedAt(testMsg3) must be (false)
d1.isDefinedAt(testMsg1) must be (true)
d1.isDefinedAt(testMsg3) must be (false)
d2.isDefinedAt(testMsg1) must be (true)
d2.isDefinedAt(testMsg3) must be (false)
t1.isDefinedAt(Yes) must be (true)
t1.isDefinedAt(No) must be (false)
t2.isDefinedAt(Yes) must be (true)
t2.isDefinedAt(No) must be (false)
d1.isDefinedAt(Yes) must be (true)
d1.isDefinedAt(No) must be (false)
d2.isDefinedAt(Yes) must be (true)
d2.isDefinedAt(No) must be (false)
for(a <- List(t1,t2,d1,d2)) a.stop()
for(a <- List(t1, t2, d1, d2)) a.stop()
}
}
// Actor Pool Capacity Tests
"Actor Pool" must {
//
// make sure the pool is of the fixed, expected capacity
//
@Test def testFixedCapacityActorPool = {
val latch = new CountDownLatch(2)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
"have expected capacity" in {
val latch = TestLatch(2)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool
with FixedCapacityStrategy
with SmallestMailboxSelector
{
def factory = actorOf(new Actor {
def receive = {
case _ =>
counter.incrementAndGet
count.incrementAndGet
latch.countDown()
self reply_? "success"
}
})
}).start()
def limit = 2
def selectionCount = 1
def partialFill = true
def instance = factory
def receive = _route
}
}).start()
val successes = new CountDownLatch(2)
implicit val successCounterActor = Some(actorOf(new Actor {
val successes = TestLatch(2)
val successCounter = Some(actorOf(new Actor {
def receive = {
case "success" => successes.countDown()
}
}).start())
val pool = actorOf(new TestPool).start()
implicit val replyTo = successCounter
pool ! "a"
pool ! "b"
latch.await(1,TimeUnit.SECONDS) must be (true)
successes.await(1,TimeUnit.SECONDS) must be (true)
counter.get must be (2)
latch.await
successes.await
count.get must be (2)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
pool stop
pool.stop()
}
@Test def testTicket705 = {
val actorPool = actorOf(new Actor with DefaultActorPool
"pass ticket #705" in {
val pool = actorOf(
new Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
with BasicFilter {
//with BasicNoBackoffFilter {
with BasicFilter
{
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
@ -248,7 +258,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def factory = actorOf(new Actor {
def receive = {
case req: String => {
Thread.sleep(10L)
sleepFor(10 millis)
self.reply_?("Response")
}
}
@ -256,23 +266,23 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
}).start()
try {
(for(count <- 1 to 500) yield actorPool.!!![String]("Test", 20000)) foreach {
(for (count <- 1 to 500) yield pool.!!![String]("Test", 20000)) foreach {
_.await.resultOrException.get must be ("Response")
}
} finally {
actorPool.stop()
pool.stop()
}
}
//
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
//
@Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = {
var latch = new CountDownLatch(3)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool
with BoundedCapacityStrategy
with ActiveFuturesPressureCapacitor
with SmallestMailboxSelector
@ -280,9 +290,9 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
counter.incrementAndGet
case n: Int =>
sleepFor(n millis)
count.incrementAndGet
latch.countDown()
}
})
@ -294,58 +304,55 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def selectionCount = 1
def instance = factory
def receive = _route
}
}).start()
//
// first message should create the minimum number of delgates
//
val pool = actorOf(new TestPool).start()
pool ! 1
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
var loops = 0
def loop(t:Int) = {
latch = new CountDownLatch(loops)
counter.set(0)
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m <- 0 until loops) {
pool !!! t
Thread.sleep(50)
sleepFor(50 millis)
}
}
//
// 2 more should go thru w/out triggering more
//
// 2 more should go thru without triggering more
loops = 2
loop(500)
var done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
latch.await
count.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
//
// a whole bunch should max it out
//
loops = 10
loop(500)
latch.await
count.get must be (loops)
done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4)
pool stop
pool.stop()
}
//
"grow as needed under mailbox pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of messages in the delegate mailboxes
//
@Test def testBoundedCapacityActorPoolWithMailboxPressure = {
var latch = new CountDownLatch(3)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
var latch = TestLatch(3)
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
@ -353,9 +360,9 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
counter.incrementAndGet
case n: Int =>
sleepFor(n millis)
count.incrementAndGet
latch.countDown()
}
})
@ -368,51 +375,43 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def selectionCount = 1
def instance = factory
def receive = _route
}
val pool = actorOf(new TestPool).start()
}).start()
var loops = 0
def loop(t:Int) = {
latch = new CountDownLatch(loops)
counter.set(0)
def loop(t: Int) = {
latch = TestLatch(loops)
count.set(0)
for (m <- 0 until loops) {
pool ! t
}
}
//
// send a few messages and observe pool at its lower bound
//
loops = 3
loop(500)
var done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
latch.await
count.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
//
// send a bunch over the theshold and observe an increment
//
loops = 15
loop(500)
done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
latch.await(10 seconds)
count.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3)
pool stop
pool.stop()
}
// Actor Pool Selector Tests
@Test def testRoundRobinSelector = {
var latch = new CountDownLatch(2)
"round robin" in {
val latch1 = TestLatch(2)
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
class TestPool1 extends Actor with DefaultActorPool
val pool1 = actorOf(
new Actor with DefaultActorPool
with FixedCapacityStrategy
with RoundRobinSelector
with BasicNoBackoffFilter
@ -421,7 +420,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def receive = {
case _ =>
delegates put(self.uuid.toString, "")
latch.countDown()
latch1.countDown()
}
})
@ -431,17 +430,21 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def partialFill = true
def instance = factory
def receive = _route
}
}).start()
val pool1 = actorOf(new TestPool1).start()
pool1 ! "a"
pool1 ! "b"
var done = latch.await(1,TimeUnit.SECONDS)
done must be (true)
delegates.size must be (1)
pool1 stop
class TestPool2 extends Actor with DefaultActorPool
latch1.await
delegates.size must be (1)
pool1.stop()
val latch2 = TestLatch(2)
delegates.clear()
val pool2 = actorOf(
new Actor with DefaultActorPool
with FixedCapacityStrategy
with RoundRobinSelector
with BasicNoBackoffFilter
@ -450,7 +453,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def receive = {
case _ =>
delegates put(self.uuid.toString, "")
latch.countDown()
latch2.countDown()
}
})
@ -460,29 +463,22 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def partialFill = false
def instance = factory
def receive = _route
}
}).start()
latch = new CountDownLatch(2)
delegates clear
val pool2 = actorOf(new TestPool2).start()
pool2 ! "a"
pool2 ! "b"
done = latch.await(1, TimeUnit.SECONDS)
done must be (true)
latch2.await
delegates.size must be (2)
pool2 stop
pool2.stop()
}
// Actor Pool Filter Tests
"backoff" in {
val latch = TestLatch(10)
//
// reuse previous test to max pool then observe filter reducing capacity over time
//
@Test def testBoundedCapacityActorPoolWithBackoffFilter = {
var latch = new CountDownLatch(10)
class TestPool extends Actor with DefaultActorPool
val pool = actorOf(
new Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
@ -492,8 +488,8 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
case n: Int =>
sleepFor(n millis)
latch.countDown()
}
})
@ -508,30 +504,29 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
def backoffThreshold = 0.50
def instance = factory
def receive = _route
}
}).start()
//
// put some pressure on the pool
//
val pool = actorOf(new TestPool).start()
for (m <- 0 to 10) pool ! 250
Thread.sleep(5)
sleepFor(5 millis)
val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size
z must be >= (2)
var done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
// let it cool down
//
//
//
for (m <- 0 to 3) {
pool ! 1
Thread.sleep(500)
sleepFor(500 millis)
}
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z)
pool stop
pool.stop()
}
}
}

View file

@ -56,8 +56,9 @@ trait ListenerManagement {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isShutdown) iterator.remove()
else try {
// Uncomment if those exceptions are so frequent as to bottleneck
// if (listener.isShutdown) iterator.remove() else
try {
listener ! msg
} catch {
case e : ActorInitializationException =>

View file

@ -6,16 +6,24 @@ SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
EASYINSTALL = easy_install
LOCALPACKAGES = $(shell pwd)/$(BUILDDIR)/site-packages
PYGMENTSDIR = pygments
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html singlehtml latex pdf
# Set python path to include local packages for pygments styles.
PYTHONPATH += $(LOCALPACKAGES)
export PYTHONPATH
.PHONY: help clean pygments html singlehtml latex pdf
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " pygments to locally install the custom pygments styles"
@echo " html to make standalone HTML files"
@echo " singlehtml to make a single large HTML file"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@ -24,7 +32,15 @@ help:
clean:
-rm -rf $(BUILDDIR)/*
html:
pygments:
mkdir -p $(LOCALPACKAGES)
$(EASYINSTALL) --install-dir $(LOCALPACKAGES) $(PYGMENTSDIR)
-rm -rf $(PYGMENTSDIR)/*.egg-info $(PYGMENTSDIR)/build $(PYGMENTSDIR)/temp
@echo
@echo "Custom pygments styles have been installed."
@echo
html: pygments
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
@ -41,9 +57,8 @@ latex:
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
pdf:
pdf: pygments
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
make -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."

View file

@ -7,7 +7,8 @@ import sys, os
# -- General configuration -----------------------------------------------------
extensions = ['sphinx.ext.todo']
sys.path.append(os.path.abspath('exts'))
extensions = ['sphinx.ext.todo', 'includecode']
templates_path = ['_templates']
source_suffix = '.rst'
@ -19,7 +20,7 @@ copyright = u'2009-2011, Scalable Solutions AB'
version = '1.1'
release = '1.1'
pygments_style = 'akka'
pygments_style = 'simple'
highlight_language = 'scala'
add_function_parentheses = False
show_authors = True

View file

@ -0,0 +1,138 @@
import os
import codecs
from os import path
from docutils import nodes
from docutils.parsers.rst import Directive, directives
class IncludeCode(Directive):
"""
Include a code example from a file with sections delimited with special comments.
"""
has_content = False
required_arguments = 1
optional_arguments = 0
final_argument_whitespace = False
option_spec = {
'section': directives.unchanged_required,
'comment': directives.unchanged_required,
'marker': directives.unchanged_required,
'include': directives.unchanged_required,
'exclude': directives.unchanged_required,
'hideexcludes': directives.flag,
'linenos': directives.flag,
'language': directives.unchanged_required,
'encoding': directives.encoding,
'prepend': directives.unchanged_required,
'append': directives.unchanged_required,
}
def run(self):
document = self.state.document
arg0 = self.arguments[0]
(filename, sep, section) = arg0.partition('#')
if not document.settings.file_insertion_enabled:
return [document.reporter.warning('File insertion disabled',
line=self.lineno)]
env = document.settings.env
if filename.startswith('/') or filename.startswith(os.sep):
rel_fn = filename[1:]
else:
docdir = path.dirname(env.doc2path(env.docname, base=None))
rel_fn = path.join(docdir, filename)
try:
fn = path.join(env.srcdir, rel_fn)
except UnicodeDecodeError:
# the source directory is a bytestring with non-ASCII characters;
# let's try to encode the rel_fn in the file system encoding
rel_fn = rel_fn.encode(sys.getfilesystemencoding())
fn = path.join(env.srcdir, rel_fn)
encoding = self.options.get('encoding', env.config.source_encoding)
codec_info = codecs.lookup(encoding)
try:
f = codecs.StreamReaderWriter(open(fn, 'U'),
codec_info[2], codec_info[3], 'strict')
lines = f.readlines()
f.close()
except (IOError, OSError):
return [document.reporter.warning(
'Include file %r not found or reading it failed' % filename,
line=self.lineno)]
except UnicodeError:
return [document.reporter.warning(
'Encoding %r used for reading included file %r seems to '
'be wrong, try giving an :encoding: option' %
(encoding, filename))]
comment = self.options.get('comment', '//')
marker = self.options.get('marker', comment + '#')
lenm = len(marker)
if not section:
section = self.options.get('section')
include_sections = self.options.get('include', '')
exclude_sections = self.options.get('exclude', '')
include = set(include_sections.split(',')) if include_sections else set()
exclude = set(exclude_sections.split(',')) if exclude_sections else set()
hideexcludes = 'hideexcludes' in self.options
if section:
include |= set([section])
within = set()
res = []
excluding = False
for line in lines:
index = line.find(marker)
if index >= 0:
section_name = line[index+lenm:].strip()
if section_name in within:
within ^= set([section_name])
if excluding and not (exclude & within):
excluding = False
else:
within |= set([section_name])
if not excluding and (exclude & within):
excluding = True
if not hideexcludes:
res.append(' ' * index + comment + ' ' + section_name.replace('-', ' ') + ' ...\n')
elif not (exclude & within) and (not include or (include & within)):
res.append(line)
lines = res
def countwhile(predicate, iterable):
count = 0
for x in iterable:
if predicate(x):
count += 1
else:
return count
nonempty = filter(lambda l: l.strip(), lines)
tabcounts = map(lambda l: countwhile(lambda c: c == ' ', l), nonempty)
tabshift = min(tabcounts) if tabcounts else 0
if tabshift > 0:
lines = map(lambda l: l[tabshift:] if len(l) > tabshift else l, lines)
prepend = self.options.get('prepend')
append = self.options.get('append')
if prepend:
lines.insert(0, prepend + '\n')
if append:
lines.append(append + '\n')
text = ''.join(lines)
retnode = nodes.literal_block(text, text, source=fn)
retnode.line = 1
retnode.attributes['line_number'] = self.lineno
if self.options.get('language', ''):
retnode['language'] = self.options['language']
if 'linenos' in self.options:
retnode['linenos'] = True
document.settings.env.note_dependency(rel_fn)
return [retnode]
def setup(app):
app.require_sphinx('1.0')
app.add_directive('includecode', IncludeCode)

View file

@ -6,74 +6,75 @@ Contents
manual/getting-started-first-scala
manual/getting-started-first-java
pending/actor-registry-java
pending/actor-registry-scala
pending/actors-scala
pending/agents-scala
pending/articles
pending/benchmarks
pending/building-akka
pending/buildr
pending/cluster-membership
pending/companies-using-akka
pending/configuration
pending/dataflow-java
pending/dataflow-scala
pending/deployment-scenarios
pending/developer-guidelines
pending/dispatchers-java
pending/dispatchers-scala
pending/event-handler
pending/external-sample-projects
pending/fault-tolerance-java
pending/fault-tolerance-scala
pending/Feature Stability Matrix
manual/fsm-scala
pending/futures-scala
pending/getting-started
pending/guice-integration
pending/Home
pending/http
pending/issue-tracking
pending/language-bindings
pending/licenses
pending/logging
pending/Migration-1.0-1.1
pending/migration-guide-0.10.x-1.0.x
pending/migration-guide-0.7.x-0.8.x
pending/migration-guide-0.8.x-0.9.x
pending/migration-guide-0.9.x-0.10.x
pending/migration-guides
pending/Recipes
pending/release-notes
pending/remote-actors-java
pending/remote-actors-scala
pending/routing-java
pending/routing-scala
pending/scheduler
pending/security
pending/serialization-java
pending/serialization-scala
pending/servlet
pending/slf4j
pending/sponsors
pending/stm
pending/stm-java
pending/stm-scala
pending/team
pending/test
pending/testkit
pending/testkit-example
pending/third-party-integrations
pending/transactors-java
pending/transactors-scala
pending/tutorial-chat-server-java
pending/tutorial-chat-server-scala
pending/typed-actors-java
pending/typed-actors-scala
pending/untyped-actors-java
pending/use-cases
pending/web
.. pending/actor-registry-java
.. pending/actor-registry-scala
.. pending/actors-scala
.. pending/agents-scala
.. pending/articles
.. pending/benchmarks
.. pending/building-akka
.. pending/buildr
.. pending/cluster-membership
.. pending/companies-using-akka
.. pending/configuration
.. pending/dataflow-java
.. pending/dataflow-scala
.. pending/deployment-scenarios
.. pending/developer-guidelines
.. pending/dispatchers-java
.. pending/dispatchers-scala
.. pending/event-handler
.. pending/external-sample-projects
.. pending/fault-tolerance-java
.. pending/fault-tolerance-scala
.. pending/Feature Stability Matrix
.. pending/futures-scala
.. pending/getting-started
.. pending/guice-integration
.. pending/Home
.. pending/http
.. pending/issue-tracking
.. pending/language-bindings
.. pending/licenses
.. pending/logging
.. pending/Migration-1.0-1.1
.. pending/migration-guide-0.10.x-1.0.x
.. pending/migration-guide-0.7.x-0.8.x
.. pending/migration-guide-0.8.x-0.9.x
.. pending/migration-guide-0.9.x-0.10.x
.. pending/migration-guides
.. pending/Recipes
.. pending/release-notes
.. pending/remote-actors-java
.. pending/remote-actors-scala
.. pending/routing-java
.. pending/routing-scala
.. pending/scheduler
.. pending/security
.. pending/serialization-java
.. pending/serialization-scala
.. pending/servlet
.. pending/slf4j
.. pending/sponsors
.. pending/stm
.. pending/stm-java
.. pending/stm-scala
.. pending/team
.. pending/test
.. pending/testkit
.. pending/testkit-example
.. pending/third-party-integrations
.. pending/transactors-java
.. pending/transactors-scala
.. pending/tutorial-chat-server-java
.. pending/tutorial-chat-server-scala
.. pending/typed-actors-java
.. pending/typed-actors-scala
.. pending/untyped-actors-java
.. pending/use-cases
.. pending/web
Links
=====

View file

@ -0,0 +1,129 @@
//#imports
package akka.tutorial.scala.first
import akka.actor.{Actor, PoisonPill}
import Actor._
import akka.routing.{Routing, CyclicIterator}
import Routing._
import System.{currentTimeMillis => now}
import java.util.concurrent.CountDownLatch
//#imports
//#app
object Pi extends App {
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
//#actors-and-messages
// ====================
// ===== Messages =====
// ====================
//#messages
sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
//#messages
// ==================
// ===== Worker =====
// ==================
//#worker
class Worker extends Actor {
//#calculate-pi
def calculatePiFor(start: Int, nrOfElements: Int): Double = {
var acc = 0.0
for (i <- start until (start + nrOfElements))
acc += 4 * math.pow(-1, i) / (2 * i + 1)
acc
}
//#calculate-pi
def receive = {
case Work(start, nrOfElements) =>
self reply Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
//#worker
// ==================
// ===== Master =====
// ==================
//#master
class Master(
nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
extends Actor {
var pi: Double = _
var nrOfResults: Int = _
var start: Long = _
//#create-workers
// create the workers
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
//#create-workers
//#master-receive
// message handler
def receive = {
//#message-handling
case Calculate =>
// schedule work
for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
// send a PoisonPill to all workers telling them to shut down themselves
router ! Broadcast(PoisonPill)
// send a PoisonPill to the router, telling him to shut himself down
router ! PoisonPill
case Result(value) =>
// handle result from the worker
pi += value
nrOfResults += 1
if (nrOfResults == nrOfMessages) self.stop()
//#message-handling
}
//#master-receive
override def preStart {
start = now
}
override def postStop {
// tell the world that the calculation is complete
println(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
.format(pi, (now - start)))
latch.countDown()
}
}
//#master
//#actors-and-messages
// ==================
// ===== Run it =====
// ==================
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1)
// create the master
val master = actorOf(
new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start()
// start the calculation
master ! Calculate
// wait for master to shut down
latch.await()
}
}
//#app

View file

@ -104,9 +104,9 @@ Downloading and installing Maven
Maven is an excellent build system that can be used to build both Java and Scala projects. If you want to use Maven for this tutorial then follow the following instructions, if not you can skip this section and the next.
First browse to the `Maven download page <http://maven.apache.org/download.html>`_ and download the ``3.0.3`` distribution.
First browse to `http://maven.apache.org/download.html <http://maven.apache.org/download.html>`_ and download the ``3.0.3`` distribution.
To install Maven it is easiest to follow the instructions on `this page <http://maven.apache.org/download.html#Installation>`_.
To install Maven it is easiest to follow the instructions on `http://maven.apache.org/download.html#Installation <http://maven.apache.org/download.html#Installation>`_.
Creating an Akka Maven project
------------------------------
@ -378,7 +378,8 @@ Here is the master actor::
}
}
public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
public Master(
int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
@ -470,7 +471,7 @@ Now the only thing that is left to implement is the runner that should bootstrap
pi.calculate(4, 10000, 10000);
}
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages)
public void calculate(int nrOfWorkers, int nrOfElements, int nrOfMessages)
throws Exception {
// this latch is only plumbing to know when the calculation is completed
@ -601,7 +602,9 @@ Before we package it up and run it, let's take a look at the full code now, with
}
}
public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
public Master(
int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
@ -664,7 +667,7 @@ Before we package it up and run it, let's take a look at the full code now, with
// ==================
// ===== Run it =====
// ==================
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages)
public void calculate(int nrOfWorkers, int nrOfElements, int nrOfMessages)
throws Exception {
// this latch is only plumbing to know when the calculation is completed
@ -702,9 +705,11 @@ First we need to compile the source file. That is done with Java's compiler ``ja
When we have compiled the source file we are ready to run the application. This is done with ``java`` but yet again we need to add the ``akka-actor-1.1.jar`` and the ``scala-library.jar`` JAR files to the classpath as well as the classes we compiled ourselves::
$ java -cp dist/akka-actor-1.1.jar:scala-library.jar:tutorial akka.tutorial.java.first.Pi
AKKA_HOME is defined as [/Users/jboner/src/akka-stuff/akka-core], loading config from \
[/Users/jboner/src/akka-stuff/akka-core/config/akka.conf].
$ java \
-cp dist/akka-actor-1.1.jar:scala-library.jar:tutorial \
akka.tutorial.java.first.Pi
AKKA_HOME is defined as [/Users/jboner/src/akka-stuff/akka-core]
loading config from [/Users/jboner/src/akka-stuff/akka-core/config/akka.conf].
Pi estimate: 3.1435501812459323
Calculation time: 822 millis

View file

@ -124,9 +124,9 @@ Downloading and installing SBT
SBT, short for 'Simple Build Tool' is an excellent build system written in Scala. It uses Scala to write the build scripts which gives you a lot of power. It has a plugin architecture with many plugins available, something that we will take advantage of soon. SBT is the preferred way of building software in Scala and is probably the easiest way of getting through this tutorial. If you want to use SBT for this tutorial then follow the following instructions, if not you can skip this section and the next.
First browse to the `SBT download page<http://code.google.com/p/simple-build-tool/downloads/list>`_ and download the ``0.7.6.RC0`` distribution.
First browse to `http://code.google.com/p/simple-build-tool/downloads/list <http://code.google.com/p/simple-build-tool/downloads/list>`_ and download the ``0.7.6.RC0`` distribution.
To install SBT and create a project for this tutorial it is easiest to follow the instructions on `this page <http://code.google.com/p/simple-build-tool/wiki/Setup>`_.
To install SBT and create a project for this tutorial it is easiest to follow the instructions on `http://code.google.com/p/simple-build-tool/wiki/Setup <http://code.google.com/p/simple-build-tool/wiki/Setup>`_.
Now we need to create our first Akka project. You could add the dependencies manually to the build script, but the easiest way is to use Akka's SBT Plugin, covered in the next section.
@ -184,7 +184,7 @@ Now it's about time to start hacking.
We start by creating a ``Pi.scala`` file and adding these import statements at the top of the file::
package akka.tutorial.scala.first
package akka.tutorial.first.scala
import akka.actor.{Actor, PoisonPill}
import Actor._
@ -275,7 +275,8 @@ Now we have a router that is representing all our workers in a single abstractio
Here is the master actor::
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
class Master(
nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
extends Actor {
var pi: Double = _
@ -291,12 +292,14 @@ Here is the master actor::
def receive = { ... }
override def preStart {
start = now
start = System.currentTimeMillis
}
override def postStop {
// tell the world that the calculation is complete
println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
println(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
.format(pi, (System.currentTimeMillis - start)))
latch.countDown()
}
}
@ -371,7 +374,7 @@ That's it. Now we are done.
But before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all::
package akka.tutorial.scala.first
package akka.tutorial.first.scala
import akka.actor.{Actor, PoisonPill}
import Actor._
@ -449,14 +452,14 @@ But before we package it up and run it, let's take a look at the full code now,
}
override def preStart {
start = now
start = System.currentTimeMillis
}
override def postStop {
// tell the world that the calculation is complete
println(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
.format(pi, (now - start)))
.format(pi, (System.currentTimeMillis - start)))
latch.countDown()
}
}
@ -492,9 +495,11 @@ First we need to compile the source file. That is done with Scala's compiler ``s
When we have compiled the source file we are ready to run the application. This is done with ``java`` but yet again we need to add the ``akka-actor-1.1.jar`` JAR file to the classpath, and this time we also need to add the Scala runtime library ``scala-library.jar`` and the classes we compiled ourselves::
$ java -cp dist/akka-actor-1.1.jar:scala-library.jar:tutorial akka.tutorial.scala.first.Pi
AKKA_HOME is defined as [/Users/jboner/src/akka-stuff/akka-core], loading config from \
[/Users/jboner/src/akka-stuff/akka-core/config/akka.conf].
$ java \
-cp dist/akka-actor-1.1.jar:scala-library.jar:tutorial \
akka.tutorial.first.scala.Pi
AKKA_HOME is defined as [/Users/jboner/src/akka-stuff/akka-core]
loading config from [/Users/jboner/src/akka-stuff/akka-core/config/akka.conf].
Pi estimate: 3.1435501812459323
Calculation time: 858 millis

View file

@ -112,7 +112,7 @@ Downloading and installing SBT
SBT, short for 'Simple Build Tool' is an excellent build system written in Scala. It uses Scala to write the build scripts which gives you a lot of power. It has a plugin architecture with many plugins available, something that we will take advantage of soon. SBT is the preferred way of building software in Scala and is probably the easiest way of getting through this tutorial. If you want to use SBT for this tutorial then follow the following instructions, if not you can skip this section and the next.
First browse to the `SBT download page<http://code.google.com/p/simple-build-tool/downloads/list>`_ and download the ``0.7.6.RC0`` distribution.
First browse to the `SBT download page <http://code.google.com/p/simple-build-tool/downloads/list>`_ and download the ``0.7.6.RC0`` distribution.
To install SBT and create a project for this tutorial it is easiest to follow the instructions on `this page <http://code.google.com/p/simple-build-tool/wiki/Setup>`_.
@ -172,17 +172,9 @@ Start writing the code
Now it's about time to start hacking.
We start by creating a ``Pi.scala`` file and adding these import statements at the top of the file::
We start by creating a ``Pi.scala`` file and adding these import statements at the top of the file:
package akka.tutorial.scala.first
import akka.actor.{Actor, PoisonPill}
import Actor._
import akka.routing.{Routing, CyclicIterator}
import Routing._
import akka.dispatch.Dispatchers
import java.util.concurrent.CountDownLatch
.. includecode:: examples/Pi.scala#imports
If you are using SBT in this tutorial then create the file in the ``src/main/scala`` directory.
@ -199,49 +191,30 @@ With this in mind, let's now create the messages that we want to have flowing in
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing the work assignment
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor containing the result from the worker's calculation
Messages sent to actors should always be immutable to avoid sharing mutable state. In scala we have 'case classes' which make excellent messages. So let's start by creating three messages as case classes. We also create a common base trait for our messages (that we define as being ``sealed`` in order to prevent creating messages outside our control)::
Messages sent to actors should always be immutable to avoid sharing mutable state. In scala we have 'case classes' which make excellent messages. So let's start by creating three messages as case classes. We also create a common base trait for our messages (that we define as being ``sealed`` in order to prevent creating messages outside our control):
sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
.. includecode:: examples/Pi.scala#messages
Creating the worker
-------------------
Now we can create the worker actor. This is done by mixing in the ``Actor`` trait and defining the ``receive`` method. The ``receive`` method defines our message handler. We expect it to be able to handle the ``Work`` message so we need to add a handler for this message::
Now we can create the worker actor. This is done by mixing in the ``Actor`` trait and defining the ``receive`` method. The ``receive`` method defines our message handler. We expect it to be able to handle the ``Work`` message so we need to add a handler for this message:
class Worker extends Actor {
def receive = {
case Work(start, nrOfElements) =>
self reply Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
.. includecode:: examples/Pi.scala#worker
:exclude: calculate-pi
As you can see we have now created an ``Actor`` with a ``receive`` method as a handler for the ``Work`` message. In this handler we invoke the ``calculatePiFor(..)`` method, wrap the result in a ``Result`` message and send it back to the original sender using ``self.reply``. In Akka the sender reference is implicitly passed along with the message so that the receiver can always reply or store away the sender reference for future use.
The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method. While there are many ways we can implement this algorithm in Scala, in this introductory tutorial we have chosen an imperative style using a for comprehension and an accumulator::
The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method. While there are many ways we can implement this algorithm in Scala, in this introductory tutorial we have chosen an imperative style using a for comprehension and an accumulator:
def calculatePiFor(start: Int, nrOfElements: Int): Double = {
var acc = 0.0
for (i <- start until (start + nrOfElements))
acc += 4 * math.pow(-1, i) / (2 * i + 1)
acc
}
.. includecode:: examples/Pi.scala#calculate-pi
Creating the master
-------------------
The master actor is a little bit more involved. In its constructor we need to create the workers (the ``Worker`` actors) and start them. We will also wrap them in a load-balancing router to make it easier to spread out the work evenly between the workers. Let's do that first::
The master actor is a little bit more involved. In its constructor we need to create the workers (the ``Worker`` actors) and start them. We will also wrap them in a load-balancing router to make it easier to spread out the work evenly between the workers. Let's do that first:
// create the workers
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
.. includecode:: examples/Pi.scala#create-workers
As you can see we are using the ``actorOf`` factory method to create actors, this method returns as an ``ActorRef`` which is a reference to our newly created actor. This method is available in the ``Actor`` object but is usually imported::
@ -253,33 +226,10 @@ Now we have a router that is representing all our workers in a single abstractio
- ``nrOfMessages`` -- defining how many number chunks to send out to the workers
- ``nrOfElements`` -- defining how big the number chunks sent to each worker should be
Here is the master actor::
Here is the master actor:
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
extends Actor {
var pi: Double = _
var nrOfResults: Int = _
var start: Long = _
// create the workers
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
def receive = { ... }
override def preStart {
start = now
}
override def postStop {
// tell the world that the calculation is complete
println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
latch.countDown()
}
}
.. includecode:: examples/Pi.scala#master
:exclude: message-handling
A couple of things are worth explaining further.
@ -296,170 +246,27 @@ The ``Calculate`` handler is sending out work to all the ``Worker`` actors and a
The ``Result`` handler is simpler, here we get the value from the ``Result`` message and aggregate it to our ``pi`` member variable. We also keep track of how many results we have received back, and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and shuts down.
Let's capture this in code::
Let's capture this in code:
// message handler
def receive = {
case Calculate =>
// schedule work
for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
// send a PoisonPill to all workers telling them to shut down themselves
router ! Broadcast(PoisonPill)
// send a PoisonPill to the router, telling him to shut himself down
router ! PoisonPill
case Result(value) =>
// handle result from the worker
pi += value
nrOfResults += 1
if (nrOfResults == nrOfMessages) self.stop()
}
.. includecode:: examples/Pi.scala#master-receive
Bootstrap the calculation
-------------------------
Now the only thing that is left to implement is the runner that should bootstrap and run the calculation for us. We do that by creating an object that we call ``Pi``, here we can extend the ``App`` trait in Scala, which means that we will be able to run this as an application directly from the command line.
The ``Pi`` object is a perfect container module for our actors and messages, so let's put them all there. We also create a method ``calculate`` in which we start up the ``Master`` actor and wait for it to finish::
The ``Pi`` object is a perfect container module for our actors and messages, so let's put them all there. We also create a method ``calculate`` in which we start up the ``Master`` actor and wait for it to finish:
object Pi extends App {
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
... // actors and messages
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1)
// create the master
val master = actorOf(
new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start()
// start the calculation
master ! Calculate
// wait for master to shut down
latch.await()
}
}
.. includecode:: examples/Pi.scala#app
:exclude: actors-and-messages
That's it. Now we are done.
But before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all::
But before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all:
package akka.tutorial.scala.first
.. includecode:: examples/Pi.scala
import akka.actor.{Actor, PoisonPill}
import Actor._
import akka.routing.{Routing, CyclicIterator}
import Routing._
import java.util.concurrent.CountDownLatch
object Pi extends App {
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
// ====================
// ===== Messages =====
// ====================
sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
// ==================
// ===== Worker =====
// ==================
class Worker extends Actor {
// define the work
def calculatePiFor(start: Int, nrOfElements: Int): Double = {
var acc = 0.0
for (i <- start until (start + nrOfElements))
acc += 4 * math.pow(-1, i) / (2 * i + 1)
acc
}
def receive = {
case Work(start, nrOfElements) =>
self reply Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
// ==================
// ===== Master =====
// ==================
class Master(
nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
extends Actor {
var pi: Double = _
var nrOfResults: Int = _
var start: Long = _
// create the workers
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
// message handler
def receive = {
case Calculate =>
// schedule work
//for (arg <- 0 until nrOfMessages) router ! Work(arg, nrOfElements)
for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
// send a PoisonPill to all workers telling them to shut down themselves
router ! Broadcast(PoisonPill)
// send a PoisonPill to the router, telling him to shut himself down
router ! PoisonPill
case Result(value) =>
// handle result from the worker
pi += value
nrOfResults += 1
if (nrOfResults == nrOfMessages) self.stop()
}
override def preStart {
start = now
}
override def postStop {
// tell the world that the calculation is complete
println(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
.format(pi, (now - start)))
latch.countDown()
}
}
// ==================
// ===== Run it =====
// ==================
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1)
// create the master
val master = actorOf(
new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start()
// start the calculation
master ! Calculate
// wait for master to shut down
latch.await()
}
}
Run it as a command line application
------------------------------------

View file

@ -0,0 +1,19 @@
"""
Akka syntax styles for Pygments.
"""
from setuptools import setup
entry_points = """
[pygments.styles]
simple = styles.simple:SimpleStyle
"""
setup(
name = 'akkastyles',
version = '0.1',
description = __doc__,
author = "Akka",
packages = ['styles'],
entry_points = entry_points
)

View file

View file

@ -3,7 +3,7 @@
pygments.styles.akka
~~~~~~~~~~~~~~~~~~~~~~~~
Akka style for Scala highlighting.
Simple style for Scala highlighting.
"""
from pygments.style import Style
@ -11,9 +11,9 @@ from pygments.token import Keyword, Name, Comment, String, Error, \
Number, Operator, Generic, Whitespace
class AkkaStyle(Style):
class SimpleStyle(Style):
"""
Akka style for Scala highlighting.
Simple style for Scala highlighting.
"""
background_color = "#f0f0f0"

View file

@ -9,6 +9,7 @@ import Actor._
import akka.routing.{Routing, CyclicIterator}
import Routing._
import System.{currentTimeMillis => now}
import java.util.concurrent.CountDownLatch
/**