merged with upstream

This commit is contained in:
Jonas Bonér 2011-03-02 18:37:41 +01:00
commit 9acb5112a1
36 changed files with 1385 additions and 889 deletions

View file

@ -0,0 +1,215 @@
/*
* Hex.java
*
* Created 04.07.2003.
*
* eaio: UUID - an implementation of the UUID specification Copyright (c) 2003-2009 Johann Burkard (jb@eaio.com)
* http://eaio.com.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
package com.eaio.util.lang;
import java.io.IOException;
/**
* Number-to-hexadecimal and hexadecimal-to-number conversions.
*
* @see <a href="http://johannburkard.de/software/uuid/">UUID</a>
* @author <a href="mailto:jb@eaio.com">Johann Burkard</a>
* @version $Id: Hex.java 1888 2009-03-15 12:43:24Z johann $
*/
public final class Hex {
/**
* No instances needed.
*/
private Hex() {
super();
}
private static final char[] DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
'f' };
/**
* Turns a <code>short</code> into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param in the integer
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, short in) {
return append(a, (long) in, 4);
}
/**
* Turns a <code>short</code> into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param in the integer
* @param length the number of octets to produce
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, short in, int length) {
return append(a, (long) in, length);
}
/**
* Turns an <code>int</code> into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param in the integer
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, int in) {
return append(a, (long) in, 8);
}
/**
* Turns an <code>int</code> into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param in the integer
* @param length the number of octets to produce
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, int in, int length) {
return append(a, (long) in, length);
}
/**
* Turns a <code>long</code> into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param in the long
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, long in) {
return append(a, in, 16);
}
/**
* Turns a <code>long</code> into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param in the long
* @param length the number of octets to produce
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, long in, int length) {
try {
int lim = (length << 2) - 4;
while (lim >= 0) {
a.append(DIGITS[(byte) (in >> lim) & 0x0f]);
lim -= 4;
}
}
catch (IOException ex) {
// Bla
}
return a;
}
/**
* Turns a <code>byte</code> array into hex octets.
*
* @param a the {@link Appendable}, may not be <code>null</code>
* @param bytes the <code>byte</code> array
* @return {@link Appendable}
*/
public static Appendable append(Appendable a, byte[] bytes) {
try {
for (byte b : bytes) {
a.append(DIGITS[(byte) ((b & 0xF0) >> 4)]);
a.append(DIGITS[(byte) (b & 0x0F)]);
}
}
catch (IOException ex) {
// Bla
}
return a;
}
/**
* Parses a <code>long</code> from a hex encoded number. This method will skip all characters that are not 0-9,
* A-F and a-f.
* <p>
* Returns 0 if the {@link CharSequence} does not contain any interesting characters.
*
* @param s the {@link CharSequence} to extract a <code>long</code> from, may not be <code>null</code>
* @return a <code>long</code>
* @throws NullPointerException if the {@link CharSequence} is <code>null</code>
*/
public static long parseLong(CharSequence s) {
long out = 0;
byte shifts = 0;
char c;
for (int i = 0; i < s.length() && shifts < 16; i++) {
c = s.charAt(i);
if ((c > 47) && (c < 58)) {
++shifts;
out <<= 4;
out |= c - 48;
}
else if ((c > 64) && (c < 71)) {
++shifts;
out <<= 4;
out |= c - 55;
}
else if ((c > 96) && (c < 103)) {
++shifts;
out <<= 4;
out |= c - 87;
}
}
return out;
}
/**
* Parses a <code>short</code> from a hex encoded number. This method will skip all characters that are not 0-9,
* A-F and a-f.
* <p>
* Returns 0 if the {@link CharSequence} does not contain any interesting characters.
*
* @param s the {@link CharSequence} to extract a <code>short</code> from, may not be <code>null</code>
* @return a <code>short</code>
* @throws NullPointerException if the {@link CharSequence} is <code>null</code>
*/
public static short parseShort(String s) {
short out = 0;
byte shifts = 0;
char c;
for (int i = 0; i < s.length() && shifts < 4; i++) {
c = s.charAt(i);
if ((c > 47) && (c < 58)) {
++shifts;
out <<= 4;
out |= c - 48;
}
else if ((c > 64) && (c < 71)) {
++shifts;
out <<= 4;
out |= c - 55;
}
else if ((c > 96) && (c < 103)) {
++shifts;
out <<= 4;
out |= c - 87;
}
}
return out;
}
}

View file

@ -0,0 +1,116 @@
/*
* MACAddressParserTest.java
*
* Created 30.01.2006.
*
* eaio: UUID - an implementation of the UUID specification
* Copyright (c) 2003-2009 Johann Burkard (jb@eaio.com) http://eaio.com.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
package com.eaio.uuid;
/**
* The MAC address parser attempts to find the following patterns:
* <ul>
* <li>.{1,2}:.{1,2}:.{1,2}:.{1,2}:.{1,2}:.{1,2}</li>
* <li>.{1,2}-.{1,2}-.{1,2}-.{1,2}-.{1,2}-.{1,2}</li>
* </ul>
*
* @see <a href="http://johannburkard.de/software/uuid/">UUID</a>
* @author <a href="mailto:jb@eaio.com">Johann Burkard</a>
* @version $Id: MACAddressParser.java 1888 2009-03-15 12:43:24Z johann $
*/
class MACAddressParser {
/**
* No instances needed.
*/
private MACAddressParser() {
super();
}
/**
* Attempts to find a pattern in the given String.
*
* @param in the String, may not be <code>null</code>
* @return the substring that matches this pattern or <code>null</code>
*/
static String parse(String in) {
String out = in;
// lanscan
int hexStart = out.indexOf("0x");
if (hexStart != -1 && out.indexOf("ETHER") != -1) {
int hexEnd = out.indexOf(' ', hexStart);
if (hexEnd > hexStart + 2) {
out = out.substring(hexStart, hexEnd);
}
}
else {
int octets = 0;
int lastIndex, old, end;
if (out.indexOf('-') > -1) {
out = out.replace('-', ':');
}
lastIndex = out.lastIndexOf(':');
if (lastIndex > out.length() - 2) {
out = null;
}
else {
end = Math.min(out.length(), lastIndex + 3);
++octets;
old = lastIndex;
while (octets != 5 && lastIndex != -1 && lastIndex > 1) {
lastIndex = out.lastIndexOf(':', --lastIndex);
if (old - lastIndex == 3 || old - lastIndex == 2) {
++octets;
old = lastIndex;
}
}
if (octets == 5 && lastIndex > 1) {
out = out.substring(lastIndex - 2, end).trim();
}
else {
out = null;
}
}
}
if (out != null && out.startsWith("0x")) {
out = out.substring(2);
}
return out;
}
}

View file

@ -0,0 +1,311 @@
/*
* UUID.java
*
* Created 07.02.2003
*
* eaio: UUID - an implementation of the UUID specification
* Copyright (c) 2003-2009 Johann Burkard (jb@eaio.com) http://eaio.com.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
package com.eaio.uuid;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.omg.CORBA.portable.IDLEntity;
import com.eaio.util.lang.Hex;
/**
* Creates UUIDs according to the DCE Universal Token Identifier specification.
* <p>
* All you need to know:
* <pre>
* UUID u = new UUID();
* </pre>
*
* @see <a href="http://www.opengroup.org/onlinepubs/9629399/apdxa.htm">
* http://www.opengroup.org/onlinepubs/9629399/apdxa.htm
* </a>
* @see <a href="http://www.uddi.org/pubs/draft-leach-uuids-guids-01.txt">
* http://www.uddi.org/pubs/draft-leach-uuids-guids-01.txt
* </a>
* @see <a href="http://johannburkard.de/software/uuid/">UUID</a>
* @author <a href="mailto:jb@eaio.de">Johann Burkard</a>
* @version $Id: UUID.java 1888 2009-03-15 12:43:24Z johann $
*/
public class UUID implements Comparable<UUID>, Serializable, Cloneable,
IDLEntity {
/**
* Hasn't ever changed between versions.
*/
static final long serialVersionUID = 7435962790062944603L;
/**
* The time field of the UUID.
*
* @serial
*/
public long time;
/**
* The clock sequence and node field of the UUID.
*
* @serial
*/
public long clockSeqAndNode;
/**
* Constructor for UUID. Constructs a new, unique UUID.
*
* @see UUIDGen#newTime()
* @see UUIDGen#getClockSeqAndNode()
*/
public UUID() {
this(UUIDGen.newTime(), UUIDGen.getClockSeqAndNode());
}
/**
* Constructor for UUID. Constructs a UUID from two <code>long</code> values.
*
* @param time the upper 64 bits
* @param clockSeqAndNode the lower 64 bits
*/
public UUID(long time, long clockSeqAndNode) {
this.time = time;
this.clockSeqAndNode = clockSeqAndNode;
}
/**
* Copy constructor for UUID. Values of the given UUID are copied.
*
* @param u the UUID, may not be <code>null</code>
*/
public UUID(UUID u) {
this(u.time, u.clockSeqAndNode);
}
/**
* Parses a textual representation of a UUID.
* <p>
* No validation is performed. If the {@link CharSequence} is shorter than 36 characters,
* {@link ArrayIndexOutOfBoundsException}s will be thrown.
*
* @param s the {@link CharSequence}, may not be <code>null</code>
*/
public UUID(CharSequence s) {
this(Hex.parseLong(s.subSequence(0, 18)), Hex.parseLong(s.subSequence(
19, 36)));
}
/**
* Compares this UUID to another Object. Throws a {@link ClassCastException} if
* the other Object is not an instance of the UUID class. Returns a value
* smaller than zero if the other UUID is "larger" than this UUID and a value
* larger than zero if the other UUID is "smaller" than this UUID.
*
* @param t the other UUID, may not be <code>null</code>
* @return a value &lt; 0, 0 or a value &gt; 0
* @see java.lang.Comparable#compareTo(java.lang.Object)
* @throws ClassCastException
*/
public int compareTo(UUID t) {
if (this == t) {
return 0;
}
if (time > t.time) {
return 1;
}
if (time < t.time) {
return -1;
}
if (clockSeqAndNode > t.clockSeqAndNode) {
return 1;
}
if (clockSeqAndNode < t.clockSeqAndNode) {
return -1;
}
return 0;
}
/**
* Tweaked Serialization routine.
*
* @param out the ObjectOutputStream
* @throws IOException
*/
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeLong(time);
out.writeLong(clockSeqAndNode);
}
/**
* Tweaked Serialization routine.
*
* @param in the ObjectInputStream
* @throws IOException
*/
private void readObject(ObjectInputStream in) throws IOException {
time = in.readLong();
clockSeqAndNode = in.readLong();
}
/**
* Returns this UUID as a String.
*
* @return a String, never <code>null</code>
* @see java.lang.Object#toString()
* @see #toAppendable(Appendable)
*/
@Override
public final String toString() {
return toAppendable(null).toString();
}
/**
* Appends a String representation of this to the given {@link StringBuffer} or
* creates a new one if none is given.
*
* @param in the StringBuffer to append to, may be <code>null</code>
* @return a StringBuffer, never <code>null</code>
* @see #toAppendable(Appendable)
*/
public StringBuffer toStringBuffer(StringBuffer in) {
StringBuffer out = in;
if (out == null) {
out = new StringBuffer(36);
}
else {
out.ensureCapacity(out.length() + 36);
}
return (StringBuffer) toAppendable(out);
}
/**
* Appends a String representation of this object to the given {@link Appendable} object.
* <p>
* For reasons I'll probably never understand, Sun has decided to have a number of I/O classes implement
* Appendable which forced them to destroy an otherwise nice and simple interface with {@link IOException}s.
* <p>
* I decided to ignore any possible IOExceptions in this method.
*
* @param a the Appendable object, may be <code>null</code>
* @return an Appendable object, defaults to a {@link StringBuilder} if <code>a</code> is <code>null</code>
*/
public Appendable toAppendable(Appendable a) {
Appendable out = a;
if (out == null) {
out = new StringBuilder(36);
}
try {
Hex.append(out, (int) (time >> 32)).append('-');
Hex.append(out, (short) (time >> 16)).append('-');
Hex.append(out, (short) time).append('-');
Hex.append(out, (short) (clockSeqAndNode >> 48)).append('-');
Hex.append(out, clockSeqAndNode, 12);
}
catch (IOException ex) {
// What were they thinking?
}
return out;
}
/**
* Returns a hash code of this UUID. The hash code is calculated by XOR'ing the
* upper 32 bits of the time and clockSeqAndNode fields and the lower 32 bits of
* the time and clockSeqAndNode fields.
*
* @return an <code>int</code> representing the hash code
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return (int) ((time >> 32) ^ time ^ (clockSeqAndNode >> 32) ^ clockSeqAndNode);
}
/**
* Clones this UUID.
*
* @return a new UUID with identical values, never <code>null</code>
*/
@Override
public Object clone() {
try {
return super.clone();
}
catch (CloneNotSupportedException ex) {
// One of Sun's most epic fails.
return null;
}
}
/**
* Returns the time field of the UUID (upper 64 bits).
*
* @return the time field
*/
public final long getTime() {
return time;
}
/**
* Returns the clock and node field of the UUID (lower 64 bits).
*
* @return the clockSeqAndNode field
*/
public final long getClockSeqAndNode() {
return clockSeqAndNode;
}
/**
* Compares two Objects for equality.
*
* @see java.lang.Object#equals(Object)
* @param obj the Object to compare this UUID with, may be <code>null</code>
* @return <code>true</code> if the other Object is equal to this UUID,
* <code>false</code> if not
*/
@Override
public boolean equals(Object obj) {
if (!(obj instanceof UUID)) {
return false;
}
return compareTo((UUID) obj) == 0;
}
/**
* Returns the nil UUID (a UUID whose values are both set to zero).
* <p>
* Starting with version 2.0, this method does return a new UUID instance every
* time it is called. Earlier versions returned one instance. This has now been
* changed because this UUID has public, non-final instance fields. Returning a
* new instance is therefore more safe.
*
* @return a nil UUID, never <code>null</code>
*/
public static UUID nilUUID() {
return new UUID(0, 0);
}
}

View file

@ -0,0 +1,364 @@
/*
* UUIDGen.java
*
* Created on 09.08.2003.
*
* eaio: UUID - an implementation of the UUID specification
* Copyright (c) 2003-2009 Johann Burkard (jb@eaio.com) http://eaio.com.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
package com.eaio.uuid;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;
import com.eaio.util.lang.Hex;
/**
* This class contains methods to generate UUID fields. These methods have been
* refactored out of {@link com.eaio.uuid.UUID}.
* <p>
* Starting with version 2, this implementation tries to obtain the MAC address
* of the network card. Under Microsoft Windows, the <code>ifconfig</code>
* command is used which may pop up a command window in Java Virtual Machines
* prior to 1.4 once this class is initialized. The command window is closed
* automatically.
* <p>
* The MAC address code has been tested extensively in Microsoft Windows,
* Linux, Solaris 8, HP-UX 11, but should work in MacOS X and BSDs, too.
* <p>
* If you use JDK 6 or later, the code in {@link InterfaceAddress} will be used.
*
* @see <a href="http://johannburkard.de/software/uuid/">UUID</a>
* @author <a href="mailto:jb@eaio.de">Johann Burkard</a>
* @version $Id: UUIDGen.java 2914 2010-04-23 11:35:00Z johann $
* @see com.eaio.uuid.UUID
*/
public final class UUIDGen {
/**
* No instances needed.
*/
private UUIDGen() {
super();
}
/**
* The last time value. Used to remove duplicate UUIDs.
*/
private static long lastTime = Long.MIN_VALUE;
/**
* The cached MAC address.
*/
private static String macAddress = null;
/**
* The current clock and node value.
*/
private static long clockSeqAndNode = 0x8000000000000000L;
static {
try {
Class.forName("java.net.InterfaceAddress");
macAddress = Class.forName(
"com.eaio.uuid.UUIDGen$HardwareAddressLookup").newInstance().toString();
}
catch (ExceptionInInitializerError err) {
// Ignored.
}
catch (ClassNotFoundException ex) {
// Ignored.
}
catch (LinkageError err) {
// Ignored.
}
catch (IllegalAccessException ex) {
// Ignored.
}
catch (InstantiationException ex) {
// Ignored.
}
catch (SecurityException ex) {
// Ignored.
}
if (macAddress == null) {
Process p = null;
BufferedReader in = null;
try {
String osname = System.getProperty("os.name", "");
if (osname.startsWith("Windows")) {
p = Runtime.getRuntime().exec(
new String[] { "ipconfig", "/all" }, null);
}
// Solaris code must appear before the generic code
else if (osname.startsWith("Solaris")
|| osname.startsWith("SunOS")) {
String hostName = getFirstLineOfCommand(
"uname", "-n" );
if (hostName != null) {
p = Runtime.getRuntime().exec(
new String[] { "/usr/sbin/arp", hostName },
null);
}
}
else if (new File("/usr/sbin/lanscan").exists()) {
p = Runtime.getRuntime().exec(
new String[] { "/usr/sbin/lanscan" }, null);
}
else if (new File("/sbin/ifconfig").exists()) {
p = Runtime.getRuntime().exec(
new String[] { "/sbin/ifconfig", "-a" }, null);
}
if (p != null) {
in = new BufferedReader(new InputStreamReader(
p.getInputStream()), 128);
String l = null;
while ((l = in.readLine()) != null) {
macAddress = MACAddressParser.parse(l);
if (macAddress != null
&& Hex.parseShort(macAddress) != 0xff) {
break;
}
}
}
}
catch (SecurityException ex) {
// Ignore it.
}
catch (IOException ex) {
// Ignore it.
}
finally {
if (p != null) {
if (in != null) {
try {
in.close();
}
catch (IOException ex) {
// Ignore it.
}
}
try {
p.getErrorStream().close();
}
catch (IOException ex) {
// Ignore it.
}
try {
p.getOutputStream().close();
}
catch (IOException ex) {
// Ignore it.
}
p.destroy();
}
}
}
if (macAddress != null) {
clockSeqAndNode |= Hex.parseLong(macAddress);
}
else {
try {
byte[] local = InetAddress.getLocalHost().getAddress();
clockSeqAndNode |= (local[0] << 24) & 0xFF000000L;
clockSeqAndNode |= (local[1] << 16) & 0xFF0000;
clockSeqAndNode |= (local[2] << 8) & 0xFF00;
clockSeqAndNode |= local[3] & 0xFF;
}
catch (UnknownHostException ex) {
clockSeqAndNode |= (long) (Math.random() * 0x7FFFFFFF);
}
}
// Skip the clock sequence generation process and use random instead.
clockSeqAndNode |= (long) (Math.random() * 0x3FFF) << 48;
}
/**
* Returns the current clockSeqAndNode value.
*
* @return the clockSeqAndNode value
* @see UUID#getClockSeqAndNode()
*/
public static long getClockSeqAndNode() {
return clockSeqAndNode;
}
/**
* Generates a new time field. Each time field is unique and larger than the
* previously generated time field.
*
* @return a new time value
* @see UUID#getTime()
*/
public static long newTime() {
return createTime(System.currentTimeMillis());
}
/**
* Creates a new time field from the given timestamp. Note that even identical
* values of <code>currentTimeMillis</code> will produce different time fields.
*
* @param currentTimeMillis the timestamp
* @return a new time value
* @see UUID#getTime()
*/
public static synchronized long createTime(long currentTimeMillis) {
long time;
// UTC time
long timeMillis = (currentTimeMillis * 10000) + 0x01B21DD213814000L;
if (timeMillis > lastTime) {
lastTime = timeMillis;
}
else {
timeMillis = ++lastTime;
}
// time low
time = timeMillis << 32;
// time mid
time |= (timeMillis & 0xFFFF00000000L) >> 16;
// time hi and version
time |= 0x1000 | ((timeMillis >> 48) & 0x0FFF); // version 1
return time;
}
/**
* Returns the MAC address. Not guaranteed to return anything.
*
* @return the MAC address, may be <code>null</code>
*/
public static String getMACAddress() {
return macAddress;
}
/**
* Returns the first line of the shell command.
*
* @param commands the commands to run
* @return the first line of the command
* @throws IOException
*/
static String getFirstLineOfCommand(String... commands) throws IOException {
Process p = null;
BufferedReader reader = null;
try {
p = Runtime.getRuntime().exec(commands);
reader = new BufferedReader(new InputStreamReader(
p.getInputStream()), 128);
return reader.readLine();
}
finally {
if (p != null) {
if (reader != null) {
try {
reader.close();
}
catch (IOException ex) {
// Ignore it.
}
}
try {
p.getErrorStream().close();
}
catch (IOException ex) {
// Ignore it.
}
try {
p.getOutputStream().close();
}
catch (IOException ex) {
// Ignore it.
}
p.destroy();
}
}
}
/**
* Scans MAC addresses for good ones.
*/
static class HardwareAddressLookup {
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
String out = null;
try {
Enumeration<NetworkInterface> ifs = NetworkInterface.getNetworkInterfaces();
if (ifs != null) {
while (ifs.hasMoreElements()) {
NetworkInterface iface = ifs.nextElement();
byte[] hardware = iface.getHardwareAddress();
if (hardware != null && hardware.length == 6
&& hardware[1] != (byte) 0xff) {
out = Hex.append(new StringBuilder(36), hardware).toString();
break;
}
}
}
}
catch (SocketException ex) {
// Ignore it.
}
return out;
}
}
}

View file

@ -0,0 +1,86 @@
package com.eaio.uuid;
/**
* com/eaio/uuid/UUIDHelper.java .
* Generated by the IDL-to-Java compiler (portable), version "3.1"
* from uuid.idl
* Sonntag, 7. März 2004 21.35 Uhr CET
*/
/**
* The UUID struct.
*/
abstract public class UUIDHelper
{
private static String _id = "IDL:com/eaio/uuid/UUID:1.0";
public static void insert (org.omg.CORBA.Any a, com.eaio.uuid.UUID that)
{
org.omg.CORBA.portable.OutputStream out = a.create_output_stream ();
a.type (type ());
write (out, that);
a.read_value (out.create_input_stream (), type ());
}
public static com.eaio.uuid.UUID extract (org.omg.CORBA.Any a)
{
return read (a.create_input_stream ());
}
private static org.omg.CORBA.TypeCode __typeCode = null;
private static boolean __active = false;
synchronized public static org.omg.CORBA.TypeCode type ()
{
if (__typeCode == null)
{
synchronized (org.omg.CORBA.TypeCode.class)
{
if (__typeCode == null)
{
if (__active)
{
return org.omg.CORBA.ORB.init().create_recursive_tc ( _id );
}
__active = true;
org.omg.CORBA.StructMember[] _members0 = new org.omg.CORBA.StructMember [2];
org.omg.CORBA.TypeCode _tcOf_members0 = null;
_tcOf_members0 = org.omg.CORBA.ORB.init ().get_primitive_tc (org.omg.CORBA.TCKind.tk_longlong);
_members0[0] = new org.omg.CORBA.StructMember (
"time",
_tcOf_members0,
null);
_tcOf_members0 = org.omg.CORBA.ORB.init ().get_primitive_tc (org.omg.CORBA.TCKind.tk_longlong);
_members0[1] = new org.omg.CORBA.StructMember (
"clockSeqAndNode",
_tcOf_members0,
null);
__typeCode = org.omg.CORBA.ORB.init ().create_struct_tc (com.eaio.uuid.UUIDHelper.id (), "UUID", _members0);
__active = false;
}
}
}
return __typeCode;
}
public static String id ()
{
return _id;
}
public static com.eaio.uuid.UUID read (org.omg.CORBA.portable.InputStream istream)
{
com.eaio.uuid.UUID value = new com.eaio.uuid.UUID ();
value.time = istream.read_longlong ();
value.clockSeqAndNode = istream.read_longlong ();
return value;
}
public static void write (org.omg.CORBA.portable.OutputStream ostream, com.eaio.uuid.UUID value)
{
ostream.write_longlong (value.time);
ostream.write_longlong (value.clockSeqAndNode);
}
}

View file

@ -0,0 +1,42 @@
package com.eaio.uuid;
/**
* com/eaio/uuid/UUIDHolder.java .
* Generated by the IDL-to-Java compiler (portable), version "3.1"
* from uuid.idl
* Sonntag, 7. März 2004 21.35 Uhr CET
*/
/**
* The UUID struct.
*/
public final class UUIDHolder implements org.omg.CORBA.portable.Streamable
{
public com.eaio.uuid.UUID value = null;
public UUIDHolder ()
{
}
public UUIDHolder (com.eaio.uuid.UUID initialValue)
{
value = initialValue;
}
public void _read (org.omg.CORBA.portable.InputStream i)
{
value = com.eaio.uuid.UUIDHelper.read (i);
}
public void _write (org.omg.CORBA.portable.OutputStream o)
{
com.eaio.uuid.UUIDHelper.write (o, value);
}
public org.omg.CORBA.TypeCode _type ()
{
return com.eaio.uuid.UUIDHelper.type ();
}
}

View file

@ -0,0 +1,55 @@
/*
* uuid.idl
*
* Created 19:49 16.12.2003
*
* eaio: UUID - an implementation of the UUID specification
* Copyright (c) 2003-2009 Johann Burkard (jb@eaio.com) http://eaio.com.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
module com {
module eaio {
module uuid {
/**
* The UUID struct.
*/
struct UUID {
/**
* The time field of the UUID.
*/
long long time;
/**
* The clock sequence and node field of the UUID.
*/
long long clockSeqAndNode;
};
};
};
};

View file

@ -201,9 +201,6 @@ object Actor extends ListenerManagement {
private[akka] lazy val shutdownHook = { private[akka] lazy val shutdownHook = {
val hook = new Runnable { val hook = new Runnable {
override def run { override def run {
// Shutdown HawtDispatch GlobalQueue
org.fusesource.hawtdispatch.globalQueue.asInstanceOf[org.fusesource.hawtdispatch.internal.GlobalDispatchQueue].shutdown
// Clear Thread.subclassAudits // Clear Thread.subclassAudits
val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
tf.setAccessible(true) tf.setAccessible(true)

View file

@ -710,16 +710,15 @@ class LocalActorRef private[akka] (
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawn(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard { def spawn(clazz: Class[_ <: Actor]): ActorRef =
Actor.actorOf(clazz).start Actor.actorOf(clazz).start
}
/** /**
* Atomically create (from actor class), start and make an actor remote. * Atomically create (from actor class), start and make an actor remote.
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = guard.withGuard { def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
ensureRemotingEnabled ensureRemotingEnabled
val ref = Actor.remote.actorOf(clazz, hostname, port) val ref = Actor.remote.actorOf(clazz, hostname, port)
ref.timeout = timeout ref.timeout = timeout
@ -731,8 +730,8 @@ class LocalActorRef private[akka] (
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard { def spawnLink(clazz: Class[_ <: Actor]): ActorRef = {
val actor = Actor.actorOf(clazz) val actor = spawn(clazz)
link(actor) link(actor)
actor.start actor.start
actor actor
@ -743,8 +742,7 @@ class LocalActorRef private[akka] (
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
guard.withGuard {
ensureRemotingEnabled ensureRemotingEnabled
val actor = Actor.remote.actorOf(clazz, hostname, port) val actor = Actor.remote.actorOf(clazz, hostname, port)
actor.timeout = timeout actor.timeout = timeout

View file

@ -61,19 +61,8 @@ object Dispatchers {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
} }
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
* <p/>
* Can be beneficial to use the <code>HawtDispatcher.pin(self)</code> to "pin" an actor to a specific thread.
* <p/>
* See the ScalaDoc for the {@link akka.dispatch.HawtDispatcher} for details.
*/
def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate)
/** /**
* Creates an thread based dispatcher serving a single actor through the same single thread. * Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout * Uses the default timeout
@ -128,21 +117,40 @@ object Dispatchers {
new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config),ThreadPoolConfig()) new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config),ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
* <p/> * <p/>
* Has a fluent builder interface for configuring its semantics. * Has a fluent builder interface for configuring its semantics.
*/ */
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String): ThreadPoolConfigDispatcherBuilder = def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) =
newExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_TYPE) ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,config),ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
* <p/> * <p/>
* Has a fluent builder interface for configuring its semantics. * Has a fluent builder interface for configuring its semantics.
*/ */
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) = def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) =
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig()) ThreadPoolConfigDispatcherBuilder(config =>
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config),ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config =>
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config),ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config =>
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config),ThreadPoolConfig())
/** /**
* Utility function that tries to load the specified dispatcher config from the akka.conf * Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher * or else use the supplied default dispatcher
@ -156,7 +164,7 @@ object Dispatchers {
* default-dispatcher { * default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt * # GlobalExecutorBasedEventDriven
* keep-alive-time = 60 # Keep alive time for threads * keep-alive-time = 60 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@ -164,7 +172,6 @@ object Dispatchers {
* allow-core-timeout = on # Allow core threads to time out * allow-core-timeout = on # Allow core threads to time out
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
* aggregate = off # Aggregate on/off for HawtDispatchers
* } * }
* ex: from(config.getConfigMap(identifier).get) * ex: from(config.getConfigMap(identifier).get)
* *
@ -211,11 +218,13 @@ object Dispatchers {
threadPoolConfig)).build threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" => case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
name,
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true)) cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)).build
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
} }
} }

View file

@ -101,7 +101,7 @@ class ExecutorBasedEventDrivenDispatcher(
/** /**
* @return the mailbox associated with the actor * @return the mailbox associated with the actor
*/ */
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
@ -116,7 +116,7 @@ class ExecutorBasedEventDrivenDispatcher(
} }
} }
private[akka] def start = {} private[akka] def start {}
private[akka] def shutdown { private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
@ -125,9 +125,9 @@ class ExecutorBasedEventDrivenDispatcher(
} }
} }
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { if (mbox.dispatcherLock.tryLock()) {
if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) { if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
try { try {
executorService.get() execute mbox executorService.get() execute mbox
} catch { } catch {
@ -136,8 +136,14 @@ class ExecutorBasedEventDrivenDispatcher(
mbox.dispatcherLock.unlock() mbox.dispatcherLock.unlock()
throw e throw e
} }
} else {
mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock
} }
} }
}
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
override val toString = getClass.getSimpleName + "[" + name + "]" override val toString = getClass.getSimpleName + "[" + name + "]"
@ -148,7 +154,7 @@ class ExecutorBasedEventDrivenDispatcher(
def resume(actorRef: ActorRef) { def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef) val mbox = getMailbox(actorRef)
mbox.suspended.tryUnlock mbox.suspended.tryUnlock
registerForExecution(mbox) reRegisterForExecution(mbox)
} }
} }
@ -168,7 +174,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
dispatcherLock.unlock() dispatcherLock.unlock()
} }
if (!self.isEmpty) if (!self.isEmpty)
dispatcher.registerForExecution(this) dispatcher.reRegisterForExecution(this)
} }
/** /**

View file

@ -4,14 +4,12 @@
package akka.dispatch package akka.dispatch
import akka.actor.{ActorRef, Actor, IllegalActorStateException}
import akka.util.{ReflectiveAccess, Switch}
import akka.actor.{Actor, ActorRef, IllegalActorStateException} import java.util.Queue
import akka.util.Switch import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
import java.util.concurrent.atomic.AtomicReference
import jsr166x.{Deque, LinkedBlockingDeque}
/** /**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -21,234 +19,138 @@ import jsr166x.{Deque, LinkedBlockingDeque}
* Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
* best described as "work donating" because the actor of which work is being stolen takes the initiative. * best described as "work donating" because the actor of which work is being stolen takes the initiative.
* <p/> * <p/>
* This dispatcher attempts to redistribute work between actors each time a message is dispatched on a busy actor. Work
* will not be redistributed when actors are busy, but no new messages are dispatched.
* TODO: it would be nice to be able to redistribute work even when no new messages are being dispatched, without impacting dispatching performance ?!
* <p/>
* The preferred way of creating dispatchers is to use * The preferred way of creating dispatchers is to use
* the {@link akka.dispatch.Dispatchers} factory object. * the {@link akka.dispatch.Dispatchers} factory object.
* *
* @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
* @see akka.dispatch.Dispatchers * @see akka.dispatch.Dispatchers
* *
* @author Jan Van Besien * @author Viktor Klang
*/ */
class ExecutorBasedEventDrivenWorkStealingDispatcher( class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String, _name: String,
val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, throughput: Int = Dispatchers.THROUGHPUT,
config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher { throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig())
extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig()) def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig()) def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
val name = "akka:event-driven-work-stealing:dispatcher:" + _name def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _config: ThreadPoolConfig) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
def this(_name: String, memberType: Class[_ <: Actor]) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, mailboxType: MailboxType) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
/** Type of the actors registered in this dispatcher. */
@volatile private var actorType: Option[Class[_]] = None @volatile private var actorType: Option[Class[_]] = None
private val pooledActors = new CopyOnWriteArrayList[ActorRef] @volatile private var members = Vector[ActorRef]()
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
/** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation] with MessageQueue with Runnable]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
private[akka] def dispatch(invocation: MessageInvocation) {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
executorService.get() execute mbox
}
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
* another thread (because then that thread is already processing the mailbox).
*
* @return true if the mailbox was processed, false otherwise
*/
private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
var mailboxWasProcessed = false
// this do-wile loop is required to prevent missing new messages between the end of processing
// the mailbox and releasing the lock
do {
if (mailbox.dispatcherLock.tryLock) {
try {
mailboxWasProcessed = processMailbox(mailbox)
} finally {
mailbox.dispatcherLock.unlock
}
}
} while ((mailboxWasProcessed && !mailbox.isEmpty))
mailboxWasProcessed
}
/**
* Process the messages in the mailbox of the given actor.
* @return
*/
private def processMailbox(mailbox: MessageQueue): Boolean = try {
if (mailbox.suspended.locked)
return false
var messageInvocation = mailbox.dequeue
while (messageInvocation ne null) {
messageInvocation.invoke
if (mailbox.suspended.locked)
return false
messageInvocation = mailbox.dequeue
}
true
} catch {
case ie: InterruptedException => false
}
private def findThief(receiver: ActorRef): Option[ActorRef] = {
// copy to prevent concurrent modifications having any impact
val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size))
val i = if ( lastThiefIndex > actors.size ) 0 else lastThiefIndex
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down...
val (thief: Option[ActorRef], index: Int) = doFindThief(receiver, actors, i)
lastThiefIndex = (index + 1) % actors.size
thief
}
/**
* Find a thief to process the receivers messages from the given list of actors.
*
* @param receiver original receiver of the message
* @param actors list of actors to find a thief in
* @param startIndex first index to start looking in the list (i.e. for round robin)
* @return the thief (or None) and the new index to start searching next time
*/
private def doFindThief(receiver: ActorRef, actors: Array[ActorRef], startIndex: Int): (Option[ActorRef], Int) = {
for (i <- 0 to actors.length) {
val index = (i + startIndex) % actors.length
val actor = actors(index)
if (actor != receiver && getMailbox(actor).isEmpty) return (Some(actor), index)
}
(None, startIndex) // nothing found, reuse same start index next time
}
/**
* Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
*/
private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
val mailbox = getMailbox(thief)
if (mailbox.dispatcherLock.tryLock) {
try {
while(donateMessage(receiver, thief)) processMailbox(mailbox)
} finally {
mailbox.dispatcherLock.unlock
}
}
}
/**
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
val donated = getMailbox(receiver).pollLast
if (donated ne null) {
if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
donated.message, receiver.timeout, donated.sender, donated.senderFuture)
else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender)
else thief.postMessageToMailbox(donated.message, None)
true
} else false
}
private[akka] def start = {}
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
old.shutdownNow()
}
}
def suspend(actorRef: ActorRef) {
getMailbox(actorRef).suspended.tryLock
}
def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef)
mbox.suspended.tryUnlock
executorService.get() execute mbox
}
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case UnboundedMailbox(blockDequeue) =>
new LinkedBlockingDeque[MessageInvocation] with MessageQueue with Runnable {
final def enqueue(handle: MessageInvocation) {
this add handle
}
final def dequeue(): MessageInvocation = {
if (blockDequeue) this.take()
else this.poll()
}
def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) )
}
}
case BoundedMailbox(blockDequeue, capacity, pushTimeOut) =>
new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
} else this put handle
}
final def dequeue(): MessageInvocation =
if (blockDequeue) this.take()
else this.poll()
def run = if (!tryProcessMailbox(this)) {
// we are not able to process our mailbox (another thread is busy with it), so lets donate some of our mailbox
// to another actor and then process his mailbox in stead.
findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef, _) )
}
}
}
private[akka] override def register(actorRef: ActorRef) = { private[akka] override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef) //Verify actor type conformity
pooledActors add actorRef actorType match {
case None => actorType = Some(actorRef.actor.getClass)
case Some(aType) =>
if (aType != actorRef.actor.getClass)
throw new IllegalActorStateException(String.format(
"Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
actorRef, aType))
}
synchronized { members :+= actorRef } //Update members
super.register(actorRef) super.register(actorRef)
} }
private[akka] override def unregister(actorRef: ActorRef) = { private[akka] override def unregister(actorRef: ActorRef) = {
pooledActors remove actorRef synchronized { members = members.filterNot(actorRef eq) } //Update members
super.unregister(actorRef) super.unregister(actorRef)
} }
private def verifyActorsAreOfSameType(actorOfId: ActorRef) = { override private[akka] def dispatch(invocation: MessageInvocation) = {
actorType match { val mbox = getMailbox(invocation.receiver)
case None => actorType = Some(actorOfId.actor.getClass) if (mbox.dispatcherLock.locked && attemptDonationOf(invocation, mbox)) {
case Some(aType) => //We were busy and we got to donate the message to some other lucky guy, we're done here
if (aType != actorOfId.actor.getClass) } else {
throw new IllegalActorStateException(String.format( mbox enqueue invocation
"Can't register actor {} in a work stealing dispatcher which already knows actors of type {}", registerForExecution(mbox)
actorOfId.actor, aType))
} }
} }
override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
while(donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
super.reRegisterForExecution(mbox)
}
/**
* Returns true if it successfully donated a message
*/
protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
val actors = members // copy to prevent concurrent modifications having any impact
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down...
// Starts at is seeded by current time
doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
case null => false
case recipient => donate(donorMbox.dequeue, recipient)
}
}
/**
* Returns true if the donation succeeded or false otherwise
*/
protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
val actors = members // copy to prevent concurrent modifications having any impact
doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
case null => false
case recipient => donate(message, recipient)
}
}
/**
* Rewrites the message and adds that message to the recipients mailbox
* returns true if the message is non-null
*/
protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
if (organ ne null) {
if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
organ.message, recipient.timeout, organ.sender, organ.senderFuture)
else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
else recipient.postMessageToMailbox(organ.message, None)
true
} else false
}
/**
* Returns an available recipient for the message, if any
*/
protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
val prSz = potentialRecipients.size
var i = 0
var recipient: ActorRef = null
while((i < prSz) && (recipient eq null)) {
val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
val mbox = getMailbox(actor)
if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
recipient = actor //Found!
}
i += 1
}
recipient // nothing found, reuse same start index next time
}
} }

View file

@ -1,201 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
import akka.actor.ActorRef
import akka.util.Switch
import org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch.DispatchQueue.QueueType
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.util.concurrent.CountDownLatch
/**
* Holds helper methods for working with actors that are using a HawtDispatcher as it's dispatcher.
*/
object HawtDispatcher {
private val retained = new AtomicInteger()
@volatile private var shutdownLatch: CountDownLatch = _
private def retainNonDaemon = if (retained.getAndIncrement == 0) {
shutdownLatch = new CountDownLatch(1)
new Thread("HawtDispatch Non-Daemon") {
override def run = {
try {
shutdownLatch.await
} catch {
case _ =>
}
}
}.start()
}
private def releaseNonDaemon = if (retained.decrementAndGet == 0) {
shutdownLatch.countDown
shutdownLatch = null
}
/**
* @return the mailbox associated with the actor
*/
private def mailbox(actorRef: ActorRef) = actorRef.mailbox.asInstanceOf[HawtDispatcherMailbox]
/**
* @return the dispatch queue associated with the actor
*/
def queue(actorRef: ActorRef) = mailbox(actorRef).queue
/**
* <p>
* Pins an actor to a random thread queue. Once pinned the actor will always execute
* on the same thread.
* </p>
*
* <p>
* This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started
* </p>
*
* @return true if the actor was pinned
*/
def pin(actorRef: ActorRef) = actorRef.mailbox match {
case x: HawtDispatcherMailbox =>
x.queue.setTargetQueue( getRandomThreadQueue )
true
case _ => false
}
/**
* <p>
* Unpins the actor so that all threads in the hawt dispatch thread pool
* compete to execute him.
* </p>
*
* <p>
* This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started
* </p>
* @return true if the actor was unpinned
*/
def unpin(actorRef: ActorRef) = target(actorRef, globalQueue)
/**
* @return true if the actor was pinned to a thread.
*/
def pinned(actorRef: ActorRef):Boolean = actorRef.mailbox match {
case x: HawtDispatcherMailbox => x.queue.getTargetQueue.getQueueType == QueueType.THREAD_QUEUE
case _ => false
}
/**
* <p>
* Updates the actor's target dispatch queue to the value specified. This allows
* you to do odd things like targeting another serial queue.
* </p>
*
* <p>
* This method can only succeed if the actor it's dispatcher is set to a HawtDispatcher and it has been started
* </p>
* @return true if the actor was unpinned
*/
def target(actorRef: ActorRef, parent: DispatchQueue) = actorRef.mailbox match {
case x: HawtDispatcherMailbox =>
x.queue.setTargetQueue(parent)
true
case _ => false
}
}
/**
* <p>
* A HawtDispatch based MessageDispatcher. Actors with this dispatcher are executed
* on the HawtDispatch fixed sized thread pool. The number of of threads will match
* the number of cores available on your system.
*
* </p>
* <p>
* Actors using this dispatcher are restricted to only executing non blocking
* operations. The actor cannot synchronously call another actor or call 3rd party
* libraries that can block for a long time. You should use non blocking IO APIs
* instead of blocking IO apis to avoid blocking that actor for an extended amount
* of time.
* </p>
*
* <p>
* This dispatcher delivers messages to the actors in the order that they
* were producer at the sender.
* </p>
*
* <p>
* HawtDispatch supports processing Non blocking Socket IO in both the reactor
* and proactor styles. For more details, see the <code>HawtDispacherEchoServer.scala</code>
* example.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher {
import HawtDispatcher._
private[akka] def start { retainNonDaemon }
private[akka] def shutdown { releaseNonDaemon }
private[akka] def dispatch(invocation: MessageInvocation){
mailbox(invocation.receiver).dispatch(invocation)
}
// hawtdispatch does not have a way to get queue sizes, getting an accurate
// size can cause extra contention.. is this really needed?
// TODO: figure out if this can be optional in akka
override def mailboxSize(actorRef: ActorRef) = 0
def createMailbox(actorRef: ActorRef): AnyRef = {
val queue = parent.createQueue(actorRef.toString)
if (aggregate) new AggregatingHawtDispatcherMailbox(queue)
else new HawtDispatcherMailbox(queue)
}
def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend
def resume(actorRef:ActorRef) = mailbox(actorRef).resume
override def toString = "HawtDispatcher"
}
class HawtDispatcherMailbox(val queue: DispatchQueue) {
def dispatch(invocation: MessageInvocation) {
queue {
invocation.invoke
}
}
def suspend = queue.suspend
def resume = queue.resume
}
class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) {
private val source = createSource(new ListEventAggregator[MessageInvocation](), queue)
source.setEventHandler (^{drain_source} )
source.resume
private def drain_source = source.getData.foreach(_.invoke)
override def suspend = source.suspend
override def resume = source.resume
override def dispatch(invocation: MessageInvocation) {
if (getCurrentQueue eq null) {
// we are being call from a non hawtdispatch thread, can't aggregate
// it's events
super.dispatch(invocation)
} else {
// we are being call from a hawtdispatch thread, use the dispatch source
// so that multiple invocations issues on this thread will aggregate and then once
// the thread runs out of work, they get transferred as a batch to the other thread.
source.merge(invocation)
}
}
}

View file

@ -64,9 +64,7 @@ trait MessageDispatcher {
unregister(actorRef) unregister(actorRef)
} }
private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = if (active.isOn) { private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
dispatch(invocation)
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
private[akka] def register(actorRef: ActorRef) { private[akka] def register(actorRef: ActorRef) {
if (actorRef.mailbox eq null) if (actorRef.mailbox eq null)
@ -101,7 +99,7 @@ trait MessageDispatcher {
*/ */
def stopAllAttachedActors { def stopAllAttachedActors {
val i = uuids.iterator val i = uuids.iterator
while(i.hasNext()) { while (i.hasNext()) {
val uuid = i.next() val uuid = i.next()
Actor.registry.actorFor(uuid) match { Actor.registry.actorFor(uuid) match {
case Some(actor) => actor.stop case Some(actor) => actor.stop

View file

@ -46,7 +46,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
}).start }).start
val actor4 = Actor.actorOf(new Actor { val actor4 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newHawtDispatcher(true) self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
override def postRestart(cause: Throwable) {countDownLatch.countDown} override def postRestart(cause: Throwable) {countDownLatch.countDown}
protected def receive = { protected def receive = {

View file

@ -303,10 +303,6 @@ class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {
new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor
} }
class HawtDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new HawtDispatcher(false) with MessageDispatcherInterceptor
}
class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec { class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor
} }

View file

@ -22,7 +22,6 @@ object DispatchersSpec {
val allowcoretimeout = "allow-core-timeout" val allowcoretimeout = "allow-core-timeout"
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
@ -30,9 +29,7 @@ object DispatchersSpec {
def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map( def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map(
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher], "ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher], "ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
"Hawt" -> ofType[HawtDispatcher], "GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher)
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
"GlobalHawt" -> instance(globalHawtDispatcher)
) )
def validTypes = typesAndValidators.keys.toList def validTypes = typesAndValidators.keys.toList

View file

@ -11,9 +11,10 @@ import Actor._
import akka.dispatch.{MessageQueue, Dispatchers} import akka.dispatch.{MessageQueue, Dispatchers}
object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build def newWorkStealer() = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher",1).build
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
val delayableActorDispatcher, sharedActorDispatcher, parentActorDispatcher = newWorkStealer()
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
self.dispatcher = delayableActorDispatcher self.dispatcher = delayableActorDispatcher

View file

@ -1,71 +0,0 @@
package akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import akka.dispatch.{HawtDispatcher, Dispatchers}
import akka.actor.Actor
import Actor._
object HawtDispatcherActorSpec {
class TestActor extends Actor {
self.dispatcher = new HawtDispatcher()
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
object OneWayTestActor {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
self.dispatcher = new HawtDispatcher()
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}
}
}
class HawtDispatcherActorSpec extends JUnitSuite {
import HawtDispatcherActorSpec._
private val unit = TimeUnit.MILLISECONDS
@Test def shouldSendOneWay = {
val actor = actorOf[OneWayTestActor].start
val result = actor ! "OneWay"
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
actor.stop
}
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
val result = (actor !! ("Hello", 10000)).as[String]
assert("World" === result.get)
actor.stop
}
@Test def shouldSendReplyAsync = {
val actor = actorOf[TestActor].start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test def shouldSendReceiveException = {
val actor = actorOf[TestActor].start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
}

View file

@ -1,206 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.dispatch
import scala.collection.mutable.ListBuffer
import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel}
import akka.actor._
import akka.actor.Actor._
import akka.dispatch.HawtDispatcher
import org.fusesource.hawtdispatch._
/**
* This is an example of how to crate an Akka actor based TCP echo server using
* the HawtDispatch dispatcher and NIO event sources.
*/
object HawtDispatcherEchoServer {
private val hawt = new HawtDispatcher
var port=4444;
var useReactorPattern=true
def main(args:Array[String]):Unit = run
def run() = {
val server = actorOf(new Server(port))
server.start
Scheduler.schedule(server, DisplayStats, 1, 5, TimeUnit.SECONDS)
println("Press enter to shutdown.");
System.in.read
server ! Shutdown
}
case object Shutdown
case object DisplayStats
case class SessionClosed(session:ActorRef)
class Server(val port: Int) extends Actor {
self.dispatcher = hawt
var channel:ServerSocketChannel = _
var accept_source:DispatchSource = _
var sessions = ListBuffer[ActorRef]()
override def preStart = {
channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
// Setup the accept source, it will callback to the handler methods
// via the actor's mailbox so you don't need to worry about
// synchronizing with the local variables
accept_source = createSource(channel, SelectionKey.OP_ACCEPT, HawtDispatcher.queue(self));
accept_source.setEventHandler(^{ accept });
accept_source.setDisposer(^{
channel.close();
println("Closed port: "+port);
});
accept_source.resume
println("Listening on port: "+port);
}
private def accept() = {
var socket = channel.accept();
while( socket!=null ) {
try {
socket.configureBlocking(false);
val session = actorOf(new Session(self, socket))
session.start()
sessions += session
} catch {
case e: Exception =>
socket.close
}
socket = channel.accept();
}
}
def receive = {
case SessionClosed(session) =>
sessions = sessions.filterNot( _ == session )
session.stop
case DisplayStats =>
sessions.foreach { session=>
session ! DisplayStats
}
case Shutdown =>
sessions.foreach { session=>
session.stop
}
sessions.clear
accept_source.release
self.stop
}
}
class Session(val server:ActorRef, val channel: SocketChannel) extends Actor {
self.dispatcher = hawt
val buffer = ByteBuffer.allocate(1024);
val remote_address = channel.socket.getRemoteSocketAddress.toString
var read_source:DispatchSource = _
var write_source:DispatchSource = _
var readCounter = 0L
var writeCounter = 0L
var closed = false
override def preStart = {
if(useReactorPattern) {
// Then we will be using the reactor pattern for handling IO:
// Pin this actor to a single thread. The read/write event sources will poll
// a Selector on the pinned thread. Since the IO events are generated on the same
// thread as where the Actor is pinned to, it can avoid a substantial amount
// thread synchronization. Plus your GC will perform better since all the IO
// processing is done on a single thread.
HawtDispatcher.pin(self)
} else {
// Then we will be using sing the proactor pattern for handling IO:
// Then the actor will not be pinned to a specific thread. The read/write
// event sources will poll a Selector and then asynchronously dispatch the
// event's to the actor via the thread pool.
}
// Setup the sources, they will callback to the handler methods
// via the actor's mailbox so you don't need to worry about
// synchronizing with the local variables
read_source = createSource(channel, SelectionKey.OP_READ, HawtDispatcher.queue(self));
read_source.setEventHandler(^{ read })
read_source.setCancelHandler(^{ close })
write_source = createSource(channel, SelectionKey.OP_WRITE, HawtDispatcher.queue(self));
write_source.setEventHandler(^{ write })
write_source.setCancelHandler(^{ close })
read_source.resume
println("Accepted connection from: "+remote_address);
}
override def postStop = {
closed = true
read_source.release
write_source.release
channel.close
}
private def catchio(func: =>Unit):Unit = {
try {
func
} catch {
case e:IOException => close
}
}
def read():Unit = catchio {
channel.read(buffer) match {
case -1 =>
close // peer disconnected.
case 0 =>
case count:Int =>
readCounter += count
buffer.flip;
read_source.suspend
write_source.resume
write()
}
}
def write() = catchio {
writeCounter += channel.write(buffer)
if (buffer.remaining == 0) {
buffer.clear
write_source.suspend
read_source.resume
}
}
def close() = {
if( !closed ) {
closed = true
server ! SessionClosed(self)
}
}
def receive = {
case DisplayStats =>
println("connection to %s reads: %,d bytes, writes: %,d".format(remote_address, readCounter, writeCounter))
}
}
}

View file

@ -30,7 +30,6 @@ trait AkkaBaseProject extends BasicScalaProject {
val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo) val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo)
val h2lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo) val h2lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
val hbaseModuleConfig = ModuleConfiguration("org.apache.hbase", AkkaRepo) val hbaseModuleConfig = ModuleConfiguration("org.apache.hbase", AkkaRepo)
val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo)
val memcachedModuleConfig = ModuleConfiguration("spy", "memcached", AkkaRepo) val memcachedModuleConfig = ModuleConfiguration("spy", "memcached", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo) val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo) val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)

View file

@ -36,10 +36,8 @@ akka {
default-dispatcher { default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
# - Hawt
# - ExecutorBasedEventDriven # - ExecutorBasedEventDriven
# - ExecutorBasedEventDrivenWorkStealing # - ExecutorBasedEventDrivenWorkStealing
# - GlobalHawt
# - GlobalExecutorBasedEventDriven # - GlobalExecutorBasedEventDriven
keep-alive-time = 60 # Keep alive time for threads keep-alive-time = 60 # Keep alive time for threads
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
@ -49,7 +47,6 @@ akka {
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline throughput-deadline-time = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
aggregate = off # Aggregate on/off for HawtDispatchers
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property # If positive then a bounded mailbox is used and the capacity is set using the property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, # NOTE: setting a mailbox to 'blocking' can be a bit dangerous,
@ -106,7 +103,7 @@ akka {
#If you are using akka.http.AkkaMistServlet #If you are using akka.http.AkkaMistServlet
mist-dispatcher { mist-dispatcher {
#type = "Hawt" # Uncomment if you want to use a different dispatcher than the default one for Comet #type = "GlobalExecutorBasedEventDriven" # Uncomment if you want to use a different dispatcher than the default one for Comet
} }
connection-close = true # toggles the addition of the "Connection" response header with a "close" value connection-close = true # toggles the addition of the "Connection" response header with a "close" value
root-actor-id = "_httproot" # the id of the actor to use as the root endpoint root-actor-id = "_httproot" # the id of the actor to use as the root endpoint

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.eaio</groupId>
<artifactId>uuid</artifactId>
<version>3.2</version>
<packaging>jar</packaging>
</project>

View file

@ -1,46 +0,0 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<packaging>jar</packaging>
<version>2.8.0-1.5.5</version>
<licenses>
<license>
<name>Apache 2</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<dependencies>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>vscaladoc</artifactId>
<version>1.1-md-3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>PublicReleasesRepository</id>
<name>Public Releases Repository</name>
<url>http://maven/content/groups/public/</url>
</repository>
<repository>
<id>PublicSnapshots</id>
<name>Public Snapshots</name>
<url>http://maven/content/groups/public-snapshots/</url>
</repository>
<repository>
<id>ScalaToolsMaven2Repository</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases/</url>
</repository>
</repositories>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>2.8.0.Beta1-1.5-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>2.8.0.RC2-1.5.2-SNAPSHOT</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>2.8.0.RC2-1.5.2-SNAPSHOT</version>
<versioning>
<snapshot>
<localCopy>true</localCopy>
</snapshot>
<lastUpdated>20100519155407</lastUpdated>
</versioning>
</metadata>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>2.8.0.RC3-1.5.2-SNAPSHOT</version>
</project>

View file

@ -110,9 +110,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// Versions // Versions
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
lazy val DISPATCH_VERSION = "0.7.4" lazy val JACKSON_VERSION = "1.7.1"
lazy val HAWT_DISPATCH_VERSION = "1.1"
lazy val JACKSON_VERSION = "1.4.3"
lazy val JERSEY_VERSION = "1.3" lazy val JERSEY_VERSION = "1.3"
lazy val MULTIVERSE_VERSION = "0.6.2" lazy val MULTIVERSE_VERSION = "0.6.2"
lazy val SCALATEST_VERSION = "1.3" lazy val SCALATEST_VERSION = "1.3"
@ -130,19 +128,12 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //LGPL 2.1 lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //LGPL 2.1
lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" //ApacheV2
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
lazy val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" //ApacheV2 lazy val commons_io = "commons-io" % "commons-io" % "2.0.1" % "compile" //ApacheV2
lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2
lazy val configgy = "net.lag" % "configgy" % "2.0.2-nologgy" % "compile" //ApacheV2 lazy val configgy = "net.lag" % "configgy" % "2.0.2-nologgy" % "compile" //ApacheV2
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license
@ -150,14 +141,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license
lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" //Eclipse license lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" //Eclipse license
lazy val uuid = "com.eaio" % "uuid" % "3.2" % "compile" //MIT license
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2 lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
@ -166,8 +153,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" //CC Public Domain
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1 lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1
lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1 lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1
@ -186,10 +171,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2 lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2
lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile" //ApacheV2
lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "compile" //ApacheV2
// Test // Test
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2 lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2
@ -315,10 +296,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val uuid = Dependencies.uuid
val configgy = Dependencies.configgy val configgy = Dependencies.configgy
val hawtdispatch = Dependencies.hawtdispatch
val jsr166x = Dependencies.jsr166x
// testing // testing
val junit = Dependencies.junit val junit = Dependencies.junit
@ -359,8 +337,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { class AkkaRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val commons_codec = Dependencies.commons_codec val commons_codec = Dependencies.commons_codec
val commons_io = Dependencies.commons_io val commons_io = Dependencies.commons_io
val dispatch_http = Dependencies.dispatch_http
val dispatch_json = Dependencies.dispatch_json
val guicey = Dependencies.guicey val guicey = Dependencies.guicey
val h2_lzf = Dependencies.h2_lzf val h2_lzf = Dependencies.h2_lzf
val jackson = Dependencies.jackson val jackson = Dependencies.jackson