001package org.intellimate.izou.events; 002 003import org.intellimate.izou.util.AddonThreadPoolUser; 004import org.intellimate.izou.util.IzouModule; 005import org.intellimate.izou.identification.Identification; 006import org.intellimate.izou.identification.IllegalIDException; 007import org.intellimate.izou.main.Main; 008import org.intellimate.izou.resource.ResourceModel; 009 010import java.util.*; 011import java.util.concurrent.*; 012import java.util.stream.Collectors; 013 014/** 015 * This class gets all the Events from all registered EventPublisher, generates Resources and passes them to the 016 * OutputManager. Can also be used to fire Events Concurrently. 017 */ 018public class EventDistributor extends IzouModule implements Runnable, AddonThreadPoolUser { 019 private BlockingQueue<EventModel<?>> events = new LinkedBlockingQueue<>(); 020 private ConcurrentHashMap<Identification, EventPublisher> registered = new ConcurrentHashMap<>(); 021 //here are all the Instances to to control the Event-dispatching stored 022 private final ConcurrentLinkedQueue<EventsControllerModel> eventsControllers = new ConcurrentLinkedQueue<>(); 023 //here are all the Listeners stored 024 private final ConcurrentHashMap<String, ArrayList<EventListenerModel>> listeners = new ConcurrentHashMap<>(); 025 //here are all the Listeners stored that get called when an Event finishes processing 026 private final ConcurrentHashMap<String, ArrayList<EventListenerModel>> finishListeners = new ConcurrentHashMap<>(); 027 private boolean stop = false; 028 029 public EventDistributor(Main main) { 030 super(main); 031 main.getThreadPoolManager().getIzouThreadPool().submit(this); 032 } 033 034 /** 035 * fires the event concurrently, this is generally discouraged. 036 * <p> 037 * This method should not be used for normal Events, for for events which obey the following laws:<br> 038 * 1. they are time critical.<br> 039 * 2. addons are not expected to react in any way beside a small update<br> 040 * 3. they are few.<br> 041 * if your event matches the above laws, you may consider firing it concurrently. 042 * </p> 043 * @param eventModel the EventModel 044 */ 045 public void fireEventConcurrently(EventModel<?> eventModel) { 046 if(eventModel == null) return; 047 submit(() -> processEvent(eventModel)); 048 } 049 050 /** 051 * with this method you can register EventPublisher add a Source of Events to the System. 052 * <p> 053 * This method represents a higher level of abstraction! Use the EventManager to fire Events! 054 * This method is intended for use cases where you have an entire new source of events (e.g. network) 055 * @param identification the Identification of the Source 056 * @return An Optional Object which may or may not contains an EventPublisher 057 * @throws IllegalIDException not yet implemented 058 */ 059 public Optional<EventCallable> registerEventPublisher(Identification identification) throws IllegalIDException { 060 if(registered.containsKey(identification)) return Optional.empty(); 061 EventPublisher eventPublisher = new EventPublisher(events); 062 registered.put(identification, eventPublisher); 063 return Optional.of(eventPublisher); 064 } 065 066 /** 067 * with this method you can unregister EventPublisher add a Source of Events to the System. 068 * <p> 069 * This method represents a higher level of abstraction! Use the EventManager to fire Events! 070 * This method is intended for use cases where you have an entire new source of events (e.g. network) 071 * @param identification the Identification of the Source 072 */ 073 public void unregisterEventPublisher(Identification identification) { 074 if(!registered.containsKey(identification)) return; 075 registered.remove(identification); 076 } 077 078 /** 079 * Registers an EventController to control EventDispatching-Behaviour 080 * <p> 081 * Method is thread-safe. 082 * It is expected that this method executes quickly. 083 * 084 * @param controller the EventController Interface to control event-dispatching 085 * @throws IllegalIDException not yet implemented 086 */ 087 public void registerEventsController(EventsControllerModel controller) throws IllegalIDException { 088 eventsControllers.add(controller); 089 } 090 091 /** 092 * Unregisters an EventController 093 * <p> 094 * Method is thread-safe. 095 * 096 * @param controller the EventController Interface to remove 097 */ 098 public void unregisterEventsController(EventsControllerModel controller) { 099 eventsControllers.remove(controller); 100 } 101 102 103 /** 104 * Adds an listener for events. 105 * <p> 106 * Be careful with this method, it will register the listener for ALL the informations found in the Event. If your 107 * event-type is a common event type, it will fire EACH time!. 108 * It will also register for all Descriptors individually! 109 * It will also ignore if this listener is already listening to an Event. 110 * Method is thread-safe. 111 * </p> 112 * @param event the Event to listen to (it will listen to all descriptors individually!) 113 * @param eventListener the ActivatorEventListener-interface for receiving activator events 114 * @throws IllegalIDException not yet implemented 115 */ 116 @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) 117 public void registerEventListener(EventModel event, EventListenerModel eventListener) throws IllegalIDException { 118 registerEventListener(event.getAllInformations(), eventListener); 119 } 120 121 /** 122 * Adds an listener for events. 123 * <p> 124 * It will register for all ids individually! 125 * This method will ignore if this listener is already listening to an Event. 126 * Method is thread-safe. 127 * </p> 128 * @param ids this can be type, or descriptors etc. 129 * @param eventListener the ActivatorEventListener-interface for receiving activator events 130 */ 131 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") 132 public void registerEventListener(List<String> ids, EventListenerModel eventListener) { 133 for(String id : ids) { 134 ArrayList<EventListenerModel> listenersList = listeners.get(id); 135 if (listenersList == null) { 136 listeners.put(id, new ArrayList<>()); 137 listenersList = listeners.get(id); 138 } 139 if (!listenersList.contains(eventListener)) { 140 synchronized (listenersList) { 141 listenersList.add(eventListener); 142 } 143 } 144 } 145 } 146 147 /** 148 * unregister an EventListener 149 * 150 * It will unregister for all Descriptors individually! 151 * It will also ignore if this listener is not listening to an Event. 152 * Method is thread-safe. 153 * 154 * @param event the Event to stop listen to 155 * @param eventListener the ActivatorEventListener used to listen for events 156 * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed 157 */ 158 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") 159 public void unregisterEventListener(EventModel<EventModel> event, EventListenerModel eventListener) throws IllegalArgumentException { 160 for (String id : event.getAllInformations()) { 161 ArrayList<EventListenerModel> listenersList = listeners.get(id); 162 if (listenersList == null) { 163 return; 164 } 165 synchronized (listenersList) { 166 listenersList.remove(eventListener); 167 } 168 } 169 } 170 171 /** 172 * unregister an EventListener 173 * 174 * It will unregister for all registered descriptors. 175 * It will also ignore if this listener is not listening to an Event. 176 * Method is thread-safe. 177 * 178 * @param eventListener the ActivatorEventListener used to listen for events 179 * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed 180 */ 181 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") 182 public void unregisterEventListener(EventListenerModel eventListener) throws IllegalArgumentException { 183 listeners.values().stream() 184 .forEach(list -> { 185 synchronized (list) { 186 list.removeIf(eventListenerModel -> eventListenerModel.equals(eventListener)); 187 } 188 }); 189 } 190 191 /** 192 * Adds an listener for events that gets called when the event finished processing. 193 * <p> 194 * Be careful with this method, it will register the listener for ALL the informations found in the Event. If your 195 * event-type is a common event type, it will fire EACH time!. 196 * It will also register for all Descriptors individually! 197 * It will also ignore if this listener is already listening to an Event. 198 * Method is thread-safe. 199 * </p> 200 * @param event the Event to listen to (it will listen to all descriptors individually!) 201 * @param eventListener the ActivatorEventListener-interface for receiving activator events 202 * @throws IllegalIDException not yet implemented 203 */ 204 @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) 205 public void registerEventFinishedListener(EventModel event, EventListenerModel eventListener) throws IllegalIDException { 206 registerEventFinishedListener(event.getAllInformations(), eventListener); 207 } 208 209 /** 210 * Adds an listener for events that gets called when the event finished processing. 211 * <p> 212 * It will register for all ids individually! 213 * This method will ignore if this listener is already listening to an Event. 214 * Method is thread-safe. 215 * </p> 216 * @param ids this can be type, or descriptors etc. 217 * @param eventListener the ActivatorEventListener-interface for receiving activator events 218 */ 219 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") 220 public void registerEventFinishedListener(List<String> ids, EventListenerModel eventListener) { 221 for(String id : ids) { 222 ArrayList<EventListenerModel> listenersList = finishListeners.get(id); 223 if (listenersList == null) { 224 finishListeners.put(id, new ArrayList<>()); 225 listenersList = finishListeners.get(id); 226 } 227 if (!listenersList.contains(eventListener)) { 228 synchronized (listenersList) { 229 listenersList.add(eventListener); 230 } 231 } 232 } 233 } 234 235 /** 236 * unregister an EventListener that got called when the event finished processing. 237 * 238 * It will unregister for all Descriptors individually! 239 * It will also ignore if this listener is not listening to an Event. 240 * Method is thread-safe. 241 * 242 * @param event the Event to stop listen to 243 * @param eventListener the ActivatorEventListener used to listen for events 244 * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed 245 */ 246 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") 247 public void unregisterEventFinishedListener(EventModel<EventModel> event, EventListenerModel eventListener) throws IllegalArgumentException { 248 for (String id : event.getAllInformations()) { 249 ArrayList<EventListenerModel> listenersList = finishListeners.get(id); 250 if (listenersList == null) { 251 return; 252 } 253 synchronized (listenersList) { 254 listenersList.remove(eventListener); 255 } 256 } 257 } 258 259 /** 260 * unregister an EventListener that got called when the event finished processing. 261 * 262 * It will unregister for all registered descriptors. 263 * It will also ignore if this listener is not listening to an Event. 264 * Method is thread-safe. 265 * 266 * @param eventListener the ActivatorEventListener used to listen for events 267 * @throws IllegalArgumentException if Listener is already listening to the Event or the id is not allowed 268 */ 269 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") 270 public void unregisterEventFinishedListener(EventListenerModel eventListener) throws IllegalArgumentException { 271 finishListeners.values().stream() 272 .forEach(list -> { 273 synchronized (list) { 274 list.removeIf(eventListenerModel -> eventListenerModel.equals(eventListener)); 275 } 276 }); 277 } 278 279 /** 280 * Checks whether to dispatch an event 281 * 282 * @param event the fired Event 283 * @return true if the event should be fired 284 */ 285 private boolean checkEventsControllers(EventModel event) { 286 List<CompletableFuture<Boolean>> collect = eventsControllers.stream() 287 .map(controller -> submit(() -> controller.controlEventDispatcher(event)).thenApply(result -> { 288 if (!result) 289 debug("Event: " + event + " is canceled by " + controller.getID()); 290 return result; 291 })) 292 .collect(Collectors.toList()); 293 try { 294 collect = timeOut(collect, 1000); 295 } catch (InterruptedException e) { 296 debug("interrupted"); 297 } 298 return collect.stream() 299 .map(future -> { 300 try { 301 return future.get(); 302 } catch (InterruptedException | ExecutionException e) { 303 return null; 304 } 305 }) 306 .filter(Objects::nonNull) 307 .noneMatch(bool -> !bool); 308 } 309 310 public BlockingQueue<EventModel<?>> getEvents() { 311 return events; 312 } 313 314 /** 315 * When an object implementing interface <code>Runnable</code> is used 316 * to create a thread, starting the thread causes the object's 317 * <code>run</code> method to be called in that separately executing 318 * thread. 319 * <p> 320 * The general contract of the method <code>run</code> is that it may 321 * take any action whatsoever. 322 * 323 * @see Thread#run() 324 */ 325 @Override 326 public void run() { 327 while(!stop) { 328 try { 329 EventModel<?> event = events.take(); 330 processEvent(event); 331 } catch (InterruptedException e) { 332 log.warn("interrupted", e); 333 } 334 } 335 } 336 337 /** 338 * process the Event 339 * @param event the event to process 340 */ 341 private void processEvent(EventModel<?> event) { 342 if (!event.getSource().isCreatedFromInstance()) { 343 error("event: " + event + "has invalid source"); 344 return; 345 } 346 debug("EventFired: " + event.toString() + " from " + event.getSource().getID()); 347 submit(() -> event.lifecycleCallback(EventLifeCycle.START)); 348 if (checkEventsControllers(event)) { 349 submit(() -> event.lifecycleCallback(EventLifeCycle.APPROVED)); 350 submit(() -> event.lifecycleCallback(EventLifeCycle.RESOURCE)); 351 List<ResourceModel> resourceList = getMain().getResourceManager().generateResources(event); 352 event.addResources(resourceList); 353 submit(() -> event.lifecycleCallback(EventLifeCycle.LISTENERS)); 354 List<EventListenerModel> listenersTemp = event.getAllInformations().parallelStream() 355 .map(listeners::get) 356 .filter(Objects::nonNull) 357 .flatMap(Collection::stream) 358 .distinct() 359 .collect(Collectors.toList()); 360 361 List<CompletableFuture> futures = listenersTemp.stream() 362 .map(eventListener -> submit(() -> eventListener.eventFired(event))) 363 .collect(Collectors.toList()); 364 try { 365 timeOut(futures, 1000); 366 } catch (InterruptedException e) { 367 error("interrupted", e); 368 } 369 submit(() -> event.lifecycleCallback(EventLifeCycle.OUTPUT)); 370 getMain().getOutputManager().passDataToOutputPlugins(event); 371 submit(() -> event.lifecycleCallback(EventLifeCycle.ENDED)); 372 List<EventListenerModel> finishListenersTemp = event.getAllInformations().parallelStream() 373 .map(finishListeners::get) 374 .filter(Objects::nonNull) 375 .flatMap(Collection::stream) 376 .distinct() 377 .collect(Collectors.toList()); 378 379 futures = finishListenersTemp.stream() 380 .map(eventListener -> submit(() -> eventListener.eventFired(event))) 381 .collect(Collectors.toList()); 382 383 try { 384 timeOut(futures, 1000); 385 } catch (InterruptedException e) { 386 error("interrupted", e); 387 } 388 } else { 389 debug("canceling: " + event.toString() + " from " + event.getSource().getID()); 390 submit(() -> event.lifecycleCallback(EventLifeCycle.CANCELED)); 391 } 392 } 393 394 /** 395 * stops the EventDistributor 396 */ 397 public void stop() { 398 stop = true; 399 } 400 401 /** 402 * This class is used to pass Events to the EventDistributor 403 */ 404 private class EventPublisher implements EventCallable { 405 //the queue where all the Events are stored 406 private final BlockingQueue<EventModel<?>> events; 407 protected EventPublisher(BlockingQueue<EventModel<?>> events) { 408 this.events = events; 409 } 410 411 /** 412 * use this method to fire Events. 413 * @param event the Event to fire 414 */ 415 public void fire(EventModel event) { 416 if(event == null) return; 417 events.add(event); 418 } 419 } 420}