AsynchronousFileChannel
异步的文件通道,FileChannel属于"同步非阻塞",AsynchronousFileChannel才是真正的异步。
从主要方法read/write的返回值为Future可以看出其"异步"的端倪。
open
1public static AsynchronousFileChannel open(Path file,
2 Set<? extends OpenOption> options,
3 ExecutorService executor,
4 FileAttribute<?>... attrs) {
5 FileSystemProvider provider = file.getFileSystem().provider();
6 return provider.newAsynchronousFileChannel(file, options, executor, attrs);
7}
另外一个重载的简化方法声明如下:
1public static AsynchronousFileChannel open(Path file, OpenOption... options) {}
所做的处理便是将可变参数options手动转为Set,executor为null,文件属性为NO_ATTRIBUTES,其实就是一个空的数组。
忽略调用关系,对于Linux来说最终由sun.nio.ch.SimpleAsynchronousFileChannelImpl的open方法实现:
1public static AsynchronousFileChannel open(FileDescriptor fdo,
2 boolean reading,
3 boolean writing,
4 ThreadPool pool) {
5 // Executor is either default or based on pool parameters
6 ExecutorService executor = (pool == null) ? DefaultExecutorHolder.defaultExecutor : pool.executor();
7 return new SimpleAsynchronousFileChannelImpl(fdo, reading, writing, executor);
8}
默认情况下pool参数为null,ThreadPool位于sun.nio.ch包下,其实是对Java线程池ExecutorService的一层包装。DefaultExecutorHolder是SimpleAsynchronousFileChannelImpl的嵌套类,这其实是一个单例模式:
1private static class DefaultExecutorHolder {
2 static final ExecutorService defaultExecutor = ThreadPool.createDefault().executor();
3}
createDefault方法:
1static ThreadPool createDefault() {
2 int initialSize = getDefaultThreadPoolInitialSize();
3 if (initialSize < 0)
4 initialSize = Runtime.getRuntime().availableProcessors();
5 ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory();
6 if (threadFactory == null)
7 threadFactory = defaultThreadFactory;
8 ExecutorService executor = Executors.newCachedThreadPool(threadFactory);
9 return new ThreadPool(executor, false, initialSize);
10}
初始大小
提供AIO操作的线程池初始大小由环境变量java.nio.channels.DefaultThreadPool.initialSize决定,不过从源码来看,此参数只是被保存到了ThreadPool内部,没有看到其真正发挥作用的地方。
线程工厂
默认的线程工厂如下:
1private static final ThreadFactory defaultThreadFactory = new ThreadFactory() {
2 @Override
3 public Thread newThread(Runnable r) {
4 Thread t = new Thread(r);
5 t.setDaemon(true);
6 return t;
7 }
8 };
采用了守护线程。我们可以通过参数java.nio.channels.DefaultThreadPool.threadFactory自定义使用的线程工厂,参数的值为线程工厂的完整类名,getDefaultThreadPoolThreadFactory使用反射的方法将其初始化。
异步的奥秘
我们以读为例,实现位于SimpleAsynchronousFileChannelImpl.implRead(简略版):
1@Override
2<A> Future<Integer> implRead(final ByteBuffer dst,
3 final long position,
4 final A attachment,
5 final CompletionHandler<Integer,? super A> handler) {
6 //创建Future
7 final PendingFuture<Integer,A> result = (handler == null) ?
8 new PendingFuture<Integer,A>(this) : null;
9 Runnable task = new Runnable() {
10 public void run() {
11 int n = 0;
12 Throwable exc = null;
13 int ti = threads.add();
14 try {
15 begin();
16 do {
17 n = IOUtil.read(fdObj, dst, position, nd);
18 } while ((n == IOStatus.INTERRUPTED) && isOpen());
19 if (n < 0 && !isOpen())
20 throw new AsynchronousCloseException();
21 } catch (IOException x) {
22 if (!isOpen())
23 x = new AsynchronousCloseException();
24 exc = x;
25 } finally {
26 end();
27 threads.remove(ti);
28 }
29 if (handler == null) {
30 result.setResult(n, exc);
31 } else {
32 //调用回调函数
33 Invoker.invokeUnchecked(handler, attachment, n, exc);
34 }
35 }
36 };
37 executor.execute(task);
38 return result;
39}
很简单,就是使用线程池去执行IO操作,执行完毕后再通过Future或回调函数通知我们。
回调接口类图如下: