java-nio-wakeup-trick

探索NIO wakeup内部原理

最近在重新梳理对Kafka的知识,看一遍源码加深印象。源码实现中,Kafka的客户端与服务端都是基于Java NIO机制提高网络IO的效率(当然,这不是唯一因素)。

这里就不再重复说明阻塞/非阻塞,同步/异步IO模型之间的区别了,也不赘述非阻塞IO在不同平台上的实现,例如select,epoll,kqueue等,这些都可以在网络上找到很丰富的文档以及示例。

wakeup 接口

Java NIO里面,我觉得蛮有意思的是这么一个接口,蛮trick的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 /**
* Causes the first selection operation that has not yet returned to return
* immediately.
*
* <p> If another thread is currently blocked in an invocation of the
* {@link #select()} or {@link #select(long)} methods then that invocation
* will return immediately. If no selection operation is currently in
* progress then the next invocation of one of these methods will return
* immediately unless the {@link #selectNow()} method is invoked in the
* meantime. In any case the value returned by that invocation may be
* non-zero. Subsequent invocations of the {@link #select()} or {@link
* #select(long)} methods will block as usual unless this method is invoked
* again in the meantime.
*
* <p> Invoking this method more than once between two successive selection
* operations has the same effect as invoking it just once. </p>
*
* @return This selector
*/
public abstract Selector wakeup();

从上面的注释可以看出,这个接口可以立刻中断正在执行的select()调用,或者接下来第一次被调用的select()调用。

这么神奇?我印象中的非阻塞实现好像不支持“唤醒”功能,Java NIO是怎么做到的?

本着好学求知的精神,我转向Google寻求帮助,终于在StackOverflow上找到了一个相似的问题:

如何在没有IO事件时中断epoll_wait

里面提到了一个技巧:self-pipe,通过管道文件IO的方式唤醒确实是可以做到的,JDK源码也证实了这一点。

源码验证

java-nio-wakeup-flow

1
2
3
4
5
6
7
public void interrupt() {
interrupt(this.interruptFD);
}

private native int poll0(long var1, int var3, long var4);

private static native void interrupt(int var0);

从图中可以看出,最终的wakeup调用了类sun.nio.ch.PollArrayWrapper的interrupt本地方法。

进一步,我们找到了这个本地方法的具体C语言实现,具体定义在src/linux/native/sun/nio/ch/PollArrayWrapper.c文件中:

1
2
3
4
5
6
7
8
9
10
JNIEXPORT void JNICALL
Java_sun_nio_ch_PollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,
"Write to interrupt fd failed");
}
}

通过往管道FD中写入一个整数1的方式,触发了管道FD的可读事件,进而中断了NIO的select()调用。

self-pipe 还是一个蛮有意思的trick。