001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.wicket.pageStore; 018 019import java.io.Serializable; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.function.Supplier; 028 029import org.apache.wicket.MetaDataKey; 030import org.apache.wicket.WicketRuntimeException; 031import org.apache.wicket.page.IManageablePage; 032import org.apache.wicket.util.lang.Args; 033import org.apache.wicket.util.lang.Classes; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Facade for {@link IPageStore} moving {@link #addPage(IPageContext, IManageablePage)} to a worker thread. 039 * <p> 040 * Creates an {@link PendingAdd} for {@link #addPage(IPageContext, IManageablePage)} and puts ito a {@link #queue}. 041 * Later {@link PageAddingRunnable} reads in blocking manner from {@link #queue} and performs the add. 042 * <p> 043 * It starts only one instance of {@link PageAddingRunnable} because all we need is to make the page 044 * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore}, 045 * though it may happen in the extreme case when the queue is full. These cases should be avoided. 046 * 047 * @author Matej Knopp 048 * @author manuelbarzi 049 */ 050public class AsynchronousPageStore extends DelegatingPageStore 051{ 052 053 /** Log for reporting. */ 054 private static final Logger log = LoggerFactory.getLogger(AsynchronousPageStore.class); 055 056 /** 057 * The time to wait when adding an {@link PendingAdd} into the entries. In millis. 058 */ 059 private static final long OFFER_WAIT = 30L; 060 061 /** 062 * The time to wait for an entry to save with the wrapped {@link IPageStore} . In millis. 063 */ 064 private static final long POLL_WAIT = 1000L; 065 066 /** 067 * The page saving thread. 068 */ 069 private final Thread pageSavingThread; 070 071 /** 072 * The queue where the entries which have to be saved are temporary stored 073 */ 074 private final BlockingQueue<PendingAdd> queue; 075 076 /** 077 * A map 'sessionId:::pageId' -> {@link PendingAdd}. Used for fast retrieval of {@link PendingAdd}s which 078 * are not yet stored by the wrapped {@link IPageStore} 079 */ 080 private final ConcurrentMap<String, PendingAdd> queueMap; 081 082 /** 083 * Construct. 084 * 085 * @param delegate 086 * the wrapped {@link IPageStore} that actually saved the page 087 * @param capacity 088 * the capacity of the queue that delays the saving 089 */ 090 public AsynchronousPageStore(final IPageStore delegate, final int capacity) 091 { 092 super(delegate); 093 094 queue = new LinkedBlockingQueue<>(capacity); 095 queueMap = new ConcurrentHashMap<>(); 096 097 PageAddingRunnable savingRunnable = new PageAddingRunnable(delegate, queue, queueMap); 098 pageSavingThread = new Thread(savingRunnable, "Wicket-AsyncPageStore-PageSavingThread"); 099 pageSavingThread.setDaemon(true); 100 pageSavingThread.start(); 101 } 102 103 /** 104 * 105 * @param sessionId 106 * @param pageId 107 * @return generated key 108 */ 109 private static String getKey(final String sessionId, final int pageId) 110 { 111 return pageId + ":::" + sessionId; 112 } 113 114 /** 115 * An add of a page that is pending its asynchronous execution. 116 * <p> 117 * Used as an isolating {@link IPageContext} for the delegation to 118 * {@link IPageStore#addPage(IPageContext, IManageablePage)}. 119 */ 120 private static class PendingAdd implements IPageContext 121 { 122 private final IPageContext context; 123 124 private final IManageablePage page; 125 126 private final String sessionId; 127 128 /** 129 * Is this context passed to an asynchronously called {@link IPageStore#addPage(IPageContext, IManageablePage)}. 130 */ 131 private boolean asynchronous = false; 132 133 /** 134 * Cache of session attributes which may filled in {@link IPageStore#canBeAsynchronous(IPageContext)}, 135 * so these are available asynchronously later on. 136 */ 137 private final Map<String, Serializable> attributeCache = new HashMap<>(); 138 139 /** 140 * Cache of session data which may filled in {@link IPageStore#canBeAsynchronous(IPageContext)}, 141 * so these are available asynchronously later on. 142 */ 143 private final Map<MetaDataKey<?>, Serializable> dataCache = new HashMap<>(); 144 145 public PendingAdd(final IPageContext context, final IManageablePage page) 146 { 147 this.context = Args.notNull(context, "context"); 148 this.page = Args.notNull(page, "page"); 149 150 this.sessionId = context.getSessionId(true); 151 } 152 153 /** 154 * @return generated key 155 */ 156 private String getKey() 157 { 158 return AsynchronousPageStore.getKey(sessionId, page.getPageId()); 159 } 160 161 @Override 162 public String toString() 163 { 164 return "PendingAdd [sessionId=" + sessionId + ", pageId=" + page.getPageId() + ", pageClass=" + Classes.name(page.getClass()) + "]"; 165 } 166 167 /** 168 * Prevents access to request when called asynchronously. 169 */ 170 @Override 171 public <T> T getRequestData(MetaDataKey<T> key, Supplier<T> value) 172 { 173 if (asynchronous) 174 { 175 throw new WicketRuntimeException("request data not available asynchronuously"); 176 } 177 178 return context.getRequestData(key, value); 179 } 180 181 /** 182 * Prevents changing of session attributes when called asynchronously. 183 * <p> 184 * All values accessed from {@link IPageStore#canBeAsynchronous(IPageContext)} are still 185 * available. 186 */ 187 @SuppressWarnings("unchecked") 188 @Override 189 public <T extends Serializable> T getSessionAttribute(String key, Supplier<T> defaultValue) 190 { 191 T value; 192 193 if (asynchronous) 194 { 195 value = (T)attributeCache.get(key); 196 if (value == null && defaultValue.get() != null) 197 { 198 throw new WicketRuntimeException("session attribute can not be changed asynchronuously"); 199 } 200 } else { 201 value = context.getSessionAttribute(key, defaultValue); 202 if (value != null) 203 { 204 attributeCache.put(key, value); 205 } 206 } 207 208 return value; 209 } 210 211 /** 212 * Prevents changing of session data when called asynchronously. 213 */ 214 @Override 215 public <T extends Serializable> T getSessionData(MetaDataKey<T> key, Supplier<T> defaultValue) 216 { 217 T value; 218 219 if (asynchronous) 220 { 221 value = (T)dataCache.get(key); 222 if (value == null && defaultValue.get() != null) 223 { 224 throw new WicketRuntimeException("session data can not be changed asynchronuously"); 225 } 226 } 227 else 228 { 229 value = context.getSessionData(key, defaultValue); 230 if (value != null) 231 { 232 dataCache.put(key, value); 233 } 234 } 235 236 return value; 237 } 238 239 /** 240 * Returns id of session. 241 */ 242 @Override 243 public String getSessionId(boolean bind) 244 { 245 return sessionId; 246 } 247 } 248 249 /** 250 * The consumer of {@link PendingAdd}s. 251 */ 252 private static class PageAddingRunnable implements Runnable 253 { 254 private static final Logger log = LoggerFactory.getLogger(PageAddingRunnable.class); 255 256 private final BlockingQueue<PendingAdd> queue; 257 258 private final ConcurrentMap<String, PendingAdd> map; 259 260 private final IPageStore delegate; 261 262 private PageAddingRunnable(IPageStore delegate, BlockingQueue<PendingAdd> queue, 263 ConcurrentMap<String, PendingAdd> map) 264 { 265 this.delegate = delegate; 266 this.queue = queue; 267 this.map = map; 268 } 269 270 @Override 271 public void run() 272 { 273 while (!Thread.interrupted()) 274 { 275 PendingAdd add = null; 276 try 277 { 278 add = queue.poll(POLL_WAIT, TimeUnit.MILLISECONDS); 279 } 280 catch (InterruptedException e) 281 { 282 log.debug("PageAddingRunnable:: Interrupted..."); 283 Thread.currentThread().interrupt(); 284 } 285 286 if (add != null) 287 { 288 try 289 { 290 log.debug("Saving asynchronously: {}...", add); 291 add.asynchronous = true; 292 delegate.addPage(add, add.page); 293 } 294 catch (Exception x) 295 { 296 log.error("An error occurred while saving asynchronously '{}'", add, x); 297 } 298 finally 299 { 300 map.remove(add.getKey()); 301 } 302 } 303 } 304 } 305 } 306 307 @Override 308 public void destroy() 309 { 310 if (pageSavingThread.isAlive()) 311 { 312 pageSavingThread.interrupt(); 313 try 314 { 315 pageSavingThread.join(); 316 } 317 catch (InterruptedException e) 318 { 319 log.error(e.getMessage(), e); 320 } 321 } 322 323 super.destroy(); 324 } 325 326 @Override 327 public IManageablePage getPage(IPageContext context, int pageId) 328 { 329 String sessionId = context.getSessionId(false); 330 if (sessionId == null) { 331 return null; 332 } 333 334 PendingAdd entry = queueMap.get(getKey(sessionId, pageId)); 335 if (entry != null) 336 { 337 log.debug("Returning the page of a non-stored entry with page id '{}'", pageId); 338 return entry.page; 339 } 340 IManageablePage page = getDelegate().getPage(context, pageId); 341 342 log.debug("Returning the page of a stored entry with page id '{}'", pageId); 343 344 return page; 345 } 346 347 @Override 348 public void removePage(IPageContext context, IManageablePage page) 349 { 350 String sessionId = context.getSessionId(false); 351 if (sessionId == null) { 352 return; 353 } 354 355 String key = getKey(sessionId, page.getPageId()); 356 PendingAdd entry = queueMap.remove(key); 357 if (entry != null) 358 { 359 queue.remove(entry); 360 } 361 362 getDelegate().removePage(context, page); 363 } 364 365 @Override 366 public void addPage(IPageContext context, IManageablePage page) 367 { 368 PendingAdd add = new PendingAdd(context, page); 369 if (getDelegate().canBeAsynchronous(add)) 370 { 371 String key = add.getKey(); 372 queueMap.put(key, add); 373 try 374 { 375 if (queue.offer(add, OFFER_WAIT, TimeUnit.MILLISECONDS)) 376 { 377 log.debug("Offered for storing asynchronously page with id '{}'", page.getPageId()); 378 return; 379 } 380 else 381 { 382 log.debug("Storing synchronously page with id '{}'", page.getPageId()); 383 queueMap.remove(key); 384 } 385 } 386 catch (InterruptedException e) 387 { 388 log.error(e.getMessage(), e); 389 queueMap.remove(key); 390 } 391 } 392 else 393 { 394 log.warn("Delegated page store '{}' can not be asynchronous", getDelegate().getClass().getName()); 395 } 396 397 getDelegate().addPage(context, page); 398 } 399 400 @Override 401 public void removeAllPages(IPageContext context) 402 { 403 String sessionId = context.getSessionId(false); 404 if (sessionId == null) { 405 return; 406 } 407 408 queue.removeIf(add -> { 409 if (add.sessionId.equals(sessionId)) { 410 queueMap.remove(add.getKey()); 411 return true; 412 } 413 414 return false; 415 }); 416 417 getDelegate().removeAllPages(context); 418 } 419}