001package org.intellimate.izou.util;
002
003import java.util.Collection;
004import java.util.List;
005import java.util.concurrent.CompletableFuture;
006import java.util.concurrent.Future;
007import java.util.function.Supplier;
008import java.util.stream.Collectors;
009
010/**
011 * ALWAYS implement this interface when you have to put some Tasks into the ThreadPool
012 * @author Leander Kurscheidt
013 * @version 1.0
014 */
015public interface AddonThreadPoolUser extends MainProvider {
016
017    /**
018     * submits the Runnable to the AddOns Thread-Pool
019     * @param runnable the runnable to submit
020     * @return the new CompletableFuture
021     */
022    default CompletableFuture<Void> submit(Runnable runnable) {
023        return CompletableFuture.runAsync(runnable, getMain().getThreadPoolManager().getAddOnsThreadPool())
024                .whenComplete((u, ex) -> {
025                    if (ex != null) {
026                        getMain().getThreadPoolManager().handleThrowable(ex, runnable);
027                    }
028                });
029    }
030
031    /**
032     * submits the Supplier to the AddOns Thread-Pool 
033     * @param supplier the supplier executed
034     * @param <U> the return type
035     * @return the new CompletableFuture
036     */
037    default <U> CompletableFuture<U> submit(Supplier<U> supplier) {
038        return CompletableFuture.supplyAsync(supplier, getMain().getThreadPoolManager().getAddOnsThreadPool())
039                .whenComplete((u, ex) -> {
040                    if (ex != null) {
041                        getMain().getThreadPoolManager().handleThrowable(ex, supplier);
042                    }
043                });
044    }
045
046    /**
047     * times out the collection of futures
048     * @param futures the collection of futures
049     * @param milliseconds the limit in milliseconds (everything under 20 milliseconds makes no sense)
050     * @param <U> the return type of the futures
051     * @param <V> the type of the futures
052     * @return a List of futures
053     * @throws InterruptedException if the process was interrupted
054     */
055    default <U, V extends Future<U>> List<V> timeOut(Collection<? extends V> futures,
056                                                   int milliseconds) throws InterruptedException {
057        //Timeout
058        int start = 0;
059        boolean notFinished = true;
060        while ( (start < milliseconds) && notFinished) {
061            notFinished = futures.stream()
062                    .anyMatch(future -> !future.isDone());
063            start = start + 10;
064            try {
065                Thread.sleep(10);
066            } catch (InterruptedException e) {
067                throw e;
068            }
069        }
070
071        //cancel all running tasks
072        if(notFinished) {
073            futures.stream()
074                    .filter(future -> !future.isDone())
075                    .peek(future -> error(future.toString()+ " timed out",
076                            new Exception(future.toString() + " timed out")))
077                    .forEach(future -> future.cancel(true));
078        }
079        return futures.stream()
080                .filter(Future::isDone)
081                .collect(Collectors.<V>toList());
082    }
083}