001package org.intellimate.izou.sdk.util;
002
003import java.util.Collection;
004import java.util.List;
005import java.util.concurrent.Callable;
006import java.util.concurrent.CompletableFuture;
007import java.util.concurrent.Future;
008import java.util.concurrent.TimeoutException;
009import java.util.function.Supplier;
010import java.util.stream.Collectors;
011
012/**
013 * provides various methods to simplify the interaction with the ThreadPool
014 * @author Leander Kurscheidt
015 * @version 1.0
016 */
017public interface ThreadPoolUser extends ContextProvider {
018    /**
019     * submits the Runnable to the AddOns Thread-Pool
020     * @param runnable the runnable to submit
021     * @return the new CompletableFuture
022     * @see java.util.concurrent.ExecutorService#submit(Callable)
023     */
024    default CompletableFuture<Void> submit(Runnable runnable) {
025        return submitRun(runnable);
026    }
027
028    /**
029     * submits the Runnable to the AddOns Thread-Pool
030     * @param runnable the runnable to submit
031     * @return the new CompletableFuture
032     * @see java.util.concurrent.ExecutorService#submit(Callable)
033     */
034    default CompletableFuture<Void> submitRun(Runnable runnable) {
035        return CompletableFuture.runAsync(runnable, getContext().getThreadPool().getThreadPool())
036                .whenComplete((u, ex) -> {
037                    if (ex != null) {
038                        getContext().getThreadPool().handleThrowable(ex, runnable);
039                    }
040                });
041    }
042
043    /**
044     * submits the Supplier to the AddOns Thread-Pool
045     * @param supplier the supplier executed
046     * @param <U> the return type
047     * @return the new CompletableFuture
048     * @see java.util.concurrent.ExecutorService#submit(Callable)
049     */
050    default <U> CompletableFuture<U> submit(Supplier<U> supplier) {
051        return CompletableFuture.supplyAsync(supplier, getContext().getThreadPool().getThreadPool())
052                .whenComplete((u, ex) -> {
053                    if (ex != null) {
054                        getContext().getThreadPool().handleThrowable(ex, supplier);
055                    }
056                });
057    }
058
059    /**
060     * submits the Callable to the AddOns Thread-Pool
061     * @param x the Callable to submit
062     * @param <U> the type to return
063     * @param <X> the Callable
064     * @return an Future-Object
065     * @see java.util.concurrent.ExecutorService#submit(Callable)
066     */
067    default <U, X extends AddOnModule & Callable<U>> Future<U>  submit(X x) {
068        return getContext().getThreadPool().getThreadPool().submit(x);
069    }
070
071    /**
072     * Times out the collection of futures.
073     * @param futures the collection of futures
074     * @param milliseconds the limit in milliseconds (everything under 20 milliseconds makes no sense)
075     * @param <U> the return type of the futures
076     * @param <V> the type of the futures
077     * @return a List of futures
078     * @throws InterruptedException if the process was interrupted
079     */
080    default <U, V extends Future<U>> List<V> timeOut(Collection<? extends V> futures,
081                                                     int milliseconds) throws InterruptedException {
082        //Timeout
083        int start = 0;
084        boolean notFinished = true;
085        while ( (start < milliseconds) && notFinished) {
086            notFinished = futures.stream()
087                    .anyMatch(future -> !future.isDone());
088            start = start + 10;
089            Thread.sleep(10);
090        }
091        //cancel all running tasks
092        if(notFinished) {
093            futures.stream()
094                    .filter(future -> !future.isDone())
095                    .peek(future -> error(future.toString()+ " timed out"))
096                    .forEach(future -> future.cancel(true));
097        }
098        return futures.stream()
099                .filter(Future::isDone)
100                .collect(Collectors.<V>toList());
101    }
102
103    /**
104     * Creates a new CompletableFuture, which inherit the CompletionStage from the supplied future or an exceptional
105     * completion with the TimeoutException.
106     * <p>
107     * It will NOT cancel/interrupt the original future!
108     * </p>
109     * @param future the future
110     * @param milliseconds the limit in milliseconds
111     * @param <U> the return type of the futures
112     * @return an new CompletableFuture
113     */
114    default <U> CompletableFuture<U> timeOut(CompletableFuture<U> future, int milliseconds) {
115        CompletableFuture<U> combiner = new CompletableFuture<>();
116        Future<?> timout = getContext().getThreadPool().getThreadPool().submit(() -> {
117            try {
118                Thread.sleep(milliseconds);
119            } catch (InterruptedException ignored) {}
120            if (!combiner.isDone())
121                combiner.completeExceptionally(new TimeoutException("timout of " + milliseconds + " exceeded"));
122        });
123        future.whenComplete((result, throwable) -> {
124            if (!combiner.isDone()) {
125                if (result != null) {
126                    combiner.complete(result);
127                } else {
128                    combiner.completeExceptionally(throwable);
129                }
130                timout.cancel(true);
131            }
132        });
133        return combiner;
134    }
135}