001package org.intellimate.izou.output; 002 003import com.google.common.reflect.TypeToken; 004import org.intellimate.izou.events.EventModel; 005import org.intellimate.izou.identification.Identification; 006import org.intellimate.izou.identification.IdentificationManager; 007import org.intellimate.izou.identification.IdentificationManagerM; 008import org.intellimate.izou.identification.IllegalIDException; 009import org.intellimate.izou.main.Main; 010import org.intellimate.izou.resource.ResourceMinimalImpl; 011import org.intellimate.izou.util.AddonThreadPoolUser; 012import org.intellimate.izou.util.IdentifiableSet; 013import org.intellimate.izou.util.IzouModule; 014 015import java.util.*; 016import java.util.concurrent.CompletableFuture; 017import java.util.concurrent.Future; 018import java.util.concurrent.TimeUnit; 019import java.util.concurrent.locks.Condition; 020import java.util.concurrent.locks.Lock; 021import java.util.concurrent.locks.ReentrantLock; 022import java.util.function.BiPredicate; 023import java.util.function.Consumer; 024import java.util.function.Function; 025import java.util.stream.Collectors; 026 027/** 028 * OutputManager manages all output plugins and is the main class anyone outside the output package should talk to. 029 * It can register/remove new output-plugins and add/delete output-extensions 030 */ 031public class OutputManager extends IzouModule implements AddonThreadPoolUser { 032 033 /** 034 * a list that contains all the registered output-plugins of Jarvis 035 */ 036 private IdentifiableSet<OutputPluginModel<?, ?>> outputPlugins; 037 038 /** 039 * HashMap that stores OutputPlugins and the Future objects representing the Task 040 */ 041 private HashMap<String, Future> futureHashMap; 042 043 /** 044 * this HashMap stores all added OutputExtensions 045 */ 046 private HashMap<String, IdentifiableSet<OutputExtensionModel<?, ?>>> outputExtensions; 047 048 049 /** 050 * Creates a new output-manager with a list of output-plugins 051 * 052 * @param main the main instance started from 053 */ 054 public OutputManager(Main main) { 055 super(main); 056 outputPlugins = new IdentifiableSet<>(); 057 futureHashMap = new HashMap<>(); 058 outputExtensions = new HashMap<>(); 059 } 060 061 /** 062 * adds outputPlugin to outputPluginList, starts a new thread for the outputPlugin, and stores the future object in a HashMap 063 * 064 * @param outputPlugin OutputPlugin to add 065 * @throws IllegalIDException not yet implemented 066 */ 067 public void addOutputPlugin(OutputPluginModel<?, ?> outputPlugin) throws IllegalIDException { 068 if (!futureHashMap.containsKey(outputPlugin.getID())) { 069 outputPlugins.add(outputPlugin); 070 futureHashMap.put(outputPlugin.getID(), submit(outputPlugin)); 071 } else { 072 if (futureHashMap.get(outputPlugin.getID()).isDone()) { 073 futureHashMap.remove(outputPlugin.getID()); 074 futureHashMap.put(outputPlugin.getID(), submit(outputPlugin)); 075 } 076 } 077 } 078 079 /** 080 * removes the OutputPlugin and stops the thread 081 * 082 * @param outputPlugin the outputPlugin to remove 083 */ 084 public void removeOutputPlugin(OutputPluginModel outputPlugin) { 085 Future future = futureHashMap.remove(outputPlugin.getID()); 086 if (future != null) { 087 future.cancel(true); 088 } 089 outputPlugins.remove(outputPlugin); 090 } 091 092 /** 093 * adds output extension to desired outputPlugin 094 * <p> 095 * adds output extension to desired outputPlugin, so that the output-plugin can start and stop the outputExtension 096 * task as needed. The outputExtension is specific to the output-plugin 097 * 098 * @param outputExtension the outputExtension to be added 099 * @throws IllegalIDException not yet implemented 100 */ 101 public void addOutputExtension(OutputExtensionModel<?, ?> outputExtension) throws IllegalIDException { 102 if (outputExtensions.containsKey(outputExtension.getPluginId())) { 103 outputExtensions.get(outputExtension.getPluginId()).add(outputExtension); 104 } else { 105 IdentifiableSet<OutputExtensionModel<?, ?>> outputExtensionList = new IdentifiableSet<>(); 106 outputExtensionList.add(outputExtension); 107 outputExtensions.put(outputExtension.getPluginId(), outputExtensionList); 108 } 109 IdentificationManager.getInstance().getIdentification(outputExtension) 110 .ifPresent(id -> outputPlugins.stream() 111 .filter(outputPlugin -> outputPlugin.getID().equals(outputExtension.getPluginId())) 112 .forEach(outputPlugin -> outputPlugin.outputExtensionAdded(id))); 113 } 114 115 /** 116 * removes the output-extension of id: extensionId from outputPluginList 117 * 118 * @param outputExtension the OutputExtension to remove 119 */ 120 public void removeOutputExtension(OutputExtensionModel<?, ?> outputExtension) { 121 IdentifiableSet<OutputExtensionModel<?, ?>> outputExtensions = 122 this.outputExtensions.get(outputExtension.getPluginId()); 123 if (outputExtensions != null) 124 outputExtensions.remove(outputExtension); 125 IdentificationManager.getInstance().getIdentification(outputExtension) 126 .ifPresent(id -> outputPlugins.stream() 127 .filter(outputPlugin -> outputPlugin.getID().equals(outputExtension.getPluginId())) 128 .forEach(outputPlugin -> outputPlugin.outputExtensionRemoved(id))); 129 } 130 131 /** 132 * gets the Event and sends it to the right outputPlugin for further processing 133 * <p> 134 * passDataToOutputPlugins is the main method of OutputManger. It is called whenever the output process has to be started 135 * 136 * @param event an Instance of Event 137 */ 138 public void passDataToOutputPlugins(EventModel event) { 139 IdentificationManagerM identificationManager = IdentificationManager.getInstance(); 140 List<Identification> allIds = outputPlugins.stream() 141 .map(identificationManager::getIdentification) 142 .filter(Optional::isPresent) 143 .map(Optional::get) 144 .collect(Collectors.toList()); 145 146 HashMap<Integer, List<Identification>> outputPluginBehaviour = event.getEventBehaviourController() 147 .getOutputPluginBehaviour(allIds); 148 149 @SuppressWarnings("unchecked") 150 Set<OutputPluginModel> outputPluginsCopy = (Set<OutputPluginModel>) this.outputPlugins.clone(); 151 152 Function<List<Identification>, List<OutputPluginModel>> getOutputPlugin = ids -> ids.stream() 153 .map(id -> outputPluginsCopy.stream() 154 .filter(outputPlugin -> outputPlugin.isOwner(id)) 155 .findFirst() 156 .orElseGet(null)) 157 .filter(Objects::nonNull) 158 .peek(outputPluginsCopy::remove) 159 .collect(Collectors.toList()); 160 161 outputPluginBehaviour.entrySet().stream() 162 .sorted(Comparator.comparingInt((Map.Entry<Integer, List<Identification>> x) -> x.getKey()).reversed()) 163 .flatMap(entry -> getOutputPlugin.apply(entry.getValue()).stream()) 164 .distinct() 165 .forEach(op -> processOutputPlugin(event, op)); 166 167 outputPluginsCopy.forEach(op -> processOutputPlugin(event, op)); 168 } 169 170 private void processOutputPlugin(EventModel event, OutputPluginModel outputPlugin) { 171 //debug("processing outputPlugin: " + outputPlugin.getID() + " for event: " + event.getDescriptors().toString()); 172 final Lock lock = new ReentrantLock(); 173 final Condition processing = lock.newCondition(); 174 175 Consumer<Boolean> consumer = noParam -> { 176 lock.lock(); 177 processing.signal(); 178 lock.unlock(); 179 }; 180 181 ResourceMinimalImpl<Consumer<Boolean>> resource = IdentificationManager.getInstance().getIdentification(this) 182 .map(id -> new ResourceMinimalImpl<>(outputPlugin.getID(), id, consumer, null)) 183 .orElse(new ResourceMinimalImpl<>(outputPlugin.getID(), null, consumer, null)); 184 event.getListResourceContainer().addResource(resource); 185 186 outputPlugin.addToEventList(event); 187 188 boolean finished = false; 189 try { 190 lock.lock(); 191 finished = processing.await(100, TimeUnit.SECONDS); 192 } catch (InterruptedException e) { 193 error("Waiting for OutputPlugins interrupted", e); 194 } finally { 195 lock.unlock(); 196 } 197 if (finished) { 198 //debug("OutputPlugin: " + outputPlugin.getID() + " finished"); 199 } else { 200 error("OutputPlugin: " + outputPlugin.getID() + " timed out"); 201 } 202 } 203 204 /** 205 * returns all the associated OutputExtensions 206 * 207 * @param outputPlugin the OutputPlugin to search for 208 * @return a List of Identifications 209 */ 210 public List<Identification> getAssociatedOutputExtension(OutputPluginModel<?, ?> outputPlugin) { 211 IdentifiableSet<OutputExtensionModel<?, ?>> outputExtensions = this.outputExtensions.get(outputPlugin.getID()); 212 IdentificationManagerM identificationManager = IdentificationManager.getInstance(); 213 return filterType(outputExtensions, outputPlugin).stream() 214 .map(identificationManager::getIdentification) 215 .filter(Optional::isPresent) 216 .map(Optional::get) 217 .collect(Collectors.toList()); 218 } 219 220 /** 221 * checks for the right type 222 * 223 * @param outputExtensions the OutputExtensions to check 224 * @param outputPlugin the OutputPlugin to check against 225 * @return a List of filtered OutputExtensions 226 */ 227 //TODO: TEST!!!! 228 @SuppressWarnings("SimplifiableIfStatement") 229 private List<OutputExtensionModel<?, ?>> filterType(Collection<OutputExtensionModel<?, ?>> outputExtensions, 230 OutputPluginModel<?, ?> outputPlugin) { 231 BiPredicate<TypeToken<?>, TypeToken<?>> isAssignable = (first, second) -> { 232 if (first == null) { 233 return second == null; 234 } else if (second != null) { 235 return first.isAssignableFrom(second); 236 } else { 237 return false; 238 } 239 }; 240 return outputExtensions.stream() 241 .filter(outputExtension -> 242 isAssignable.test(outputExtension.getArgumentType(), outputPlugin.getArgumentType())) 243 .filter(outputExtension -> 244 isAssignable.test(outputExtension.getReturnType(), outputPlugin.getReceivingType())) 245 .collect(Collectors.toList()); 246 } 247 248 /** 249 * starts every associated OutputExtension 250 * 251 * @param outputPlugin the OutputPlugin to generate the Data for 252 * @param t the argument or null 253 * @param event the Event to generate for 254 * @param <T> the type of the argument 255 * @param <X> the return type 256 * @return a List of Future-Objects 257 */ 258 public <T, X> List<CompletableFuture<X>> generateAllOutputExtensions(OutputPluginModel<T, X> outputPlugin, 259 T t, EventModel event) { 260 IdentifiableSet<OutputExtensionModel<?, ?>> extensions = outputExtensions.get(outputPlugin.getID()); 261 if (extensions == null) 262 return new ArrayList<>(); 263 return filterType(extensions, outputPlugin).stream() 264 .map(extension -> { 265 try { 266 //noinspection unchecked 267 return (OutputExtensionModel<X, T>) extension; 268 } catch (ClassCastException e) { 269 return null; 270 } 271 }) 272 .filter(Objects::nonNull) 273 .filter(outputExtension -> outputExtension.canRun(event)) 274 .map(extension -> submit(() -> extension.generate(event, t))) 275 .collect(Collectors.toList()); 276 } 277}