publicFutureTask(Callable<V> callable){ if (callable == null) thrownew NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
publicFutureTask(Runnable runnable, V result){ this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicclassFutureTaskExample{
publicstaticvoidmain(String[] args)throws Exception { FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call()throws Exception { log.info("do something in callable"); Thread.sleep(5000); return"Done"; } });
new Thread(futureTask).start(); log.info("do something in main"); Thread.sleep(1000); String result = futureTask.get(); log.info("result:{}", result); } }
static class MyCallable implements Callable<String> {
@Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } }
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(new MyCallable()); log.info("do something in main"); Thread.sleep(1000); String result = future.get(); log.info("result:{}", result); } }
运行结果
1 2 3
11:29:00.230 [main] INFO com.mmall.concurrency.aqs.FutureExample - do something in main 11:29:00.230 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.FutureExample - do something in callable 11:29:05.234 [main] INFO com.mmall.concurrency.aqs.FutureExample - result:Done
public static void main(String[] args) throws Exception { FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } });
new Thread(futureTask).start(); log.info("do something in main"); Thread.sleep(1000); String result = futureTask.get(); log.info("result:{}", result); } }
/** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
还有一个指定超时时间的get():
1 2 3 4 5 6 7 8 9 10
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
当计算失败时(不包括计算被取消)被内部的run()调用:报告其中的异常
1 2 3 4 5 6 7
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
/** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions * from non-{@code ForkJoinTask} clients, as well as management and * monitoring operations. * * <p>A {@code ForkJoinPool} differs from other kinds of {@link * ExecutorService} mainly by virtue of employing * <em>work-stealing</em>: all threads in the pool attempt to find and * execute tasks submitted to the pool and/or created by other active * tasks (eventually blocking waiting for work if none exist). This * enables efficient processing when most tasks spawn other subtasks * (as do most {@code ForkJoinTask}s), as well as when many small * tasks are submitted to the pool from external clients. Especially * when setting <em>asyncMode</em> to true in constructors, {@code * ForkJoinPool}s may also be appropriate for use with event-style * tasks that are never joined. */
/** * Abstract base class for tasks that run within a {@link ForkJoinPool}. * A {@code ForkJoinTask} is a thread-like entity that is much * lighter weight than a normal thread. Huge numbers of tasks and * subtasks may be hosted by a small number of actual threads in a * ForkJoinPool, at the price of some usage limitations. * * <p>A "main" {@code ForkJoinTask} begins execution when it is * explicitly submitted to a {@link ForkJoinPool}, or, if not already * engaged in a ForkJoin computation, commenced in the {@link * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or * related methods. Once started, it will usually in turn start other * subtasks. As indicated by the name of this class, many programs * using {@code ForkJoinTask} employ only methods {@link #fork} and * {@link #join}, or derivatives such as {@link * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also * provides a number of other methods that can come into play in * advanced usages, as well as extension mechanics that allow support * of new forms of fork/join processing. * * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. * The efficiency of {@code ForkJoinTask}s stems from a set of * restrictions (that are only partially statically enforceable) * reflecting their main use as computational tasks calculating pure * functions or operating on purely isolated objects. The primary * coordination mechanisms are {@link #fork}, that arranges * asynchronous execution, and {@link #join}, that doesn't proceed * until the task's result has been computed. Computations should * ideally avoid {@code synchronized} methods or blocks, and should * minimize other blocking synchronization apart from joining other * tasks or using synchronizers such as Phasers that are advertised to * cooperate with fork/join scheduling. */