fixed performance problem in dispatcher

This commit is contained in:
jboner 2009-07-28 10:45:41 +02:00
parent bf50299629
commit 5b56bd0b79
11 changed files with 424 additions and 403 deletions

View file

@ -1246,17 +1246,6 @@
<root url="jar://$MAVEN_REPOSITORY$/com/facebook/fb303/1.0/fb303-1.0-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: org.apache.commons:commons-collections:3.2.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-collections/3.2.1/commons-collections-3.2.1.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-collections/3.2.1/commons-collections-3.2.1-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-collections/3.2.1/commons-collections-3.2.1-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: high-scale-lib:high-scale-lib:1.0">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/high-scale-lib/high-scale-lib/1.0/high-scale-lib-1.0.jar!/" />
@ -1268,17 +1257,6 @@
<root url="jar://$MAVEN_REPOSITORY$/high-scale-lib/high-scale-lib/1.0/high-scale-lib-1.0-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: org.apache.commons:commons-lang:2.4">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-lang/2.4/commons-lang-2.4.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-lang/2.4/commons-lang-2.4-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-lang/2.4/commons-lang-2.4-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: org.scala-lang:scala-compiler:2.7.4">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/scala-lang/scala-compiler/2.7.4/scala-compiler-2.7.4.jar!/" />
@ -1456,6 +1434,39 @@
<root url="jar://$MAVEN_REPOSITORY$/com/twitter/scala-json/1.0/scala-json-1.0-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: commons-collections:commons-collections:3.2.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: commons-lang:commons-lang:2.4">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/commons-lang/commons-lang/2.4/commons-lang-2.4.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/commons-lang/commons-lang/2.4/commons-lang-2.4-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/commons-lang/commons-lang/2.4/commons-lang-2.4-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: commons-logging:commons-logging:1.0.4">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/commons-logging/commons-logging/1.0.4/commons-logging-1.0.4.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/commons-logging/commons-logging/1.0.4/commons-logging-1.0.4-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/commons-logging/commons-logging/1.0.4/commons-logging-1.0.4-sources.jar!/" />
</SOURCES>
</library>
</component>
<UsedPathMacros>
<macro name="MAVEN_REPOSITORY" description="Maven Local Repostiry" />

355
akka.iws
View file

@ -6,16 +6,21 @@
</component>
<component name="ChangeListManager">
<list default="true" id="212ccd86-01aa-4780-a2f0-0d130be5abd2" name="Test" comment="Test">
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteClient.scala" afterPath="$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteClient.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/test/scala/RemoteSupervisorSpec.scala" afterPath="$PROJECT_DIR$/kernel/src/test/scala/RemoteSupervisorSpec.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala" afterPath="$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/akka-kernel.iml" afterPath="$PROJECT_DIR$/kernel/akka-kernel.iml" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/fun-test-java/akka-fun-test-java.iml" afterPath="$PROJECT_DIR$/fun-test-java/akka-fun-test-java.iml" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/samples-scala/akka-samples-scala.iml" afterPath="$PROJECT_DIR$/samples-scala/akka-samples-scala.iml" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala" afterPath="$PROJECT_DIR$/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/main/scala/reactor/MessageDispatcherBase.scala" afterPath="$PROJECT_DIR$/kernel/src/main/scala/reactor/MessageDispatcherBase.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala" afterPath="$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/samples-java/akka-samples-java.iml" afterPath="$PROJECT_DIR$/samples-java/akka-samples-java.iml" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/akka.ipr" afterPath="$PROJECT_DIR$/akka.ipr" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/akka.iws" afterPath="$PROJECT_DIR$/akka.iws" />
<change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteServer.scala" afterPath="$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteServer.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/test/scala/Messages.scala" afterPath="$PROJECT_DIR$/kernel/src/test/scala/Messages.scala" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/src/main/scala/reactor/Reactor.scala" afterPath="$PROJECT_DIR$/kernel/src/main/scala/reactor/Reactor.scala" />
</list>
<list readonly="true" id="6e842704-fac6-40e9-8a67-d02385f87db9" name="Default" comment="&#10;# Brief commit desciption here&#10;&#10;# Full commit description here (comment lines starting with '#' will not be included)&#10;&#10;" />
<ignored path=".idea/workspace.xml" />
<ignored path="akka.iws" />
<ignored path=".idea/workspace.xml" />
</component>
<component name="ChangesViewManager" flattened_view="true" show_ignored="false" />
<component name="Commander">
@ -32,7 +37,33 @@
</component>
<component name="DebuggerManager">
<line_breakpoints>
<breakpoint url="file://$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala" line="10" class="Class at Binary.scala:10" package="">
<breakpoint url="file://$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala" line="6" class="Class at Binary.scala:6" package="">
<option name="ENABLED" value="true" />
<option name="LOG_ENABLED" value="false" />
<option name="LOG_EXPRESSION_ENABLED" value="false" />
<option name="SUSPEND_POLICY" value="SuspendAll" />
<option name="COUNT_FILTER_ENABLED" value="false" />
<option name="COUNT_FILTER" value="0" />
<option name="CONDITION_ENABLED" value="false" />
<option name="CLASS_FILTERS_ENABLED" value="false" />
<option name="INSTANCE_FILTERS_ENABLED" value="false" />
<option name="CONDITION" value="" />
<option name="LOG_MESSAGE" value="" />
</breakpoint>
<breakpoint url="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/MessageDispatcherBase.scala" line="24" class="Class at MessageDispatcherBase.scala:24" package="">
<option name="ENABLED" value="true" />
<option name="LOG_ENABLED" value="false" />
<option name="LOG_EXPRESSION_ENABLED" value="false" />
<option name="SUSPEND_POLICY" value="SuspendAll" />
<option name="COUNT_FILTER_ENABLED" value="false" />
<option name="COUNT_FILTER" value="0" />
<option name="CONDITION_ENABLED" value="false" />
<option name="CLASS_FILTERS_ENABLED" value="false" />
<option name="INSTANCE_FILTERS_ENABLED" value="false" />
<option name="CONDITION" value="" />
<option name="LOG_MESSAGE" value="" />
</breakpoint>
<breakpoint url="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala" line="121" class="Class at EventBasedThreadPoolDispatcher.scala:121" package="">
<option name="ENABLED" value="true" />
<option name="LOG_ENABLED" value="false" />
<option name="LOG_EXPRESSION_ENABLED" value="false" />
@ -108,55 +139,73 @@
<component name="FileColors" enabled="false" enabledForTabs="false" />
<component name="FileEditorManager">
<leaf>
<file leaf-file-name="RemoteProtocolBuilder.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteProtocolBuilder.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="31" column="76" selection-start="1647" selection-end="1647" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="EventBasedThreadPoolDispatcher.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="132" column="2" selection-start="4660" selection-end="4660" vertical-scroll-proportion="-14.153846">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="EventBasedThreadPoolDispatcherTest.scala" pinned="false" current="true" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="161" column="168" selection-start="6003" selection-end="6003" vertical-scroll-proportion="0.9074253">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="Reactor.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/Reactor.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="34" column="6" selection-start="695" selection-end="695" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="EventBasedSingleThreadDispatcher.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="28" column="48" selection-start="1154" selection-end="1154" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="ThreadBasedDispatcher.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="14" column="6" selection-start="401" selection-end="401" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="MessageDispatcherBase.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/MessageDispatcherBase.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="49" column="41" selection-start="1388" selection-end="1388" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="Actor.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/actor/Actor.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="414" column="33" selection-start="14988" selection-end="14988" vertical-scroll-proportion="-24.615385">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="RemoteProtocolBuilder.scala" pinned="false" current="true" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteProtocolBuilder.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="14" column="0" selection-start="410" selection-end="410" vertical-scroll-proportion="0.18181819">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="RemoteSupervisorSpec.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/RemoteSupervisorSpec.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="638" column="28" selection-start="16462" selection-end="16462" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="Messages.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/Messages.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="19" column="33" selection-start="564" selection-end="564" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="RemoteActorSpec.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/RemoteActorSpec.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="27" column="6" selection-start="652" selection-end="652" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="Serializable.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/serialization/Serializable.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="12" column="1" selection-start="340" selection-end="340" vertical-scroll-proportion="0.0">
<state line="101" column="15" selection-start="4038" selection-end="4038" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
@ -165,7 +214,7 @@
<file leaf-file-name="Binary.scala" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="12" column="63" selection-start="429" selection-end="429" vertical-scroll-proportion="0.0">
<state line="12" column="63" selection-start="424" selection-end="424" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
@ -193,11 +242,6 @@
<component name="IdeDocumentHistory">
<option name="changedFiles">
<list>
<option value="$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteProtocolBuilder.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/serialization/Serializable.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/serialization/Serializer.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/state/CassandraStorage.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/Kernel.scala" />
<option value="$PROJECT_DIR$/config/akka-reference.conf" />
<option value="$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java" />
<option value="$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java" />
@ -209,6 +253,11 @@
<option value="$PROJECT_DIR$/kernel/src/test/scala/RemoteSupervisorSpec.scala" />
<option value="$PROJECT_DIR$/kernel/src/test/scala/Messages.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/reactor/MessageDispatcherBase.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/reactor/Reactor.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala" />
<option value="$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala" />
<option value="$PROJECT_DIR$/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala" />
</list>
</option>
</component>
@ -490,54 +539,6 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.LibraryGroupNode" />
</PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
<option name="myItemId" value="akka" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="akka-kernel" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="Libraries" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.LibraryGroupNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="Maven: sbinary:sbinary:0.3" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.NamedLibraryElementNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="sbinary-0.3.jar" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="sbinary" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
<option name="myItemId" value="akka" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="akka-kernel" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="Libraries" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.LibraryGroupNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="Maven: sbinary:sbinary:0.3" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.NamedLibraryElementNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="sbinary-0.3.jar" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
<option name="myItemId" value="akka" />
@ -630,7 +631,7 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="serialization" />
<option name="myItemId" value="reactor" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@ -660,7 +661,7 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="nio" />
<option name="myItemId" value="actor" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@ -777,7 +778,7 @@
<recent name="stm" />
</key>
</component>
<component name="RunManager" selected="JUnit.RemoteSupervisorSpec">
<component name="RunManager" selected="JUnit.EventBasedThreadPoolDispatcherTest">
<configuration default="false" name="InMemoryStateTest.testRefShouldRollbackStateForStatefulServerInCaseOfFailure" type="JUnit" factoryName="JUnit" temporary="true" enabled="false" merge="false" sample_coverage="true" runner="emma">
<pattern>
<option name="PATTERN" value="se.scalablesolutions.akka.api.*" />
@ -1944,17 +1945,18 @@
</component>
<component name="ToolWindowManager">
<frame x="4" y="22" width="1916" height="1178" extended-state="6" />
<editor active="false" />
<editor active="true" />
<layout>
<window_info id="Maven Projects" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.3294881" sideWeight="0.5212177" order="5" side_tool="false" />
<window_info id="Dependency Viewer" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="15" side_tool="false" />
<window_info id="Palette" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="6" side_tool="false" />
<window_info id="Ant Build" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.24906267" sideWeight="0.5212177" order="1" side_tool="false" />
<window_info id="Changes" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32749078" sideWeight="0.5" order="9" side_tool="false" />
<window_info id="Run" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.43542436" sideWeight="0.0" order="2" side_tool="false" x="4" y="22" width="1436" height="878" />
<window_info id="Run" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.601476" sideWeight="0.0" order="2" side_tool="false" x="4" y="22" width="1436" height="878" />
<window_info id="TODO" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32749078" sideWeight="0.5" order="6" side_tool="false" />
<window_info id="Project" active="true" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.1751473" sideWeight="0.59594095" order="1" side_tool="false" />
<window_info id="Debug" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.3976015" sideWeight="0.5" order="3" side_tool="false" />
<window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.1751473" sideWeight="0.39206642" order="1" side_tool="false" />
<window_info id="Debug" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.39668366" sideWeight="0.5" order="3" side_tool="false" />
<window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.3201107" sideWeight="0.5" order="1" side_tool="false" />
<window_info id="Structure" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.16818425" sideWeight="0.24354243" order="0" side_tool="true" x="1609" y="144" width="252" height="862" />
<window_info id="Version Control" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.3265306" sideWeight="0.5" order="13" side_tool="false" />
<window_info id="Web Preview" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32822478" sideWeight="0.5" order="16" side_tool="false" />
@ -1966,9 +1968,8 @@
<window_info id="Clojure REPL" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32749078" sideWeight="0.5" order="8" side_tool="false" />
<window_info id="Data Sources" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.3294881" sideWeight="0.6875" order="3" side_tool="false" />
<window_info id="Web" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.26479077" sideWeight="0.24617347" order="2" side_tool="true" />
<window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32142857" sideWeight="0.5" order="1" side_tool="false" />
<window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="5" side_tool="false" />
<window_info id="Messages" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.3265306" sideWeight="0.0" order="14" side_tool="false" />
<window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="5" side_tool="false" />
<window_info id="FindBugs-IDEA" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32976654" sideWeight="0.5" order="12" side_tool="false" />
<window_info id="Cvs" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="4" side_tool="false" />
<window_info id="Hierarchy" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.24965987" sideWeight="0.5" order="2" side_tool="false" />
@ -2010,62 +2011,6 @@
<option name="FILTER_TARGETS" value="false" />
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java">
<provider selected="true" editor-type-id="text-editor">
<state line="39" column="14" selection-start="1538" selection-end="1538" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java">
<provider selected="true" editor-type-id="text-editor">
<state line="55" column="64" selection-start="1440" selection-end="1456" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/actor/ActiveObject.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="290" column="36" selection-start="13641" selection-end="13641" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/JerseySpec.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="24" column="7" selection-start="690" selection-end="690" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteClient.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="75" column="75" selection-start="2766" selection-end="2766" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/actor/Actor.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="414" column="33" selection-start="14988" selection-end="14988" vertical-scroll-proportion="-24.615385">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/RemoteActorSpec.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="27" column="6" selection-start="652" selection-end="652" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/com/google/protobuf/protobuf-java/2.1.0/protobuf-java-2.1.0-sources.jar!/com/google/protobuf/ByteString.java">
<provider selected="true" editor-type-id="text-editor">
<state line="153" column="0" selection-start="4874" selection-end="4874" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/sbinary/sbinary/0.3/sbinary-0.3.jar!/sbinary/DefaultProtocol.class">
<provider selected="true" editor-type-id="text-editor">
<state line="3" column="0" selection-start="147" selection-end="147" vertical-scroll-proportion="0.0">
@ -2089,7 +2034,14 @@
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/Messages.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="19" column="33" selection-start="564" selection-end="564" vertical-scroll-proportion="0.0">
<state line="19" column="33" selection-start="570" selection-end="570" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/RemoteActorSpec.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="27" column="6" selection-start="652" selection-end="652" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
@ -2101,6 +2053,13 @@
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="12" column="63" selection-start="424" selection-end="424" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteServer.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="117" column="45" selection-start="4575" selection-end="4575" vertical-scroll-proportion="0.0">
@ -2108,16 +2067,58 @@
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/serialization/Binary.scala">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteProtocolBuilder.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="12" column="63" selection-start="429" selection-end="429" vertical-scroll-proportion="0.0">
<state line="31" column="76" selection-start="1647" selection-end="1647" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/nio/RemoteProtocolBuilder.scala">
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/actor/Actor.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="14" column="0" selection-start="410" selection-end="410" vertical-scroll-proportion="0.18181819">
<state line="101" column="15" selection-start="4038" selection-end="4038" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/MessageDispatcherBase.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="49" column="41" selection-start="1388" selection-end="1388" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="14" column="6" selection-start="401" selection-end="401" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="28" column="48" selection-start="1154" selection-end="1154" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/Reactor.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="34" column="6" selection-start="695" selection-end="695" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="132" column="2" selection-start="4660" selection-end="4660" vertical-scroll-proportion="-14.153846">
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala">
<provider selected="true" editor-type-id="text-editor">
<state line="161" column="168" selection-start="6003" selection-end="6003" vertical-scroll-proportion="0.9074253">
<folding />
</state>
</provider>

View file

@ -46,9 +46,9 @@
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: high-scale-lib:high-scale-lib:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-lang:commons-lang:2.4" level="project" />
<orderEntry type="library" name="Maven: com.sun.grizzly:grizzly-servlet-webserver:1.9.9" level="project" />
<orderEntry type="library" name="Maven: com.sun.grizzly:grizzly-http:1.9.9" level="project" />
<orderEntry type="library" name="Maven: com.sun.grizzly:grizzly-framework:1.9.9" level="project" />
@ -67,6 +67,7 @@
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-log4j12:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-api:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: log4j:log4j:1.2.13" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-logging:commons-logging:1.0.4" level="project" />
<orderEntry type="library" name="Maven: com.sun.jersey:jersey-client:1.1.0-ea" level="project" />
<orderEntry type="library" name="Maven: com.sun.jersey:jersey-atom:1.0.3" level="project" />
<orderEntry type="library" name="Maven: rome:rome:0.9" level="project" />

View file

@ -62,9 +62,9 @@
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: high-scale-lib:high-scale-lib:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-lang:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-servlet-webserver:1.9.9" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http:1.9.9" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-framework:1.9.9" level="project" />
@ -83,6 +83,7 @@
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-log4j12:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-api:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: log4j:log4j:1.2.13" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-logging:commons-logging:1.0.4" level="project" />
<orderEntry type="library" name="Maven: org.scala-tools.testing:scalatest:0.9.5" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-client:1.1.0-ea" level="project" />
</component>

View file

@ -10,6 +10,8 @@
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.{LinkedList, Queue, List}
class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
def start = if (!active) {
active = true
@ -20,11 +22,12 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val selectedQueue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until selectedQueue.size) {
val handle = selectedQueue.remove
val handler = messageHandlers.get(handle.sender)
if (handler != null) handler.invoke(handle)
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations.iterator
while (selectedInvocations.hasNext) {
val invocation = selectedInvocations.next
val invoker = messageHandlers.get(invocation.sender)
if (invoker != null) invoker.invoke(invocation)
selectedInvocations.remove
}
}
}
@ -34,15 +37,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
}
class EventBasedSingleThreadDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
import java.util.{LinkedList, Queue}
private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]
def select = messageQueue.read(selectedQueue)
def acquireSelectedQueue: Queue[MessageInvocation] = selectedQueue
def acquireSelectedInvocations: List[MessageInvocation] = selectedQueue
def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue")
def releaseSelectedInvocations = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue")
def wakeUp = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't be woken up")
}

View file

@ -8,7 +8,7 @@ import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import java.util.{Collection, HashSet, LinkedList, Queue}
import java.util.{Collection, HashSet, HashMap, LinkedList, List}
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
@ -66,7 +66,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private var threadPoolBuilder: ThreadPoolExecutor = _
private val threadFactory = new MonitorableThreadFactory("akka")
private var boundedExecutorBound = -1
private val busyHandlers = new HashSet[AnyRef]
private val busyInvokers = new HashSet[AnyRef]
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
@ -82,27 +82,27 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override def run = {
while (active) {
try {
try {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
//try {
// guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select
} catch {case e: InterruptedException => active = false}
val selectedQueue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until selectedQueue.size) {
val message = selectedQueue.peek
val messageHandler = getIfNotBusy(message.sender)
if (messageHandler.isDefined) {
executor.execute(new Runnable {
override def run = {
messageHandler.get.invoke(message)
free(message.sender)
messageDemultiplexer.wakeUp
}
})
selectedQueue.remove
}
//} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
val reservedInvocations = reserve(selectedInvocations)
val it = reservedInvocations.entrySet.iterator
while (it.hasNext) {
val entry = it.next
val invocation = entry.getKey
val invoker = entry.getValue
threadPoolBuilder.execute(new Runnable() {
def run = {
invoker.invoke(invocation)
free(invocation.sender)
messageDemultiplexer.wakeUp
}
})
}
} finally {
messageDemultiplexer.releaseSelectedQueue
messageDemultiplexer.releaseSelectedInvocations
}
}
}
@ -112,19 +112,25 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override protected def doShutdown = executor.shutdownNow
private def getIfNotBusy(key: AnyRef): Option[MessageInvoker] = guard.synchronized {
if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
busyHandlers.add(key)
Some(messageHandlers.get(key))
} else None
private def reserve(invocations: List[MessageInvocation]): HashMap[MessageInvocation, MessageInvoker] = synchronized {
// if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
val result = new HashMap[MessageInvocation, MessageInvoker]
val iterator = invocations.iterator
while (iterator.hasNext) {
val invocation = iterator.next
if (!busyInvokers.contains(invocation.sender)) {
result.put(invocation, messageHandlers.get(invocation.sender))
busyInvokers.add(invocation.sender)
iterator.remove
}
}
result
}
private def free(key: AnyRef) = guard.synchronized {
if (!CONCURRENT_MODE) busyHandlers.remove(key)
private def free(invoker: AnyRef) = synchronized {
if (!CONCURRENT_MODE) busyInvokers.remove(invoker)
}
// ============ Code for configuration of thread pool =============
def buildThreadPool = synchronized {
@ -241,25 +247,22 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
}
class EventBasedThreadPoolDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedQueueLock = new ReentrantLock
private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedInvocationsLock = new ReentrantLock
def select = try {
selectedQueueLock.lock
messageQueue.read(selectedQueue)
selectedInvocationsLock.lock
messageQueue.read(selectedInvocations)
} finally {
selectedQueueLock.unlock
selectedInvocationsLock.unlock
}
def acquireSelectedQueue: Queue[MessageInvocation] = {
selectedQueueLock.lock
selectedQueue
def acquireSelectedInvocations: List[MessageInvocation] = {
selectedInvocationsLock.lock
selectedInvocations
}
def releaseSelectedQueue = {
//selectedQueue.clear
selectedQueueLock.unlock
}
def releaseSelectedInvocations = selectedInvocationsLock.unlock
def wakeUp = messageQueue.interrupt
}

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.kernel.reactor
import java.util.{LinkedList, Queue}
import java.util.{LinkedList, Queue, List}
import java.util.concurrent.TimeUnit
import java.util.HashMap
@ -21,11 +21,11 @@ trait MessageDispatcherBase extends MessageDispatcher {
def messageQueue = queue
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
def registerHandler(key: AnyRef, handler: MessageInvoker) = synchronized {
messageHandlers.put(key, handler)
}
def unregisterHandler(key: AnyRef) = guard.synchronized {
def unregisterHandler(key: AnyRef) = synchronized {
messageHandlers.remove(key)
}
@ -55,9 +55,9 @@ class ReactiveMessageQueue extends MessageQueue {
queue.notifyAll
}
def read(destination: Queue[MessageInvocation]) = queue.synchronized {
def read(destination: List[MessageInvocation]) = queue.synchronized {
while (queue.isEmpty && !interrupted) queue.wait
if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove)
else interrupted = false
}

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.kernel.reactor
import java.util.Queue
import java.util.List
import kernel.stm.Transaction
import kernel.util.HashCode
@ -27,8 +27,8 @@ trait MessageDispatcher {
trait MessageDemultiplexer {
def select
def acquireSelectedQueue: Queue[MessageInvocation]
def releaseSelectedQueue
def acquireSelectedInvocations: List[MessageInvocation]
def releaseSelectedInvocations
def wakeUp
}

View file

@ -11,151 +11,151 @@ import org.junit.Assert._
import junit.framework.TestCase
class EventBasedThreadPoolDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null
private var threadingIssueDetected: AtomicBoolean = null
@Before
override def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Before
override def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key, new MessageInvoker {
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
}
}
})
dispatcher.start
for (i <- 0 until 10) {
dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key, new MessageInvoker {
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
return
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
})
dispatcher.start
for (i <- 0 until 10) {
dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val guardLock1 = new ReentrantLock
val guardLock2 = new ReentrantLock
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock1.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}
}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock2.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val guardLock1 = new ReentrantLock
val guardLock2 = new ReentrantLock
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock1.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {
if (guardLock2.tryLock) {
handlersBarrier.await(1, TimeUnit.SECONDS)
} else {
threadingIssueDetected.set(true);
}
}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1.1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2.2", None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
}

View file

@ -46,9 +46,9 @@
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: high-scale-lib:high-scale-lib:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-lang:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-servlet-webserver:1.9.9" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http:1.9.9" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-framework:1.9.9" level="project" />
@ -67,6 +67,7 @@
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-log4j12:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-api:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: log4j:log4j:1.2.13" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-logging:commons-logging:1.0.4" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-client:1.1.0-ea" level="project" />
</component>
</module>

View file

@ -51,9 +51,9 @@
<orderEntry type="library" exported="" name="Maven: org.apache.cassandra:cassandra:0.4.0-dev" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:thrift:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: com.facebook:fb303:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" exported="" name="Maven: high-scale-lib:high-scale-lib:1.0" level="project" />
<orderEntry type="library" exported="" name="Maven: org.apache.commons:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-lang:commons-lang:2.4" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-servlet-webserver:1.9.9" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-http:1.9.9" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.grizzly:grizzly-framework:1.9.9" level="project" />
@ -72,6 +72,7 @@
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-log4j12:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: org.slf4j:slf4j-api:1.4.3" level="project" />
<orderEntry type="library" exported="" name="Maven: log4j:log4j:1.2.13" level="project" />
<orderEntry type="library" exported="" name="Maven: commons-logging:commons-logging:1.0.4" level="project" />
<orderEntry type="library" exported="" name="Maven: com.sun.jersey:jersey-client:1.1.0-ea" level="project" />
</component>
</module>