From 807579ee87ae0ee891af241cb1b8a38b23bd8999 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Mon, 18 Apr 2011 23:01:08 +0200 Subject: [PATCH 01/11] optimize performance optimization (away) as suggested by Viktor --- akka-actor/src/main/scala/akka/util/ListenerManagement.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala index 349d51255d..efeb482377 100644 --- a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala @@ -56,8 +56,9 @@ trait ListenerManagement { val iterator = listeners.iterator while (iterator.hasNext) { val listener = iterator.next - if (listener.isShutdown) iterator.remove() - else try { + // Uncomment if those exceptions are so frequent as to bottleneck + // if (listener.isShutdown) iterator.remove() else + try { listener ! msg } catch { case e : ActorInitializationException => From 0b4fe353f884b24f451a07476c93c720efd4f1d8 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 10:31:54 +1200 Subject: [PATCH 02/11] Fix broken compile - revert import in pi tutorial --- akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index a1844cdc45..c31f8ee2f6 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -9,6 +9,7 @@ import Actor._ import akka.routing.{Routing, CyclicIterator} import Routing._ +import System.{currentTimeMillis => now} import java.util.concurrent.CountDownLatch /** From 4cd2693340104d6d0bf5675e620924ab59a8afa9 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 12:43:50 +1200 Subject: [PATCH 03/11] Automatically install pygments style --- akka-docs/Makefile | 17 +++++++++++++---- akka-docs/conf.py | 2 +- akka-docs/pygments/setup.py | 19 +++++++++++++++++++ akka-docs/pygments/styles/__init__.py | 0 .../akka.py => pygments/styles/simple.py} | 6 +++--- 5 files changed, 36 insertions(+), 8 deletions(-) create mode 100644 akka-docs/pygments/setup.py create mode 100644 akka-docs/pygments/styles/__init__.py rename akka-docs/{themes/akka/pygments/akka.py => pygments/styles/simple.py} (93%) diff --git a/akka-docs/Makefile b/akka-docs/Makefile index fedddbee17..d56a2fdf1e 100644 --- a/akka-docs/Makefile +++ b/akka-docs/Makefile @@ -6,16 +6,19 @@ SPHINXOPTS = SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build +EASYINSTALL = easy_install +PYGMENTSDIR = pygments # Internal variables. PAPEROPT_a4 = -D latex_paper_size=a4 PAPEROPT_letter = -D latex_paper_size=letter ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . -.PHONY: help clean html singlehtml latex pdf +.PHONY: help clean pygments html singlehtml latex pdf help: @echo "Please use \`make ' where is one of" + @echo " pygments to locally install the custom pygments styles" @echo " html to make standalone HTML files" @echo " singlehtml to make a single large HTML file" @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" @@ -24,7 +27,14 @@ help: clean: -rm -rf $(BUILDDIR)/* -html: +pygments: + $(EASYINSTALL) --user $(PYGMENTSDIR) + -rm -rf $(PYGMENTSDIR)/*.egg-info $(PYGMENTSDIR)/build $(PYGMENTSDIR)/temp + @echo + @echo "Custom pygments styles have been installed." + @echo + +html: pygments $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html @echo @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." @@ -41,9 +51,8 @@ latex: @echo "Run \`make' in that directory to run these through (pdf)latex" \ "(use \`make latexpdf' here to do that automatically)." -pdf: +pdf: pygments $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex @echo "Running LaTeX files through pdflatex..." make -C $(BUILDDIR)/latex all-pdf @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." - diff --git a/akka-docs/conf.py b/akka-docs/conf.py index 91c7bab0ba..00145ca0fb 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -19,7 +19,7 @@ copyright = u'2009-2011, Scalable Solutions AB' version = '1.1' release = '1.1' -pygments_style = 'akka' +pygments_style = 'simple' highlight_language = 'scala' add_function_parentheses = False show_authors = True diff --git a/akka-docs/pygments/setup.py b/akka-docs/pygments/setup.py new file mode 100644 index 0000000000..7c86a6a681 --- /dev/null +++ b/akka-docs/pygments/setup.py @@ -0,0 +1,19 @@ +""" +Akka syntax styles for Pygments. +""" + +from setuptools import setup + +entry_points = """ +[pygments.styles] +simple = styles.simple:SimpleStyle +""" + +setup( + name = 'akkastyles', + version = '0.1', + description = __doc__, + author = "Akka", + packages = ['styles'], + entry_points = entry_points +) diff --git a/akka-docs/pygments/styles/__init__.py b/akka-docs/pygments/styles/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-docs/themes/akka/pygments/akka.py b/akka-docs/pygments/styles/simple.py similarity index 93% rename from akka-docs/themes/akka/pygments/akka.py rename to akka-docs/pygments/styles/simple.py index af9fe61bf9..bdf3c7878e 100644 --- a/akka-docs/themes/akka/pygments/akka.py +++ b/akka-docs/pygments/styles/simple.py @@ -3,7 +3,7 @@ pygments.styles.akka ~~~~~~~~~~~~~~~~~~~~~~~~ - Akka style for Scala highlighting. + Simple style for Scala highlighting. """ from pygments.style import Style @@ -11,9 +11,9 @@ from pygments.token import Keyword, Name, Comment, String, Error, \ Number, Operator, Generic, Whitespace -class AkkaStyle(Style): +class SimpleStyle(Style): """ - Akka style for Scala highlighting. + Simple style for Scala highlighting. """ background_color = "#f0f0f0" From 0f6a2d1884d24f1615f3f34eace901c85c0f0cfc Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 12:46:35 +1200 Subject: [PATCH 04/11] Comment out pending toc entries --- akka-docs/index.rst | 135 ++++++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 67 deletions(-) diff --git a/akka-docs/index.rst b/akka-docs/index.rst index 9771b0044c..6e09dc2450 100644 --- a/akka-docs/index.rst +++ b/akka-docs/index.rst @@ -5,74 +5,75 @@ Contents :maxdepth: 2 manual/getting-started-first - pending/actor-registry-java - pending/actor-registry-scala - pending/actors-scala - pending/agents-scala - pending/articles - pending/benchmarks - pending/building-akka - pending/buildr - pending/cluster-membership - pending/companies-using-akka - pending/configuration - pending/dataflow-java - pending/dataflow-scala - pending/deployment-scenarios - pending/developer-guidelines - pending/dispatchers-java - pending/dispatchers-scala - pending/event-handler - pending/external-sample-projects - pending/fault-tolerance-java - pending/fault-tolerance-scala - pending/Feature Stability Matrix manual/fsm-scala - pending/futures-scala - pending/getting-started - pending/guice-integration - pending/Home - pending/http - pending/issue-tracking - pending/language-bindings - pending/licenses - pending/logging - pending/Migration-1.0-1.1 - pending/migration-guide-0.10.x-1.0.x - pending/migration-guide-0.7.x-0.8.x - pending/migration-guide-0.8.x-0.9.x - pending/migration-guide-0.9.x-0.10.x - pending/migration-guides - pending/Recipes - pending/release-notes - pending/remote-actors-java - pending/remote-actors-scala - pending/routing-java - pending/routing-scala - pending/scheduler - pending/security - pending/serialization-java - pending/serialization-scala - pending/servlet - pending/slf4j - pending/sponsors - pending/stm - pending/stm-java - pending/stm-scala - pending/team - pending/test - pending/testkit - pending/testkit-example - pending/third-party-integrations - pending/transactors-java - pending/transactors-scala - pending/tutorial-chat-server-java - pending/tutorial-chat-server-scala - pending/typed-actors-java - pending/typed-actors-scala - pending/untyped-actors-java - pending/use-cases - pending/web + +.. pending/actor-registry-java +.. pending/actor-registry-scala +.. pending/actors-scala +.. pending/agents-scala +.. pending/articles +.. pending/benchmarks +.. pending/building-akka +.. pending/buildr +.. pending/cluster-membership +.. pending/companies-using-akka +.. pending/configuration +.. pending/dataflow-java +.. pending/dataflow-scala +.. pending/deployment-scenarios +.. pending/developer-guidelines +.. pending/dispatchers-java +.. pending/dispatchers-scala +.. pending/event-handler +.. pending/external-sample-projects +.. pending/fault-tolerance-java +.. pending/fault-tolerance-scala +.. pending/Feature Stability Matrix +.. pending/futures-scala +.. pending/getting-started +.. pending/guice-integration +.. pending/Home +.. pending/http +.. pending/issue-tracking +.. pending/language-bindings +.. pending/licenses +.. pending/logging +.. pending/Migration-1.0-1.1 +.. pending/migration-guide-0.10.x-1.0.x +.. pending/migration-guide-0.7.x-0.8.x +.. pending/migration-guide-0.8.x-0.9.x +.. pending/migration-guide-0.9.x-0.10.x +.. pending/migration-guides +.. pending/Recipes +.. pending/release-notes +.. pending/remote-actors-java +.. pending/remote-actors-scala +.. pending/routing-java +.. pending/routing-scala +.. pending/scheduler +.. pending/security +.. pending/serialization-java +.. pending/serialization-scala +.. pending/servlet +.. pending/slf4j +.. pending/sponsors +.. pending/stm +.. pending/stm-java +.. pending/stm-scala +.. pending/team +.. pending/test +.. pending/testkit +.. pending/testkit-example +.. pending/third-party-integrations +.. pending/transactors-java +.. pending/transactors-scala +.. pending/tutorial-chat-server-java +.. pending/tutorial-chat-server-scala +.. pending/typed-actors-java +.. pending/typed-actors-scala +.. pending/untyped-actors-java +.. pending/use-cases +.. pending/web Links ===== From 14334a65afe9fea77468c64c585234cc0f7fddeb Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 12:48:41 +1200 Subject: [PATCH 05/11] Fix sbt download link --- akka-docs/manual/getting-started-first.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/manual/getting-started-first.rst b/akka-docs/manual/getting-started-first.rst index 0da7909187..9e9562b605 100644 --- a/akka-docs/manual/getting-started-first.rst +++ b/akka-docs/manual/getting-started-first.rst @@ -112,7 +112,7 @@ Downloading and installing SBT SBT, short for 'Simple Build Tool' is an excellent build system written in Scala. It uses Scala to write the build scripts which gives you a lot of power. It has a plugin architecture with many plugins available, something that we will take advantage of soon. SBT is the preferred way of building software in Scala and is probably the easiest way of getting through this tutorial. If you want to use SBT for this tutorial then follow the following instructions, if not you can skip this section and the next. -First browse to the `SBT download page`_ and download the ``0.7.6.RC0`` distribution. +First browse to the `SBT download page `_ and download the ``0.7.6.RC0`` distribution. To install SBT and create a project for this tutorial it is easiest to follow the instructions on `this page `_. From c179cbf623466d11a9c8826c5d18191a6f75b5e4 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 13:03:15 +1200 Subject: [PATCH 06/11] Add includecode directive for akka-docs --- .gitignore | 1 + akka-docs/conf.py | 3 +- akka-docs/exts/includecode.py | 135 ++++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 akka-docs/exts/includecode.py diff --git a/.gitignore b/.gitignore index 28bd0c884d..91c3a65819 100755 --- a/.gitignore +++ b/.gitignore @@ -46,5 +46,6 @@ multiverse.log .eprj .*.swp akka-docs/_build/ +*.pyc akka-tutorials/akka-tutorial-first/project/boot/ akka-tutorials/akka-tutorial-first/project/plugins/project/ \ No newline at end of file diff --git a/akka-docs/conf.py b/akka-docs/conf.py index 00145ca0fb..f0dd997167 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -7,7 +7,8 @@ import sys, os # -- General configuration ----------------------------------------------------- -extensions = ['sphinx.ext.todo'] +sys.path.append(os.path.abspath('exts')) +extensions = ['sphinx.ext.todo', 'includecode'] templates_path = ['_templates'] source_suffix = '.rst' diff --git a/akka-docs/exts/includecode.py b/akka-docs/exts/includecode.py new file mode 100644 index 0000000000..728af5f82f --- /dev/null +++ b/akka-docs/exts/includecode.py @@ -0,0 +1,135 @@ +import os +import codecs +from os import path + +from docutils import nodes +from docutils.parsers.rst import Directive, directives + +class IncludeCode(Directive): + """ + Include a code example from a file with sections delimited with special comments. + """ + + has_content = False + required_arguments = 1 + optional_arguments = 0 + final_argument_whitespace = False + option_spec = { + 'section': directives.unchanged_required, + 'comment': directives.unchanged_required, + 'marker': directives.unchanged_required, + 'include': directives.unchanged_required, + 'exclude': directives.unchanged_required, + 'linenos': directives.flag, + 'language': directives.unchanged_required, + 'encoding': directives.encoding, + 'prepend': directives.unchanged_required, + 'append': directives.unchanged_required, + } + + def run(self): + document = self.state.document + arg0 = self.arguments[0] + (filename, sep, section) = arg0.partition('#') + + if not document.settings.file_insertion_enabled: + return [document.reporter.warning('File insertion disabled', + line=self.lineno)] + env = document.settings.env + if filename.startswith('/') or filename.startswith(os.sep): + rel_fn = filename[1:] + else: + docdir = path.dirname(env.doc2path(env.docname, base=None)) + rel_fn = path.join(docdir, filename) + try: + fn = path.join(env.srcdir, rel_fn) + except UnicodeDecodeError: + # the source directory is a bytestring with non-ASCII characters; + # let's try to encode the rel_fn in the file system encoding + rel_fn = rel_fn.encode(sys.getfilesystemencoding()) + fn = path.join(env.srcdir, rel_fn) + + encoding = self.options.get('encoding', env.config.source_encoding) + codec_info = codecs.lookup(encoding) + try: + f = codecs.StreamReaderWriter(open(fn, 'U'), + codec_info[2], codec_info[3], 'strict') + lines = f.readlines() + f.close() + except (IOError, OSError): + return [document.reporter.warning( + 'Include file %r not found or reading it failed' % filename, + line=self.lineno)] + except UnicodeError: + return [document.reporter.warning( + 'Encoding %r used for reading included file %r seems to ' + 'be wrong, try giving an :encoding: option' % + (encoding, filename))] + + comment = self.options.get('comment', '//') + marker = self.options.get('marker', comment + '#') + lenm = len(marker) + if not section: + section = self.options.get('section') + include_sections = self.options.get('include', '') + exclude_sections = self.options.get('exclude', '') + include = set(include_sections.split(',')) if include_sections else set() + exclude = set(exclude_sections.split(',')) if exclude_sections else set() + if section: + include |= {section} + within = set() + res = [] + excluding = False + for line in lines: + index = line.find(marker) + if index >= 0: + section_name = line[index+lenm:].strip() + if section_name in within: + within ^= {section_name} + if excluding and not (exclude & within): + excluding = False + else: + within |= {section_name} + if not excluding and (exclude & within): + excluding = True + res.append(' ' * index + comment + ' ' + section_name.replace('-', ' ') + ' ...\n') + elif not (exclude & within) and (not include or (include & within)): + res.append(line) + lines = res + + def countwhile(predicate, iterable): + count = 0 + for x in iterable: + if predicate(x): + count += 1 + else: + return count + + nonempty = filter(lambda l: l.strip(), lines) + tabcounts = map(lambda l: countwhile(lambda c: c == ' ', l), nonempty) + tabshift = min(tabcounts) + + if tabshift > 0: + lines = map(lambda l: l[tabshift:] if len(l) > tabshift else l, lines) + + prepend = self.options.get('prepend') + append = self.options.get('append') + if prepend: + lines.insert(0, prepend + '\n') + if append: + lines.append(append + '\n') + + text = ''.join(lines) + retnode = nodes.literal_block(text, text, source=fn) + retnode.line = 1 + retnode.attributes['line_number'] = self.lineno + if self.options.get('language', ''): + retnode['language'] = self.options['language'] + if 'linenos' in self.options: + retnode['linenos'] = True + document.settings.env.note_dependency(rel_fn) + return [retnode] + +def setup(app): + app.require_sphinx('1.0') + app.add_directive('includecode', IncludeCode) From 6151d17413f687b03d6dcec5f06248a4b6cefb3f Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 13:40:13 +1200 Subject: [PATCH 07/11] Use new includecode directive for getting started docs --- akka-docs/exts/includecode.py | 27 +-- akka-docs/manual/examples/Pi.scala | 129 +++++++++++ akka-docs/manual/getting-started-first.rst | 235 ++------------------- 3 files changed, 165 insertions(+), 226 deletions(-) create mode 100644 akka-docs/manual/examples/Pi.scala diff --git a/akka-docs/exts/includecode.py b/akka-docs/exts/includecode.py index 728af5f82f..816126c697 100644 --- a/akka-docs/exts/includecode.py +++ b/akka-docs/exts/includecode.py @@ -15,16 +15,17 @@ class IncludeCode(Directive): optional_arguments = 0 final_argument_whitespace = False option_spec = { - 'section': directives.unchanged_required, - 'comment': directives.unchanged_required, - 'marker': directives.unchanged_required, - 'include': directives.unchanged_required, - 'exclude': directives.unchanged_required, - 'linenos': directives.flag, - 'language': directives.unchanged_required, - 'encoding': directives.encoding, - 'prepend': directives.unchanged_required, - 'append': directives.unchanged_required, + 'section': directives.unchanged_required, + 'comment': directives.unchanged_required, + 'marker': directives.unchanged_required, + 'include': directives.unchanged_required, + 'exclude': directives.unchanged_required, + 'hideexcludes': directives.flag, + 'linenos': directives.flag, + 'language': directives.unchanged_required, + 'encoding': directives.encoding, + 'prepend': directives.unchanged_required, + 'append': directives.unchanged_required, } def run(self): @@ -75,6 +76,7 @@ class IncludeCode(Directive): exclude_sections = self.options.get('exclude', '') include = set(include_sections.split(',')) if include_sections else set() exclude = set(exclude_sections.split(',')) if exclude_sections else set() + hideexcludes = 'hideexcludes' in self.options if section: include |= {section} within = set() @@ -92,7 +94,8 @@ class IncludeCode(Directive): within |= {section_name} if not excluding and (exclude & within): excluding = True - res.append(' ' * index + comment + ' ' + section_name.replace('-', ' ') + ' ...\n') + if not hideexcludes: + res.append(' ' * index + comment + ' ' + section_name.replace('-', ' ') + ' ...\n') elif not (exclude & within) and (not include or (include & within)): res.append(line) lines = res @@ -107,7 +110,7 @@ class IncludeCode(Directive): nonempty = filter(lambda l: l.strip(), lines) tabcounts = map(lambda l: countwhile(lambda c: c == ' ', l), nonempty) - tabshift = min(tabcounts) + tabshift = min(tabcounts) if tabcounts else 0 if tabshift > 0: lines = map(lambda l: l[tabshift:] if len(l) > tabshift else l, lines) diff --git a/akka-docs/manual/examples/Pi.scala b/akka-docs/manual/examples/Pi.scala new file mode 100644 index 0000000000..6bf1dea903 --- /dev/null +++ b/akka-docs/manual/examples/Pi.scala @@ -0,0 +1,129 @@ +//#imports +package akka.tutorial.scala.first + +import akka.actor.{Actor, PoisonPill} +import Actor._ +import akka.routing.{Routing, CyclicIterator} +import Routing._ + +import System.{currentTimeMillis => now} +import java.util.concurrent.CountDownLatch +//#imports + +//#app +object Pi extends App { + + calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) + + //#actors-and-messages + // ==================== + // ===== Messages ===== + // ==================== + //#messages + sealed trait PiMessage + case object Calculate extends PiMessage + case class Work(start: Int, nrOfElements: Int) extends PiMessage + case class Result(value: Double) extends PiMessage + //#messages + + // ================== + // ===== Worker ===== + // ================== + //#worker + class Worker extends Actor { + + //#calculate-pi + def calculatePiFor(start: Int, nrOfElements: Int): Double = { + var acc = 0.0 + for (i <- start until (start + nrOfElements)) + acc += 4 * math.pow(-1, i) / (2 * i + 1) + acc + } + //#calculate-pi + + def receive = { + case Work(start, nrOfElements) => + self reply Result(calculatePiFor(start, nrOfElements)) // perform the work + } + } + //#worker + + // ================== + // ===== Master ===== + // ================== + //#master + class Master( + nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) + extends Actor { + + var pi: Double = _ + var nrOfResults: Int = _ + var start: Long = _ + + //#create-workers + // create the workers + val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) + + // wrap them with a load-balancing router + val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() + //#create-workers + + //#master-receive + // message handler + def receive = { + //#message-handling + case Calculate => + // schedule work + for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) + + // send a PoisonPill to all workers telling them to shut down themselves + router ! Broadcast(PoisonPill) + + // send a PoisonPill to the router, telling him to shut himself down + router ! PoisonPill + + case Result(value) => + // handle result from the worker + pi += value + nrOfResults += 1 + if (nrOfResults == nrOfMessages) self.stop() + //#message-handling + } + //#master-receive + + override def preStart { + start = now + } + + override def postStop { + // tell the world that the calculation is complete + println( + "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" + .format(pi, (now - start))) + latch.countDown() + } + } + //#master + //#actors-and-messages + + // ================== + // ===== Run it ===== + // ================== + def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { + + // this latch is only plumbing to know when the calculation is completed + val latch = new CountDownLatch(1) + + // create the master + val master = actorOf( + new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start() + + // start the calculation + master ! Calculate + + // wait for master to shut down + latch.await() + } +} +//#app + diff --git a/akka-docs/manual/getting-started-first.rst b/akka-docs/manual/getting-started-first.rst index 9e9562b605..aaa466c1dc 100644 --- a/akka-docs/manual/getting-started-first.rst +++ b/akka-docs/manual/getting-started-first.rst @@ -172,17 +172,9 @@ Start writing the code Now it's about time to start hacking. -We start by creating a ``Pi.scala`` file and adding these import statements at the top of the file:: +We start by creating a ``Pi.scala`` file and adding these import statements at the top of the file: - package akka.tutorial.scala.first - - import akka.actor.{Actor, PoisonPill} - import Actor._ - import akka.routing.{Routing, CyclicIterator} - import Routing._ - import akka.dispatch.Dispatchers - - import java.util.concurrent.CountDownLatch +.. includecode:: examples/Pi.scala#imports If you are using SBT in this tutorial then create the file in the ``src/main/scala`` directory. @@ -199,49 +191,30 @@ With this in mind, let's now create the messages that we want to have flowing in - ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing the work assignment - ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor containing the result from the worker's calculation -Messages sent to actors should always be immutable to avoid sharing mutable state. In scala we have 'case classes' which make excellent messages. So let's start by creating three messages as case classes. We also create a common base trait for our messages (that we define as being ``sealed`` in order to prevent creating messages outside our control):: +Messages sent to actors should always be immutable to avoid sharing mutable state. In scala we have 'case classes' which make excellent messages. So let's start by creating three messages as case classes. We also create a common base trait for our messages (that we define as being ``sealed`` in order to prevent creating messages outside our control): - sealed trait PiMessage - - case object Calculate extends PiMessage - - case class Work(start: Int, nrOfElements: Int) extends PiMessage - - case class Result(value: Double) extends PiMessage +.. includecode:: examples/Pi.scala#messages Creating the worker ------------------- -Now we can create the worker actor. This is done by mixing in the ``Actor`` trait and defining the ``receive`` method. The ``receive`` method defines our message handler. We expect it to be able to handle the ``Work`` message so we need to add a handler for this message:: +Now we can create the worker actor. This is done by mixing in the ``Actor`` trait and defining the ``receive`` method. The ``receive`` method defines our message handler. We expect it to be able to handle the ``Work`` message so we need to add a handler for this message: - class Worker extends Actor { - def receive = { - case Work(start, nrOfElements) => - self reply Result(calculatePiFor(start, nrOfElements)) // perform the work - } - } +.. includecode:: examples/Pi.scala#worker + :exclude: calculate-pi As you can see we have now created an ``Actor`` with a ``receive`` method as a handler for the ``Work`` message. In this handler we invoke the ``calculatePiFor(..)`` method, wrap the result in a ``Result`` message and send it back to the original sender using ``self.reply``. In Akka the sender reference is implicitly passed along with the message so that the receiver can always reply or store away the sender reference for future use. -The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method. While there are many ways we can implement this algorithm in Scala, in this introductory tutorial we have chosen an imperative style using a for comprehension and an accumulator:: +The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method. While there are many ways we can implement this algorithm in Scala, in this introductory tutorial we have chosen an imperative style using a for comprehension and an accumulator: - def calculatePiFor(start: Int, nrOfElements: Int): Double = { - var acc = 0.0 - for (i <- start until (start + nrOfElements)) - acc += 4 * math.pow(-1, i) / (2 * i + 1) - acc - } +.. includecode:: examples/Pi.scala#calculate-pi Creating the master ------------------- -The master actor is a little bit more involved. In its constructor we need to create the workers (the ``Worker`` actors) and start them. We will also wrap them in a load-balancing router to make it easier to spread out the work evenly between the workers. Let's do that first:: +The master actor is a little bit more involved. In its constructor we need to create the workers (the ``Worker`` actors) and start them. We will also wrap them in a load-balancing router to make it easier to spread out the work evenly between the workers. Let's do that first: - // create the workers - val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) - - // wrap them with a load-balancing router - val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() +.. includecode:: examples/Pi.scala#create-workers As you can see we are using the ``actorOf`` factory method to create actors, this method returns as an ``ActorRef`` which is a reference to our newly created actor. This method is available in the ``Actor`` object but is usually imported:: @@ -253,33 +226,10 @@ Now we have a router that is representing all our workers in a single abstractio - ``nrOfMessages`` -- defining how many number chunks to send out to the workers - ``nrOfElements`` -- defining how big the number chunks sent to each worker should be -Here is the master actor:: +Here is the master actor: - class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) - extends Actor { - - var pi: Double = _ - var nrOfResults: Int = _ - var start: Long = _ - - // create the workers - val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) - - // wrap them with a load-balancing router - val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() - - def receive = { ... } - - override def preStart { - start = now - } - - override def postStop { - // tell the world that the calculation is complete - println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) - latch.countDown() - } - } +.. includecode:: examples/Pi.scala#master + :exclude: message-handling A couple of things are worth explaining further. @@ -296,170 +246,27 @@ The ``Calculate`` handler is sending out work to all the ``Worker`` actors and a The ``Result`` handler is simpler, here we get the value from the ``Result`` message and aggregate it to our ``pi`` member variable. We also keep track of how many results we have received back, and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and shuts down. -Let's capture this in code:: +Let's capture this in code: - // message handler - def receive = { - case Calculate => - // schedule work - for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) - - // send a PoisonPill to all workers telling them to shut down themselves - router ! Broadcast(PoisonPill) - - // send a PoisonPill to the router, telling him to shut himself down - router ! PoisonPill - - case Result(value) => - // handle result from the worker - pi += value - nrOfResults += 1 - if (nrOfResults == nrOfMessages) self.stop() - } +.. includecode:: examples/Pi.scala#master-receive Bootstrap the calculation ------------------------- Now the only thing that is left to implement is the runner that should bootstrap and run the calculation for us. We do that by creating an object that we call ``Pi``, here we can extend the ``App`` trait in Scala, which means that we will be able to run this as an application directly from the command line. -The ``Pi`` object is a perfect container module for our actors and messages, so let's put them all there. We also create a method ``calculate`` in which we start up the ``Master`` actor and wait for it to finish:: +The ``Pi`` object is a perfect container module for our actors and messages, so let's put them all there. We also create a method ``calculate`` in which we start up the ``Master`` actor and wait for it to finish: - object Pi extends App { - - calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - - ... // actors and messages - - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - - // this latch is only plumbing to know when the calculation is completed - val latch = new CountDownLatch(1) - - // create the master - val master = actorOf( - new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start() - - // start the calculation - master ! Calculate - - // wait for master to shut down - latch.await() - } - } +.. includecode:: examples/Pi.scala#app + :exclude: actors-and-messages That's it. Now we are done. -But before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all:: +But before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all: - package akka.tutorial.scala.first +.. includecode:: examples/Pi.scala - import akka.actor.{Actor, PoisonPill} - import Actor._ - import akka.routing.{Routing, CyclicIterator} - import Routing._ - import java.util.concurrent.CountDownLatch - - object Pi extends App { - - calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - - // ==================== - // ===== Messages ===== - // ==================== - sealed trait PiMessage - case object Calculate extends PiMessage - case class Work(start: Int, nrOfElements: Int) extends PiMessage - case class Result(value: Double) extends PiMessage - - // ================== - // ===== Worker ===== - // ================== - class Worker extends Actor { - - // define the work - def calculatePiFor(start: Int, nrOfElements: Int): Double = { - var acc = 0.0 - for (i <- start until (start + nrOfElements)) - acc += 4 * math.pow(-1, i) / (2 * i + 1) - acc - } - - def receive = { - case Work(start, nrOfElements) => - self reply Result(calculatePiFor(start, nrOfElements)) // perform the work - } - } - - // ================== - // ===== Master ===== - // ================== - class Master( - nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) - extends Actor { - - var pi: Double = _ - var nrOfResults: Int = _ - var start: Long = _ - - // create the workers - val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) - - // wrap them with a load-balancing router - val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() - - // message handler - def receive = { - case Calculate => - // schedule work - //for (arg <- 0 until nrOfMessages) router ! Work(arg, nrOfElements) - for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) - - // send a PoisonPill to all workers telling them to shut down themselves - router ! Broadcast(PoisonPill) - - // send a PoisonPill to the router, telling him to shut himself down - router ! PoisonPill - - case Result(value) => - // handle result from the worker - pi += value - nrOfResults += 1 - if (nrOfResults == nrOfMessages) self.stop() - } - - override def preStart { - start = now - } - - override def postStop { - // tell the world that the calculation is complete - println( - "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" - .format(pi, (now - start))) - latch.countDown() - } - } - - // ================== - // ===== Run it ===== - // ================== - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - - // this latch is only plumbing to know when the calculation is completed - val latch = new CountDownLatch(1) - - // create the master - val master = actorOf( - new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start() - - // start the calculation - master ! Calculate - - // wait for master to shut down - latch.await() - } - } Run it as a command line application ------------------------------------ From f4fd3ed814c561a4a6b02a427ecdaa5de5b72baf Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 14:27:48 +1200 Subject: [PATCH 08/11] Rework local python packages for akka-docs Hopefully this is more cross-platform friendly. --- akka-docs/Makefile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/akka-docs/Makefile b/akka-docs/Makefile index d56a2fdf1e..d2be158d10 100644 --- a/akka-docs/Makefile +++ b/akka-docs/Makefile @@ -7,6 +7,7 @@ SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build EASYINSTALL = easy_install +LOCALPACKAGES = $(shell pwd)/$(BUILDDIR)/site-packages PYGMENTSDIR = pygments # Internal variables. @@ -14,6 +15,10 @@ PAPEROPT_a4 = -D latex_paper_size=a4 PAPEROPT_letter = -D latex_paper_size=letter ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# Set python path to include local packages for pygments styles. +PYTHONPATH += $(LOCALPACKAGES) +export PYTHONPATH + .PHONY: help clean pygments html singlehtml latex pdf help: @@ -28,7 +33,8 @@ clean: -rm -rf $(BUILDDIR)/* pygments: - $(EASYINSTALL) --user $(PYGMENTSDIR) + mkdir $(LOCALPACKAGES) + $(EASYINSTALL) --install-dir $(LOCALPACKAGES) $(PYGMENTSDIR) -rm -rf $(PYGMENTSDIR)/*.egg-info $(PYGMENTSDIR)/build $(PYGMENTSDIR)/temp @echo @echo "Custom pygments styles have been installed." From 4f79a2a3de82a06321c2856418ee99cc45fed92d Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 14:34:47 +1200 Subject: [PATCH 09/11] mkdir -p --- akka-docs/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/Makefile b/akka-docs/Makefile index d2be158d10..7b803258cb 100644 --- a/akka-docs/Makefile +++ b/akka-docs/Makefile @@ -33,7 +33,7 @@ clean: -rm -rf $(BUILDDIR)/* pygments: - mkdir $(LOCALPACKAGES) + mkdir -p $(LOCALPACKAGES) $(EASYINSTALL) --install-dir $(LOCALPACKAGES) $(PYGMENTSDIR) -rm -rf $(PYGMENTSDIR)/*.egg-info $(PYGMENTSDIR)/build $(PYGMENTSDIR)/temp @echo From 897504f973b1dec508463304b6d63682d90bbf79 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 14:40:04 +1200 Subject: [PATCH 10/11] Make includecode directive python 2.6 friendly --- akka-docs/exts/includecode.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-docs/exts/includecode.py b/akka-docs/exts/includecode.py index 816126c697..c12ddfa7f4 100644 --- a/akka-docs/exts/includecode.py +++ b/akka-docs/exts/includecode.py @@ -78,7 +78,7 @@ class IncludeCode(Directive): exclude = set(exclude_sections.split(',')) if exclude_sections else set() hideexcludes = 'hideexcludes' in self.options if section: - include |= {section} + include |= set([section]) within = set() res = [] excluding = False @@ -87,11 +87,11 @@ class IncludeCode(Directive): if index >= 0: section_name = line[index+lenm:].strip() if section_name in within: - within ^= {section_name} + within ^= set([section_name]) if excluding and not (exclude & within): excluding = False else: - within |= {section_name} + within |= set([section_name]) if not excluding and (exclude & within): excluding = True if not hideexcludes: From 427a6cf63e2db05f0c9ce8483ab468fd9c36a572 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 19 Apr 2011 17:45:01 +1200 Subject: [PATCH 11/11] Rework routing spec - failing under jenkins --- .../test/scala/akka/routing/RoutingSpec.scala | 975 +++++++++--------- 1 file changed, 485 insertions(+), 490 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index d79bd0651e..96f93fc160 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -1,537 +1,532 @@ package akka.actor.routing +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testing._ +import akka.testing.Testing.{sleepFor, testMillis} +import akka.util.duration._ + import akka.actor.Actor import akka.actor.Actor._ - -import org.scalatest.Suite -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.matchers.MustMatchers -import org.junit.Test - -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{CountDownLatch, TimeUnit} import akka.routing._ -@RunWith(classOf[JUnitRunner]) -class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers { +import java.util.concurrent.atomic.AtomicInteger + + +class RoutingSpec extends WordSpec with MustMatchers { import Routing._ - @Test def testDispatcher = { - val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4") - val targetOk = new AtomicInteger(0) - val t1 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() + "Routing" must { - val t2 = actorOf( new Actor() { - def receive = { - case `testMsg3` => self.reply(11) - } - }).start() + "dispatch" in { + val Test1 = "test1" + val Test2 = "test2" + val Test3 = "test3" - val d = dispatcherActor { - case `testMsg1`|`testMsg2` => t1 - case `testMsg3` => t2 - }.start() - - val result = for { - a <- (d !! (testMsg1, 5000)).as[Int] - b <- (d !! (testMsg2, 5000)).as[Int] - c <- (d !! (testMsg3, 5000)).as[Int] - } yield a + b + c - - result.isDefined must be (true) - result.get must be(21) - - for(a <- List(t1,t2,d)) a.stop() - } - - @Test def testLogger = { - val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any] - val latch = new CountDownLatch(2) - val t1 = actorOf(new Actor { def receive = { case _ => } }).start() - val l = loggerActor(t1,(x) => { msgs.add(x); latch.countDown() }).start() - val foo : Any = "foo" - val bar : Any = "bar" - l ! foo - l ! bar - val done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - msgs must ( have size (2) and contain (foo) and contain (bar) ) - t1.stop() - l.stop() - } - - @Test def testSmallestMailboxFirstDispatcher = { - val t1ProcessedCount = new AtomicInteger(0) - val latch = new CountDownLatch(500) - val t1 = actorOf(new Actor { - def receive = { - case x => - Thread.sleep(50) // slow actor - t1ProcessedCount.incrementAndGet - latch.countDown() - } - }).start() - - val t2ProcessedCount = new AtomicInteger(0) - val t2 = actorOf(new Actor { - def receive = { - case x => t2ProcessedCount.incrementAndGet - latch.countDown() - } - }).start() - val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - for (i <- 1 to 500) d ! i - val done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - t1ProcessedCount.get must be < (t2ProcessedCount.get) // because t1 is much slower and thus has a bigger mailbox all the time - for(a <- List(t1,t2,d)) a.stop() - } - - @Test def testListener = { - val latch = new CountDownLatch(2) - val foreachListener = new CountDownLatch(2) - val num = new AtomicInteger(0) - val i = actorOf(new Actor with Listeners { - def receive = listenerManagement orElse { - case "foo" => gossip("bar") - } - }) - i.start() - - def newListener = actorOf(new Actor { - def receive = { - case "bar" => - num.incrementAndGet - latch.countDown() - case "foo" => foreachListener.countDown() - } - }).start() - - val a1 = newListener - val a2 = newListener - val a3 = newListener - - i ! Listen(a1) - i ! Listen(a2) - i ! Listen(a3) - i ! Deafen(a3) - i ! WithListeners(_ ! "foo") - i ! "foo" - - val done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - num.get must be (2) - val withListeners = foreachListener.await(5,TimeUnit.SECONDS) - withListeners must be (true) - for(a <- List(i,a1,a2,a3)) a.stop() - } - - @Test def testIsDefinedAt = { - import akka.actor.ActorRef - - val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4") - - val t1 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val t2 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val t3 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val t4 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) - - t1.isDefinedAt(testMsg1) must be (true) - t1.isDefinedAt(testMsg3) must be (false) - t2.isDefinedAt(testMsg1) must be (true) - t2.isDefinedAt(testMsg3) must be (false) - d1.isDefinedAt(testMsg1) must be (true) - d1.isDefinedAt(testMsg3) must be (false) - d2.isDefinedAt(testMsg1) must be (true) - d2.isDefinedAt(testMsg3) must be (false) - - for(a <- List(t1,t2,d1,d2)) a.stop() - } - - // Actor Pool Capacity Tests - - // - // make sure the pool is of the fixed, expected capacity - // - @Test def testFixedCapacityActorPool = { - val latch = new CountDownLatch(2) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with FixedCapacityStrategy - with SmallestMailboxSelector - { - def factory = actorOf(new Actor { + val t1 = actorOf(new Actor { def receive = { - case _ => - counter.incrementAndGet - latch.countDown() - self reply_? "success" + case Test1 => self.reply(3) + case Test2 => self.reply(7) } - }) + }).start() - def limit = 2 - def selectionCount = 1 - def partialFill = true - def instance = factory - def receive = _route - } - - val successes = new CountDownLatch(2) - implicit val successCounterActor = Some(actorOf(new Actor { - def receive = { - case "success" => successes.countDown() - } - }).start()) - - val pool = actorOf(new TestPool).start() - pool ! "a" - pool ! "b" - - latch.await(1,TimeUnit.SECONDS) must be (true) - successes.await(1,TimeUnit.SECONDS) must be (true) - counter.get must be (2) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - pool stop - } - - @Test def testTicket705 = { - - val actorPool = actorOf(new Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with BasicFilter { - //with BasicNoBackoffFilter { - def lowerBound = 2 - def upperBound = 20 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - def receive = { - case req: String => { - Thread.sleep(10L) - self.reply_?("Response") - } - } - }) - }).start() - - try { - (for(count <- 1 to 500) yield actorPool.!!![String]("Test", 20000)) foreach { - _.await.resultOrException.get must be ("Response") - } - } finally { - actorPool.stop() - } - } - - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of blocking pooled actors - // - @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { - - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with ActiveFuturesPressureCapacitor - with SmallestMailboxSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { + val t2 = actorOf( new Actor() { def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet + case Test3 => self.reply(11) + } + }).start() + + val d = dispatcherActor { + case Test1 | Test2 => t1 + case Test3 => t2 + }.start() + + val result = for { + a <- (d !! (Test1, testMillis(5 seconds))).as[Int] + b <- (d !! (Test2, testMillis(5 seconds))).as[Int] + c <- (d !! (Test3, testMillis(5 seconds))).as[Int] + } yield a + b + c + + result.isDefined must be (true) + result.get must be (21) + + for(a <- List(t1, t2, d)) a.stop() + } + + "have messages logged" in { + val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any] + val latch = TestLatch(2) + + val actor = actorOf(new Actor { + def receive = { case _ => } + }).start() + + val logger = loggerActor(actor, x => { msgs.add(x); latch.countDown() }).start() + + val foo: Any = "foo" + val bar: Any = "bar" + + logger ! foo + logger ! bar + + latch.await + + msgs must ( have size (2) and contain (foo) and contain (bar) ) + + actor.stop() + logger.stop() + } + + "dispatch to smallest mailbox" in { + val t1Count = new AtomicInteger(0) + val t2Count = new AtomicInteger(0) + val latch = TestLatch(500) + + val t1 = actorOf(new Actor { + def receive = { + case x => + sleepFor(50 millis) // slow actor + t1Count.incrementAndGet latch.countDown() } - }) + }).start() - def lowerBound = 2 - def upperBound = 4 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } - - // - // first message should create the minimum number of delgates - // - val pool = actorOf(new TestPool).start() - pool ! 1 - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool !!! t - Thread.sleep(50) - } - } - - // - // 2 more should go thru w/out triggering more - // - loops = 2 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - // - // a whole bunch should max it out - // - loops = 10 - loop(500) - - done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) - - pool stop - } - - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of messages in the delegate mailboxes - // - @Test def testBoundedCapacityActorPoolWithMailboxPressure = { - - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { + val t2 = actorOf(new Actor { def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet + case x => + t2Count.incrementAndGet latch.countDown() } - }) + }).start() - def lowerBound = 2 - def upperBound = 4 - def pressureThreshold = 3 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route + val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) + + for (i <- 1 to 500) d ! i + + latch.await(10 seconds) + + // because t1 is much slower and thus has a bigger mailbox all the time + t1Count.get must be < (t2Count.get) + + for(a <- List(t1, t2, d)) a.stop() } - val pool = actorOf(new TestPool).start() - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool ! t + "listen" in { + val fooLatch = TestLatch(2) + val barLatch = TestLatch(2) + val barCount = new AtomicInteger(0) + + val broadcast = actorOf(new Actor with Listeners { + def receive = listenerManagement orElse { + case "foo" => gossip("bar") } - } + }).start() + + def newListener = actorOf(new Actor { + def receive = { + case "bar" => + barCount.incrementAndGet + barLatch.countDown() + case "foo" => + fooLatch.countDown() + } + }).start() + + val a1 = newListener + val a2 = newListener + val a3 = newListener + + broadcast ! Listen(a1) + broadcast ! Listen(a2) + broadcast ! Listen(a3) + + broadcast ! Deafen(a3) + + broadcast ! WithListeners(_ ! "foo") + broadcast ! "foo" + + barLatch.await + barCount.get must be (2) + + fooLatch.await + + for(a <- List(broadcast, a1 ,a2 ,a3)) a.stop() + } + + "be defined at" in { + import akka.actor.ActorRef + + val Yes = "yes" + val No = "no" + + def testActor() = actorOf( new Actor() { + def receive = { + case Yes => "yes" + } + }).start() + + val t1 = testActor() + val t2 = testActor() + val t3 = testActor() + val t4 = testActor() + + val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) + val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) + + t1.isDefinedAt(Yes) must be (true) + t1.isDefinedAt(No) must be (false) + t2.isDefinedAt(Yes) must be (true) + t2.isDefinedAt(No) must be (false) + d1.isDefinedAt(Yes) must be (true) + d1.isDefinedAt(No) must be (false) + d2.isDefinedAt(Yes) must be (true) + d2.isDefinedAt(No) must be (false) + + for(a <- List(t1, t2, d1, d2)) a.stop() + } + } + + "Actor Pool" must { + + "have expected capacity" in { + val latch = TestLatch(2) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool + with FixedCapacityStrategy + with SmallestMailboxSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + count.incrementAndGet + latch.countDown() + self reply_? "success" + } + }).start() + + def limit = 2 + def selectionCount = 1 + def partialFill = true + def instance = factory + def receive = _route + }).start() + + val successes = TestLatch(2) + val successCounter = Some(actorOf(new Actor { + def receive = { + case "success" => successes.countDown() + } + }).start()) + + implicit val replyTo = successCounter + pool ! "a" + pool ! "b" + + latch.await + successes.await + + count.get must be (2) + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + pool.stop() + } + + + "pass ticket #705" in { + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with BasicFilter + { + def lowerBound = 2 + def upperBound = 20 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + def receive = { + case req: String => { + sleepFor(10 millis) + self.reply_?("Response") + } + } + }) + }).start() + + try { + (for (count <- 1 to 500) yield pool.!!![String]("Test", 20000)) foreach { + _.await.resultOrException.get must be ("Response") + } + } finally { + pool.stop() + } + } + + "grow as needed under pressure" in { + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of blocking pooled actors + + var latch = TestLatch(3) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with ActiveFuturesPressureCapacitor + with SmallestMailboxSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case n: Int => + sleepFor(n millis) + count.incrementAndGet + latch.countDown() + } + }) + + def lowerBound = 2 + def upperBound = 4 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + }).start() + + // first message should create the minimum number of delgates + + pool ! 1 + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + var loops = 0 + def loop(t: Int) = { + latch = TestLatch(loops) + count.set(0) + for (m <- 0 until loops) { + pool !!! t + sleepFor(50 millis) + } + } + + // 2 more should go thru without triggering more + + loops = 2 + + loop(500) + latch.await + count.get must be (loops) + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + // a whole bunch should max it out + + loops = 10 + loop(500) + latch.await + count.get must be (loops) + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) + + pool.stop() + } + + "grow as needed under mailbox pressure" in { + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of messages in the delegate mailboxes + + var latch = TestLatch(3) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case n: Int => + sleepFor(n millis) + count.incrementAndGet + latch.countDown() + } + }) + + def lowerBound = 2 + def upperBound = 4 + def pressureThreshold = 3 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + }).start() + + var loops = 0 + def loop(t: Int) = { + latch = TestLatch(loops) + count.set(0) + for (m <- 0 until loops) { + pool ! t + } + } + + // send a few messages and observe pool at its lower bound + loops = 3 + loop(500) + latch.await + count.get must be (loops) - // - // send a few messages and observe pool at its lower bound - // - loops = 3 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - // // send a bunch over the theshold and observe an increment - // loops = 15 loop(500) - done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) + latch.await(10 seconds) + count.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) - pool stop - } - - // Actor Pool Selector Tests - - @Test def testRoundRobinSelector = { - - var latch = new CountDownLatch(2) - val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - class TestPool1 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown() - } - }) - - def limit = 1 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = true - def instance = factory - def receive = _route + pool.stop() } - val pool1 = actorOf(new TestPool1).start() - pool1 ! "a" - pool1 ! "b" - var done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (1) - pool1 stop + "round robin" in { + val latch1 = TestLatch(2) + val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - class TestPool2 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown() - } - }) + val pool1 = actorOf( + new Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch1.countDown() + } + }) - def limit = 2 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = false - def instance = factory - def receive = _route + def limit = 1 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = true + def instance = factory + def receive = _route + }).start() + + pool1 ! "a" + pool1 ! "b" + + latch1.await + delegates.size must be (1) + + pool1.stop() + + val latch2 = TestLatch(2) + delegates.clear() + + val pool2 = actorOf( + new Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch2.countDown() + } + }) + + def limit = 2 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = false + def instance = factory + def receive = _route + }).start() + + pool2 ! "a" + pool2 ! "b" + + latch2.await + delegates.size must be (2) + + pool2.stop() } - latch = new CountDownLatch(2) - delegates clear + "backoff" in { + val latch = TestLatch(10) - val pool2 = actorOf(new TestPool2).start() - pool2 ! "a" - pool2 ! "b" - done = latch.await(1, TimeUnit.SECONDS) - done must be (true) - delegates.size must be (2) - pool2 stop - } - - // Actor Pool Filter Tests + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with Filter + with RunningMeanBackoff + with BasicRampup + { + def factory = actorOf(new Actor { + def receive = { + case n: Int => + sleepFor(n millis) + latch.countDown() + } + }) - // - // reuse previous test to max pool then observe filter reducing capacity over time - // - @Test def testBoundedCapacityActorPoolWithBackoffFilter = { + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = factory + def receive = _route + }).start() - var latch = new CountDownLatch(10) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with Filter - with RunningMeanBackoff - with BasicRampup - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - latch.countDown() - } - }) + // put some pressure on the pool - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = factory - def receive = _route + for (m <- 0 to 10) pool ! 250 + + sleepFor(5 millis) + + val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size + + z must be >= (2) + + // let it cool down + + for (m <- 0 to 3) { + pool ! 1 + sleepFor(500 millis) + } + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) + + pool.stop() } - - - // - // put some pressure on the pool - // - val pool = actorOf(new TestPool).start() - for (m <- 0 to 10) pool ! 250 - Thread.sleep(5) - val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size - z must be >= (2) - var done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - - - // - // - // - for (m <- 0 to 3) { - pool ! 1 - Thread.sleep(500) - } - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) - - pool stop } } +