From 6ed887fb7b29fc84f03cd3ef7964a8dbeb10fe78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Fri, 9 Apr 2010 11:16:38 +0200 Subject: [PATCH] fixed bug in Agent.scala, fixed bug in RemoteClient.scala, fixed problem with tests --- akka-core/src/main/scala/actor/Agent.scala | 8 +- ...sedEventDrivenWorkStealingDispatcher.scala | 2 +- .../src/main/scala/remote/RemoteClient.scala | 16 +-- akka-core/src/test/scala/AgentSpec.scala | 134 ++++++++---------- ...ventDrivenWorkStealingDispatcherSpec.scala | 11 +- project/build/AkkaProject.scala | 2 +- 6 files changed, 80 insertions(+), 93 deletions(-) diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index a0ca0c90eb..b6a65423a3 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -90,7 +90,7 @@ class AgentException private[akka](message: String) extends RuntimeException(mes * * IMPORTANT: * You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach', -* 'map and 'flatMap' within an enclosing transaction since that would block +* 'map' and 'flatMap' within an enclosing transaction since that would block * the transaction indefinitely. But all other operations are fine. The system * will raise an error (e.g. *not* deadlock) if you try to do so, so as long as * you test your application thoroughly you should be fine. @@ -99,11 +99,13 @@ class AgentException private[akka](message: String) extends RuntimeException(mes * @author Jonas Bonér */ sealed class Agent[T] private (initialValue: T) extends Transactor { + start import Agent._ + log.debug("Starting up Agent [%s]", _uuid) + private lazy val value = Ref[T]() - start - this !! Value(initialValue) + this ! Value(initialValue) /** * Periodically handles incoming messages. diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index a769f92e55..a96f5c5e76 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -168,7 +168,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = { val donated = receiver._mailbox.pollLast if (donated != null) { - thief.forward(donated.message)(Some(donated.receiver)) + thief ! donated.message return Some(donated) } else return None } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 72a7f37229..81d5591fbb 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -200,7 +200,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause)) + listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(connection.getCause)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -232,7 +232,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } else { val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception)) + listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception)) throw exception } @@ -325,12 +325,12 @@ class RemoteClientHandler(val name: String, futures.remove(reply.getId) } else { val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception)) + client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception)) throw exception } } catch { case e: Exception => - client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(e)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -345,7 +345,7 @@ class RemoteClientHandler(val name: String, // Wait until the connection attempt succeeds or fails. client.connection.awaitUninterruptibly if (!client.connection.isSuccess) { - client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(client.connection.getCause)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -353,17 +353,17 @@ class RemoteClientHandler(val name: String, } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port)) + client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port)) + client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(event.getCause)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-core/src/test/scala/AgentSpec.scala b/akka-core/src/test/scala/AgentSpec.scala index 20f8e0d8f4..d38f6a4265 100644 --- a/akka-core/src/test/scala/AgentSpec.scala +++ b/akka-core/src/test/scala/AgentSpec.scala @@ -1,6 +1,5 @@ package se.scalablesolutions.akka.actor -import _root_.java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.Actor.transactor import se.scalablesolutions.akka.stm.Transaction.Global.atomic import se.scalablesolutions.akka.util.Logging @@ -10,51 +9,40 @@ import org.scalatest.junit.JUnitRunner import org.scalatest.matchers.MustMatchers import org.junit.runner.RunWith -import org.junit.{Test} +import org.junit.Test -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{TimeUnit, CountDownLatch} @RunWith(classOf[JUnitRunner]) -class AgentSpec extends junit.framework.TestCase -with Suite with MustMatchers -with ActorTestUtil with Logging { +class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers { - @Test def testSendFun = verify(new TestActor { - def test = { - val agent = Agent(5) - handle(agent) { - agent send (_ + 1) - agent send (_ * 2) - val result = agent() - result must be(12) - } - } - }) + @Test def testSendFun = { + val agent = Agent(5) + agent send (_ + 1) + agent send (_ * 2) + val result = agent() + result must be(12) + agent.stop + } - @Test def testSendValue = verify(new TestActor { - def test = { - val agent = Agent(5) - handle(agent) { - agent send 6 - val result = agent() - result must be(6) - } - } - }) + @Test def testSendValue = { + val agent = Agent(5) + agent send 6 + val result = agent() + result must be(6) + agent.stop + } - @Test def testSendProc = verify(new TestActor { - def test = { - val agent = Agent(5) - var result = 0 - val latch = new CountDownLatch(2) - handle(agent) { - agent sendProc { e => result += e; latch.countDown } - agent sendProc { e => result += e; latch.countDown } - assert(latch.await(1, TimeUnit.SECONDS)) - result must be(10) - } - } - }) + @Test def testSendProc = { + val agent = Agent(5) + var result = 0 + val latch = new CountDownLatch(2) + agent sendProc { e => result += e; latch.countDown } + agent sendProc { e => result += e; latch.countDown } + assert(latch.await(5, TimeUnit.SECONDS)) + result must be(10) + agent.stop + } @Test def testOneAgentsendWithinEnlosingTransactionSuccess = { case object Go @@ -64,7 +52,7 @@ with ActorTestUtil with Logging { case Go => agent send { e => latch.countDown; e + 1 } } tx ! Go - assert(latch.await(1, TimeUnit.SECONDS)) + assert(latch.await(5, TimeUnit.SECONDS)) val result = agent() result must be(6) agent.close @@ -84,46 +72,40 @@ with ActorTestUtil with Logging { } } tx ! Go - assert(latch.await(1, TimeUnit.SECONDS)) + assert(latch.await(5, TimeUnit.SECONDS)) agent.close tx.stop assert(true) } - @Test def testAgentForeach = verify(new TestActor { - def test = { - val agent1 = Agent(3) - var result = 0 - for (first <- agent1) { - result = first + 1 - } - result must be(4) - agent1.close + @Test def testAgentForeach = { + val agent1 = Agent(3) + var result = 0 + for (first <- agent1) { + result = first + 1 } - }) + result must be(4) + agent1.close + } + + @Test def testAgentMap = { + val agent1 = Agent(3) + val result = for (first <- agent1) yield first + 1 + result() must be(4) + result.close + agent1.close + } - @Test def testAgentMap = verify(new TestActor { - def test = { - val agent1 = Agent(3) - val result = for (first <- agent1) yield first + 1 - result() must be(4) - result.close - agent1.close - } - }) - - @Test def testAgentFlatMap = verify(new TestActor { - def test = { - val agent1 = Agent(3) - val agent2 = Agent(5) - val result = for { - first <- agent1 - second <- agent2 - } yield second + first - result() must be(8) - result.close - agent1.close - agent2.close - } - }) + @Test def testAgentFlatMap = { + val agent1 = Agent(3) + val agent2 = Agent(5) + val result = for { + first <- agent1 + second <- agent2 + } yield second + first + result() must be(8) + result.close + agent1.close + agent2.close + } } diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index 231e4f4d98..2a194cc454 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -1,11 +1,14 @@ package se.scalablesolutions.akka.actor -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import java.util.concurrent.CountDownLatch import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite + +import org.junit.Test + import se.scalablesolutions.akka.dispatch.Dispatchers +import java.util.concurrent.{TimeUnit, CountDownLatch} + /** * @author Jan Van Besien */ @@ -51,7 +54,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with slow ! i } - finishedCounter.await + finishedCounter.await(5, TimeUnit.SECONDS) fast.invocationCount must be > (slow.invocationCount) } } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index b7c02135fc..b5c776fb80 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -59,7 +59,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { Path.fromFile(home) } val encodingUtf8 = List("-encoding", "UTF-8") - override def parallelExecution = true + override def parallelExecution = false lazy val deployPath = akkaHome / "deploy" lazy val distPath = akkaHome / "dist"