001package org.intellimate.izou.sdk.output;
002
003import com.google.common.reflect.TypeToken;
004import org.intellimate.izou.events.EventModel;
005import org.intellimate.izou.identification.Identification;
006import org.intellimate.izou.output.OutputPluginModel;
007import org.intellimate.izou.resource.ResourceModel;
008import org.intellimate.izou.sdk.Context;
009import org.intellimate.izou.sdk.util.AddOnModule;
010import org.intellimate.izou.sdk.util.ThreadPoolUser;
011
012import java.util.List;
013import java.util.Optional;
014import java.util.concurrent.BlockingQueue;
015import java.util.concurrent.CompletableFuture;
016import java.util.concurrent.ExecutionException;
017import java.util.concurrent.LinkedBlockingDeque;
018import java.util.function.Consumer;
019import java.util.stream.Collectors;
020
021/**
022 * The OutputPlugin class gets Event and then starts threads filled with output-extension tasks to create the final
023 * output and then render it on its own medium
024 * @param <T> the type of the Argument
025 * @param <X> the return type
026 */
027@SuppressWarnings("UnusedDeclaration")
028public abstract class OutputPluginArgument<T, X> extends AddOnModule implements OutputPluginModel<T, X>, ThreadPoolUser {
029
030    /**
031     * here are the events stored before they get processed
032     */
033    private final BlockingQueue<EventModel> eventBlockingQueue = new LinkedBlockingDeque<>();
034    /**
035     * the type argument for the Data you want to receive
036     */
037    private final TypeToken<X> receivingTypeToken;
038    /**
039     * the type argument for the Data you want to give the OutputExtensions as an argument
040     */
041    private final TypeToken<T> argumentTypeToken;
042
043    /**
044     * setting this boolean to true stops the while-loop
045     */
046    private boolean stop = false;
047
048    /**
049     * signals whether the OutputPlugin is working
050     */
051    private boolean isWorking = false;
052
053    /**
054     * creates a new output-plugin with a new id
055     *
056     * @param context context
057     * @param id the id of the new output-plugin
058     */
059    public OutputPluginArgument(Context context, String id) {
060        super(context, id);
061        this.receivingTypeToken = new TypeToken<X>(getClass()) {};
062        this.argumentTypeToken = new TypeToken<T>(getClass()) {};
063    }
064
065    /**
066     * get the outputExtensionList
067     *
068     * @return gets the list of output-extensions in the output-plugin
069     */
070    public List<Identification> getOutputExtensionList() {
071        return getContext().getOutput().getAssociatedOutputExtension(this);
072    }
073
074    /**
075     * gets the blocking-queue that stores the backlog of Events
076     *
077     * @return blocking-queue that stores Events
078     */
079    public BlockingQueue<EventModel> getEventBlockingQueue() {
080        return eventBlockingQueue;
081    }
082
083    /**
084     * callback method to notify that an OutputExtension was added
085     *
086     * @param identification the Identification of the OutputExtension added
087     */
088    @Override
089    public void outputExtensionAdded(Identification identification) {}
090
091    /**
092     * callback method to notify that an OutputExtension was added
093     *
094     * @param identification the Identification of the OutputExtension added
095     */
096    @Override
097    public void outputExtensionRemoved(Identification identification) {}
098
099    /**
100     * returns the Type of the one wants to receive from the OutputExtensions
101     *
102     * @return the type of the generic
103     */
104    @Override
105    public TypeToken<X> getReceivingType() {
106        return receivingTypeToken;
107    }
108
109    /**
110     * returns the Type of the argument for the OutputExtensions, or null if none
111     *
112     * @return the type of the Argument
113     */
114    @Override
115    public TypeToken<T> getArgumentType() {
116        return argumentTypeToken;
117    }
118
119    /**
120     * Adds an event to blockingQueue
121     *
122     * @param event the event to add
123     * @throws IllegalStateException raised if problems adding an event to blockingQueue
124     */
125    @Override
126    public void addToEventList(EventModel event) {
127        eventBlockingQueue.add(event);
128    }
129
130    @Override
131    public boolean isRunning() {
132        return isWorking;
133    }
134
135    @Override
136    public void stop() {
137        Thread.currentThread().interrupt();
138    }
139
140    /**
141     * @param event the current processed Event
142     */
143    public void isDone(EventModel event) {
144        Optional<ResourceModel> resource = event.getListResourceContainer().provideResource(getID()).stream()
145                .filter(resourceS -> resourceS.getProvider().getID()
146                        .equals(getContext().getOutput().getManagerIdentification().getID()))
147                .findFirst();
148        if(!resource.isPresent()) return;
149        if(resource.get().getResource() instanceof Consumer) {
150            Consumer consumer = (Consumer) resource.get().getResource();
151            //noinspection unchecked
152            consumer.accept(null);
153        }
154    }
155
156    /**
157     * terminates the outputPlugin (will not listen to events anymore)
158     */
159    public void terminate() {
160        stop = true;
161        eventBlockingQueue.notify();
162    }
163
164    public void handleFutures(List<CompletableFuture<X>> futures, EventModel eventModel) {
165        List<X> result = futures.stream()
166                .map(future -> {
167                    try {
168                        return future.get();
169                    } catch (InterruptedException e) {
170                        getContext().getLogger().error("interrupted", e);
171                        throw new RuntimeException(e);
172                    } catch (ExecutionException e) {
173                        getContext().getLogger().error("future finished exceptionally", e);
174                        return null;
175                    }
176                })
177                .collect(Collectors.toList());
178        isWorking = true;
179        renderFinalOutput(result, eventModel);
180        isWorking = false;
181    }
182
183    /**
184     * main method for outputPlugin, runs the data-conversion and output-renderer
185     *
186     * it will instruct the outputManager to let the outputExtensions generate the data. Wait for the time specified
187     * in getTimeoutLimit() (standard is 1000 milliseconds) abd the call renderFinalOutput() with the resulting data.
188     */
189    @Override
190    public void run() {
191        while (!stop) {
192            EventModel event;
193            try {
194                event = blockingQueueHandling();  //gets the new Event if one was added to the blockingQueue
195            } catch (InterruptedException e) {
196                getContext().getLogger().warn(e);
197                continue;
198            }
199
200            List<CompletableFuture<X>> outputExtensions = getContext().getOutput()
201                    .generateAllOutputExtensions(this, getArgument(), event);
202
203            try {
204                outputExtensions = timeOut(outputExtensions, getTimeoutLimit());
205            } catch (InterruptedException e) {
206                getContext().getLogger().warn(e);
207            }
208
209            handleFutures(outputExtensions, event);
210
211            //notifies output-manager when done processing
212            isDone(event);
213        }
214    }
215
216    /**
217     * Default implementation waits until a new Event has been received and then processes it.
218     *
219     * This method is made to be overwritten as seen fit by the developer
220     *
221     * @throws InterruptedException if interrupted while waiting
222     * @return the recently added Event-instance to be processed by the outputPlugin
223     */
224    public EventModel blockingQueueHandling() throws InterruptedException {
225        return eventBlockingQueue.take();
226    }
227
228    /**
229     * gets the timeout-limit in Milliseconds
230     * @return timeout in milliseconds
231     */
232    public int getTimeoutLimit() {
233        return 1000;
234    }
235
236    /**
237     * method that uses tDoneList to generate a final output that will then be rendered.
238     * @param data the data generated
239     * @param eventModel the Event which caused the whole thing
240     */
241    public abstract void renderFinalOutput(List<X> data, EventModel eventModel);
242
243    /**
244     * returns the argument for the OutputExtensions
245     * @return the argument
246     */
247    public abstract T getArgument();
248}