diff options
author | Greg Lewis <glewis@FreeBSD.org> | 2010-08-16 01:35:41 +0000 |
---|---|---|
committer | Greg Lewis <glewis@FreeBSD.org> | 2010-08-16 01:35:41 +0000 |
commit | c9ca7a4ee033705fb11f2aaea3168e99f6ca4962 (patch) | |
tree | cc75e5ff576e4366e39e89ebd386004b04611a44 /java | |
parent | 1b31d1c15e6bb73651d2acc3aec6aaf249307ad9 (diff) | |
download | ports-c9ca7a4ee033705fb11f2aaea3168e99f6ca4962.tar.gz ports-c9ca7a4ee033705fb11f2aaea3168e99f6ca4962.zip |
. Add a new NIO selector that uses kqueue(2) and make it the default. [1]
. Mark as MAKE_JOBS_UNSAFE.
. Pet portlint.
Submitted by: davidxu@ [1]
Notes
Notes:
svn path=/head/; revision=259324
Diffstat (limited to 'java')
-rw-r--r-- | java/openjdk7/Makefile | 3 | ||||
-rw-r--r-- | java/openjdk7/files/patch-zzz-nio-kqueue | 1236 |
2 files changed, 1239 insertions, 0 deletions
diff --git a/java/openjdk7/Makefile b/java/openjdk7/Makefile index ef7394cfcff5..5bfce842054d 100644 --- a/java/openjdk7/Makefile +++ b/java/openjdk7/Makefile @@ -7,6 +7,7 @@ PORTNAME= openjdk PORTVERSION= ${JDK_MAJOR_VERSION}.${JDK_MINOR_VERSION}.${JDK_BUILD_NUMBER} +PORTREVISION= 1 CATEGORIES= java devel MASTER_SITES= http://download.java.net/openjdk/jdk7/promoted/b${JDK_BUILD_NUMBER}/ \ ${MASTER_SITE_APACHE:S,%SUBDIR%/,ant/binaries/:ant,} @@ -15,11 +16,13 @@ DISTFILES= ${JDK_SRC_DISTFILE}${EXTRACT_SUFX} \ MAINTAINER= glewis@FreeBSD.org COMMENT= Java Development Kit 7 + LICENSE= GPLv2 WRKSRC= ${WRKDIR}/${PORTNAME} USE_ZIP= YES +MAKE_JOBS_UNSAFE= YES JDK_MAJOR_VERSION= 7 JDK_MINOR_VERSION= 0 diff --git a/java/openjdk7/files/patch-zzz-nio-kqueue b/java/openjdk7/files/patch-zzz-nio-kqueue new file mode 100644 index 000000000000..61c4783237bb --- /dev/null +++ b/java/openjdk7/files/patch-zzz-nio-kqueue @@ -0,0 +1,1236 @@ +--- jdk/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java (revision 0) ++++ jdk/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java (revision 16) +@@ -0,0 +1,17 @@ ++package sun.nio.ch; ++ ++import java.io.IOException; ++import java.nio.channels.*; ++import java.nio.channels.spi.*; ++ ++public class KqueueSelectorProvider ++ extends SelectorProviderImpl ++{ ++ public AbstractSelector openSelector() throws IOException { ++ return new KqueueSelectorImpl(this); ++ } ++ ++ public Channel inheritedChannel() throws IOException { ++ return InheritedChannel.getChannel(); ++ } ++} +--- jdk/src/solaris/native/sun/nio/ch/KqueuePort.c (revision 0) ++++ jdk/src/solaris/native/sun/nio/ch/KqueuePort.c (revision 16) +@@ -0,0 +1,67 @@ ++/* ++ * Scratched by davidxu@freebsd.org ++ */ ++ ++#include "jni.h" ++#include "jni_util.h" ++#include "jvm.h" ++#include "jlong.h" ++#include "nio_util.h" ++ ++#include "sun_nio_ch_KqueuePort.h" ++ ++#include <unistd.h> ++#include <sys/types.h> ++#include <sys/socket.h> ++ ++#ifdef __cplusplus ++extern "C" { ++#endif ++JNIEXPORT void JNICALL ++Java_sun_nio_ch_KqueuePort_socketpair ++ (JNIEnv *env, jclass cls, jintArray sv) ++{ ++ int sp[2]; ++ if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) == -1) { ++ JNU_ThrowIOExceptionWithLastError(env, "socketpair failed"); ++ } else { ++ jint res[2]; ++ res[0] = (jint)sp[0]; ++ res[1] = (jint)sp[1]; ++ (*env)->SetIntArrayRegion(env, sv, 0, 2, &res[0]); ++ } ++} ++ ++JNIEXPORT void JNICALL Java_sun_nio_ch_KqueuePort_interrupt ++ (JNIEnv *env, jclass cls, jint fd) ++{ ++ int res; ++ int buf[1]; ++ buf[0] = 1; ++ RESTARTABLE(write(fd, buf, 1), res); ++ if (res < 0) { ++ JNU_ThrowIOExceptionWithLastError(env, "write failed"); ++ } ++} ++ ++JNIEXPORT void JNICALL Java_sun_nio_ch_KqueuePort_drain1 ++ (JNIEnv *env, jclass cls, jint fd) ++{ ++ int res; ++ char buf[1]; ++ RESTARTABLE(read(fd, buf, 1), res); ++ if (res < 0) { ++ JNU_ThrowIOExceptionWithLastError(env, "drain1 failed"); ++ } ++} ++ ++JNIEXPORT void JNICALL Java_sun_nio_ch_KqueuePort_close0 ++ (JNIEnv *env, jclass cls, jint fd) ++{ ++ int res; ++ RESTARTABLE(close(fd), res); ++} ++ ++#ifdef __cplusplus ++} ++#endif +--- jdk/src/solaris/native/sun/nio/ch/Kqueue.c (revision 0) ++++ jdk/src/solaris/native/sun/nio/ch/Kqueue.c (revision 16) +@@ -0,0 +1,174 @@ ++/* ++ * Scratched by davidxu@freebsd.org ++ */ ++ ++#include "jni.h" ++#include "jni_util.h" ++#include "jvm.h" ++#include "jlong.h" ++ ++#include "sun_nio_ch_Kqueue.h" ++ ++#include <errno.h> ++#include <sys/types.h> ++#include <sys/event.h> ++#include <sys/time.h> ++ ++#ifdef __cplusplus ++extern "C" { ++#endif ++ ++static int ++restartable_kevent(int kqfd, struct kevent *changelist, int nchanges, ++ struct kevent *eventlist, int nevents); ++ ++static int ++timeout_kevent(int kqfd, struct kevent *changelist, int nchanges, ++ struct kevent *eventlist, int nevents, int timo); ++ ++JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_kqueue ++ (JNIEnv *env, jclass cls) ++{ ++ int kqfd = kqueue(); ++ if (kqfd < 0) { ++ JNU_ThrowIOExceptionWithLastError(env, "Error opening kqueue"); ++ return -1; ++ } ++ return kqfd; ++} ++ ++JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_keventChange ++ (JNIEnv *env, jclass cls, jint kqfd, jint fd, jshort flags, jshort filter) ++{ ++ struct kevent ev; ++ struct timespec ts; ++ ++ ev.ident = fd; ++ ev.flags = flags; ++ ev.filter = filter; ++ ev.fflags = 0; ++ ev.data = 0; ++ ev.udata = NULL; ++ ts.tv_sec = 0; ++ ts.tv_nsec = 0; ++ if (kevent(kqfd, &ev, 1, NULL, 0, &ts) < 0) ++ JNU_ThrowIOExceptionWithLastError(env, "Error changing kevent"); ++ return (0); ++} ++ ++JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_kevent ++ (JNIEnv *env, jclass cls, jint kqfd , jlong changelist_addr, jint nchanges, ++ jlong eventlist_addr, jint nevents, jlong timeout) ++{ ++ struct kevent *changelist = (struct kevent *)jlong_to_ptr(changelist_addr); ++ struct kevent *eventlist = (struct kevent *)jlong_to_ptr(eventlist_addr); ++ int result; ++ ++ if (timeout < 0) { ++ result = restartable_kevent(kqfd, changelist, nchanges, ++ eventlist, nevents); ++ } else { ++ result = timeout_kevent(kqfd, changelist, nchanges, eventlist, ++ nevents, timeout); ++ } ++ ++ if (result < 0) { ++ JNU_ThrowIOExceptionWithLastError(env, "Error reading driver"); ++ return -1; ++ } ++ return result; ++} ++ ++static int ++restartable_kevent(int kqfd, struct kevent *changelist, int nchanges, ++ struct kevent *eventlist, int nevents) ++{ ++ int result; ++ ++ for (;;) { ++ result = kevent(kqfd, changelist, nchanges, eventlist, ++ nevents, NULL); ++ if (result == -1 && errno == EINTR) { ++ continue; ++ } else { ++ return result; ++ } ++ } ++} ++ ++static int ++timeout_kevent(int kqfd, struct kevent *changelist, int nchanges, ++ struct kevent *eventlist, int nevents, int timo) ++{ ++ struct timeval timeout, now, end; ++ int result; ++ ++ timeout.tv_sec = timo / 1000; ++ timeout.tv_usec = (timo % 1000) * 1000; ++ gettimeofday(&now, NULL); ++ timeradd(&now, &timeout, &end); ++ ++ for (;;) { ++ struct timespec ts; ++ ++ ts.tv_sec = timeout.tv_sec; ++ ts.tv_nsec = timeout.tv_usec * 1000; ++ result = kevent(kqfd, changelist, nchanges, eventlist, nevents, ++ &ts); ++ if (result == -1 && (errno == EINTR)) { ++ gettimeofday(&now, NULL); ++ if (timercmp(&now, &end, >=)) ++ return 0; ++ timersub(&end, &now, &timeout); ++ } else { ++ return result; ++ } ++ } ++} ++ ++JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_keventSize ++ (JNIEnv *env, jclass cls) ++{ ++ return sizeof(struct kevent); ++} ++ ++JNIEXPORT void JNICALL Java_sun_nio_ch_Kqueue_putKevent ++ (JNIEnv *env, jclass cls, jlong address, jint index, jint fd, jshort flags, jshort filter) ++{ ++ struct kevent *ev = (struct kevent *)jlong_to_ptr(address); ++ ++ ev[index].ident = fd; ++ ev[index].flags = flags; ++ ev[index].filter = filter; ++ ev[index].fflags = 0; ++ ev[index].data = 0; ++ ev[index].udata = NULL; ++} ++ ++JNIEXPORT jshort JNICALL Java_sun_nio_ch_Kqueue_getKeventFilter ++ (JNIEnv *env, jclass cls, jlong address, jint index) ++{ ++ struct kevent *ev = (struct kevent *)jlong_to_ptr(address); ++ ++ return ev[index].filter; ++} ++ ++JNIEXPORT jshort JNICALL Java_sun_nio_ch_Kqueue_getKeventFlags ++ (JNIEnv *env, jclass cls, jlong address, jint index) ++{ ++ struct kevent *ev = (struct kevent *)jlong_to_ptr(address); ++ ++ return ev[index].flags; ++} ++ ++JNIEXPORT jint JNICALL Java_sun_nio_ch_Kqueue_getKeventIdent ++ (JNIEnv *env, jclass cls, jlong address, jint index) ++{ ++ struct kevent *ev = (struct kevent *)jlong_to_ptr(address); ++ ++ return (int)ev[index].ident; ++} ++ ++#ifdef __cplusplus ++} ++#endif +--- jdk/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c (revision 0) ++++ jdk/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c (revision 16) +@@ -0,0 +1,30 @@ ++/* ++ * Scratched by davidxu@freebsd.org ++ */ ++ ++#include "jni.h" ++#include "jni_util.h" ++#include "jvm.h" ++#include "jlong.h" ++#include "nio_util.h" ++ ++#include "sun_nio_ch_KqueueArrayWrapper.h" ++ ++#ifdef __cplusplus ++extern "C" { ++#endif ++ ++JNIEXPORT void JNICALL Java_sun_nio_ch_KqueueArrayWrapper_interrupt ++ (JNIEnv *env, jclass cls, jint fd) ++{ ++ int fakebuf[1]; ++ fakebuf[0] = 1; ++ if (write(fd, fakebuf, 1) < 0) { ++ JNU_ThrowIOExceptionWithLastError(env, ++ "Write to interrupt fd failed"); ++ } ++} ++ ++#ifdef __cplusplus ++} ++#endif +--- jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java (revision 1) ++++ jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java (revision 16) +@@ -47,6 +47,10 @@ + public static SelectorProvider create() { + String osname = AccessController.doPrivileged( + new GetPropertyAction("os.name")); ++ if ("FreeBSD".equals(osname)) { ++ return new sun.nio.ch.KqueueSelectorProvider(); ++ } ++ + if ("SunOS".equals(osname)) { + return new sun.nio.ch.DevPollSelectorProvider(); + } +--- jdk/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java (revision 0) ++++ jdk/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java (revision 16) +@@ -0,0 +1,204 @@ ++/* ++ * scratched by davidxu@freebsd.org ++ */ ++ ++package sun.nio.ch; ++ ++import java.io.IOException; ++import java.nio.channels.*; ++import java.nio.channels.spi.*; ++import java.util.*; ++import sun.misc.*; ++ ++ ++/** ++ * An implementation of Selector for FreeBSD. ++ */ ++class KqueueSelectorImpl ++ extends SelectorImpl ++{ ++ ++ // File descriptors used for interrupt ++ protected int fd0; ++ protected int fd1; ++ ++ // The kqueue object ++ KqueueArrayWrapper kqueueWrapper; ++ ++ // The number of valid channels in this Selector's poll array ++ private int totalChannels; ++ ++ // Maps from file descriptors to keys ++ private HashMap fdToKey; ++ ++ // True if this Selector has been closed ++ private boolean closed = false; ++ ++ // Lock for interrupt triggering and clearing ++ private Object interruptLock = new Object(); ++ private boolean interruptTriggered = false; ++ ++ private BitSet updatedSet; ++ ++ /** ++ * Package private constructor called by factory method in ++ * the abstract superclass Selector. ++ */ ++ KqueueSelectorImpl(SelectorProvider sp) { ++ super(sp); ++ int[] fdes = new int[2]; ++ IOUtil.initPipe(fdes, false); ++ fd0 = fdes[0]; ++ fd1 = fdes[1]; ++ kqueueWrapper = new KqueueArrayWrapper(); ++ totalChannels = 1; ++ kqueueWrapper.initInterrupt(fd0, fd1); ++ updatedSet = new BitSet(); ++ fdToKey = new HashMap(); ++ } ++ ++ protected int doSelect(long timeout) ++ throws IOException ++ { ++ if (closed) ++ throw new ClosedSelectorException(); ++ processDeregisterQueue(); ++ try { ++ begin(); ++ kqueueWrapper.poll(timeout); ++ } finally { ++ end(); ++ } ++ processDeregisterQueue(); ++ int numKeysUpdated = updateSelectedKeys(); ++ if (kqueueWrapper.interrupted()) { ++ // Clear the wakeup pipe ++ synchronized (interruptLock) { ++ kqueueWrapper.clearInterrupted(); ++ IOUtil.drain(fd0); ++ interruptTriggered = false; ++ } ++ } ++ return numKeysUpdated; ++ } ++ ++ /** ++ * Update the keys whose fd's have been selected by the kqueue. ++ * Add the ready keys to the ready queue. ++ */ ++ private int updateSelectedKeys() { ++ int entries = kqueueWrapper.updated; ++ int numKeysUpdated = 0; ++ SelectionKeyImpl ski; ++ int fd; ++ int i; ++ ++ updatedSet.clear(); ++ for (i = 0; i < entries; i++) { ++ fd = kqueueWrapper.getDescriptor(i); ++ ski = (SelectionKeyImpl) fdToKey.get(new Integer(fd)); ++ // ski is null in the case of an interrupt ++ if (ski != null) ++ ski.nioReadyOps(0); ++ } ++ ++ for (i = 0; i < entries; i++) { ++ fd = kqueueWrapper.getDescriptor(i); ++ ski = (SelectionKeyImpl) fdToKey.get(new Integer(fd)); ++ // ski is null in the case of an interrupt ++ if (ski != null) { ++ int rOps = kqueueWrapper.getReventOps(i); ++ if (selectedKeys.contains(ski)) { ++ if (ski.channel.translateAndUpdateReadyOps(rOps, ski)) { ++ if (!updatedSet.get(fd)) { ++ updatedSet.set(fd); ++ numKeysUpdated++; ++ } ++ } ++ } else { ++ ski.channel.translateAndSetReadyOps(rOps, ski); ++ if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { ++ selectedKeys.add(ski); ++ if (!updatedSet.get(fd)) { ++ updatedSet.set(fd); ++ numKeysUpdated++; ++ } ++ } ++ } ++ } ++ } ++ return numKeysUpdated; ++ } ++ ++ protected void implClose() throws IOException { ++ if (!closed) { ++ closed = true; ++ FileDispatcherImpl.closeIntFD(fd0); ++ FileDispatcherImpl.closeIntFD(fd1); ++ if (kqueueWrapper != null) { ++ kqueueWrapper.release(fd0); ++ kqueueWrapper.closeKqueueFD(); ++ kqueueWrapper = null; ++ selectedKeys = null; ++ ++ // Deregister channels ++ Iterator i = keys.iterator(); ++ while (i.hasNext()) { ++ SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); ++ deregister(ski); ++ SelectableChannel selch = ski.channel(); ++ if (!selch.isOpen() && !selch.isRegistered()) ++ ((SelChImpl)selch).kill(); ++ i.remove(); ++ } ++ totalChannels = 0; ++ ++ } ++ fd0 = -1; ++ fd1 = -1; ++ } ++ } ++ ++ protected void implRegister(SelectionKeyImpl ski) { ++ int fd = IOUtil.fdVal(ski.channel.getFD()); ++ fdToKey.put(new Integer(fd), ski); ++ totalChannels++; ++ keys.add(ski); ++ } ++ ++ protected void implDereg(SelectionKeyImpl ski) throws IOException { ++ int i = ski.getIndex(); ++ assert (i >= 0); ++ int fd = ski.channel.getFDVal(); ++ fdToKey.remove(new Integer(fd)); ++ kqueueWrapper.release(fd); ++ totalChannels--; ++ ski.setIndex(-1); ++ keys.remove(ski); ++ selectedKeys.remove(ski); ++ deregister((AbstractSelectionKey)ski); ++ SelectableChannel selch = ski.channel(); ++ if (!selch.isOpen() && !selch.isRegistered()) ++ ((SelChImpl)selch).kill(); ++ } ++ ++ void putEventOps(SelectionKeyImpl sk, int ops) { ++ int fd = IOUtil.fdVal(sk.channel.getFD()); ++ kqueueWrapper.setInterest(fd, ops); ++ } ++ ++ public Selector wakeup() { ++ synchronized (interruptLock) { ++ if (!interruptTriggered) { ++ kqueueWrapper.interrupt(); ++ interruptTriggered = true; ++ } ++ } ++ return this; ++ } ++ ++ static { ++ Util.load(); ++ } ++ ++} +--- jdk/src/solaris/classes/sun/nio/ch/Kqueue.java (revision 0) ++++ jdk/src/solaris/classes/sun/nio/ch/Kqueue.java (revision 16) +@@ -0,0 +1,49 @@ ++/* ++ * Scratched by davidxu@freebsd.org ++ */ ++ ++package sun.nio.ch; ++ ++import java.io.IOException; ++import sun.misc.Unsafe; ++ ++class Kqueue { ++ // Kevent filters ++ static final short EVFILT_READ = -1; ++ static final short EVFILT_WRITE = -2; ++ ++ // Kevent flags ++ static final short EV_ADD = 0x0001; ++ static final short EV_DELETE = 0x0002; ++ static final short EV_ONESHOT = 0x0010; ++ static final short EV_ERROR = 0x4000; ++ ++ private static final Unsafe unsafe = Unsafe.getUnsafe(); ++ static final int SIZEOF_KEVENT = keventSize(); ++ ++ private Kqueue() {} ++ ++ /** ++ * Allocates a poll array to handle up to {@code count} events. ++ */ ++ static long allocatePollArray(int count) { ++ return unsafe.allocateMemory(count * SIZEOF_KEVENT); ++ } ++ ++ /** ++ * Free a poll array ++ */ ++ static void freePollArray(long address) { ++ unsafe.freeMemory(address); ++ } ++ ++ static native int kqueue(); ++ static native int keventChange(int kqfd, int fd, short flags, short filter); ++ static native int kevent(int kqfd, long changeList, int nchanges, long eventList, ++ int nevents, long timeout); ++ static native int keventSize(); ++ static native void putKevent(long address, int index, int fd, short flag, short filter); ++ static native short getKeventFilter(long address, int index); ++ static native short getKeventFlags(long address, int index); ++ static native int getKeventIdent(long address, int index); ++} +--- jdk/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java (revision 0) ++++ jdk/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java (revision 16) +@@ -0,0 +1,211 @@ ++/* ++ * Scratched by davidxu@freebsd.org ++ */ ++ ++package sun.nio.ch; ++ ++import sun.misc.*; ++import java.io.IOException; ++import java.util.HashMap; ++import java.util.Set; ++import java.util.Arrays; ++import static sun.nio.ch.Kqueue.*; ++ ++class KqueueArrayWrapper { ++ ++ // Event masks copied from class AbstractPollArrayWrapper ++ static final short POLLIN = 0x0001; ++ static final short POLLOUT = 0x0004; ++ static final short POLLERR = 0x0008; ++ static final short POLLHUP = 0x0010; ++ static final short POLLNVAL = 0x0020; ++ static final short POLLREMOVE = 0x0800; ++ ++ // Zero mask to unregister events from kqueue ++ static final Integer ZERO_MASK = new Integer(0); ++ ++ // Capacity increment of some arrays ++ static final int capacityIncr = 100; ++ ++ KqueueArrayWrapper() { ++ int allocationSize; ++ ++ // initial size of event array ++ pollKeventSize = capacityIncr * 2; ++ allocationSize = pollKeventSize * SIZEOF_KEVENT; ++ pollKeventArray = new AllocatedNativeObject(allocationSize, true); ++ kqfd = kqueue(); ++ } ++ ++ // Machinery for remembering fd registration changes ++ private HashMap<Integer, Integer> updateMap = new HashMap<Integer, Integer>(); ++ private int[] oldMasks = new int[capacityIncr]; ++ ++ // kevent array to receive ++ private AllocatedNativeObject pollKeventArray; ++ ++ // current size of pollKeventArray ++ int pollKeventSize; ++ ++ // the pollKeventSize should be larger than this ++ int nextKeventSize; ++ ++ // The fd of the kqueue() ++ int kqfd; ++ ++ // The fd of the interrupt line going out ++ int outgoingInterruptFD; ++ ++ // The fd of the interrupt line coming in ++ int incomingInterruptFD; ++ ++ // The index of the interrupt FD ++ int interruptedIndex; ++ ++ // Number of updated kevent entries ++ int updated; ++ ++ // ensure some array sizes are large enough with a given file handle ++ void ensureFd(int fd) { ++ ensureNextEventFd(fd); ++ if (oldMasks.length < fd+1) ++ oldMasks = Arrays.copyOf(oldMasks, fd+capacityIncr); ++ } ++ ++ void ensureNextEventFd(int fd) { ++ // each file handle may have two filters, read and write. ++ if (nextKeventSize / 2 < fd+1) ++ nextKeventSize = (fd+1) * 2; ++ } ++ ++ void resizeEventBuffer() { ++ if (nextKeventSize > pollKeventSize) { ++ pollKeventArray.free(); ++ pollKeventSize = nextKeventSize + capacityIncr * 2; ++ int allocationSize = pollKeventSize * SIZEOF_KEVENT; ++ pollKeventArray = new AllocatedNativeObject(allocationSize, true); ++ } ++ } ++ ++ void initInterrupt(int fd0, int fd1) { ++ outgoingInterruptFD = fd1; ++ incomingInterruptFD = fd0; ++ ensureFd(fd0); ++ keventChange(kqfd, fd0, EV_ADD, EVFILT_READ); ++ } ++ ++ int getReventOps(int i) { ++ short filter = getKeventFilter(pollKeventArray.address(), i); ++ short flags = getKeventFlags(pollKeventArray.address(), i); ++ if ((flags & EV_ERROR) != 0) ++ return POLLERR; ++ if (filter == EVFILT_READ) ++ return POLLIN; ++ if (filter == EVFILT_WRITE) ++ return POLLOUT; ++ return (0); ++ } ++ ++ int getDescriptor(int i) { ++ return getKeventIdent(pollKeventArray.address(), i); ++ } ++ ++ void setInterest(int fd, int mask) { ++ if (fd <0) ++ throw new IllegalArgumentException("file handle less than 0"); ++ synchronized (updateMap) { ++ ensureFd(fd); ++ updateMap.put(new Integer(fd), new Integer(mask)); ++ } ++ } ++ ++ void release(int fd) { ++ synchronized (updateMap) { ++ updateMap.put(new Integer(fd), ZERO_MASK); ++ } ++ } ++ ++ void closeKqueueFD() throws IOException { ++ FileDispatcherImpl.closeIntFD(kqfd); ++ pollKeventArray.free(); ++ } ++ ++ int poll(long timeout) { ++ int changeCount = updateRegistrations(); ++ updated = kevent(kqfd, pollKeventArray.address(), changeCount, ++ pollKeventArray.address(), pollKeventSize, timeout); ++ for (int i = 0; i < updated; i++) { ++ if (getDescriptor(i) == incomingInterruptFD) { ++ interruptedIndex = i; ++ interrupted = true; ++ break; ++ } ++ } ++ return updated; ++ } ++ ++ int updateRegistrations() { ++ int index = 0; ++ synchronized (updateMap) { ++ resizeEventBuffer(); ++ ++ Set<Integer> s = updateMap.keySet(); ++ /* ++ * Because resizeEventBuffer may reallocate event buffer, ++ * we must retrieve fresh address here. ++ */ ++ long address = pollKeventArray.address(); ++ ++ for (Integer fd : s) { ++ Integer newmask = updateMap.get(fd); ++ int oldmask = oldMasks[fd]; ++ if ((oldmask & POLLIN) != 0) { ++ if ((newmask & POLLIN) == 0) { ++ putKevent(address, index, fd.intValue(), EV_DELETE, EVFILT_READ); ++ index++; ++ } ++ } else { ++ if ((newmask & POLLIN) != 0) { ++ putKevent(address, index, fd.intValue(), EV_ADD, EVFILT_READ); ++ index++; ++ } ++ } ++ ++ if ((oldmask & POLLOUT) != 0) { ++ if ((newmask & POLLOUT) == 0) { ++ putKevent(address, index, fd.intValue(), EV_DELETE, EVFILT_WRITE); ++ index++; ++ } ++ } else { ++ if ((newmask & POLLOUT) != 0) { ++ putKevent(address, index, fd.intValue(), EV_ADD, EVFILT_WRITE); ++ index++; ++ } ++ } ++ oldMasks[fd] = newmask; ++ } ++ updateMap.clear(); ++ } ++ return index; ++ } ++ ++ boolean interrupted = false; ++ ++ public void interrupt() { ++ interrupt(outgoingInterruptFD); ++ } ++ ++ public int interruptedIndex() { ++ return interruptedIndex; ++ } ++ ++ boolean interrupted() { ++ return interrupted; ++ } ++ ++ void clearInterrupted() { ++ interrupted = false; ++ } ++ ++ private static native void interrupt(int fd); ++} +--- jdk/src/solaris/classes/sun/nio/ch/DefaultAsynchronousChannelProvider.java (revision 1) ++++ jdk/src/solaris/classes/sun/nio/ch/DefaultAsynchronousChannelProvider.java (revision 16) +@@ -46,6 +46,8 @@ + public static AsynchronousChannelProvider create() { + String osname = AccessController + .doPrivileged(new GetPropertyAction("os.name")); ++ if (osname.equals("FreeBSD")) ++ return new FreeBSDAsynchronousChannelProvider(); + if (osname.equals("SunOS")) + return new SolarisAsynchronousChannelProvider(); + if (osname.equals("Linux")) +--- jdk/src/solaris/classes/sun/nio/ch/KqueuePort.java (revision 0) ++++ jdk/src/solaris/classes/sun/nio/ch/KqueuePort.java (revision 16) +@@ -0,0 +1,321 @@ ++/* ++ * Scratched by davidxu@FreeBSD.org ++ */ ++ ++package sun.nio.ch; ++ ++import java.nio.channels.spi.AsynchronousChannelProvider; ++import java.io.IOException; ++import java.util.concurrent.ArrayBlockingQueue; ++import java.util.concurrent.RejectedExecutionException; ++import java.util.concurrent.atomic.AtomicInteger; ++import static sun.nio.ch.Kqueue.*; ++ ++/** ++ * AsynchronousChannelGroup implementation based on the FreeBSD kqueue facility. ++ */ ++ ++final class KqueuePort ++ extends Port ++{ ++/* ++ // Kevent filters ++ static final short EVFILT_READ = -1; ++ static final short EVFILT_WRITE = -2; ++ ++ // Kevent flags ++ static final char EV_ADD = 0x0001; ++ static final char EV_DELETE = 0x0002; ++ static final char EV_EOF = 0x8000; ++ static final char EV_ERROR = 0x4000; ++*/ ++ ++ // maximum number of events to poll at a time ++ private static final int MAX_KEVENTS = 512; ++ ++ // kqueue file descriptor ++ private final int kqfd; ++ ++ // true if kqueue closed ++ private boolean closed; ++ ++ // socket pair used for wakeup ++ private final int sp[]; ++ ++ // number of wakeups pending ++ private final AtomicInteger wakeupCount = new AtomicInteger(); ++ ++ // address of the poll array passed to kevent() ++ private final long address; ++ ++ // encapsulates an event for a channel ++ static class Event { ++ final PollableChannel channel; ++ final int events; ++ ++ Event(PollableChannel channel, int events) { ++ this.channel = channel; ++ this.events = events; ++ } ++ ++ PollableChannel channel() { return channel; } ++ int events() { return events; } ++ } ++ ++ // queue of events for cases that a polling thread dequeues more than one ++ // event ++ private final ArrayBlockingQueue<Event> queue; ++ private final Event NEED_TO_POLL = new Event(null, 0); ++ private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); ++ ++ KqueuePort(AsynchronousChannelProvider provider, ThreadPool pool) ++ throws IOException ++ { ++ super(provider, pool); ++ ++ // open kqueue ++ this.kqfd = kqueue(); ++ ++ // create socket pair for wakeup mechanism ++ int[] sv = new int[2]; ++ try { ++ socketpair(sv); ++ // register one end with epoll ++ keventChange(kqfd, sv[0], EV_ADD, EVFILT_READ); ++ } catch (IOException x) { ++ close0(kqfd); ++ throw x; ++ } ++ this.sp = sv; ++ ++ // allocate the poll array ++ this.address = allocatePollArray(MAX_KEVENTS); ++ ++ // create the queue and offer the special event to ensure that the first ++ // threads polls ++ this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS); ++ this.queue.offer(NEED_TO_POLL); ++ } ++ ++ KqueuePort start() { ++ startThreads(new EventHandlerTask()); ++ return this; ++ } ++ ++ /** ++ * Release all resources ++ */ ++ private void implClose() { ++ synchronized (this) { ++ if (closed) ++ return; ++ closed = true; ++ } ++ freePollArray(address); ++ close0(sp[0]); ++ close0(sp[1]); ++ close0(kqfd); ++ } ++ ++ private void wakeup() { ++ if (wakeupCount.incrementAndGet() == 1) { ++ // write byte to socketpair to force wakeup ++ try { ++ interrupt(sp[1]); ++ } catch (IOException x) { ++ throw new AssertionError(x); ++ } ++ } ++ } ++ ++ @Override ++ void executeOnHandlerTask(Runnable task) { ++ synchronized (this) { ++ if (closed) ++ throw new RejectedExecutionException(); ++ offerTask(task); ++ wakeup(); ++ } ++ } ++ ++ @Override ++ void shutdownHandlerTasks() { ++ /* ++ * If no tasks are running then just release resources; otherwise ++ * write to the one end of the socketpair to wakeup any polling threads. ++ */ ++ int nThreads = threadCount(); ++ if (nThreads == 0) { ++ implClose(); ++ } else { ++ // send interrupt to each thread ++ while (nThreads-- > 0) { ++ wakeup(); ++ } ++ } ++ } ++ ++ // invoke by clients to register a file descriptor ++ @Override ++ void startPoll(int fd, int events) { ++ ++ if ((events & POLLIN) != 0) ++ keventChange(kqfd, fd, (short)(EV_ONESHOT|EV_ADD), EVFILT_READ); ++ if ((events & POLLOUT) != 0) ++ keventChange(kqfd, fd, (short)(EV_ONESHOT|EV_ADD), EVFILT_WRITE); ++ } ++ ++ /* ++ * Task to process events from kevent and dispatch to the channel's ++ * onEvent handler. ++ * ++ * Events are retreived from kevent in batch and offered to a BlockingQueue ++ * where they are consumed by handler threads. A special "NEED_TO_POLL" ++ * event is used to signal one consumer to re-poll when all events have ++ * been consumed. ++ */ ++ private class EventHandlerTask implements Runnable { ++ private Event poll() throws IOException { ++ try { ++ for (;;) { ++ int n = kevent(kqfd, 0, 0, address, MAX_KEVENTS, -1); ++ /* ++ * 'n' events have been read. Here we map them to their ++ * corresponding channel in batch and queue n-1 so that ++ * they can be handled by other handler threads. The last ++ * event is handled by this thread (and so is not queued). ++ */ ++ fdToChannelLock.readLock().lock(); ++ try { ++ while (n-- > 0) { ++ int fd = getKeventIdent(address, n); ++ ++ // wakeup ++ if (fd == sp[0]) { ++ if (wakeupCount.decrementAndGet() == 0) { ++ // no more wakeups so drain pipe ++ drain1(sp[0]); ++ } ++ ++ // queue special event if there are more events ++ // to handle. ++ if (n > 0) { ++ queue.offer(EXECUTE_TASK_OR_SHUTDOWN); ++ continue; ++ } ++ return EXECUTE_TASK_OR_SHUTDOWN; ++ } ++ ++ PollableChannel channel = fdToChannel.get(fd); ++ if (channel != null) { ++ int events = getEvents(address, n); ++ Event ev = new Event(channel, events); ++ ++ // n-1 events are queued; This thread handles ++ // the last one except for the wakeup ++ if (n > 0) { ++ queue.offer(ev); ++ } else { ++ return ev; ++ } ++ } ++ } ++ } finally { ++ fdToChannelLock.readLock().unlock(); ++ } ++ } ++ } finally { ++ // to ensure that some thread will poll when all events have ++ // been consumed ++ queue.offer(NEED_TO_POLL); ++ } ++ } ++ ++ public void run() { ++ Invoker.GroupAndInvokeCount myGroupAndInvokeCount = ++ Invoker.getGroupAndInvokeCount(); ++ final boolean isPooledThread = (myGroupAndInvokeCount != null); ++ boolean replaceMe = false; ++ Event ev; ++ try { ++ for (;;) { ++ // reset invoke count ++ if (isPooledThread) ++ myGroupAndInvokeCount.resetInvokeCount(); ++ ++ try { ++ replaceMe = false; ++ ev = queue.take(); ++ ++ // no events and this thread has been "selected" to ++ // poll for more. ++ if (ev == NEED_TO_POLL) { ++ try { ++ ev = poll(); ++ } catch (IOException x) { ++ x.printStackTrace(); ++ return; ++ } ++ } ++ } catch (InterruptedException x) { ++ continue; ++ } ++ ++ // handle wakeup to execute task or shutdown ++ if (ev == EXECUTE_TASK_OR_SHUTDOWN) { ++ Runnable task = pollTask(); ++ if (task == null) { ++ // shutdown request ++ return; ++ } ++ // run task (may throw error/exception) ++ replaceMe = true; ++ task.run(); ++ continue; ++ } ++ ++ // process event ++ try { ++ ev.channel().onEvent(ev.events(), isPooledThread); ++ } catch (Error x) { ++ replaceMe = true; throw x; ++ } catch (RuntimeException x) { ++ replaceMe = true; throw x; ++ } ++ } ++ } finally { ++ // last handler to exit when shutdown releases resources ++ int remaining = threadExit(this, replaceMe); ++ if (remaining == 0 && isShutdown()) { ++ implClose(); ++ } ++ } ++ } ++ } ++ ++ static int getEvents(long address, int index) { ++ short filter = getKeventFilter(address, index); ++ short flags = getKeventFlags(address, index); ++ if ((flags & EV_ERROR) != 0) ++ return POLLERR; ++ if (filter == EVFILT_READ) ++ return POLLIN; ++ if (filter == EVFILT_WRITE) ++ return POLLOUT; ++ return (0); ++ } ++ ++ // -- Native methods -- ++ ++ private static native void socketpair(int[] sv) throws IOException; ++ ++ private static native void interrupt(int fd) throws IOException; ++ ++ private static native void drain1(int fd) throws IOException; ++ ++ private static native void close0(int fd); ++ ++ static { ++ Util.load(); ++ } ++} +--- jdk/src/solaris/classes/sun/nio/ch/FreeBSDAsynchronousChannelProvider.java (revision 0) ++++ jdk/src/solaris/classes/sun/nio/ch/FreeBSDAsynchronousChannelProvider.java (revision 16) +@@ -0,0 +1,75 @@ ++ ++package sun.nio.ch; ++ ++import java.nio.channels.*; ++import java.nio.channels.spi.AsynchronousChannelProvider; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.ThreadFactory; ++import java.net.ProtocolFamily; ++import java.io.IOException; ++ ++public class FreeBSDAsynchronousChannelProvider ++ extends AsynchronousChannelProvider ++{ ++ private static volatile KqueuePort defaultPort; ++ ++ private KqueuePort defaultEventPort() throws IOException { ++ if (defaultPort == null) { ++ synchronized (FreeBSDAsynchronousChannelProvider.class) { ++ if (defaultPort == null) { ++ defaultPort = new KqueuePort(this, ThreadPool.getDefault()).start(); ++ } ++ } ++ } ++ return defaultPort; ++ } ++ ++ public FreeBSDAsynchronousChannelProvider() { ++ } ++ ++ @Override ++ public AsynchronousChannelGroup openAsynchronousChannelGroup(int nThreads, ThreadFactory factory) ++ throws IOException ++ { ++ return new KqueuePort(this, ThreadPool.create(nThreads, factory)).start(); ++ } ++ ++ @Override ++ public AsynchronousChannelGroup openAsynchronousChannelGroup(ExecutorService executor, int initialSize) ++ throws IOException ++ { ++ return new KqueuePort(this, ThreadPool.wrap(executor, initialSize)).start(); ++ } ++ ++ private Port toPort(AsynchronousChannelGroup group) throws IOException { ++ if (group == null) { ++ return defaultEventPort(); ++ } else { ++ if (!(group instanceof KqueuePort)) ++ throw new IllegalChannelGroupException(); ++ return (Port)group; ++ } ++ } ++ ++ @Override ++ public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup group) ++ throws IOException ++ { ++ return new UnixAsynchronousServerSocketChannelImpl(toPort(group)); ++ } ++ ++ @Override ++ public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group) ++ throws IOException ++ { ++ return new UnixAsynchronousSocketChannelImpl(toPort(group)); ++ } ++ ++ @Override ++ public AsynchronousDatagramChannel openAsynchronousDatagramChannel(ProtocolFamily family, ++ AsynchronousChannelGroup group) ++ throws IOException ++ { ++ return new SimpleAsynchronousDatagramChannelImpl(family, toPort(group)); ++ } ++} +$FreeBSD$ + +--- jdk/make/java/nio/Makefile (revision 1) ++++ jdk/make/java/nio/Makefile (revision 16) +@@ -264,7 +264,12 @@ + ifeq ($(PLATFORM), bsd) + FILES_java += \ + sun/nio/ch/AbstractPollSelectorImpl.java \ ++ sun/nio/ch/FreeBSDAsynchronousChannelProvider.java \ + sun/nio/ch/InheritedChannel.java \ ++ sun/nio/ch/Kqueue.java \ ++ sun/nio/ch/KqueueArrayWrapper.java \ ++ sun/nio/ch/KqueueSelectorProvider.java \ ++ sun/nio/ch/KqueueSelectorImpl.java \ + sun/nio/ch/PollSelectorProvider.java \ + sun/nio/ch/PollSelectorImpl.java \ + sun/nio/ch/Port.java \ +@@ -299,6 +304,9 @@ + + FILES_c += \ + InheritedChannel.c \ ++ Kqueue.c \ ++ KqueueArrayWrapper.c \ ++ KqueuePort.c \ + NativeThread.c \ + PollArrayWrapper.c \ + UnixAsynchronousServerSocketChannelImpl.c \ +@@ -311,6 +319,9 @@ + + FILES_export += \ + sun/nio/ch/InheritedChannel.java \ ++ sun/nio/ch/Kqueue.java \ ++ sun/nio/ch/KqueueArrayWrapper.java \ ++ sun/nio/ch/KqueuePort.java \ + sun/nio/ch/NativeThread.java \ + sun/nio/ch/UnixAsynchronousServerSocketChannelImpl.java \ + sun/nio/ch/UnixAsynchronousSocketChannelImpl.java \ |