001package org.intellimate.izou.sdk.output; 002 003import com.google.common.reflect.TypeToken; 004import org.intellimate.izou.events.EventModel; 005import org.intellimate.izou.identification.Identification; 006import org.intellimate.izou.output.OutputPluginModel; 007import org.intellimate.izou.resource.ResourceModel; 008import org.intellimate.izou.sdk.Context; 009import org.intellimate.izou.sdk.util.AddOnModule; 010import org.intellimate.izou.sdk.util.ThreadPoolUser; 011 012import java.util.List; 013import java.util.Optional; 014import java.util.concurrent.BlockingQueue; 015import java.util.concurrent.CompletableFuture; 016import java.util.concurrent.ExecutionException; 017import java.util.concurrent.LinkedBlockingDeque; 018import java.util.function.Consumer; 019import java.util.stream.Collectors; 020 021/** 022 * The OutputPlugin class gets Event and then starts threads filled with output-extension tasks to create the final 023 * output and then render it on its own medium 024 * @param <T> the type of the Argument 025 * @param <X> the return type 026 */ 027@SuppressWarnings("UnusedDeclaration") 028public abstract class OutputPluginArgument<T, X> extends AddOnModule implements OutputPluginModel<T, X>, ThreadPoolUser { 029 030 /** 031 * here are the events stored before they get processed 032 */ 033 private final BlockingQueue<EventModel> eventBlockingQueue = new LinkedBlockingDeque<>(); 034 /** 035 * the type argument for the Data you want to receive 036 */ 037 private final TypeToken<X> receivingTypeToken; 038 /** 039 * the type argument for the Data you want to give the OutputExtensions as an argument 040 */ 041 private final TypeToken<T> argumentTypeToken; 042 043 /** 044 * setting this boolean to true stops the while-loop 045 */ 046 private boolean stop = false; 047 048 /** 049 * signals whether the OutputPlugin is working 050 */ 051 private boolean isWorking = false; 052 053 /** 054 * creates a new output-plugin with a new id 055 * 056 * @param context context 057 * @param id the id of the new output-plugin 058 */ 059 public OutputPluginArgument(Context context, String id) { 060 super(context, id); 061 this.receivingTypeToken = new TypeToken<X>(getClass()) {}; 062 this.argumentTypeToken = new TypeToken<T>(getClass()) {}; 063 } 064 065 /** 066 * get the outputExtensionList 067 * 068 * @return gets the list of output-extensions in the output-plugin 069 */ 070 public List<Identification> getOutputExtensionList() { 071 return getContext().getOutput().getAssociatedOutputExtension(this); 072 } 073 074 /** 075 * gets the blocking-queue that stores the backlog of Events 076 * 077 * @return blocking-queue that stores Events 078 */ 079 public BlockingQueue<EventModel> getEventBlockingQueue() { 080 return eventBlockingQueue; 081 } 082 083 /** 084 * callback method to notify that an OutputExtension was added 085 * 086 * @param identification the Identification of the OutputExtension added 087 */ 088 @Override 089 public void outputExtensionAdded(Identification identification) {} 090 091 /** 092 * callback method to notify that an OutputExtension was added 093 * 094 * @param identification the Identification of the OutputExtension added 095 */ 096 @Override 097 public void outputExtensionRemoved(Identification identification) {} 098 099 /** 100 * returns the Type of the one wants to receive from the OutputExtensions 101 * 102 * @return the type of the generic 103 */ 104 @Override 105 public TypeToken<X> getReceivingType() { 106 return receivingTypeToken; 107 } 108 109 /** 110 * returns the Type of the argument for the OutputExtensions, or null if none 111 * 112 * @return the type of the Argument 113 */ 114 @Override 115 public TypeToken<T> getArgumentType() { 116 return argumentTypeToken; 117 } 118 119 /** 120 * Adds an event to blockingQueue 121 * 122 * @param event the event to add 123 * @throws IllegalStateException raised if problems adding an event to blockingQueue 124 */ 125 @Override 126 public void addToEventList(EventModel event) { 127 eventBlockingQueue.add(event); 128 } 129 130 @Override 131 public boolean isRunning() { 132 return isWorking; 133 } 134 135 @Override 136 public void stop() { 137 Thread.currentThread().interrupt(); 138 } 139 140 /** 141 * @param event the current processed Event 142 */ 143 public void isDone(EventModel event) { 144 Optional<ResourceModel> resource = event.getListResourceContainer().provideResource(getID()).stream() 145 .filter(resourceS -> resourceS.getProvider().getID() 146 .equals(getContext().getOutput().getManagerIdentification().getID())) 147 .findFirst(); 148 if(!resource.isPresent()) return; 149 if(resource.get().getResource() instanceof Consumer) { 150 Consumer consumer = (Consumer) resource.get().getResource(); 151 //noinspection unchecked 152 consumer.accept(null); 153 } 154 } 155 156 /** 157 * terminates the outputPlugin (will not listen to events anymore) 158 */ 159 public void terminate() { 160 stop = true; 161 eventBlockingQueue.notify(); 162 } 163 164 public void handleFutures(List<CompletableFuture<X>> futures, EventModel eventModel) { 165 List<X> result = futures.stream() 166 .map(future -> { 167 try { 168 return future.get(); 169 } catch (InterruptedException e) { 170 getContext().getLogger().error("interrupted", e); 171 throw new RuntimeException(e); 172 } catch (ExecutionException e) { 173 getContext().getLogger().error("future finished exceptionally", e); 174 return null; 175 } 176 }) 177 .collect(Collectors.toList()); 178 isWorking = true; 179 renderFinalOutput(result, eventModel); 180 isWorking = false; 181 } 182 183 /** 184 * main method for outputPlugin, runs the data-conversion and output-renderer 185 * 186 * it will instruct the outputManager to let the outputExtensions generate the data. Wait for the time specified 187 * in getTimeoutLimit() (standard is 1000 milliseconds) abd the call renderFinalOutput() with the resulting data. 188 */ 189 @Override 190 public void run() { 191 while (!stop) { 192 EventModel event; 193 try { 194 event = blockingQueueHandling(); //gets the new Event if one was added to the blockingQueue 195 } catch (InterruptedException e) { 196 getContext().getLogger().warn(e); 197 continue; 198 } 199 200 List<CompletableFuture<X>> outputExtensions = getContext().getOutput() 201 .generateAllOutputExtensions(this, getArgument(), event); 202 203 try { 204 outputExtensions = timeOut(outputExtensions, getTimeoutLimit()); 205 } catch (InterruptedException e) { 206 getContext().getLogger().warn(e); 207 } 208 209 handleFutures(outputExtensions, event); 210 211 //notifies output-manager when done processing 212 isDone(event); 213 } 214 } 215 216 /** 217 * Default implementation waits until a new Event has been received and then processes it. 218 * 219 * This method is made to be overwritten as seen fit by the developer 220 * 221 * @throws InterruptedException if interrupted while waiting 222 * @return the recently added Event-instance to be processed by the outputPlugin 223 */ 224 public EventModel blockingQueueHandling() throws InterruptedException { 225 return eventBlockingQueue.take(); 226 } 227 228 /** 229 * gets the timeout-limit in Milliseconds 230 * @return timeout in milliseconds 231 */ 232 public int getTimeoutLimit() { 233 return 1000; 234 } 235 236 /** 237 * method that uses tDoneList to generate a final output that will then be rendered. 238 * @param data the data generated 239 * @param eventModel the Event which caused the whole thing 240 */ 241 public abstract void renderFinalOutput(List<X> data, EventModel eventModel); 242 243 /** 244 * returns the argument for the OutputExtensions 245 * @return the argument 246 */ 247 public abstract T getArgument(); 248}