001package org.intellimate.izou.util; 002 003import java.util.Collection; 004import java.util.List; 005import java.util.concurrent.CompletableFuture; 006import java.util.concurrent.Future; 007import java.util.function.Supplier; 008import java.util.stream.Collectors; 009 010/** 011 * ALWAYS implement this interface when you have to put some Tasks into the ThreadPool 012 * @author Leander Kurscheidt 013 * @version 1.0 014 */ 015public interface AddonThreadPoolUser extends MainProvider { 016 017 /** 018 * submits the Runnable to the AddOns Thread-Pool 019 * @param runnable the runnable to submit 020 * @return the new CompletableFuture 021 */ 022 default CompletableFuture<Void> submit(Runnable runnable) { 023 return CompletableFuture.runAsync(runnable, getMain().getThreadPoolManager().getAddOnsThreadPool()) 024 .whenComplete((u, ex) -> { 025 if (ex != null) { 026 getMain().getThreadPoolManager().handleThrowable(ex, runnable); 027 } 028 }); 029 } 030 031 /** 032 * submits the Supplier to the AddOns Thread-Pool 033 * @param supplier the supplier executed 034 * @param <U> the return type 035 * @return the new CompletableFuture 036 */ 037 default <U> CompletableFuture<U> submit(Supplier<U> supplier) { 038 return CompletableFuture.supplyAsync(supplier, getMain().getThreadPoolManager().getAddOnsThreadPool()) 039 .whenComplete((u, ex) -> { 040 if (ex != null) { 041 getMain().getThreadPoolManager().handleThrowable(ex, supplier); 042 } 043 }); 044 } 045 046 /** 047 * times out the collection of futures 048 * @param futures the collection of futures 049 * @param milliseconds the limit in milliseconds (everything under 20 milliseconds makes no sense) 050 * @param <U> the return type of the futures 051 * @param <V> the type of the futures 052 * @return a List of futures 053 * @throws InterruptedException if the process was interrupted 054 */ 055 default <U, V extends Future<U>> List<V> timeOut(Collection<? extends V> futures, 056 int milliseconds) throws InterruptedException { 057 //Timeout 058 int start = 0; 059 boolean notFinished = true; 060 while ( (start < milliseconds) && notFinished) { 061 notFinished = futures.stream() 062 .anyMatch(future -> !future.isDone()); 063 start = start + 10; 064 try { 065 Thread.sleep(10); 066 } catch (InterruptedException e) { 067 throw e; 068 } 069 } 070 071 //cancel all running tasks 072 if(notFinished) { 073 futures.stream() 074 .filter(future -> !future.isDone()) 075 .peek(future -> error(future.toString()+ " timed out", 076 new Exception(future.toString() + " timed out"))) 077 .forEach(future -> future.cancel(true)); 078 } 079 return futures.stream() 080 .filter(Future::isDone) 081 .collect(Collectors.<V>toList()); 082 } 083}