001package org.intellimate.izou.output;
002
003import com.google.common.reflect.TypeToken;
004import org.intellimate.izou.events.EventModel;
005import org.intellimate.izou.identification.Identification;
006import org.intellimate.izou.identification.IdentificationManager;
007import org.intellimate.izou.identification.IdentificationManagerM;
008import org.intellimate.izou.identification.IllegalIDException;
009import org.intellimate.izou.main.Main;
010import org.intellimate.izou.resource.ResourceMinimalImpl;
011import org.intellimate.izou.util.AddonThreadPoolUser;
012import org.intellimate.izou.util.IdentifiableSet;
013import org.intellimate.izou.util.IzouModule;
014
015import java.util.*;
016import java.util.concurrent.CompletableFuture;
017import java.util.concurrent.Future;
018import java.util.concurrent.TimeUnit;
019import java.util.concurrent.locks.Condition;
020import java.util.concurrent.locks.Lock;
021import java.util.concurrent.locks.ReentrantLock;
022import java.util.function.BiPredicate;
023import java.util.function.Consumer;
024import java.util.function.Function;
025import java.util.stream.Collectors;
026
027/**
028 * OutputManager manages all output plugins and is the main class anyone outside the output package should talk to.
029 * It can register/remove new output-plugins and add/delete output-extensions
030 */
031public class OutputManager extends IzouModule implements AddonThreadPoolUser {
032
033    /**
034     * a list that contains all the registered output-plugins of Jarvis
035     */
036    private IdentifiableSet<OutputPluginModel<?, ?>> outputPlugins;
037
038    /**
039     * HashMap that stores OutputPlugins and the Future objects representing the Task
040     */
041    private HashMap<String, Future> futureHashMap;
042
043    /**
044     * this HashMap stores all added OutputExtensions
045     */
046    private HashMap<String, IdentifiableSet<OutputExtensionModel<?, ?>>> outputExtensions;
047
048
049    /**
050     * Creates a new output-manager with a list of output-plugins
051     *
052     * @param main the main instance started from
053     */
054    public OutputManager(Main main) {
055        super(main);
056        outputPlugins = new IdentifiableSet<>();
057        futureHashMap = new HashMap<>();
058        outputExtensions = new HashMap<>();
059    }
060
061    /**
062     * adds outputPlugin to outputPluginList, starts a new thread for the outputPlugin, and stores the future object in a HashMap
063     *
064     * @param outputPlugin OutputPlugin to add
065     * @throws IllegalIDException not yet implemented
066     */
067    public void addOutputPlugin(OutputPluginModel<?, ?> outputPlugin) throws IllegalIDException {
068        if (!futureHashMap.containsKey(outputPlugin.getID())) {
069            outputPlugins.add(outputPlugin);
070            futureHashMap.put(outputPlugin.getID(), submit(outputPlugin));
071        } else {
072            if (futureHashMap.get(outputPlugin.getID()).isDone()) {
073                futureHashMap.remove(outputPlugin.getID());
074                futureHashMap.put(outputPlugin.getID(), submit(outputPlugin));
075            }
076        }
077    }
078
079    /**
080     * removes the OutputPlugin and stops the thread
081     *
082     * @param outputPlugin the outputPlugin to remove
083     */
084    public void removeOutputPlugin(OutputPluginModel outputPlugin) {
085        Future future = futureHashMap.remove(outputPlugin.getID());
086        if (future != null) {
087            future.cancel(true);
088        }
089        outputPlugins.remove(outputPlugin);
090    }
091
092    /**
093     * adds output extension to desired outputPlugin
094     * <p>
095     * adds output extension to desired outputPlugin, so that the output-plugin can start and stop the outputExtension
096     * task as needed. The outputExtension is specific to the output-plugin
097     *
098     * @param outputExtension the outputExtension to be added
099     * @throws IllegalIDException not yet implemented
100     */
101    public void addOutputExtension(OutputExtensionModel<?, ?> outputExtension) throws IllegalIDException {
102        if (outputExtensions.containsKey(outputExtension.getPluginId())) {
103            outputExtensions.get(outputExtension.getPluginId()).add(outputExtension);
104        } else {
105            IdentifiableSet<OutputExtensionModel<?, ?>> outputExtensionList = new IdentifiableSet<>();
106            outputExtensionList.add(outputExtension);
107            outputExtensions.put(outputExtension.getPluginId(), outputExtensionList);
108        }
109        IdentificationManager.getInstance().getIdentification(outputExtension)
110                .ifPresent(id -> outputPlugins.stream()
111                        .filter(outputPlugin -> outputPlugin.getID().equals(outputExtension.getPluginId()))
112                        .forEach(outputPlugin -> outputPlugin.outputExtensionAdded(id)));
113    }
114
115    /**
116     * removes the output-extension of id: extensionId from outputPluginList
117     *
118     * @param outputExtension the OutputExtension to remove
119     */
120    public void removeOutputExtension(OutputExtensionModel<?, ?> outputExtension) {
121        IdentifiableSet<OutputExtensionModel<?, ?>> outputExtensions =
122                this.outputExtensions.get(outputExtension.getPluginId());
123        if (outputExtensions != null)
124            outputExtensions.remove(outputExtension);
125        IdentificationManager.getInstance().getIdentification(outputExtension)
126                .ifPresent(id -> outputPlugins.stream()
127                        .filter(outputPlugin -> outputPlugin.getID().equals(outputExtension.getPluginId()))
128                        .forEach(outputPlugin -> outputPlugin.outputExtensionRemoved(id)));
129    }
130
131    /**
132     * gets the Event and sends it to the right outputPlugin for further processing
133     * <p>
134     * passDataToOutputPlugins is the main method of OutputManger. It is called whenever the output process has to be started
135     *
136     * @param event an Instance of Event
137     */
138    public void passDataToOutputPlugins(EventModel event) {
139        IdentificationManagerM identificationManager = IdentificationManager.getInstance();
140        List<Identification> allIds = outputPlugins.stream()
141                .map(identificationManager::getIdentification)
142                .filter(Optional::isPresent)
143                .map(Optional::get)
144                .collect(Collectors.toList());
145
146        HashMap<Integer, List<Identification>> outputPluginBehaviour = event.getEventBehaviourController()
147                .getOutputPluginBehaviour(allIds);
148
149        @SuppressWarnings("unchecked")
150        Set<OutputPluginModel> outputPluginsCopy = (Set<OutputPluginModel>) this.outputPlugins.clone();
151
152        Function<List<Identification>, List<OutputPluginModel>> getOutputPlugin = ids -> ids.stream()
153                .map(id -> outputPluginsCopy.stream()
154                        .filter(outputPlugin -> outputPlugin.isOwner(id))
155                        .findFirst()
156                        .orElseGet(null))
157                .filter(Objects::nonNull)
158                .peek(outputPluginsCopy::remove)
159                .collect(Collectors.toList());
160
161        outputPluginBehaviour.entrySet().stream()
162                .sorted(Comparator.comparingInt((Map.Entry<Integer, List<Identification>> x) -> x.getKey()).reversed())
163                .flatMap(entry -> getOutputPlugin.apply(entry.getValue()).stream())
164                .distinct()
165                .forEach(op -> processOutputPlugin(event, op));
166
167        outputPluginsCopy.forEach(op -> processOutputPlugin(event, op));
168    }
169
170    private void processOutputPlugin(EventModel event, OutputPluginModel outputPlugin) {
171        //debug("processing outputPlugin: " + outputPlugin.getID() + " for event: " + event.getDescriptors().toString());
172        final Lock lock = new ReentrantLock();
173        final Condition processing = lock.newCondition();
174
175        Consumer<Boolean> consumer = noParam -> {
176            lock.lock();
177            processing.signal();
178            lock.unlock();
179        };
180
181        ResourceMinimalImpl<Consumer<Boolean>> resource = IdentificationManager.getInstance().getIdentification(this)
182                .map(id -> new ResourceMinimalImpl<>(outputPlugin.getID(), id, consumer, null))
183                .orElse(new ResourceMinimalImpl<>(outputPlugin.getID(), null, consumer, null));
184        event.getListResourceContainer().addResource(resource);
185
186        outputPlugin.addToEventList(event);
187
188        boolean finished = false;
189        try {
190            lock.lock();
191            finished = processing.await(100, TimeUnit.SECONDS);
192        } catch (InterruptedException e) {
193            error("Waiting for OutputPlugins interrupted", e);
194        } finally {
195            lock.unlock();
196        }
197        if (finished) {
198            //debug("OutputPlugin: " + outputPlugin.getID() + " finished");
199        } else {
200            error("OutputPlugin: " + outputPlugin.getID() + " timed out");
201        }
202    }
203
204    /**
205     * returns all the associated OutputExtensions
206     *
207     * @param outputPlugin the OutputPlugin to search for
208     * @return a List of Identifications
209     */
210    public List<Identification> getAssociatedOutputExtension(OutputPluginModel<?, ?> outputPlugin) {
211        IdentifiableSet<OutputExtensionModel<?, ?>> outputExtensions = this.outputExtensions.get(outputPlugin.getID());
212        IdentificationManagerM identificationManager = IdentificationManager.getInstance();
213        return filterType(outputExtensions, outputPlugin).stream()
214                .map(identificationManager::getIdentification)
215                .filter(Optional::isPresent)
216                .map(Optional::get)
217                .collect(Collectors.toList());
218    }
219
220    /**
221     * checks for the right type
222     *
223     * @param outputExtensions the OutputExtensions to check
224     * @param outputPlugin     the OutputPlugin to check against
225     * @return a List of filtered OutputExtensions
226     */
227    //TODO: TEST!!!!
228    @SuppressWarnings("SimplifiableIfStatement")
229    private List<OutputExtensionModel<?, ?>> filterType(Collection<OutputExtensionModel<?, ?>> outputExtensions,
230                                                        OutputPluginModel<?, ?> outputPlugin) {
231        BiPredicate<TypeToken<?>, TypeToken<?>> isAssignable = (first, second) -> {
232            if (first == null) {
233                return second == null;
234            } else if (second != null) {
235                return first.isAssignableFrom(second);
236            } else {
237                return false;
238            }
239        };
240        return outputExtensions.stream()
241                .filter(outputExtension ->
242                        isAssignable.test(outputExtension.getArgumentType(), outputPlugin.getArgumentType()))
243                .filter(outputExtension ->
244                        isAssignable.test(outputExtension.getReturnType(), outputPlugin.getReceivingType()))
245                .collect(Collectors.toList());
246    }
247
248    /**
249     * starts every associated OutputExtension
250     *
251     * @param outputPlugin the OutputPlugin to generate the Data for
252     * @param t            the argument or null
253     * @param event        the Event to generate for
254     * @param <T>          the type of the argument
255     * @param <X>          the return type
256     * @return a List of Future-Objects
257     */
258    public <T, X> List<CompletableFuture<X>> generateAllOutputExtensions(OutputPluginModel<T, X> outputPlugin,
259                                                                         T t, EventModel event) {
260        IdentifiableSet<OutputExtensionModel<?, ?>> extensions = outputExtensions.get(outputPlugin.getID());
261        if (extensions == null)
262            return new ArrayList<>();
263        return filterType(extensions, outputPlugin).stream()
264                .map(extension -> {
265                    try {
266                        //noinspection unchecked
267                        return (OutputExtensionModel<X, T>) extension;
268                    } catch (ClassCastException e) {
269                        return null;
270                    }
271                })
272                .filter(Objects::nonNull)
273                .filter(outputExtension -> outputExtension.canRun(event))
274                .map(extension -> submit(() -> extension.generate(event, t)))
275                .collect(Collectors.toList());
276    }
277}