diff --git a/.history b/.history index 209db6b195..7bbf31e478 100644 --- a/.history +++ b/.history @@ -1,2 +1,4 @@ update reload +projects +exit diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 5bf3fcf9d7..d1201f259c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -217,13 +217,13 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true app.mainbus.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[" + app.address + "/sys/testActor") ⇒ true + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[" + app.defaultAddress + "/sys/testActor") ⇒ true } expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) fsm ! "stop" expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[" + app.address + "/sys/testActor") ⇒ true + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[" + app.defaultAddress + "/sys/testActor") ⇒ true } expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) expectNoMsg(1 second) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 7ab547cf1f..65dfe26de2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -1,24 +1,21 @@ package akka.actor import org.scalatest.BeforeAndAfterEach -import akka.testkit.TestEvent._ -import akka.testkit.EventFilter import org.multiverse.api.latches.StandardLatch -import java.util.concurrent.{ ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit } +import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { - private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]() + private val cancellables = new ConcurrentLinkedQueue[Cancellable]() - def collectFuture(f: ⇒ ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = { - val future = f - futures.add(future) - future + def collectCancellable(c: Cancellable): Cancellable = { + cancellables.add(c) + c } override def afterEach { - while (futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) } + while (cancellables.peek() ne null) { Option(cancellables.poll()).foreach(_.cancel()) } } "A Scheduler" must { @@ -30,14 +27,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectFuture(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectFuture(app.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -49,9 +46,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { val tickActor = actorOf(new Actor { def receive = { case Tick ⇒ countDownLatch.countDown() } }) + // run every 50 millisec - collectFuture(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) - collectFuture(app.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -87,9 +85,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val future = collectFuture(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) - future.cancel(true) + val timeout = collectCancellable(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) + timeout.cancel() } + assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made } @@ -114,9 +113,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectFuture(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) // appx 2 pings before crash - collectFuture(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) // should be enough time for the ping countdown to recover and reach 6 pings diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java new file mode 100644 index 0000000000..328d2dc39f --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java @@ -0,0 +1,479 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util; + +import akka.event.LoggingAdapter; +import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; +import org.jboss.netty.akka.util.internal.ReusableIterator; + +import java.util.*; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A {@link Timer} optimized for approximated I/O timeout scheduling. + * + *

Tick Duration

+ * + * As described with 'approximated', this timer does not execute the scheduled + * {@link TimerTask} on time. {@link org.jboss.netty.akka.util.HashedWheelTimer}, on every tick, will + * check if there are any {@link TimerTask}s behind the schedule and execute + * them. + *

+ * You can increase or decrease the accuracy of the execution timing by + * specifying smaller or larger tick duration in the constructor. In most + * network applications, I/O timeout does not need to be accurate. Therefore, + * the default tick duration is 100 milliseconds and you will not need to try + * different configurations in most cases. + * + *

Ticks per Wheel (Wheel Size)

+ * + * {@link org.jboss.netty.akka.util.HashedWheelTimer} maintains a data structure called 'wheel'. + * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash + * function is 'dead line of the task'. The default number of ticks per wheel + * (i.e. the size of the wheel) is 512. You could specify a larger value + * if you are going to schedule a lot of timeouts. + * + *

Do not create many instances.

+ * + * {@link org.jboss.netty.akka.util.HashedWheelTimer} creates a new thread whenever it is instantiated and + * started. Therefore, you should make sure to create only one instance and + * share it across your application. One of the common mistakes, that makes + * your application unresponsive, is to create a new instance in + * {@link ChannelPipelineFactory}, which results in the creation of a new thread + * for every connection. + * + *

Implementation Details

+ * + * {@link org.jboss.netty.akka.util.HashedWheelTimer} is based on + * George Varghese and + * Tony Lauck's paper, + * 'Hashed + * and Hierarchical Timing Wheels: data structures to efficiently implement a + * timer facility'. More comprehensive slides are located + * here. + * + * @author The Netty Project + * @author Trustin Lee + * @version $Rev: 2297 $, $Date: 2010-06-07 10:50:02 +0900 (Mon, 07 Jun 2010) $ + */ +public class HashedWheelTimer implements Timer { + private static final AtomicInteger id = new AtomicInteger(); + + private final Worker worker = new Worker(); + final Thread workerThread; + final AtomicBoolean shutdown = new AtomicBoolean(); + + private final long roundDuration; + final long tickDuration; + final Set[] wheel; + final ReusableIterator[] iterators; + final int mask; + final ReadWriteLock lock = new ReentrantReadWriteLock(); + volatile int wheelCursor; + private LoggingAdapter logger; + + /** + * Creates a new timer. + * + * @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @param ticksPerWheel the size of the wheel + */ + public HashedWheelTimer( + LoggingAdapter logger, + ThreadFactory threadFactory, + long tickDuration, TimeUnit unit, int ticksPerWheel) { + + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (tickDuration <= 0) { + throw new IllegalArgumentException( + "tickDuration must be greater than 0: " + tickDuration); + } + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException( + "ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + + this.logger = logger; + + // Normalize ticksPerWheel to power of two and initialize the wheel. + wheel = createWheel(ticksPerWheel); + iterators = createIterators(wheel); + mask = wheel.length - 1; + + // Convert tickDuration to milliseconds. + this.tickDuration = tickDuration = unit.toMillis(tickDuration); + + // Prevent overflow. + if (tickDuration == Long.MAX_VALUE || + tickDuration >= Long.MAX_VALUE / wheel.length) { + throw new IllegalArgumentException( + "tickDuration is too long: " + + tickDuration + ' ' + unit); + } + + roundDuration = tickDuration * wheel.length; + workerThread = threadFactory.newThread(worker); + } + + @SuppressWarnings("unchecked") + private static Set[] createWheel(int ticksPerWheel) { + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException( + "ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + if (ticksPerWheel > 1073741824) { + throw new IllegalArgumentException( + "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); + } + + ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); + Set[] wheel = new Set[ticksPerWheel]; + for (int i = 0; i < wheel.length; i ++) { + wheel[i] = new MapBackedSet( + new ConcurrentIdentityHashMap(16, 0.95f, 4)); + } + return wheel; + } + + @SuppressWarnings("unchecked") + private static ReusableIterator[] createIterators(Set[] wheel) { + ReusableIterator[] iterators = new ReusableIterator[wheel.length]; + for (int i = 0; i < wheel.length; i ++) { + iterators[i] = (ReusableIterator) wheel[i].iterator(); + } + return iterators; + } + + private static int normalizeTicksPerWheel(int ticksPerWheel) { + int normalizedTicksPerWheel = 1; + while (normalizedTicksPerWheel < ticksPerWheel) { + normalizedTicksPerWheel <<= 1; + } + return normalizedTicksPerWheel; + } + + /** + * Starts the background thread explicitly. The background thread will + * start automatically on demand even if you did not call this method. + * + * @throws IllegalStateException if this timer has been + * {@linkplain #stop() stopped} already + */ + public synchronized void start() { + if (shutdown.get()) { + throw new IllegalStateException("cannot be started once stopped"); + } + + if (!workerThread.isAlive()) { + workerThread.start(); + } + } + + public synchronized Set stop() { + if (Thread.currentThread() == workerThread) { + throw new IllegalStateException( + HashedWheelTimer.class.getSimpleName() + + ".stop() cannot be called from " + + TimerTask.class.getSimpleName()); + } + + if (!shutdown.compareAndSet(false, true)) { + return Collections.emptySet(); + } + + boolean interrupted = false; + while (workerThread.isAlive()) { + workerThread.interrupt(); + try { + workerThread.join(100); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + + Set unprocessedTimeouts = new HashSet(); + for (Set bucket: wheel) { + unprocessedTimeouts.addAll(bucket); + bucket.clear(); + } + + return Collections.unmodifiableSet(unprocessedTimeouts); + } + + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + final long currentTime = System.currentTimeMillis(); + + if (task == null) { + throw new NullPointerException("task"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (!workerThread.isAlive()) { + start(); + } + + delay = unit.toMillis(delay); + HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); + scheduleTimeout(timeout, delay); + return timeout; + } + + void scheduleTimeout(HashedWheelTimeout timeout, long delay) { + // delay must be equal to or greater than tickDuration so that the + // worker thread never misses the timeout. + if (delay < tickDuration) { + delay = tickDuration; + } + + // Prepare the required parameters to schedule the timeout object. + final long lastRoundDelay = delay % roundDuration; + final long lastTickDelay = delay % tickDuration; + final long relativeIndex = + lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); + + final long remainingRounds = + delay / roundDuration - (delay % roundDuration == 0? 1 : 0); + + // Add the timeout to the wheel. + lock.readLock().lock(); + try { + int stopIndex = (int) (wheelCursor + relativeIndex & mask); + timeout.stopIndex = stopIndex; + timeout.remainingRounds = remainingRounds; + wheel[stopIndex].add(timeout); + } finally { + lock.readLock().unlock(); + } + } + + private final class Worker implements Runnable { + + private long startTime; + private long tick; + + Worker() { + super(); + } + + public void run() { + List expiredTimeouts = + new ArrayList(); + + startTime = System.currentTimeMillis(); + tick = 1; + + while (!shutdown.get()) { + final long deadline = waitForNextTick(); + if (deadline > 0) { + fetchExpiredTimeouts(expiredTimeouts, deadline); + notifyExpiredTimeouts(expiredTimeouts); + } + } + } + + private void fetchExpiredTimeouts( + List expiredTimeouts, long deadline) { + + // Find the expired timeouts and decrease the round counter + // if necessary. Note that we don't send the notification + // immediately to make sure the listeners are called without + // an exclusive lock. + lock.writeLock().lock(); + try { + int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; + ReusableIterator i = iterators[newWheelCursor]; + fetchExpiredTimeouts(expiredTimeouts, i, deadline); + } finally { + lock.writeLock().unlock(); + } + } + + private void fetchExpiredTimeouts( + List expiredTimeouts, + ReusableIterator i, long deadline) { + + List slipped = null; + i.rewind(); + while (i.hasNext()) { + HashedWheelTimeout timeout = i.next(); + if (timeout.remainingRounds <= 0) { + i.remove(); + if (timeout.deadline <= deadline) { + expiredTimeouts.add(timeout); + } else { + // Handle the case where the timeout is put into a wrong + // place, usually one tick earlier. For now, just add + // it to a temporary list - we will reschedule it in a + // separate loop. + if (slipped == null) { + slipped = new ArrayList(); + } + slipped.add(timeout); + } + } else { + timeout.remainingRounds --; + } + } + + // Reschedule the slipped timeouts. + if (slipped != null) { + for (HashedWheelTimeout timeout: slipped) { + scheduleTimeout(timeout, timeout.deadline - deadline); + } + } + } + + private void notifyExpiredTimeouts( + List expiredTimeouts) { + // Notify the expired timeouts. + for (int i = expiredTimeouts.size() - 1; i >= 0; i --) { + expiredTimeouts.get(i).expire(); + } + + // Clean up the temporary list. + expiredTimeouts.clear(); + } + + private long waitForNextTick() { + long deadline = startTime + tickDuration * tick; + + for (;;) { + final long currentTime = System.currentTimeMillis(); + final long sleepTime = tickDuration * tick - (currentTime - startTime); + + if (sleepTime <= 0) { + break; + } + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + if (shutdown.get()) { + return -1; + } + } + } + + // Increase the tick. + tick ++; + return deadline; + } + } + + private final class HashedWheelTimeout implements Timeout { + + private static final int ST_INIT = 0; + private static final int ST_CANCELLED = 1; + private static final int ST_EXPIRED = 2; + + private final TimerTask task; + final long deadline; + volatile int stopIndex; + volatile long remainingRounds; + private final AtomicInteger state = new AtomicInteger(ST_INIT); + + HashedWheelTimeout(TimerTask task, long deadline) { + this.task = task; + this.deadline = deadline; + } + + public Timer getTimer() { + return HashedWheelTimer.this; + } + + public TimerTask getTask() { + return task; + } + + public void cancel() { + if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) { + // TODO return false + return; + } + + wheel[stopIndex].remove(this); + } + + public boolean isCancelled() { + return state.get() == ST_CANCELLED; + } + + public boolean isExpired() { + return state.get() != ST_INIT; + } + + public void expire() { + if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) { + return; + } + + try { + task.run(this); + } catch (Throwable t) { + logger.warning( + "An exception was thrown by " + + TimerTask.class.getSimpleName() + ".", t); + } + } + + @Override + public String toString() { + long currentTime = System.currentTimeMillis(); + long remaining = deadline - currentTime; + + StringBuilder buf = new StringBuilder(192); + buf.append(getClass().getSimpleName()); + buf.append('('); + + buf.append("deadline: "); + if (remaining > 0) { + buf.append(remaining); + buf.append(" ms later, "); + } else if (remaining < 0) { + buf.append(-remaining); + buf.append(" ms ago, "); + } else { + buf.append("now, "); + } + + if (isCancelled()) { + buf.append (", cancelled"); + } + + return buf.append(')').toString(); + } + } +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/MapBackedSet.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/MapBackedSet.java new file mode 100644 index 0000000000..a40e72cead --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/MapBackedSet.java @@ -0,0 +1,74 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util; + +import java.io.Serializable; +import java.util.AbstractSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * A {@link java.util.Map}-backed {@link java.util.Set}. + * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + */ +final class MapBackedSet extends AbstractSet implements Serializable { + + private static final long serialVersionUID = -6761513279741915432L; + + private final Map map; + + /** + * Creates a new instance which wraps the specified {@code map}. + */ + MapBackedSet(Map map) { + this.map = map; + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean contains(Object o) { + return map.containsKey(o); + } + + @Override + public boolean add(E o) { + return map.put(o, Boolean.TRUE) == null; + } + + @Override + public boolean remove(Object o) { + return map.remove(o) != null; + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public Iterator iterator() { + return map.keySet().iterator(); + } +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java new file mode 100644 index 0000000000..dbda2110d3 --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java @@ -0,0 +1,56 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util; + +/** + * A handle associated with a {@link TimerTask} that is returned by a + * {@link Timer}. + * + * @author The Netty Project + * @author Trustin Lee + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + */ +public interface Timeout { + + /** + * Returns the {@link Timer} that created this handle. + */ + Timer getTimer(); + + /** + * Returns the {@link TimerTask} which is associated with this handle. + */ + TimerTask getTask(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been expired. + */ + boolean isExpired(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been cancelled. + */ + boolean isCancelled(); + + /** + * Cancels the {@link TimerTask} associated with this handle. It the + * task has been executed or cancelled already, it will return with no + * side effect. + */ + void cancel(); +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java new file mode 100644 index 0000000000..43ddec9604 --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java @@ -0,0 +1,54 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Schedules {@link TimerTask}s for one-time future execution in a background + * thread. + * + * @author The Netty Project + * @author Trustin Lee + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + * + * @apiviz.landmark + * @apiviz.has org.jboss.netty.util.TimerTask oneway - - executes + * @apiviz.has org.jboss.netty.util.Timeout oneway - - creates + */ +public interface Timer { + + /** + * Schedules the specified {@link TimerTask} for one-time execution after + * the specified delay. + * + * @return a handle which is associated with the specified task + * + * @throws IllegalStateException if this timer has been + * {@linkplain #stop() stopped} already + */ + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + + /** + * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all + * tasks which were scheduled but not executed yet. + * + * @return the handles associated with the tasks which were canceled by + * this method + */ + Set stop(); +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java new file mode 100644 index 0000000000..341f43ad68 --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java @@ -0,0 +1,37 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util; + +import java.util.concurrent.TimeUnit; + +/** + * A task which is executed after the delay specified with + * {@link Timer#newTimeout(org.jboss.netty.akka.util.TimerTask, long, java.util.concurrent.TimeUnit)}. + * + * @author The Netty Project + * @author Trustin Lee + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + */ +public interface TimerTask { + + /** + * Executed after the delay specified with + * {@link Timer#newTimeout(org.jboss.netty.akka.util.TimerTask, long, java.util.concurrent.TimeUnit)}. + * + * @param timeout a handle which is associated with this task + */ + void run(Timeout timeout) throws Exception; +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java new file mode 100644 index 0000000000..90176f1fc8 --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java @@ -0,0 +1,1418 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +package org.jboss.netty.akka.util.internal; + +import java.util.AbstractCollection; +import java.util.AbstractMap; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * An alternative identity-comparing {@link java.util.concurrent.ConcurrentMap} which is similar to + * {@link java.util.concurrent.ConcurrentHashMap}. + * + * @author The Netty Project + * @author Doug Lea + * @author Jason T. Greene + * @author Trustin Lee + * @version $Rev: 2371 $, $Date: 2010-10-19 15:00:42 +0900 (Tue, 19 Oct 2010) $ + * + * @param the type of keys maintained by this map + * @param the type of mapped values + */ +public final class ConcurrentIdentityHashMap extends AbstractMap + implements ConcurrentMap{ + + /** + * The default initial capacity for this table, used when not otherwise + * specified in a constructor. + */ + static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default load factor for this table, used when not otherwise specified + * in a constructor. + */ + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * The default concurrency level for this table, used when not otherwise + * specified in a constructor. + */ + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * The maximum capacity, used if a higher value is implicitly specified by + * either of the constructors with arguments. MUST be a power of two + * <= 1<<30 to ensure that entries are indexable using integers. + */ + static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * The maximum number of segments to allow; used to bound constructor + * arguments. + */ + static final int MAX_SEGMENTS = 1 << 16; // slightly conservative + + /** + * Number of unsynchronized retries in size and containsValue methods before + * resorting to locking. This is used to avoid unbounded retries if tables + * undergo continuous modification which would make it impossible to obtain + * an accurate result. + */ + static final int RETRIES_BEFORE_LOCK = 2; + + /* ---------------- Fields -------------- */ + + /** + * Mask value for indexing into segments. The upper bits of a key's hash + * code are used to choose the segment. + */ + final int segmentMask; + + /** + * Shift value for indexing within segments. + */ + final int segmentShift; + + /** + * The segments, each of which is a specialized hash table + */ + final Segment[] segments; + + Set keySet; + Set> entrySet; + Collection values; + + /* ---------------- Small Utilities -------------- */ + + /** + * Applies a supplemental hash function to a given hashCode, which defends + * against poor quality hash functions. This is critical because + * ConcurrentReferenceHashMap uses power-of-two length hash tables, that + * otherwise encounter collisions for hashCodes that do not differ in lower + * or upper bits. + */ + private static int hash(int h) { + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += h << 15 ^ 0xffffcd7d; + h ^= h >>> 10; + h += h << 3; + h ^= h >>> 6; + h += (h << 2) + (h << 14); + return h ^ h >>> 16; + } + + /** + * Returns the segment that should be used for key with given hash. + * + * @param hash the hash code for the key + * @return the segment + */ + final Segment segmentFor(int hash) { + return segments[hash >>> segmentShift & segmentMask]; + } + + private int hashOf(Object key) { + return hash(System.identityHashCode(key)); + } + + /** + * ConcurrentReferenceHashMap list entry. Note that this is never exported + * out as a user-visible Map.Entry. + * + * Because the value field is volatile, not final, it is legal wrt + * the Java Memory Model for an unsynchronized reader to see null + * instead of initial value when read via a data race. Although a + * reordering leading to this is not likely to ever actually + * occur, the Segment.readValueUnderLock method is used as a + * backup in case a null (pre-initialized) value is ever seen in + * an unsynchronized access method. + */ + static final class HashEntry { + final Object key; + final int hash; + volatile Object value; + final HashEntry next; + + HashEntry( + K key, int hash, HashEntry next, V value) { + this.hash = hash; + this.next = next; + this.key = key; + this.value = value; + } + + @SuppressWarnings("unchecked") + final K key() { + return (K) key; + } + + @SuppressWarnings("unchecked") + final V value() { + return (V) value; + } + + final void setValue(V value) { + this.value = value; + } + + @SuppressWarnings("unchecked") + static final HashEntry[] newArray(int i) { + return new HashEntry[i]; + } + } + + /** + * Segments are specialized versions of hash tables. This subclasses from + * ReentrantLock opportunistically, just to simplify some locking and avoid + * separate construction. + */ + static final class Segment extends ReentrantLock { + /* + * Segments maintain a table of entry lists that are ALWAYS kept in a + * consistent state, so can be read without locking. Next fields of + * nodes are immutable (final). All list additions are performed at the + * front of each bin. This makes it easy to check changes, and also fast + * to traverse. When nodes would otherwise be changed, new nodes are + * created to replace them. This works well for hash tables since the + * bin lists tend to be short. (The average length is less than two for + * the default load factor threshold.) + * + * Read operations can thus proceed without locking, but rely on + * selected uses of volatiles to ensure that completed write operations + * performed by other threads are noticed. For most purposes, the + * "count" field, tracking the number of elements, serves as that + * volatile variable ensuring visibility. This is convenient because + * this field needs to be read in many read operations anyway: + * + * - All (unsynchronized) read operations must first read the + * "count" field, and should not look at table entries if + * it is 0. + * + * - All (synchronized) write operations should write to + * the "count" field after structurally changing any bin. + * The operations must not take any action that could even + * momentarily cause a concurrent read operation to see + * inconsistent data. This is made easier by the nature of + * the read operations in Map. For example, no operation + * can reveal that the table has grown but the threshold + * has not yet been updated, so there are no atomicity + * requirements for this with respect to reads. + * + * As a guide, all critical volatile reads and writes to the count field + * are marked in code comments. + */ + + private static final long serialVersionUID = 5207829234977119743L; + + /** + * The number of elements in this segment's region. + */ + transient volatile int count; + + /** + * Number of updates that alter the size of the table. This is used + * during bulk-read methods to make sure they see a consistent snapshot: + * If modCounts change during a traversal of segments computing size or + * checking containsValue, then we might have an inconsistent view of + * state so (usually) must retry. + */ + int modCount; + + /** + * The table is rehashed when its size exceeds this threshold. + * (The value of this field is always (capacity * loadFactor).) + */ + int threshold; + + /** + * The per-segment table. + */ + transient volatile HashEntry[] table; + + /** + * The load factor for the hash table. Even though this value is same + * for all segments, it is replicated to avoid needing links to outer + * object. + */ + final float loadFactor; + + Segment(int initialCapacity, float lf) { + loadFactor = lf; + setTable(HashEntry. newArray(initialCapacity)); + } + + @SuppressWarnings("unchecked") + static final Segment[] newArray(int i) { + return new Segment[i]; + } + + private boolean keyEq(Object src, Object dest) { + return src == dest; + } + + /** + * Sets table to new HashEntry array. Call only while holding lock or in + * constructor. + */ + void setTable(HashEntry[] newTable) { + threshold = (int) (newTable.length * loadFactor); + table = newTable; + } + + /** + * Returns properly casted first entry of bin for given hash. + */ + HashEntry getFirst(int hash) { + HashEntry[] tab = table; + return tab[hash & tab.length - 1]; + } + + HashEntry newHashEntry( + K key, int hash, HashEntry next, V value) { + return new HashEntry(key, hash, next, value); + } + + /** + * Reads value field of an entry under lock. Called if value field ever + * appears to be null. This is possible only if a compiler happens to + * reorder a HashEntry initialization with its table assignment, which + * is legal under memory model but is not known to ever occur. + */ + V readValueUnderLock(HashEntry e) { + lock(); + try { + return e.value(); + } finally { + unlock(); + } + } + + /* Specialized implementations of map methods */ + + V get(Object key, int hash) { + if (count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && keyEq(key, e.key())) { + V opaque = e.value(); + if (opaque != null) { + return opaque; + } + + return readValueUnderLock(e); // recheck + } + e = e.next; + } + } + return null; + } + + boolean containsKey(Object key, int hash) { + if (count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && keyEq(key, e.key())) { + return true; + } + e = e.next; + } + } + return false; + } + + boolean containsValue(Object value) { + if (count != 0) { // read-volatile + HashEntry[] tab = table; + int len = tab.length; + for (int i = 0; i < len; i ++) { + for (HashEntry e = tab[i]; e != null; e = e.next) { + V opaque = e.value(); + V v; + + if (opaque == null) { + v = readValueUnderLock(e); // recheck + } else { + v = opaque; + } + + if (value.equals(v)) { + return true; + } + } + } + } + return false; + } + + boolean replace(K key, int hash, V oldValue, V newValue) { + lock(); + try { + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + boolean replaced = false; + if (e != null && oldValue.equals(e.value())) { + replaced = true; + e.setValue(newValue); + } + return replaced; + } finally { + unlock(); + } + } + + V replace(K key, int hash, V newValue) { + lock(); + try { + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + oldValue = e.value(); + e.setValue(newValue); + } + return oldValue; + } finally { + unlock(); + } + } + + V put(K key, int hash, V value, boolean onlyIfAbsent) { + lock(); + try { + int c = count; + if (c ++ > threshold) { // ensure capacity + int reduced = rehash(); + if (reduced > 0) { + count = (c -= reduced) - 1; // write-volatile + } + } + + HashEntry[] tab = table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue; + if (e != null) { + oldValue = e.value(); + if (!onlyIfAbsent) { + e.setValue(value); + } + } else { + oldValue = null; + ++ modCount; + tab[index] = newHashEntry(key, hash, first, value); + count = c; // write-volatile + } + return oldValue; + } finally { + unlock(); + } + } + + int rehash() { + HashEntry[] oldTable = table; + int oldCapacity = oldTable.length; + if (oldCapacity >= MAXIMUM_CAPACITY) { + return 0; + } + + /* + * Reclassify nodes in each list to new Map. Because we are using + * power-of-two expansion, the elements from each bin must either + * stay at same index, or move with a power of two offset. We + * eliminate unnecessary node creation by catching cases where old + * nodes can be reused because their next fields won't change. + * Statistically, at the default threshold, only about one-sixth of + * them need cloning when a table doubles. The nodes they replace + * will be garbage collectable as soon as they are no longer + * referenced by any reader thread that may be in the midst of + * traversing table right now. + */ + + HashEntry[] newTable = HashEntry.newArray(oldCapacity << 1); + threshold = (int) (newTable.length * loadFactor); + int sizeMask = newTable.length - 1; + int reduce = 0; + for (int i = 0; i < oldCapacity; i ++) { + // We need to guarantee that any existing reads of old Map can + // proceed. So we cannot yet null out each bin. + HashEntry e = oldTable[i]; + + if (e != null) { + HashEntry next = e.next; + int idx = e.hash & sizeMask; + + // Single node on list + if (next == null) { + newTable[idx] = e; + } else { + // Reuse trailing consecutive sequence at same slot + HashEntry lastRun = e; + int lastIdx = idx; + for (HashEntry last = next; last != null; last = last.next) { + int k = last.hash & sizeMask; + if (k != lastIdx) { + lastIdx = k; + lastRun = last; + } + } + newTable[lastIdx] = lastRun; + // Clone all remaining nodes + for (HashEntry p = e; p != lastRun; p = p.next) { + // Skip GC'd weak references + K key = p.key(); + if (key == null) { + reduce ++; + continue; + } + int k = p.hash & sizeMask; + HashEntry n = newTable[k]; + newTable[k] = newHashEntry(key, p.hash, n, p.value()); + } + } + } + } + table = newTable; + return reduce; + } + + /** + * Remove; match on key only if value null, else match both. + */ + V remove(Object key, int hash, Object value, boolean refRemove) { + lock(); + try { + int c = count - 1; + HashEntry[] tab = table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + // a reference remove operation compares the Reference instance + while (e != null && key != e.key && + (refRemove || hash != e.hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + V v = e.value(); + if (value == null || value.equals(v)) { + oldValue = v; + // All entries following removed node can stay in list, + // but all preceding ones need to be cloned. + ++ modCount; + HashEntry newFirst = e.next; + for (HashEntry p = first; p != e; p = p.next) { + K pKey = p.key(); + if (pKey == null) { // Skip GC'd keys + c --; + continue; + } + + newFirst = newHashEntry( + pKey, p.hash, newFirst, p.value()); + } + tab[index] = newFirst; + count = c; // write-volatile + } + } + return oldValue; + } finally { + unlock(); + } + } + + void clear() { + if (count != 0) { + lock(); + try { + HashEntry[] tab = table; + for (int i = 0; i < tab.length; i ++) { + tab[i] = null; + } + ++ modCount; + count = 0; // write-volatile + } finally { + unlock(); + } + } + } + } + + /* ---------------- Public operations -------------- */ + + /** + * Creates a new, empty map with the specified initial capacity, load factor + * and concurrency level. + * + * @param initialCapacity the initial capacity. The implementation performs + * internal sizing to accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of + * elements per bin exceeds this threshold. + * @param concurrencyLevel the estimated number of concurrently updating + * threads. The implementation performs internal + * sizing to try to accommodate this many threads. + * @throws IllegalArgumentException if the initial capacity is negative or + * the load factor or concurrencyLevel are + * nonpositive. + */ + public ConcurrentIdentityHashMap( + int initialCapacity, float loadFactor, + int concurrencyLevel) { + if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) { + throw new IllegalArgumentException(); + } + + if (concurrencyLevel > MAX_SEGMENTS) { + concurrencyLevel = MAX_SEGMENTS; + } + + // Find power-of-two sizes best matching arguments + int sshift = 0; + int ssize = 1; + while (ssize < concurrencyLevel) { + ++ sshift; + ssize <<= 1; + } + segmentShift = 32 - sshift; + segmentMask = ssize - 1; + this.segments = Segment.newArray(ssize); + + if (initialCapacity > MAXIMUM_CAPACITY) { + initialCapacity = MAXIMUM_CAPACITY; + } + int c = initialCapacity / ssize; + if (c * ssize < initialCapacity) { + ++ c; + } + int cap = 1; + while (cap < c) { + cap <<= 1; + } + + for (int i = 0; i < this.segments.length; ++ i) { + this.segments[i] = new Segment(cap, loadFactor); + } + } + + + /** + * Creates a new, empty map with the specified initial capacity and load + * factor and with the default reference types (weak keys, strong values), + * and concurrencyLevel (16). + * + * @param initialCapacity The implementation performs internal sizing to + * accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of + * elements per bin exceeds this threshold. + * @throws IllegalArgumentException if the initial capacity of elements is + * negative or the load factor is + * nonpositive + */ + public ConcurrentIdentityHashMap(int initialCapacity, float loadFactor) { + this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new, empty map with the specified initial capacity, and with + * default reference types (weak keys, strong values), load factor (0.75) + * and concurrencyLevel (16). + * + * @param initialCapacity the initial capacity. The implementation performs + * internal sizing to accommodate this many elements. + * @throws IllegalArgumentException if the initial capacity of elements is + * negative. + */ + public ConcurrentIdentityHashMap(int initialCapacity) { + this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new, empty map with a default initial capacity (16), reference + * types (weak keys, strong values), default load factor (0.75) and + * concurrencyLevel (16). + */ + public ConcurrentIdentityHashMap() { + this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new map with the same mappings as the given map. The map is + * created with a capacity of 1.5 times the number of mappings in the given + * map or 16 (whichever is greater), and a default load factor (0.75) and + * concurrencyLevel (16). + * + * @param m the map + */ + public ConcurrentIdentityHashMap(Map m) { + this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, + DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, + DEFAULT_CONCURRENCY_LEVEL); + putAll(m); + } + + /** + * Returns true if this map contains no key-value mappings. + * + * @return true if this map contains no key-value mappings + */ + @Override + public boolean isEmpty() { + final Segment[] segments = this.segments; + /* + * We keep track of per-segment modCounts to avoid ABA problems in which + * an element in one segment was added and in another removed during + * traversal, in which case the table was never actually empty at any + * point. Note the similar use of modCounts in the size() and + * containsValue() methods, which are the only other methods also + * susceptible to ABA problems. + */ + int[] mc = new int[segments.length]; + int mcsum = 0; + for (int i = 0; i < segments.length; ++ i) { + if (segments[i].count != 0) { + return false; + } else { + mcsum += mc[i] = segments[i].modCount; + } + } + // If mcsum happens to be zero, then we know we got a snapshot before + // any modifications at all were made. This is probably common enough + // to bother tracking. + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++ i) { + if (segments[i].count != 0 || mc[i] != segments[i].modCount) { + return false; + } + } + } + return true; + } + + /** + * Returns the number of key-value mappings in this map. If the map contains + * more than Integer.MAX_VALUE elements, returns + * Integer.MAX_VALUE. + * + * @return the number of key-value mappings in this map + */ + @Override + public int size() { + final Segment[] segments = this.segments; + long sum = 0; + long check = 0; + int[] mc = new int[segments.length]; + // Try a few times to get accurate count. On failure due to continuous + // async changes in table, resort to locking. + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++ k) { + check = 0; + sum = 0; + int mcsum = 0; + for (int i = 0; i < segments.length; ++ i) { + sum += segments[i].count; + mcsum += mc[i] = segments[i].modCount; + } + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++ i) { + check += segments[i].count; + if (mc[i] != segments[i].modCount) { + check = -1; // force retry + break; + } + } + } + if (check == sum) { + break; + } + } + if (check != sum) { // Resort to locking all segments + sum = 0; + for (int i = 0; i < segments.length; ++ i) { + segments[i].lock(); + } + for (int i = 0; i < segments.length; ++ i) { + sum += segments[i].count; + } + for (int i = 0; i < segments.length; ++ i) { + segments[i].unlock(); + } + } + if (sum > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) sum; + } + } + + /** + * Returns the value to which the specified key is mapped, or {@code null} + * if this map contains no mapping for the key. + * + *

More formally, if this map contains a mapping from a key {@code k} to + * a value {@code v} such that {@code key.equals(k)}, then this method + * returns {@code v}; otherwise it returns {@code null}. (There can be at + * most one such mapping.) + * + * @throws NullPointerException if the specified key is null + */ + @Override + public V get(Object key) { + int hash = hashOf(key); + return segmentFor(hash).get(key, hash); + } + + /** + * Tests if the specified object is a key in this table. + * + * @param key possible key + * @return true if and only if the specified object is a key in + * this table, as determined by the equals method; + * false otherwise. + * @throws NullPointerException if the specified key is null + */ + @Override + public boolean containsKey(Object key) { + int hash = hashOf(key); + return segmentFor(hash).containsKey(key, hash); + } + + /** + * Returns true if this map maps one or more keys to the specified + * value. Note: This method requires a full internal traversal of the hash + * table, and so is much slower than method containsKey. + * + * @param value value whose presence in this map is to be tested + * @return true if this map maps one or more keys to the specified + * value + * @throws NullPointerException if the specified value is null + */ + + @Override + public boolean containsValue(Object value) { + if (value == null) { + throw new NullPointerException(); + } + + // See explanation of modCount use above + + final Segment[] segments = this.segments; + int[] mc = new int[segments.length]; + + // Try a few times without locking + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++ k) { + int mcsum = 0; + for (int i = 0; i < segments.length; ++ i) { + mcsum += mc[i] = segments[i].modCount; + if (segments[i].containsValue(value)) { + return true; + } + } + boolean cleanSweep = true; + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++ i) { + if (mc[i] != segments[i].modCount) { + cleanSweep = false; + break; + } + } + } + if (cleanSweep) { + return false; + } + } + // Resort to locking all segments + for (int i = 0; i < segments.length; ++ i) { + segments[i].lock(); + } + boolean found = false; + try { + for (int i = 0; i < segments.length; ++ i) { + if (segments[i].containsValue(value)) { + found = true; + break; + } + } + } finally { + for (int i = 0; i < segments.length; ++ i) { + segments[i].unlock(); + } + } + return found; + } + + /** + * Legacy method testing if some key maps into the specified value in this + * table. This method is identical in functionality to + * {@link #containsValue}, and exists solely to ensure full compatibility + * with class {@link java.util.Hashtable}, which supported this method prior to + * introduction of the Java Collections framework. + * + * @param value a value to search for + * @return true if and only if some key maps to the value + * argument in this table as determined by the equals + * method; false otherwise + * @throws NullPointerException if the specified value is null + */ + public boolean contains(Object value) { + return containsValue(value); + } + + /** + * Maps the specified key to the specified value in this table. Neither the + * key nor the value can be null. + * + *

The value can be retrieved by calling the get method with a + * key that is equal to the original key. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @return the previous value associated with key, or null + * if there was no mapping for key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V put(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).put(key, hash, value, false); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, or + * null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + public V putIfAbsent(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).put(key, hash, value, true); + } + + /** + * Copies all of the mappings from the specified map to this one. These + * mappings replace any mappings that this map had for any of the keys + * currently in the specified map. + * + * @param m mappings to be stored in this map + */ + @Override + public void putAll(Map m) { + for (Entry e: m.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + /** + * Removes the key (and its corresponding value) from this map. This method + * does nothing if the key is not in the map. + * + * @param key the key that needs to be removed + * @return the previous value associated with key, or null + * if there was no mapping for key + * @throws NullPointerException if the specified key is null + */ + @Override + public V remove(Object key) { + int hash = hashOf(key); + return segmentFor(hash).remove(key, hash, null, false); + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if the specified key is null + */ + public boolean remove(Object key, Object value) { + int hash = hashOf(key); + if (value == null) { + return false; + } + return segmentFor(hash).remove(key, hash, value, false) != null; + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if any of the arguments are null + */ + public boolean replace(K key, V oldValue, V newValue) { + if (oldValue == null || newValue == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).replace(key, hash, oldValue, newValue); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, or + * null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + public V replace(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).replace(key, hash, value); + } + + /** + * Removes all of the mappings from this map. + */ + @Override + public void clear() { + for (int i = 0; i < segments.length; ++ i) { + segments[i].clear(); + } + } + + /** + * Returns a {@link java.util.Set} view of the keys contained in this map. The set is + * backed by the map, so changes to the map are reflected in the set, and + * vice-versa. The set supports element removal, which removes the + * corresponding mapping from this map, via the Iterator.remove, + * Set.remove, removeAll, retainAll, and + * clear operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. + */ + @Override + public Set keySet() { + Set ks = keySet; + return ks != null? ks : (keySet = new KeySet()); + } + + /** + * Returns a {@link java.util.Collection} view of the values contained in this map. + * The collection is backed by the map, so changes to the map are reflected + * in the collection, and vice-versa. The collection supports element + * removal, which removes the corresponding mapping from this map, via the + * Iterator.remove, Collection.remove, removeAll, + * retainAll, and clear operations. It does not support + * the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. + */ + @Override + public Collection values() { + Collection vs = values; + return vs != null? vs : (values = new Values()); + } + + /** + * Returns a {@link java.util.Set} view of the mappings contained in this map. + * The set is backed by the map, so changes to the map are reflected in the + * set, and vice-versa. The set supports element removal, which removes the + * corresponding mapping from the map, via the Iterator.remove, + * Set.remove, removeAll, retainAll, and + * clear operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. + */ + @Override + public Set> entrySet() { + Set> es = entrySet; + return es != null? es : (entrySet = new EntrySet()); + } + + /** + * Returns an enumeration of the keys in this table. + * + * @return an enumeration of the keys in this table + * @see #keySet() + */ + public Enumeration keys() { + return new KeyIterator(); + } + + /** + * Returns an enumeration of the values in this table. + * + * @return an enumeration of the values in this table + * @see #values() + */ + public Enumeration elements() { + return new ValueIterator(); + } + + /* ---------------- Iterator Support -------------- */ + + abstract class HashIterator { + int nextSegmentIndex; + int nextTableIndex; + HashEntry[] currentTable; + HashEntry nextEntry; + HashEntry lastReturned; + K currentKey; // Strong reference to weak key (prevents gc) + + HashIterator() { + nextSegmentIndex = segments.length - 1; + nextTableIndex = -1; + advance(); + } + + public void rewind() { + nextSegmentIndex = segments.length - 1; + nextTableIndex = -1; + currentTable = null; + nextEntry = null; + lastReturned = null; + currentKey = null; + advance(); + } + + public boolean hasMoreElements() { + return hasNext(); + } + + final void advance() { + if (nextEntry != null && (nextEntry = nextEntry.next) != null) { + return; + } + + while (nextTableIndex >= 0) { + if ((nextEntry = currentTable[nextTableIndex --]) != null) { + return; + } + } + + while (nextSegmentIndex >= 0) { + Segment seg = segments[nextSegmentIndex --]; + if (seg.count != 0) { + currentTable = seg.table; + for (int j = currentTable.length - 1; j >= 0; -- j) { + if ((nextEntry = currentTable[j]) != null) { + nextTableIndex = j - 1; + return; + } + } + } + } + } + + public boolean hasNext() { + while (nextEntry != null) { + if (nextEntry.key() != null) { + return true; + } + advance(); + } + + return false; + } + + HashEntry nextEntry() { + do { + if (nextEntry == null) { + throw new NoSuchElementException(); + } + + lastReturned = nextEntry; + currentKey = lastReturned.key(); + advance(); + } while (currentKey == null); // Skip GC'd keys + + return lastReturned; + } + + public void remove() { + if (lastReturned == null) { + throw new IllegalStateException(); + } + ConcurrentIdentityHashMap.this.remove(currentKey); + lastReturned = null; + } + } + + final class KeyIterator + extends HashIterator implements ReusableIterator, Enumeration { + + public K next() { + return super.nextEntry().key(); + } + + public K nextElement() { + return super.nextEntry().key(); + } + } + + final class ValueIterator + extends HashIterator implements ReusableIterator, Enumeration { + + public V next() { + return super.nextEntry().value(); + } + + public V nextElement() { + return super.nextEntry().value(); + } + } + + /* + * This class is needed for JDK5 compatibility. + */ + static class SimpleEntry implements Entry { + + private static final long serialVersionUID = -8144765946475398746L; + + private final K key; + + private V value; + + public SimpleEntry(K key, V value) { + this.key = key; + this.value = value; + + } + + public SimpleEntry(Entry entry) { + this.key = entry.getKey(); + this.value = entry.getValue(); + + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + public V setValue(V value) { + V oldValue = this.value; + this.value = value; + return oldValue; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Entry)) { + return false; + } + @SuppressWarnings("rawtypes") + Entry e = (Entry) o; + return eq(key, e.getKey()) && eq(value, e.getValue()); + } + + @Override + public int hashCode() { + return (key == null? 0 : key.hashCode()) ^ (value == null? 0 : value.hashCode()); + } + + @Override + public String toString() { + return key + "=" + value; + } + + private static boolean eq(Object o1, Object o2) { + return o1 == null? o2 == null : o1.equals(o2); + } + } + + /** + * Custom Entry class used by EntryIterator.next(), that relays setValue + * changes to the underlying map. + */ + final class WriteThroughEntry extends SimpleEntry { + + WriteThroughEntry(K k, V v) { + super(k, v); + } + + /** + * Set our entry's value and write through to the map. The value to + * return is somewhat arbitrary here. Since a WriteThroughEntry does not + * necessarily track asynchronous changes, the most recent "previous" + * value could be different from what we return (or could even have been + * removed in which case the put will re-establish). We do not and can + * not guarantee more. + */ + @Override + public V setValue(V value) { + + if (value == null) { + throw new NullPointerException(); + } + V v = super.setValue(value); + ConcurrentIdentityHashMap.this.put(getKey(), value); + return v; + } + + } + + final class EntryIterator extends HashIterator implements + ReusableIterator> { + public Entry next() { + HashEntry e = super.nextEntry(); + return new WriteThroughEntry(e.key(), e.value()); + } + } + + final class KeySet extends AbstractSet { + @Override + public Iterator iterator() { + + return new KeyIterator(); + } + + @Override + public int size() { + return ConcurrentIdentityHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentIdentityHashMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ConcurrentIdentityHashMap.this.containsKey(o); + } + + @Override + public boolean remove(Object o) { + return ConcurrentIdentityHashMap.this.remove(o) != null; + + } + + @Override + public void clear() { + ConcurrentIdentityHashMap.this.clear(); + } + } + + final class Values extends AbstractCollection { + @Override + public Iterator iterator() { + return new ValueIterator(); + } + + @Override + public int size() { + return ConcurrentIdentityHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentIdentityHashMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ConcurrentIdentityHashMap.this.containsValue(o); + } + + @Override + public void clear() { + ConcurrentIdentityHashMap.this.clear(); + } + } + + final class EntrySet extends AbstractSet> { + @Override + public Iterator> iterator() { + return new EntryIterator(); + } + + @Override + public boolean contains(Object o) { + if (!(o instanceof Entry)) { + return false; + } + Entry e = (Entry) o; + V v = ConcurrentIdentityHashMap.this.get(e.getKey()); + return v != null && v.equals(e.getValue()); + } + + @Override + public boolean remove(Object o) { + if (!(o instanceof Entry)) { + return false; + } + Entry e = (Entry) o; + return ConcurrentIdentityHashMap.this.remove(e.getKey(), e.getValue()); + } + + @Override + public int size() { + return ConcurrentIdentityHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentIdentityHashMap.this.isEmpty(); + } + + @Override + public void clear() { + ConcurrentIdentityHashMap.this.clear(); + } + } +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java new file mode 100644 index 0000000000..210edbe65d --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java @@ -0,0 +1,27 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util.internal; + +import java.util.Iterator; + +/** + * @author The Netty Project + * @author Trustin Lee + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + */ +public interface ReusableIterator extends Iterator { + void rewind(); +} diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java new file mode 100644 index 0000000000..bf3e2ac571 --- /dev/null +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java @@ -0,0 +1,89 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.akka.util.internal; + +import java.util.regex.Pattern; + +/** + * Accesses the system property swallowing a {@link SecurityException}. + * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev: 2161 $, $Date: 2010-02-18 11:12:15 +0900 (Thu, 18 Feb 2010) $ + * + */ +public class SystemPropertyUtil { + + /** + * Returns the value of the Java system property with the specified + * {@code key}. + * + * @return the property value. + * {@code null} if there's no such property or if an access to the + * specified property is not allowed. + */ + public static String get(String key) { + try { + return System.getProperty(key); + } catch (Exception e) { + return null; + } + } + + /** + * Returns the value of the Java system property with the specified + * {@code key}, while falling back to the specified default value if + * the property access fails. + * + * @return the property value. + * {@code def} if there's no such property or if an access to the + * specified property is not allowed. + */ + public static String get(String key, String def) { + String value = get(key); + if (value == null) { + value = def; + } + return value; + } + + /** + * Returns the value of the Java system property with the specified + * {@code key}, while falling back to the specified default value if + * the property access fails. + * + * @return the property value. + * {@code def} if there's no such property or if an access to the + * specified property is not allowed. + */ + public static int get(String key, int def) { + String value = get(key); + if (value == null) { + return def; + } + + if (Pattern.matches("-?[0-9]+", value)) { + return Integer.parseInt(value); + } else { + return def; + } + } + + private SystemPropertyUtil() { + // Unused + } +} diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index dcc3b74731..d9ad8706b2 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -5,7 +5,6 @@ package akka import akka.config._ import akka.actor._ -import akka.dispatch._ import akka.event._ import akka.util.duration._ import java.net.InetAddress @@ -13,10 +12,8 @@ import com.eaio.uuid.UUID import akka.dispatch.{ Dispatchers, Future } import akka.util.Duration import akka.util.ReflectiveAccess -import akka.routing.Routing import akka.serialization.Serialization -import java.net.InetSocketAddress -import remote.{ RemoteAddress, RemoteSupport } +import remote.{ RemoteAddress } object AkkaApplication { @@ -167,8 +164,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor def port: Int = defaultAddress.port - def address: String = hostname + ":" + port.toString - // this provides basic logging (to stdout) until .start() is called below val mainbus = new MainBus(DebugMainBus) mainbus.startStdoutLogger(AkkaConfig) @@ -176,8 +171,10 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) + implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher - def terminationFuture: Future[ExitStatus] = provider.terminationFuture + + def scheduler = provider.scheduler // TODO think about memory consistency effects when doing funky stuff inside constructor val reflective = new ReflectiveAccess(this) @@ -190,6 +187,8 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO think about memory consistency effects when doing funky stuff inside constructor val provider: ActorRefProvider = reflective.createProvider + def terminationFuture: Future[ExitStatus] = provider.terminationFuture + private class Guardian extends Actor { def receive = { case Terminated(_) ⇒ context.self.stop() @@ -243,9 +242,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO think about memory consistency effects when doing funky stuff inside constructor val serialization = new Serialization(this) - val scheduler = new DefaultScheduler - terminationFuture.onComplete(_ ⇒ scheduler.shutdown()) - /** * Create an actor path under the application supervisor (/app). */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f16a9867c8..68d082d4d5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -5,11 +5,9 @@ package akka.actor import akka.dispatch._ -import akka.util._ import scala.annotation.tailrec import scala.collection.immutable.{ Stack, TreeMap } -import scala.collection.JavaConverters -import java.util.concurrent.{ ScheduledFuture, TimeUnit } +import java.util.concurrent.TimeUnit import akka.AkkaApplication import akka.event.Logging.{ Debug, Warning, Error } @@ -79,7 +77,7 @@ private[akka] class ActorCell( final def provider = app.provider - var futureTimeout: Option[ScheduledFuture[AnyRef]] = None + var futureTimeout: Option[Cancellable] = None var childrenRefs = emptyChildrenRefs @@ -348,7 +346,7 @@ private[akka] class ActorCell( final def cancelReceiveTimeout() { if (futureTimeout.isDefined) { - futureTimeout.get.cancel(true) + futureTimeout.get.cancel() futureTimeout = None } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 19223416cf..781dd3e114 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -12,6 +12,7 @@ import akka.AkkaApplication import akka.serialization.Serialization import java.net.InetSocketAddress import akka.remote.RemoteAddress +import java.util.concurrent.TimeUnit /** * ActorRef is an immutable and serializable handle to an Actor. @@ -171,7 +172,7 @@ class LocalActorRef private[akka] ( def name = path.name - def address: String = _app.address + path.toString + def address: String = _app.defaultAddress + path.toString private[this] val actorCell = new ActorCell(_app, this, props, _supervisor, receiveTimeout, hotswap) actorCell.start() @@ -379,7 +380,7 @@ class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef { // FIXME (actor path): put this under the sys guardian supervisor val path: ActorPath = app.root / "sys" / name - def address: String = app.address + path.toString + def address: String = app.defaultAddress + path.toString override def isShutdown(): Boolean = true @@ -401,7 +402,7 @@ abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout // FIXME (actor path): put this under the tmp guardian supervisor val path: ActorPath = app.root / "tmp" / name - def address: String = app.address + path.toString + def address: String = app.defaultAddress + path.toString { val callback: Future[Any] ⇒ Unit = { _ ⇒ app.deathWatch.publish(Terminated(AskActorRef.this)); whenDone() } @@ -431,4 +432,4 @@ abstract class AskActorRef(protected val app: AkkaApplication)(timeout: Timeout @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = app.provider.serialize(this) -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 3c1f185a69..f5662f9f57 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -8,12 +8,13 @@ import akka.config.ConfigurationException import akka.util.ReflectiveAccess import akka.routing._ import akka.AkkaApplication -import java.util.concurrent.ConcurrentHashMap import com.eaio.uuid.UUID import akka.AkkaException -import akka.event.{ ActorClassification, DeathWatch, Logging } import akka.dispatch._ import scala.annotation.tailrec +import org.jboss.netty.akka.util.HashedWheelTimer +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentHashMap } +import akka.event.{ LoggingAdapter, ActorClassification, DeathWatch, Logging } /** * Interface for all ActorRef providers to implement. @@ -31,6 +32,8 @@ trait ActorRefProvider { */ private[akka] def deployer: Deployer + private[akka] def scheduler: Scheduler + private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef @@ -101,10 +104,17 @@ class ActorRefProviderException(message: String) extends AkkaException(message) */ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { + val log = Logging(app.mainbus, this) + private[akka] val deployer: Deployer = new Deployer(app) val terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher) - val log = Logging(app.mainbus, this) + + private[akka] val scheduler: Scheduler = { //TODO FIXME Make this configurable + val s = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512)) + terminationFuture.onComplete(_ ⇒ s.stop()) + s + } /** * Top-level anchor for the supervision hierarchy of this actor system. Will @@ -119,7 +129,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { // FIXME (actor path): move the root path to the new root guardian val path = app.root - val address = app.address + path.toString + val address = app.defaultAddress + path.toString override def toString = name @@ -294,3 +304,58 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { } else true } } + +import org.jboss.netty.akka.util.{ HashedWheelTimer, TimerTask } +class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { + + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) + + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) + + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) + + def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) + + def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) + + private def createSingleTask(runnable: Runnable): TimerTask = + new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } + + private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = + new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } + + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { + new TimerTask { + def run(timeout: org.jboss.netty.akka.util.Timeout) { + receiver ! message + timeout.getTimer.newTimeout(this, delay, timeUnit) + } + } + } + + private def createSingleTask(f: () ⇒ Unit): TimerTask = + new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } + + private def createContinuousTask(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): TimerTask = { + new TimerTask { + def run(timeout: org.jboss.netty.akka.util.Timeout) { + f() + timeout.getTimer.newTimeout(this, delay, timeUnit) + } + } + } + + private[akka] def stop() = hashedWheelTimer.stop() +} + +class DefaultCancellable(timeout: org.jboss.netty.akka.util.Timeout) extends Cancellable { + def cancel() { timeout.cancel() } + + def isCancelled: Boolean = { timeout.isCancelled } +} + diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index a6a119cef6..682f5ae3c6 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -4,11 +4,10 @@ package akka.actor import akka.util._ -import akka.event.Logging import scala.collection.mutable -import java.util.concurrent.ScheduledFuture import akka.AkkaApplication +import akka.event.Logging object FSM { @@ -31,7 +30,7 @@ object FSM { case class TimeoutMarker(generation: Long) case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit app: AkkaApplication) { - private var ref: Option[ScheduledFuture[AnyRef]] = _ + private var ref: Option[Cancellable] = _ def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { @@ -43,7 +42,7 @@ object FSM { def cancel { if (ref.isDefined) { - ref.get.cancel(true) + ref.get.cancel() ref = None } } @@ -393,7 +392,7 @@ trait FSM[S, D] extends ListenerManagement { * FSM State data and current timeout handling */ private var currentState: State = _ - private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + private var timeoutFuture: Option[Cancellable] = None private var generation: Long = 0L /* @@ -458,7 +457,7 @@ trait FSM[S, D] extends ListenerManagement { case t @ Timer(name, msg, repeat, gen) ⇒ if ((timers contains name) && (timers(name).generation == gen)) { if (timeoutFuture.isDefined) { - timeoutFuture.get.cancel(true) + timeoutFuture.get.cancel() timeoutFuture = None } generation += 1 @@ -476,7 +475,7 @@ trait FSM[S, D] extends ListenerManagement { removeListener(actorRef) case value ⇒ { if (timeoutFuture.isDefined) { - timeoutFuture.get.cancel(true) + timeoutFuture.get.cancel() timeoutFuture = None } generation += 1 diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index ae9b86695d..d12dcb6329 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -15,132 +15,40 @@ */ package akka.actor -import akka.AkkaException -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent._ import akka.util.Duration +import akka.AkkaException case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { def this(msg: String) = this(msg, null) } trait JScheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable } abstract class Scheduler extends JScheduler { + def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] + def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] = + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] = + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): ScheduledFuture[AnyRef] = + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = scheduleOnce(receiver, message, delay.length, delay.unit) - def scheduleOnce(f: () ⇒ Unit, delay: Duration): ScheduledFuture[AnyRef] = + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = scheduleOnce(f, delay.length, delay.unit) } -class DefaultScheduler extends Scheduler { - private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = new Runnable { - def run = { - receiver ! message - if (throwWhenReceiverExpired && receiver.isShutdown) throw new ActorKilledException("Receiver was terminated") - } - } +trait Cancellable { + def cancel(): Unit - private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - - /** - * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay. - * The returned java.util.concurrent.ScheduledFuture can be used to cancel the - * send of the message. - */ - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.scheduleAtFixedRate(createSendRunnable(receiver, message, true), initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception ⇒ throw SchedulerException(message + " could not be scheduled on " + receiver, e) - } - } - - /** - * Schedules to run specified function to the receiver after initialDelay and then repeated after delay, - * avoid blocking operations since this is executed in the schedulers thread. - * The returned java.util.concurrent.ScheduledFuture can be used to cancel the - * execution of the function. - */ - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = - schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit) - - /** - * Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay, - * avoid blocking operations since this is executed in the schedulers thread. - * The returned java.util.concurrent.ScheduledFuture can be used to cancel the - * execution of the runnable. - */ - def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception ⇒ throw SchedulerException("Failed to schedule a Runnable", e) - } - } - - /** - * Schedules to send the specified message to the receiver after delay. - * The returned java.util.concurrent.ScheduledFuture can be used to cancel the - * send of the message. - */ - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.schedule(createSendRunnable(receiver, message, false), delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception ⇒ throw SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e) - } - } - - /** - * Schedules a function to be run after delay, - * avoid blocking operations since the runnable is executed in the schedulers thread. - * The returned java.util.concurrent.ScheduledFuture can be used to cancel the - * execution of the function. - */ - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = - scheduleOnce(new Runnable { def run = f() }, delay, timeUnit) - - /** - * Schedules a runnable to be run after delay, - * avoid blocking operations since the runnable is executed in the schedulers thread. - * The returned java.util.concurrent.ScheduledFuture can be used to cancel the - * execution of the runnable. - */ - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.schedule(runnable, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception ⇒ throw SchedulerException("Failed to scheduleOnce a Runnable", e) - } - } - - private[akka] def shutdown() { service.shutdownNow() } -} - -private object SchedulerThreadFactory extends ThreadFactory { - private val count = new AtomicLong(0) - val threadFactory = Executors.defaultThreadFactory() - - def newThread(r: Runnable): Thread = { - val thread = threadFactory.newThread(r) - thread.setName("akka:scheduler-" + count.incrementAndGet()) - thread.setDaemon(true) - thread - } -} + def isCancelled: Boolean +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 8051bd2336..945c3df5b1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,7 +21,7 @@ import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicReference, AtomicBoolean } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(message: String) = this(message, null) @@ -985,7 +985,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } } val timeoutFuture = dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) - onComplete(_ ⇒ timeoutFuture.cancel(true)) + onComplete(_ ⇒ timeoutFuture.cancel()) false } else true } else false diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index a1d5eb57ee..46b7c1d09c 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -29,7 +29,8 @@ object RemoteAddress { trait RemoteAddress extends Serializable { def hostname: String def port: Int - override def toString = "" + hostname + ":" + port + @transient + override lazy val toString = "" + hostname + ":" + port } class RemoteException(message: String) extends AkkaException(message) diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index f98b2ffa8e..f158cf1b02 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -4,7 +4,6 @@ package akka.remote -import akka.AkkaApplication import akka.actor._ import akka.actor.Status._ import akka.event.Logging @@ -107,7 +106,6 @@ class Gossiper(remote: Remote) { private val failureDetector = remote.failureDetector private val connectionManager = new RemoteConnectionManager(app, remote, Map.empty[RemoteAddress, ActorRef]) private val seeds = Set(address) // FIXME read in list of seeds from config - private val scheduler = new DefaultScheduler private val address = app.defaultAddress private val nodeFingerprint = address.## @@ -124,8 +122,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) - scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) + app.scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) + app.scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 496dcc051b..de31074295 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -51,6 +51,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider def defaultDispatcher = app.dispatcher def defaultTimeout = app.AkkaConfig.ActorTimeout + def scheduler: Scheduler = local.scheduler + private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = actorOf(props, supervisor, supervisor.path / name, systemService)