AsynchronousFileChannel

异步的文件通道,FileChannel属于"同步非阻塞",AsynchronousFileChannel才是真正的异步。

AsynchronousFileChannel

从主要方法read/write的返回值为Future可以看出其"异步"的端倪。

open

1public static AsynchronousFileChannel open(Path file,
2                                           Set<? extends OpenOption> options,
3                                           ExecutorService executor,
4                                           FileAttribute<?>... attrs) {
5    FileSystemProvider provider = file.getFileSystem().provider();
6    return provider.newAsynchronousFileChannel(file, options, executor, attrs);
7}

另外一个重载的简化方法声明如下:

1public static AsynchronousFileChannel open(Path file, OpenOption... options) {}

所做的处理便是将可变参数options手动转为Set,executor为null,文件属性为NO_ATTRIBUTES,其实就是一个空的数组。

忽略调用关系,对于Linux来说最终由sun.nio.ch.SimpleAsynchronousFileChannelImpl的open方法实现:

1public static AsynchronousFileChannel open(FileDescriptor fdo,
2                                               boolean reading,
3                                               boolean writing,
4                                               ThreadPool pool) {
5    // Executor is either default or based on pool parameters
6    ExecutorService executor = (pool == null) ? DefaultExecutorHolder.defaultExecutor : pool.executor();
7    return new SimpleAsynchronousFileChannelImpl(fdo, reading, writing, executor);
8}

默认情况下pool参数为null,ThreadPool位于sun.nio.ch包下,其实是对Java线程池ExecutorService的一层包装。DefaultExecutorHolder是SimpleAsynchronousFileChannelImpl的嵌套类,这其实是一个单例模式:

1private static class DefaultExecutorHolder {
2    static final ExecutorService defaultExecutor = ThreadPool.createDefault().executor();
3}

createDefault方法:

 1static ThreadPool createDefault() {
 2    int initialSize = getDefaultThreadPoolInitialSize();
 3    if (initialSize < 0)
 4        initialSize = Runtime.getRuntime().availableProcessors();
 5    ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory();
 6    if (threadFactory == null)
 7        threadFactory = defaultThreadFactory;
 8    ExecutorService executor = Executors.newCachedThreadPool(threadFactory);
 9    return new ThreadPool(executor, false, initialSize);
10}

初始大小

提供AIO操作的线程池初始大小由环境变量java.nio.channels.DefaultThreadPool.initialSize决定,不过从源码来看,此参数只是被保存到了ThreadPool内部,没有看到其真正发挥作用的地方。

线程工厂

默认的线程工厂如下:

1private static final ThreadFactory defaultThreadFactory = new ThreadFactory() {
2     @Override
3     public Thread newThread(Runnable r) {
4         Thread t = new Thread(r);
5         t.setDaemon(true);
6         return t;
7    }
8 };

采用了守护线程。我们可以通过参数java.nio.channels.DefaultThreadPool.threadFactory自定义使用的线程工厂,参数的值为线程工厂的完整类名,getDefaultThreadPoolThreadFactory使用反射的方法将其初始化。

异步的奥秘

我们以读为例,实现位于SimpleAsynchronousFileChannelImpl.implRead(简略版):

 1@Override
 2<A> Future<Integer> implRead(final ByteBuffer dst,
 3                             final long position,
 4                             final A attachment,
 5                             final CompletionHandler<Integer,? super A> handler) {
 6    //创建Future
 7    final PendingFuture<Integer,A> result = (handler == null) ?
 8        new PendingFuture<Integer,A>(this) : null;
 9    Runnable task = new Runnable() {
10        public void run() {
11            int n = 0;
12            Throwable exc = null;
13            int ti = threads.add();
14            try {
15                begin();
16                do {
17                    n = IOUtil.read(fdObj, dst, position, nd);
18                } while ((n == IOStatus.INTERRUPTED) && isOpen());
19                if (n < 0 && !isOpen())
20                    throw new AsynchronousCloseException();
21            } catch (IOException x) {
22                if (!isOpen())
23                    x = new AsynchronousCloseException();
24                exc = x;
25            } finally {
26                end();
27                threads.remove(ti);
28            }
29            if (handler == null) {
30                result.setResult(n, exc);
31            } else {
32                //调用回调函数
33                Invoker.invokeUnchecked(handler, attachment, n, exc);
34            }
35        }
36    };
37    executor.execute(task);
38    return result;
39}

很简单,就是使用线程池去执行IO操作,执行完毕后再通过Future或回调函数通知我们。

回调接口类图如下:

CompletionHandler