001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.wicket.pageStore;
018
019import java.io.Serializable;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.function.Supplier;
028
029import org.apache.wicket.MetaDataKey;
030import org.apache.wicket.WicketRuntimeException;
031import org.apache.wicket.page.IManageablePage;
032import org.apache.wicket.util.lang.Args;
033import org.apache.wicket.util.lang.Classes;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Facade for {@link IPageStore} moving {@link #addPage(IPageContext, IManageablePage)} to a worker thread.
039 * <p>
040 * Creates an {@link PendingAdd} for {@link #addPage(IPageContext, IManageablePage)} and puts ito a {@link #queue}.
041 * Later {@link PageAddingRunnable} reads in blocking manner from {@link #queue} and performs the add.
042 * <p>
043 * It starts only one instance of {@link PageAddingRunnable} because all we need is to make the page
044 * storing asynchronous. We don't want to write concurrently in the wrapped {@link IPageStore},
045 * though it may happen in the extreme case when the queue is full. These cases should be avoided.
046 * 
047 * @author Matej Knopp
048 * @author manuelbarzi
049 */
050public class AsynchronousPageStore extends DelegatingPageStore
051{
052
053        /** Log for reporting. */
054        private static final Logger log = LoggerFactory.getLogger(AsynchronousPageStore.class);
055
056        /**
057         * The time to wait when adding an {@link PendingAdd} into the entries. In millis.
058         */
059        private static final long OFFER_WAIT = 30L;
060
061        /**
062         * The time to wait for an entry to save with the wrapped {@link IPageStore} . In millis.
063         */
064        private static final long POLL_WAIT = 1000L;
065
066        /**
067         * The page saving thread.
068         */
069        private final Thread pageSavingThread;
070
071        /**
072         * The queue where the entries which have to be saved are temporary stored
073         */
074        private final BlockingQueue<PendingAdd> queue;
075
076        /**
077         * A map 'sessionId:::pageId' -> {@link PendingAdd}. Used for fast retrieval of {@link PendingAdd}s which
078         * are not yet stored by the wrapped {@link IPageStore}
079         */
080        private final ConcurrentMap<String, PendingAdd> queueMap;
081
082        /**
083         * Construct.
084         * 
085         * @param delegate
086         *            the wrapped {@link IPageStore} that actually saved the page
087         * @param capacity
088         *            the capacity of the queue that delays the saving
089         */
090        public AsynchronousPageStore(final IPageStore delegate, final int capacity)
091        {
092                super(delegate);
093                
094                queue = new LinkedBlockingQueue<>(capacity);
095                queueMap = new ConcurrentHashMap<>();
096
097                PageAddingRunnable savingRunnable = new PageAddingRunnable(delegate, queue, queueMap);
098                pageSavingThread = new Thread(savingRunnable, "Wicket-AsyncPageStore-PageSavingThread");
099                pageSavingThread.setDaemon(true);
100                pageSavingThread.start();
101        }
102
103        /**
104         * 
105         * @param sessionId
106         * @param pageId
107         * @return generated key
108         */
109        private static String getKey(final String sessionId, final int pageId)
110        {
111                return pageId + ":::" + sessionId;
112        }
113
114        /**
115         * An add of a page that is pending its asynchronous execution.
116         * <p>
117         * Used as an isolating {@link IPageContext} for the delegation to 
118         * {@link IPageStore#addPage(IPageContext, IManageablePage)}.
119         */
120        private static class PendingAdd implements IPageContext
121        {
122                private final IPageContext context;
123                
124                private final IManageablePage page;
125
126                private final String sessionId;
127
128                /**
129                 * Is this context passed to an asynchronously called {@link IPageStore#addPage(IPageContext, IManageablePage)}.
130                 */
131                private boolean asynchronous = false;
132
133                /**
134                 * Cache of session attributes which may filled in {@link IPageStore#canBeAsynchronous(IPageContext)},
135                 * so these are available asynchronously later on.
136                 */
137                private final Map<String, Serializable> attributeCache = new HashMap<>();
138
139                /**
140                 * Cache of session data which may filled in {@link IPageStore#canBeAsynchronous(IPageContext)},
141                 * so these are available asynchronously later on.
142                 */
143                private final Map<MetaDataKey<?>, Serializable> dataCache = new HashMap<>();
144
145                public PendingAdd(final IPageContext context, final IManageablePage page)
146                {
147                        this.context = Args.notNull(context, "context");
148                        this.page = Args.notNull(page, "page");
149                        
150                        this.sessionId = context.getSessionId(true);
151                }
152
153                /**
154                 * @return generated key
155                 */
156                private String getKey()
157                {
158                        return AsynchronousPageStore.getKey(sessionId, page.getPageId());
159                }
160
161                @Override
162                public String toString()
163                {
164                        return "PendingAdd [sessionId=" + sessionId + ", pageId=" + page.getPageId() + ", pageClass=" + Classes.name(page.getClass()) + "]";
165                }
166
167                /**
168                 * Prevents access to request when called asynchronously.
169                 */
170                @Override
171                public <T> T getRequestData(MetaDataKey<T> key, Supplier<T> value)
172                {
173                        if (asynchronous)
174                        {
175                                throw new WicketRuntimeException("request data not available asynchronuously");
176                        }
177                        
178                        return context.getRequestData(key, value);
179                }
180
181                /**
182                 * Prevents changing of session attributes when called asynchronously.
183                 * <p>
184                 * All values accessed from {@link IPageStore#canBeAsynchronous(IPageContext)} are still
185                 * available.
186                 */
187                @SuppressWarnings("unchecked")
188                @Override
189                public <T extends Serializable> T getSessionAttribute(String key, Supplier<T> defaultValue)
190                {
191                        T value;
192                        
193                        if (asynchronous)
194                        {
195                                value = (T)attributeCache.get(key);
196                                if (value == null && defaultValue.get() != null)
197                                {
198                                                throw new WicketRuntimeException("session attribute can not be changed asynchronuously");
199                                }
200                        } else {
201                                value = context.getSessionAttribute(key, defaultValue);
202                                if (value != null)
203                                {
204                                        attributeCache.put(key, value);
205                                }
206                        }
207                        
208                        return value;
209                }
210                
211                /**
212                 * Prevents changing of session data when called asynchronously.
213                 */
214                @Override
215                public <T extends Serializable> T getSessionData(MetaDataKey<T> key, Supplier<T> defaultValue)
216                {
217                        T value;
218                        
219                        if (asynchronous)
220                        {
221                                value = (T)dataCache.get(key);
222                                if (value == null && defaultValue.get() != null)
223                                {
224                                        throw new WicketRuntimeException("session data can not be changed asynchronuously");
225                                }
226                        }
227                        else
228                        {
229                                value = context.getSessionData(key, defaultValue);
230                                if (value != null)
231                                {
232                                        dataCache.put(key, value);
233                                }
234                        }
235                        
236                        return value;
237                }
238
239                /**
240                 * Returns id of session.
241                 */
242                @Override
243                public String getSessionId(boolean bind)
244                {
245                        return sessionId;
246                }
247        }
248
249        /**
250         * The consumer of {@link PendingAdd}s.
251         */
252        private static class PageAddingRunnable implements Runnable
253        {
254                private static final Logger log = LoggerFactory.getLogger(PageAddingRunnable.class);
255
256                private final BlockingQueue<PendingAdd> queue;
257
258                private final ConcurrentMap<String, PendingAdd> map;
259
260                private final IPageStore delegate;
261
262                private PageAddingRunnable(IPageStore delegate, BlockingQueue<PendingAdd> queue,
263                                           ConcurrentMap<String, PendingAdd> map)
264                {
265                        this.delegate = delegate;
266                        this.queue = queue;
267                        this.map = map;
268                }
269
270                @Override
271                public void run()
272                {
273                        while (!Thread.interrupted())
274                        {
275                                PendingAdd add = null;
276                                try
277                                {
278                                        add = queue.poll(POLL_WAIT, TimeUnit.MILLISECONDS);
279                                }
280                                catch (InterruptedException e)
281                                {
282                                        log.debug("PageAddingRunnable:: Interrupted...");
283                                        Thread.currentThread().interrupt();
284                                }
285
286                                if (add != null)
287                                {
288                                        try
289                                        {
290                                                log.debug("Saving asynchronously: {}...", add);
291                                                add.asynchronous = true;
292                                                delegate.addPage(add, add.page);
293                                        }
294                                        catch (Exception x)
295                                        {
296                                                log.error("An error occurred while saving asynchronously '{}'", add, x);
297                                        }
298                                        finally
299                                        {
300                                                map.remove(add.getKey());
301                                        }
302                                }
303                        }
304                }
305        }
306
307        @Override
308        public void destroy()
309        {
310                if (pageSavingThread.isAlive())
311                {
312                        pageSavingThread.interrupt();
313                        try
314                        {
315                                pageSavingThread.join();
316                        }
317                        catch (InterruptedException e)
318                        {
319                                log.error(e.getMessage(), e);
320                        }
321                }
322
323                super.destroy();
324        }
325
326        @Override
327        public IManageablePage getPage(IPageContext context, int pageId)
328        {
329                String sessionId = context.getSessionId(false);
330                if (sessionId == null) {
331                        return null;
332                }
333                
334                PendingAdd entry = queueMap.get(getKey(sessionId, pageId));
335                if (entry != null)
336                {
337                        log.debug("Returning the page of a non-stored entry with page id '{}'", pageId);
338                        return entry.page;
339                }
340                IManageablePage page = getDelegate().getPage(context, pageId);
341
342                log.debug("Returning the page of a stored entry with page id '{}'", pageId);
343
344                return page;
345        }
346
347        @Override
348        public void removePage(IPageContext context, IManageablePage page)
349        {
350                String sessionId = context.getSessionId(false);
351                if (sessionId == null) {
352                        return;
353                }
354
355                String key = getKey(sessionId, page.getPageId());
356                PendingAdd entry = queueMap.remove(key);
357                if (entry != null)
358                {
359                        queue.remove(entry);
360                }
361
362                getDelegate().removePage(context, page);
363        }
364
365        @Override
366        public void addPage(IPageContext context, IManageablePage page)
367        {
368                PendingAdd add = new PendingAdd(context, page);
369                if (getDelegate().canBeAsynchronous(add))
370                {
371                        String key = add.getKey();
372                        queueMap.put(key, add);
373                        try
374                        {
375                                if (queue.offer(add, OFFER_WAIT, TimeUnit.MILLISECONDS))
376                                {
377                                        log.debug("Offered for storing asynchronously page with id '{}'", page.getPageId());
378                                        return;
379                                }
380                                else
381                                {
382                                        log.debug("Storing synchronously page with id '{}'", page.getPageId());
383                                        queueMap.remove(key);
384                                }
385                        }
386                        catch (InterruptedException e)
387                        {
388                                log.error(e.getMessage(), e);
389                                queueMap.remove(key);
390                        }
391                }
392                else
393                {
394                        log.warn("Delegated page store '{}' can not be asynchronous", getDelegate().getClass().getName());
395                }
396                
397                getDelegate().addPage(context, page);
398        }
399
400        @Override
401        public void removeAllPages(IPageContext context)
402        {
403                String sessionId = context.getSessionId(false);
404                if (sessionId == null) {
405                        return;
406                }
407
408                queue.removeIf(add -> {
409                        if (add.sessionId.equals(sessionId)) {
410                                queueMap.remove(add.getKey());
411                                return true;
412                        }
413                        
414                        return false;
415                });
416                
417                getDelegate().removeAllPages(context);
418        }
419}