001package org.intellimate.izou.sdk.util; 002 003import java.util.Collection; 004import java.util.List; 005import java.util.concurrent.Callable; 006import java.util.concurrent.CompletableFuture; 007import java.util.concurrent.Future; 008import java.util.concurrent.TimeoutException; 009import java.util.function.Supplier; 010import java.util.stream.Collectors; 011 012/** 013 * provides various methods to simplify the interaction with the ThreadPool 014 * @author Leander Kurscheidt 015 * @version 1.0 016 */ 017public interface ThreadPoolUser extends ContextProvider { 018 /** 019 * submits the Runnable to the AddOns Thread-Pool 020 * @param runnable the runnable to submit 021 * @return the new CompletableFuture 022 * @see java.util.concurrent.ExecutorService#submit(Callable) 023 */ 024 default CompletableFuture<Void> submit(Runnable runnable) { 025 return submitRun(runnable); 026 } 027 028 /** 029 * submits the Runnable to the AddOns Thread-Pool 030 * @param runnable the runnable to submit 031 * @return the new CompletableFuture 032 * @see java.util.concurrent.ExecutorService#submit(Callable) 033 */ 034 default CompletableFuture<Void> submitRun(Runnable runnable) { 035 return CompletableFuture.runAsync(runnable, getContext().getThreadPool().getThreadPool()) 036 .whenComplete((u, ex) -> { 037 if (ex != null) { 038 getContext().getThreadPool().handleThrowable(ex, runnable); 039 } 040 }); 041 } 042 043 /** 044 * submits the Supplier to the AddOns Thread-Pool 045 * @param supplier the supplier executed 046 * @param <U> the return type 047 * @return the new CompletableFuture 048 * @see java.util.concurrent.ExecutorService#submit(Callable) 049 */ 050 default <U> CompletableFuture<U> submit(Supplier<U> supplier) { 051 return CompletableFuture.supplyAsync(supplier, getContext().getThreadPool().getThreadPool()) 052 .whenComplete((u, ex) -> { 053 if (ex != null) { 054 getContext().getThreadPool().handleThrowable(ex, supplier); 055 } 056 }); 057 } 058 059 /** 060 * submits the Callable to the AddOns Thread-Pool 061 * @param x the Callable to submit 062 * @param <U> the type to return 063 * @param <X> the Callable 064 * @return an Future-Object 065 * @see java.util.concurrent.ExecutorService#submit(Callable) 066 */ 067 default <U, X extends AddOnModule & Callable<U>> Future<U> submit(X x) { 068 return getContext().getThreadPool().getThreadPool().submit(x); 069 } 070 071 /** 072 * Times out the collection of futures. 073 * @param futures the collection of futures 074 * @param milliseconds the limit in milliseconds (everything under 20 milliseconds makes no sense) 075 * @param <U> the return type of the futures 076 * @param <V> the type of the futures 077 * @return a List of futures 078 * @throws InterruptedException if the process was interrupted 079 */ 080 default <U, V extends Future<U>> List<V> timeOut(Collection<? extends V> futures, 081 int milliseconds) throws InterruptedException { 082 //Timeout 083 int start = 0; 084 boolean notFinished = true; 085 while ( (start < milliseconds) && notFinished) { 086 notFinished = futures.stream() 087 .anyMatch(future -> !future.isDone()); 088 start = start + 10; 089 Thread.sleep(10); 090 } 091 //cancel all running tasks 092 if(notFinished) { 093 futures.stream() 094 .filter(future -> !future.isDone()) 095 .peek(future -> error(future.toString()+ " timed out")) 096 .forEach(future -> future.cancel(true)); 097 } 098 return futures.stream() 099 .filter(Future::isDone) 100 .collect(Collectors.<V>toList()); 101 } 102 103 /** 104 * Creates a new CompletableFuture, which inherit the CompletionStage from the supplied future or an exceptional 105 * completion with the TimeoutException. 106 * <p> 107 * It will NOT cancel/interrupt the original future! 108 * </p> 109 * @param future the future 110 * @param milliseconds the limit in milliseconds 111 * @param <U> the return type of the futures 112 * @return an new CompletableFuture 113 */ 114 default <U> CompletableFuture<U> timeOut(CompletableFuture<U> future, int milliseconds) { 115 CompletableFuture<U> combiner = new CompletableFuture<>(); 116 Future<?> timout = getContext().getThreadPool().getThreadPool().submit(() -> { 117 try { 118 Thread.sleep(milliseconds); 119 } catch (InterruptedException ignored) {} 120 if (!combiner.isDone()) 121 combiner.completeExceptionally(new TimeoutException("timout of " + milliseconds + " exceeded")); 122 }); 123 future.whenComplete((result, throwable) -> { 124 if (!combiner.isDone()) { 125 if (result != null) { 126 combiner.complete(result); 127 } else { 128 combiner.completeExceptionally(throwable); 129 } 130 timout.cancel(true); 131 } 132 }); 133 return combiner; 134 } 135}