001package org.intellimate.izou.events;
002
003import org.intellimate.izou.util.AddonThreadPoolUser;
004import org.intellimate.izou.util.IzouModule;
005import org.intellimate.izou.identification.Identification;
006import org.intellimate.izou.identification.IllegalIDException;
007import org.intellimate.izou.main.Main;
008import org.intellimate.izou.resource.ResourceModel;
009
010import java.util.*;
011import java.util.concurrent.*;
012import java.util.stream.Collectors;
013
014/**
015 * This class gets all the Events from all registered EventPublisher, generates Resources and passes them to the
016 * OutputManager. Can also be used to fire Events Concurrently.
017 */
018public class EventDistributor extends IzouModule implements Runnable, AddonThreadPoolUser {
019    private BlockingQueue<EventModel<?>> events = new LinkedBlockingQueue<>();
020    private ConcurrentHashMap<Identification, EventPublisher> registered = new ConcurrentHashMap<>();
021    //here are all the Instances to to control the Event-dispatching stored
022    private final ConcurrentLinkedQueue<EventsControllerModel> eventsControllers = new ConcurrentLinkedQueue<>();
023    //here are all the Listeners stored
024    private final ConcurrentHashMap<String, ArrayList<EventListenerModel>> listeners = new ConcurrentHashMap<>();
025    //here are all the Listeners stored that get called when an Event finishes processing
026    private final ConcurrentHashMap<String, ArrayList<EventListenerModel>> finishListeners = new ConcurrentHashMap<>();
027    private boolean stop = false;
028
029    public EventDistributor(Main main) {
030        super(main);
031        main.getThreadPoolManager().getIzouThreadPool().submit(this);
032    }
033
034    /**
035     * fires the event concurrently, this is generally discouraged.
036     * <p>
037     * This method should not be used for normal Events, for for events which obey the following laws:<br>
038     * 1. they are time critical.<br>
039     * 2. addons are not expected to react in any way beside a small update<br>
040     * 3. they are few.<br>
041     * if your event matches the above laws, you may consider firing it concurrently.
042     * </p>
043     * @param eventModel the EventModel
044     */
045    public void fireEventConcurrently(EventModel<?> eventModel) {
046        if(eventModel == null) return;
047        submit(() -> processEvent(eventModel));
048    }
049
050    /**
051     * with this method you can register EventPublisher add a Source of Events to the System.
052     * <p>
053     * This method represents a higher level of abstraction! Use the EventManager to fire Events!
054     * This method is intended for use cases where you have an entire new source of events (e.g. network)
055     * @param identification the Identification of the Source
056     * @return An Optional Object which may or may not contains an EventPublisher
057     * @throws IllegalIDException not yet implemented
058     */
059    public Optional<EventCallable> registerEventPublisher(Identification identification) throws IllegalIDException {
060        if(registered.containsKey(identification)) return Optional.empty();
061        EventPublisher eventPublisher = new EventPublisher(events);
062        registered.put(identification, eventPublisher);
063        return Optional.of(eventPublisher);
064    }
065
066    /**
067     * with this method you can unregister EventPublisher add a Source of Events to the System.
068     * <p>
069     * This method represents a higher level of abstraction! Use the EventManager to fire Events!
070     * This method is intended for use cases where you have an entire new source of events (e.g. network)
071     * @param identification the Identification of the Source
072     */
073    public void unregisterEventPublisher(Identification identification) {
074        if(!registered.containsKey(identification)) return;
075        registered.remove(identification);
076    }
077
078    /**
079     * Registers an EventController to control EventDispatching-Behaviour
080     * <p>
081     * Method is thread-safe.
082     * It is expected that this method executes quickly.
083     *
084     * @param controller the EventController Interface to control event-dispatching
085     * @throws IllegalIDException not yet implemented
086     */
087    public void registerEventsController(EventsControllerModel controller) throws IllegalIDException  {
088        eventsControllers.add(controller);
089    }
090
091    /**
092     * Unregisters an EventController
093     * <p>
094     * Method is thread-safe.
095     *
096     * @param controller the EventController Interface to remove
097     */
098    public void unregisterEventsController(EventsControllerModel controller) {
099        eventsControllers.remove(controller);
100    }
101
102
103    /**
104     * Adds an listener for events.
105     * <p>
106     * Be careful with this method, it will register the listener for ALL the informations found in the Event. If your
107     * event-type is a common event type, it will fire EACH time!.
108     * It will also register for all Descriptors individually!
109     * It will also ignore if this listener is already listening to an Event.
110     * Method is thread-safe.
111     * </p>
112     * @param event the Event to listen to (it will listen to all descriptors individually!)
113     * @param eventListener the ActivatorEventListener-interface for receiving activator events
114     * @throws IllegalIDException not yet implemented
115     */
116    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
117    public void registerEventListener(EventModel event, EventListenerModel eventListener) throws IllegalIDException {
118        registerEventListener(event.getAllInformations(), eventListener);
119    }
120
121    /**
122     * Adds an listener for events.
123     * <p>
124     * It will register for all ids individually!
125     * This method will ignore if this listener is already listening to an Event.
126     * Method is thread-safe.
127     * </p>
128     * @param ids this can be type, or descriptors etc.
129     * @param eventListener the ActivatorEventListener-interface for receiving activator events
130     */
131    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
132    public void registerEventListener(List<String> ids, EventListenerModel eventListener) {
133        for(String id : ids) {
134            ArrayList<EventListenerModel> listenersList = listeners.get(id);
135            if (listenersList == null) {
136                listeners.put(id, new ArrayList<>());
137                listenersList = listeners.get(id);
138            }
139            if (!listenersList.contains(eventListener)) {
140                synchronized (listenersList) {
141                    listenersList.add(eventListener);
142                }
143            }
144        }
145    }
146
147    /**
148     * unregister an EventListener
149     *
150     * It will unregister for all Descriptors individually!
151     * It will also ignore if this listener is not listening to an Event.
152     * Method is thread-safe.
153     *
154     * @param event the Event to stop listen to
155     * @param eventListener the ActivatorEventListener used to listen for events
156     * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed
157     */
158    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
159    public void unregisterEventListener(EventModel<EventModel> event, EventListenerModel eventListener) throws IllegalArgumentException {
160        for (String id : event.getAllInformations()) {
161            ArrayList<EventListenerModel> listenersList = listeners.get(id);
162            if (listenersList == null) {
163                return;
164            }
165            synchronized (listenersList) {
166                listenersList.remove(eventListener);
167            }
168        }
169    }
170
171    /**
172     * unregister an EventListener
173     *
174     * It will unregister for all registered descriptors.
175     * It will also ignore if this listener is not listening to an Event.
176     * Method is thread-safe.
177     *
178     * @param eventListener the ActivatorEventListener used to listen for events
179     * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed
180     */
181    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
182    public void unregisterEventListener(EventListenerModel eventListener) throws IllegalArgumentException {
183        listeners.values().stream()
184                .forEach(list -> {
185                    synchronized (list) {
186                        list.removeIf(eventListenerModel -> eventListenerModel.equals(eventListener));
187                    }
188                });
189    }
190
191    /**
192     * Adds an listener for events that gets called when the event finished processing.
193     * <p>
194     * Be careful with this method, it will register the listener for ALL the informations found in the Event. If your
195     * event-type is a common event type, it will fire EACH time!.
196     * It will also register for all Descriptors individually!
197     * It will also ignore if this listener is already listening to an Event.
198     * Method is thread-safe.
199     * </p>
200     * @param event the Event to listen to (it will listen to all descriptors individually!)
201     * @param eventListener the ActivatorEventListener-interface for receiving activator events
202     * @throws IllegalIDException not yet implemented
203     */
204    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
205    public void registerEventFinishedListener(EventModel event, EventListenerModel eventListener) throws IllegalIDException {
206        registerEventFinishedListener(event.getAllInformations(), eventListener);
207    }
208
209    /**
210     * Adds an listener for events that gets called when the event finished processing.
211     * <p>
212     * It will register for all ids individually!
213     * This method will ignore if this listener is already listening to an Event.
214     * Method is thread-safe.
215     * </p>
216     * @param ids this can be type, or descriptors etc.
217     * @param eventListener the ActivatorEventListener-interface for receiving activator events
218     */
219    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
220    public void registerEventFinishedListener(List<String> ids, EventListenerModel eventListener) {
221        for(String id : ids) {
222            ArrayList<EventListenerModel> listenersList = finishListeners.get(id);
223            if (listenersList == null) {
224                finishListeners.put(id, new ArrayList<>());
225                listenersList = finishListeners.get(id);
226            }
227            if (!listenersList.contains(eventListener)) {
228                synchronized (listenersList) {
229                    listenersList.add(eventListener);
230                }
231            }
232        }
233    }
234
235    /**
236     * unregister an EventListener that got called when the event finished processing.
237     *
238     * It will unregister for all Descriptors individually!
239     * It will also ignore if this listener is not listening to an Event.
240     * Method is thread-safe.
241     *
242     * @param event the Event to stop listen to
243     * @param eventListener the ActivatorEventListener used to listen for events
244     * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed
245     */
246    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
247    public void unregisterEventFinishedListener(EventModel<EventModel> event, EventListenerModel eventListener) throws IllegalArgumentException {
248        for (String id : event.getAllInformations()) {
249            ArrayList<EventListenerModel> listenersList = finishListeners.get(id);
250            if (listenersList == null) {
251                return;
252            }
253            synchronized (listenersList) {
254                listenersList.remove(eventListener);
255            }
256        }
257    }
258
259    /**
260     * unregister an EventListener that got called when the event finished processing.
261     *
262     * It will unregister for all registered descriptors.
263     * It will also ignore if this listener is not listening to an Event.
264     * Method is thread-safe.
265     *
266     * @param eventListener the ActivatorEventListener used to listen for events
267     * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed
268     */
269    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
270    public void unregisterEventFinishedListener(EventListenerModel eventListener) throws IllegalArgumentException {
271        finishListeners.values().stream()
272                .forEach(list -> {
273                    synchronized (list) {
274                        list.removeIf(eventListenerModel -> eventListenerModel.equals(eventListener));
275                    }
276                });
277    }
278
279    /**
280     * Checks whether to dispatch an event
281     *
282     * @param event the fired Event
283     * @return true if the event should be fired
284     */
285    private boolean checkEventsControllers(EventModel event) {
286        List<CompletableFuture<Boolean>> collect = eventsControllers.stream()
287                .map(controller -> submit(() -> controller.controlEventDispatcher(event)).thenApply(result -> {
288                    if (!result)
289                        debug("Event: " + event + " is canceled by " + controller.getID());
290                    return result;
291                }))
292                .collect(Collectors.toList());
293        try {
294            collect = timeOut(collect, 1000);
295        } catch (InterruptedException e) {
296            debug("interrupted");
297        }
298        return collect.stream()
299                .map(future -> {
300                    try {
301                        return future.get();
302                    } catch (InterruptedException | ExecutionException e) {
303                        return null;
304                    }
305                })
306                .filter(Objects::nonNull)
307                .noneMatch(bool -> !bool);
308    }
309
310    public BlockingQueue<EventModel<?>> getEvents() {
311        return events;
312    }
313
314    /**
315     * When an object implementing interface <code>Runnable</code> is used
316     * to create a thread, starting the thread causes the object's
317     * <code>run</code> method to be called in that separately executing
318     * thread.
319     * <p>
320     * The general contract of the method <code>run</code> is that it may
321     * take any action whatsoever.
322     *
323     * @see Thread#run()
324     */
325    @Override
326    public void run() {
327        while(!stop) {
328            try {
329                EventModel<?> event = events.take();
330                processEvent(event);
331            } catch (InterruptedException e) {
332                log.warn("interrupted", e);
333            }
334        }
335    }
336
337    /**
338     * process the Event
339     * @param event the event to process
340     */
341    private void processEvent(EventModel<?> event) {
342        if (!event.getSource().isCreatedFromInstance()) {
343            error("event: " + event + "has invalid source");
344            return;
345        }
346        debug("EventFired: " + event.toString() + " from " + event.getSource().getID());
347        submit(() -> event.lifecycleCallback(EventLifeCycle.START));
348        if (checkEventsControllers(event)) {
349            submit(() -> event.lifecycleCallback(EventLifeCycle.APPROVED));
350            submit(() -> event.lifecycleCallback(EventLifeCycle.RESOURCE));
351            List<ResourceModel> resourceList = getMain().getResourceManager().generateResources(event);
352            event.addResources(resourceList);
353            submit(() -> event.lifecycleCallback(EventLifeCycle.LISTENERS));
354            List<EventListenerModel> listenersTemp = event.getAllInformations().parallelStream()
355                    .map(listeners::get)
356                    .filter(Objects::nonNull)
357                    .flatMap(Collection::stream)
358                    .distinct()
359                    .collect(Collectors.toList());
360
361            List<CompletableFuture> futures = listenersTemp.stream()
362                    .map(eventListener -> submit(() -> eventListener.eventFired(event)))
363                    .collect(Collectors.toList());
364            try {
365                timeOut(futures, 1000);
366            } catch (InterruptedException e) {
367                error("interrupted", e);
368            }
369            submit(() -> event.lifecycleCallback(EventLifeCycle.OUTPUT));
370            getMain().getOutputManager().passDataToOutputPlugins(event);
371            submit(() -> event.lifecycleCallback(EventLifeCycle.ENDED));
372            List<EventListenerModel> finishListenersTemp = event.getAllInformations().parallelStream()
373                    .map(finishListeners::get)
374                    .filter(Objects::nonNull)
375                    .flatMap(Collection::stream)
376                    .distinct()
377                    .collect(Collectors.toList());
378
379            futures = finishListenersTemp.stream()
380                    .map(eventListener -> submit(() -> eventListener.eventFired(event)))
381                    .collect(Collectors.toList());
382
383            try {
384                timeOut(futures, 1000);
385            } catch (InterruptedException e) {
386                error("interrupted", e);
387            }
388        } else {
389            debug("canceling: " + event.toString() + " from " + event.getSource().getID());
390            submit(() -> event.lifecycleCallback(EventLifeCycle.CANCELED));
391        }
392    }
393
394    /**
395     * stops the EventDistributor
396     */
397    public void stop() {
398        stop = true;
399    }
400
401    /**
402     * This class is used to pass Events to the EventDistributor
403     */
404    private class EventPublisher implements EventCallable {
405        //the queue where all the Events are stored
406        private final BlockingQueue<EventModel<?>> events;
407        protected EventPublisher(BlockingQueue<EventModel<?>> events) {
408            this.events = events;
409        }
410
411        /**
412         * use this method to fire Events.
413         * @param event the Event to fire
414         */
415        public void fire(EventModel event) {
416            if(event == null) return;
417            events.add(event);
418        }
419    }
420}