001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.wicket.protocol.ws.api;
018
019import javax.servlet.http.HttpServletRequest;
020import javax.servlet.http.HttpSession;
021
022import org.apache.wicket.Application;
023import org.apache.wicket.MarkupContainer;
024import org.apache.wicket.Page;
025import org.apache.wicket.Session;
026import org.apache.wicket.ThreadContext;
027import org.apache.wicket.markup.IMarkupResourceStreamProvider;
028import org.apache.wicket.markup.html.WebPage;
029import org.apache.wicket.page.IPageManager;
030import org.apache.wicket.protocol.http.WebApplication;
031import org.apache.wicket.protocol.http.WicketFilter;
032import org.apache.wicket.protocol.ws.WebSocketSettings;
033import org.apache.wicket.protocol.ws.api.event.WebSocketAbortedPayload;
034import org.apache.wicket.protocol.ws.api.event.WebSocketBinaryPayload;
035import org.apache.wicket.protocol.ws.api.event.WebSocketClosedPayload;
036import org.apache.wicket.protocol.ws.api.event.WebSocketConnectedPayload;
037import org.apache.wicket.protocol.ws.api.event.WebSocketErrorPayload;
038import org.apache.wicket.protocol.ws.api.event.WebSocketPayload;
039import org.apache.wicket.protocol.ws.api.event.WebSocketPushPayload;
040import org.apache.wicket.protocol.ws.api.event.WebSocketTextPayload;
041import org.apache.wicket.protocol.ws.api.message.AbortedMessage;
042import org.apache.wicket.protocol.ws.api.message.BinaryMessage;
043import org.apache.wicket.protocol.ws.api.message.ClosedMessage;
044import org.apache.wicket.protocol.ws.api.message.ConnectedMessage;
045import org.apache.wicket.protocol.ws.api.message.ErrorMessage;
046import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage;
047import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
048import org.apache.wicket.protocol.ws.api.message.TextMessage;
049import org.apache.wicket.protocol.ws.api.registry.IKey;
050import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
051import org.apache.wicket.protocol.ws.api.registry.PageIdKey;
052import org.apache.wicket.protocol.ws.api.registry.ResourceNameKey;
053import org.apache.wicket.protocol.ws.api.registry.ResourceNameTokenKey;
054import org.apache.wicket.request.IRequestHandler;
055import org.apache.wicket.request.Url;
056import org.apache.wicket.request.cycle.IRequestCycleListener;
057import org.apache.wicket.request.cycle.RequestCycle;
058import org.apache.wicket.request.cycle.RequestCycleContext;
059import org.apache.wicket.request.http.WebRequest;
060import org.apache.wicket.request.http.WebResponse;
061import org.apache.wicket.session.ISessionStore;
062import org.apache.wicket.util.lang.Args;
063import org.apache.wicket.util.lang.Checks;
064import org.apache.wicket.util.resource.IResourceStream;
065import org.apache.wicket.util.resource.StringResourceStream;
066import org.apache.wicket.util.string.Strings;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * The base implementation of IWebSocketProcessor. Provides the common logic
072 * for registering a web socket connection and broadcasting its events.
073 *
074 * @since 6.0
075 */
076public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
077{
078        private static final Logger LOG = LoggerFactory.getLogger(AbstractWebSocketProcessor.class);
079
080        /**
081         * A pageId indicating that the endpoint is WebSocketResource
082         */
083        static final int NO_PAGE_ID = -1;
084
085        private final WebRequest webRequest;
086        private final int pageId;
087        private final String context;
088        private final String resourceName;
089        private final String connectionToken;
090        private final Url baseUrl;
091        private final WebApplication application;
092        private final String sessionId;
093        private final WebSocketSettings webSocketSettings;
094        private final IWebSocketConnectionRegistry connectionRegistry;
095        private final IWebSocketConnectionFilter connectionFilter;
096        private final HttpServletRequest servletRequest;
097
098        /**
099         * Constructor.
100         *
101         * @param request
102         *      the http request that was used to create this {@link IWebSocketProcessor}
103         * @param application
104         *      the current Wicket Application
105         */
106        public AbstractWebSocketProcessor(final HttpServletRequest request, final WebApplication application)
107        {
108                final HttpSession httpSession = request.getSession(true);
109                if (httpSession == null)
110                {
111                        throw new IllegalStateException("There is no HTTP Session bound. Without a session Wicket won't be " +
112                                        "able to find the stored page to update its components");
113                }
114                this.sessionId = httpSession.getId();
115                String pageId = request.getParameter("pageId");
116                this.context = request.getParameter("context");
117                this.resourceName = request.getParameter("resourceName");
118                this.connectionToken = request.getParameter("connectionToken");
119                if (Strings.isEmpty(pageId) && Strings.isEmpty(resourceName))
120                {
121                        throw new IllegalArgumentException("The request should have either 'pageId' or 'resourceName' parameter!");
122                }
123                if (Strings.isEmpty(pageId) == false)
124                {
125                        this.pageId = Integer.parseInt(pageId, 10);
126                }
127                else
128                {
129                        this.pageId = NO_PAGE_ID;
130                }
131
132                String baseUrl = request.getParameter(WebRequest.PARAM_AJAX_BASE_URL);
133                Checks.notNull(baseUrl, String.format("Request parameter '%s' is required!", WebRequest.PARAM_AJAX_BASE_URL));
134                this.baseUrl = Url.parse(baseUrl);
135
136                WicketFilter wicketFilter = application.getWicketFilter();
137                this.servletRequest = new ServletRequestCopy(request);
138
139                this.application = Args.notNull(application, "application");
140
141                this.webSocketSettings = WebSocketSettings.Holder.get(application);
142
143                this.webRequest = webSocketSettings.newWebSocketRequest(request, wicketFilter.getFilterPath());
144
145                this.connectionRegistry = webSocketSettings.getConnectionRegistry();
146
147                this.connectionFilter = webSocketSettings.getConnectionFilter();
148        }
149
150        @Override
151        public void onMessage(final String message)
152        {
153                broadcastMessage(new TextMessage(getApplication(), getSessionId(), getRegistryKey(), message));
154        }
155
156        @Override
157        public void onMessage(byte[] data, int offset, int length)
158        {
159                BinaryMessage binaryMessage = new BinaryMessage(getApplication(), getSessionId(), getRegistryKey(), data, offset, length);
160                broadcastMessage(binaryMessage);
161        }
162
163        /**
164         * A helper that registers the opened connection in the application-level registry.
165         *
166         * @param connection
167         *            the web socket connection to use to communicate with the client
168         * @see #onOpen(Object)
169         */
170        protected final void onConnect(final IWebSocketConnection connection) {
171                IKey key = getRegistryKey();
172                connectionRegistry.setConnection(getApplication(), getSessionId(), key, connection);
173
174                if (connectionFilter != null)
175                {
176                        ConnectionRejected connectionRejected = connectionFilter.doFilter(servletRequest);
177                        if (connectionRejected != null)
178                        {
179                                broadcastMessage(new AbortedMessage(getApplication(), getSessionId(), key));
180                                connectionRegistry.removeConnection(getApplication(), getSessionId(), key);
181                                connection.close(connectionRejected.getCode(), connectionRejected.getReason());
182                                return;
183                        }
184                }
185
186                broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection, webSocketSettings.isAsynchronousPush(), webSocketSettings.getAsynchronousPushTimeout());
187        }
188
189        @Override
190        public void onClose(int closeCode, String message)
191        {
192                IKey key = getRegistryKey();
193                if (webSocketSettings.shouldNotifyOnCloseEvent(closeCode)) {
194                        broadcastMessage(new ClosedMessage(getApplication(), getSessionId(), key, closeCode, message));
195                }
196                connectionRegistry.removeConnection(getApplication(), getSessionId(), key);
197        }
198
199        @Override
200        public void onError(Throwable t)
201        {
202                if (webSocketSettings.shouldNotifyOnErrorEvent(t)) {
203                        IKey key = getRegistryKey();
204                        broadcastMessage(new ErrorMessage(getApplication(), getSessionId(), key, t));
205                }
206        }
207
208        public final void broadcastMessage(final IWebSocketMessage message)
209        {
210                IKey key = getRegistryKey();
211                IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key);
212                broadcastMessage(message, connection, webSocketSettings.isAsynchronousPush(), webSocketSettings.getAsynchronousPushTimeout());
213        }
214
215        /**
216         * Exports the Wicket thread locals and broadcasts the received message from the client to all
217         * interested components and behaviors in the page with id {@code #pageId}
218         * <p>
219         *     Note: ConnectedMessage and ClosedMessage messages are notification-only. I.e. whatever the
220         *     components/behaviors write in the WebSocketRequestHandler will be ignored because the protocol
221         *     doesn't expect response from the user.
222         * </p>
223         *
224         * @param message
225         *      the message to broadcast
226     * @param connection
227     *      the {@link org.apache.wicket.protocol.ws.api.IWebSocketConnection}
228     * @param asynchronousPush
229     *      whether asynchronous pus is used or not
230     * @param timeout
231     *      The time ut to use for operation (in milliseconds). A negative value means use default timeout
232     *      (specified by container).
233         */
234        public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection, boolean asynchronousPush, long timeout)
235        {
236                if (connection != null && (connection.isOpen() || isSpecialMessage(message)))
237                {
238                        Application oldApplication = ThreadContext.getApplication();
239                        Session oldSession = ThreadContext.getSession();
240                        RequestCycle oldRequestCycle = ThreadContext.getRequestCycle();
241
242                        WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection, asynchronousPush, timeout);
243                        try
244                        {
245                                WebSocketRequestMapper requestMapper = new WebSocketRequestMapper(application.getRootRequestMapper());
246                                RequestCycle requestCycle = createRequestCycle(requestMapper, webResponse);
247                                ThreadContext.setRequestCycle(requestCycle);
248
249                                ThreadContext.setApplication(application);
250
251                                Session session;
252                                if (oldSession == null || message instanceof IWebSocketPushMessage)
253                                {
254                                        ISessionStore sessionStore = application.getSessionStore();
255                                        session = sessionStore.lookup(webRequest);
256                                        ThreadContext.setSession(session);
257                                }
258                                else
259                                {
260                                        session = oldSession;
261                                }
262
263                                if (session == null)
264                                {
265                                        connectionRegistry.removeConnection(application, sessionId, connection.getKey());
266                                        LOG.debug("No Session could be found for session id '{}' and key '{}'!", sessionId, connection.getKey());
267                                        return;
268                                }
269
270                                IPageManager pageManager = session.getPageManager();
271                                Page page = getPage(pageManager);
272
273                                if (page != null)
274                                {
275                                        WebSocketRequestHandler requestHandler = webSocketSettings.newWebSocketRequestHandler(page, connection);
276
277                                        WebSocketPayload<?> payload = createEventPayload(message, requestHandler);
278
279                                        if (!(message instanceof ConnectedMessage || isSpecialMessage(message))) {
280                                                requestCycle.scheduleRequestHandlerAfterCurrent(requestHandler);
281                                        }
282
283                                        IRequestHandler broadcastingHandler = new WebSocketMessageBroadcastHandler(pageId, resourceName, payload);
284                                        requestMapper.setHandler(broadcastingHandler);
285                                        requestCycle.processRequestAndDetach();
286                                }
287                                else
288                                {
289                                        LOG.debug("Page with id '{}' has been expired. No message will be broadcast!", pageId);
290                                }
291                        }
292                        catch (Exception x)
293                        {
294                                LOG.error("An error occurred during processing of a WebSocket message", x);
295                        }
296                        finally
297                        {
298                                try
299                                {
300                                        webResponse.close();
301                                }
302                                finally
303                                {
304                                        ThreadContext.setApplication(oldApplication);
305                                        ThreadContext.setRequestCycle(oldRequestCycle);
306                                        ThreadContext.setSession(oldSession);
307                                }
308                        }
309                }
310                else
311                {
312                        LOG.debug("Either there is no connection({}) or it is closed.", connection);
313                }
314        }
315
316        private static boolean isSpecialMessage(IWebSocketMessage message)
317        {
318                return message instanceof ClosedMessage || message instanceof ErrorMessage || message instanceof AbortedMessage;
319        }
320
321        private RequestCycle createRequestCycle(WebSocketRequestMapper requestMapper, WebResponse webResponse)
322        {
323                RequestCycleContext context = new RequestCycleContext(webRequest, webResponse,
324                                requestMapper, application.getExceptionMapperProvider().get());
325
326                RequestCycle requestCycle = application.getRequestCycleProvider().apply(context);
327                requestCycle.getListeners().add(application.getRequestCycleListeners());
328                requestCycle.getListeners().add(new IRequestCycleListener()
329                {
330                        @Override
331                        public void onDetach(final RequestCycle requestCycle)
332                        {
333                                if (Session.exists())
334                                {
335                                        Session.get().getPageManager().detach();
336                                }
337                        }
338                });
339                requestCycle.getUrlRenderer().setBaseUrl(baseUrl);
340                return requestCycle;
341        }
342
343        /**
344         * @param pageManager
345         *      the page manager to use when finding a page by id
346         * @return the page to use when creating WebSocketRequestHandler
347         */
348        private Page getPage(IPageManager pageManager)
349        {
350                Page page;
351                if (pageId != -1)
352                {
353                        page = (Page) pageManager.getPage(pageId);
354                }
355                else
356                {
357                        page = new WebSocketResourcePage();
358                }
359                return page;
360        }
361
362        protected final WebApplication getApplication()
363        {
364                return application;
365        }
366
367        protected final String getSessionId()
368        {
369                return sessionId;
370        }
371
372        private WebSocketPayload<?> createEventPayload(IWebSocketMessage message, WebSocketRequestHandler handler)
373        {
374                final WebSocketPayload<?> payload;
375                if (message instanceof TextMessage)
376                {
377                        payload = new WebSocketTextPayload((TextMessage) message, handler);
378                }
379                else if (message instanceof BinaryMessage)
380                {
381                        payload = new WebSocketBinaryPayload((BinaryMessage) message, handler);
382                }
383                else if (message instanceof ConnectedMessage)
384                {
385                        payload = new WebSocketConnectedPayload((ConnectedMessage) message, handler);
386                }
387                else if (message instanceof ClosedMessage)
388                {
389                        payload = new WebSocketClosedPayload((ClosedMessage) message, handler);
390                }
391                else if (message instanceof ErrorMessage)
392                {
393                        payload = new WebSocketErrorPayload((ErrorMessage) message, handler);
394                }
395                else if (message instanceof AbortedMessage)
396                {
397                        payload = new WebSocketAbortedPayload((AbortedMessage) message, handler);
398                }
399                else if (message instanceof IWebSocketPushMessage)
400                {
401                        payload = new WebSocketPushPayload((IWebSocketPushMessage) message, handler);
402                }
403                else
404                {
405                        throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName());
406                }
407                return payload;
408        }
409
410        protected IKey getRegistryKey()
411        {
412                IKey key;
413                if (Strings.isEmpty(resourceName))
414                {
415                        key = new PageIdKey(pageId, context);
416                }
417                else
418                {
419                        if (Strings.isEmpty(connectionToken))
420                        {
421                                key = new ResourceNameKey(resourceName, context);
422                        } else {
423                                key = new ResourceNameTokenKey(resourceName, connectionToken, context);
424                        }
425                }
426                return key;
427        }
428
429        /**
430         * A dummy page that is used to create a new WebSocketRequestHandler for
431         * web socket connections to WebSocketResource
432         */
433        private static class WebSocketResourcePage extends WebPage implements IMarkupResourceStreamProvider
434        {
435                private WebSocketResourcePage()
436                {
437                        setStatelessHint(true);
438                }
439
440                @Override
441                public IResourceStream getMarkupResourceStream(MarkupContainer container, Class<?> containerClass)
442                {
443                        return new StringResourceStream("");
444                }
445        }
446}