001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.wicket.protocol.ws.api; 018 019import javax.servlet.http.HttpServletRequest; 020import javax.servlet.http.HttpSession; 021 022import org.apache.wicket.Application; 023import org.apache.wicket.MarkupContainer; 024import org.apache.wicket.Page; 025import org.apache.wicket.Session; 026import org.apache.wicket.ThreadContext; 027import org.apache.wicket.markup.IMarkupResourceStreamProvider; 028import org.apache.wicket.markup.html.WebPage; 029import org.apache.wicket.page.IPageManager; 030import org.apache.wicket.protocol.http.WebApplication; 031import org.apache.wicket.protocol.http.WicketFilter; 032import org.apache.wicket.protocol.ws.WebSocketSettings; 033import org.apache.wicket.protocol.ws.api.event.WebSocketAbortedPayload; 034import org.apache.wicket.protocol.ws.api.event.WebSocketBinaryPayload; 035import org.apache.wicket.protocol.ws.api.event.WebSocketClosedPayload; 036import org.apache.wicket.protocol.ws.api.event.WebSocketConnectedPayload; 037import org.apache.wicket.protocol.ws.api.event.WebSocketErrorPayload; 038import org.apache.wicket.protocol.ws.api.event.WebSocketPayload; 039import org.apache.wicket.protocol.ws.api.event.WebSocketPushPayload; 040import org.apache.wicket.protocol.ws.api.event.WebSocketTextPayload; 041import org.apache.wicket.protocol.ws.api.message.AbortedMessage; 042import org.apache.wicket.protocol.ws.api.message.BinaryMessage; 043import org.apache.wicket.protocol.ws.api.message.ClosedMessage; 044import org.apache.wicket.protocol.ws.api.message.ConnectedMessage; 045import org.apache.wicket.protocol.ws.api.message.ErrorMessage; 046import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage; 047import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; 048import org.apache.wicket.protocol.ws.api.message.TextMessage; 049import org.apache.wicket.protocol.ws.api.registry.IKey; 050import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry; 051import org.apache.wicket.protocol.ws.api.registry.PageIdKey; 052import org.apache.wicket.protocol.ws.api.registry.ResourceNameKey; 053import org.apache.wicket.protocol.ws.api.registry.ResourceNameTokenKey; 054import org.apache.wicket.request.IRequestHandler; 055import org.apache.wicket.request.Url; 056import org.apache.wicket.request.cycle.IRequestCycleListener; 057import org.apache.wicket.request.cycle.RequestCycle; 058import org.apache.wicket.request.cycle.RequestCycleContext; 059import org.apache.wicket.request.http.WebRequest; 060import org.apache.wicket.request.http.WebResponse; 061import org.apache.wicket.session.ISessionStore; 062import org.apache.wicket.util.lang.Args; 063import org.apache.wicket.util.lang.Checks; 064import org.apache.wicket.util.resource.IResourceStream; 065import org.apache.wicket.util.resource.StringResourceStream; 066import org.apache.wicket.util.string.Strings; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070/** 071 * The base implementation of IWebSocketProcessor. Provides the common logic 072 * for registering a web socket connection and broadcasting its events. 073 * 074 * @since 6.0 075 */ 076public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor 077{ 078 private static final Logger LOG = LoggerFactory.getLogger(AbstractWebSocketProcessor.class); 079 080 /** 081 * A pageId indicating that the endpoint is WebSocketResource 082 */ 083 static final int NO_PAGE_ID = -1; 084 085 private final WebRequest webRequest; 086 private final int pageId; 087 private final String context; 088 private final String resourceName; 089 private final String connectionToken; 090 private final Url baseUrl; 091 private final WebApplication application; 092 private final String sessionId; 093 private final WebSocketSettings webSocketSettings; 094 private final IWebSocketConnectionRegistry connectionRegistry; 095 private final IWebSocketConnectionFilter connectionFilter; 096 private final HttpServletRequest servletRequest; 097 098 /** 099 * Constructor. 100 * 101 * @param request 102 * the http request that was used to create this {@link IWebSocketProcessor} 103 * @param application 104 * the current Wicket Application 105 */ 106 public AbstractWebSocketProcessor(final HttpServletRequest request, final WebApplication application) 107 { 108 final HttpSession httpSession = request.getSession(true); 109 if (httpSession == null) 110 { 111 throw new IllegalStateException("There is no HTTP Session bound. Without a session Wicket won't be " + 112 "able to find the stored page to update its components"); 113 } 114 this.sessionId = httpSession.getId(); 115 String pageId = request.getParameter("pageId"); 116 this.context = request.getParameter("context"); 117 this.resourceName = request.getParameter("resourceName"); 118 this.connectionToken = request.getParameter("connectionToken"); 119 if (Strings.isEmpty(pageId) && Strings.isEmpty(resourceName)) 120 { 121 throw new IllegalArgumentException("The request should have either 'pageId' or 'resourceName' parameter!"); 122 } 123 if (Strings.isEmpty(pageId) == false) 124 { 125 this.pageId = Integer.parseInt(pageId, 10); 126 } 127 else 128 { 129 this.pageId = NO_PAGE_ID; 130 } 131 132 String baseUrl = request.getParameter(WebRequest.PARAM_AJAX_BASE_URL); 133 Checks.notNull(baseUrl, String.format("Request parameter '%s' is required!", WebRequest.PARAM_AJAX_BASE_URL)); 134 this.baseUrl = Url.parse(baseUrl); 135 136 WicketFilter wicketFilter = application.getWicketFilter(); 137 this.servletRequest = new ServletRequestCopy(request); 138 139 this.application = Args.notNull(application, "application"); 140 141 this.webSocketSettings = WebSocketSettings.Holder.get(application); 142 143 this.webRequest = webSocketSettings.newWebSocketRequest(request, wicketFilter.getFilterPath()); 144 145 this.connectionRegistry = webSocketSettings.getConnectionRegistry(); 146 147 this.connectionFilter = webSocketSettings.getConnectionFilter(); 148 } 149 150 @Override 151 public void onMessage(final String message) 152 { 153 broadcastMessage(new TextMessage(getApplication(), getSessionId(), getRegistryKey(), message)); 154 } 155 156 @Override 157 public void onMessage(byte[] data, int offset, int length) 158 { 159 BinaryMessage binaryMessage = new BinaryMessage(getApplication(), getSessionId(), getRegistryKey(), data, offset, length); 160 broadcastMessage(binaryMessage); 161 } 162 163 /** 164 * A helper that registers the opened connection in the application-level registry. 165 * 166 * @param connection 167 * the web socket connection to use to communicate with the client 168 * @see #onOpen(Object) 169 */ 170 protected final void onConnect(final IWebSocketConnection connection) { 171 IKey key = getRegistryKey(); 172 connectionRegistry.setConnection(getApplication(), getSessionId(), key, connection); 173 174 if (connectionFilter != null) 175 { 176 ConnectionRejected connectionRejected = connectionFilter.doFilter(servletRequest); 177 if (connectionRejected != null) 178 { 179 broadcastMessage(new AbortedMessage(getApplication(), getSessionId(), key)); 180 connectionRegistry.removeConnection(getApplication(), getSessionId(), key); 181 connection.close(connectionRejected.getCode(), connectionRejected.getReason()); 182 return; 183 } 184 } 185 186 broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection, webSocketSettings.isAsynchronousPush(), webSocketSettings.getAsynchronousPushTimeout()); 187 } 188 189 @Override 190 public void onClose(int closeCode, String message) 191 { 192 IKey key = getRegistryKey(); 193 if (webSocketSettings.shouldNotifyOnCloseEvent(closeCode)) { 194 broadcastMessage(new ClosedMessage(getApplication(), getSessionId(), key, closeCode, message)); 195 } 196 connectionRegistry.removeConnection(getApplication(), getSessionId(), key); 197 } 198 199 @Override 200 public void onError(Throwable t) 201 { 202 if (webSocketSettings.shouldNotifyOnErrorEvent(t)) { 203 IKey key = getRegistryKey(); 204 broadcastMessage(new ErrorMessage(getApplication(), getSessionId(), key, t)); 205 } 206 } 207 208 public final void broadcastMessage(final IWebSocketMessage message) 209 { 210 IKey key = getRegistryKey(); 211 IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key); 212 broadcastMessage(message, connection, webSocketSettings.isAsynchronousPush(), webSocketSettings.getAsynchronousPushTimeout()); 213 } 214 215 /** 216 * Exports the Wicket thread locals and broadcasts the received message from the client to all 217 * interested components and behaviors in the page with id {@code #pageId} 218 * <p> 219 * Note: ConnectedMessage and ClosedMessage messages are notification-only. I.e. whatever the 220 * components/behaviors write in the WebSocketRequestHandler will be ignored because the protocol 221 * doesn't expect response from the user. 222 * </p> 223 * 224 * @param message 225 * the message to broadcast 226 * @param connection 227 * the {@link org.apache.wicket.protocol.ws.api.IWebSocketConnection} 228 * @param asynchronousPush 229 * whether asynchronous pus is used or not 230 * @param timeout 231 * The time ut to use for operation (in milliseconds). A negative value means use default timeout 232 * (specified by container). 233 */ 234 public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection, boolean asynchronousPush, long timeout) 235 { 236 if (connection != null && (connection.isOpen() || isSpecialMessage(message))) 237 { 238 Application oldApplication = ThreadContext.getApplication(); 239 Session oldSession = ThreadContext.getSession(); 240 RequestCycle oldRequestCycle = ThreadContext.getRequestCycle(); 241 242 WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection, asynchronousPush, timeout); 243 try 244 { 245 WebSocketRequestMapper requestMapper = new WebSocketRequestMapper(application.getRootRequestMapper()); 246 RequestCycle requestCycle = createRequestCycle(requestMapper, webResponse); 247 ThreadContext.setRequestCycle(requestCycle); 248 249 ThreadContext.setApplication(application); 250 251 Session session; 252 if (oldSession == null || message instanceof IWebSocketPushMessage) 253 { 254 ISessionStore sessionStore = application.getSessionStore(); 255 session = sessionStore.lookup(webRequest); 256 ThreadContext.setSession(session); 257 } 258 else 259 { 260 session = oldSession; 261 } 262 263 if (session == null) 264 { 265 connectionRegistry.removeConnection(application, sessionId, connection.getKey()); 266 LOG.debug("No Session could be found for session id '{}' and key '{}'!", sessionId, connection.getKey()); 267 return; 268 } 269 270 IPageManager pageManager = session.getPageManager(); 271 Page page = getPage(pageManager); 272 273 if (page != null) 274 { 275 WebSocketRequestHandler requestHandler = webSocketSettings.newWebSocketRequestHandler(page, connection); 276 277 WebSocketPayload<?> payload = createEventPayload(message, requestHandler); 278 279 if (!(message instanceof ConnectedMessage || isSpecialMessage(message))) { 280 requestCycle.scheduleRequestHandlerAfterCurrent(requestHandler); 281 } 282 283 IRequestHandler broadcastingHandler = new WebSocketMessageBroadcastHandler(pageId, resourceName, payload); 284 requestMapper.setHandler(broadcastingHandler); 285 requestCycle.processRequestAndDetach(); 286 } 287 else 288 { 289 LOG.debug("Page with id '{}' has been expired. No message will be broadcast!", pageId); 290 } 291 } 292 catch (Exception x) 293 { 294 LOG.error("An error occurred during processing of a WebSocket message", x); 295 } 296 finally 297 { 298 try 299 { 300 webResponse.close(); 301 } 302 finally 303 { 304 ThreadContext.setApplication(oldApplication); 305 ThreadContext.setRequestCycle(oldRequestCycle); 306 ThreadContext.setSession(oldSession); 307 } 308 } 309 } 310 else 311 { 312 LOG.debug("Either there is no connection({}) or it is closed.", connection); 313 } 314 } 315 316 private static boolean isSpecialMessage(IWebSocketMessage message) 317 { 318 return message instanceof ClosedMessage || message instanceof ErrorMessage || message instanceof AbortedMessage; 319 } 320 321 private RequestCycle createRequestCycle(WebSocketRequestMapper requestMapper, WebResponse webResponse) 322 { 323 RequestCycleContext context = new RequestCycleContext(webRequest, webResponse, 324 requestMapper, application.getExceptionMapperProvider().get()); 325 326 RequestCycle requestCycle = application.getRequestCycleProvider().apply(context); 327 requestCycle.getListeners().add(application.getRequestCycleListeners()); 328 requestCycle.getListeners().add(new IRequestCycleListener() 329 { 330 @Override 331 public void onDetach(final RequestCycle requestCycle) 332 { 333 if (Session.exists()) 334 { 335 Session.get().getPageManager().detach(); 336 } 337 } 338 }); 339 requestCycle.getUrlRenderer().setBaseUrl(baseUrl); 340 return requestCycle; 341 } 342 343 /** 344 * @param pageManager 345 * the page manager to use when finding a page by id 346 * @return the page to use when creating WebSocketRequestHandler 347 */ 348 private Page getPage(IPageManager pageManager) 349 { 350 Page page; 351 if (pageId != -1) 352 { 353 page = (Page) pageManager.getPage(pageId); 354 } 355 else 356 { 357 page = new WebSocketResourcePage(); 358 } 359 return page; 360 } 361 362 protected final WebApplication getApplication() 363 { 364 return application; 365 } 366 367 protected final String getSessionId() 368 { 369 return sessionId; 370 } 371 372 private WebSocketPayload<?> createEventPayload(IWebSocketMessage message, WebSocketRequestHandler handler) 373 { 374 final WebSocketPayload<?> payload; 375 if (message instanceof TextMessage) 376 { 377 payload = new WebSocketTextPayload((TextMessage) message, handler); 378 } 379 else if (message instanceof BinaryMessage) 380 { 381 payload = new WebSocketBinaryPayload((BinaryMessage) message, handler); 382 } 383 else if (message instanceof ConnectedMessage) 384 { 385 payload = new WebSocketConnectedPayload((ConnectedMessage) message, handler); 386 } 387 else if (message instanceof ClosedMessage) 388 { 389 payload = new WebSocketClosedPayload((ClosedMessage) message, handler); 390 } 391 else if (message instanceof ErrorMessage) 392 { 393 payload = new WebSocketErrorPayload((ErrorMessage) message, handler); 394 } 395 else if (message instanceof AbortedMessage) 396 { 397 payload = new WebSocketAbortedPayload((AbortedMessage) message, handler); 398 } 399 else if (message instanceof IWebSocketPushMessage) 400 { 401 payload = new WebSocketPushPayload((IWebSocketPushMessage) message, handler); 402 } 403 else 404 { 405 throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName()); 406 } 407 return payload; 408 } 409 410 protected IKey getRegistryKey() 411 { 412 IKey key; 413 if (Strings.isEmpty(resourceName)) 414 { 415 key = new PageIdKey(pageId, context); 416 } 417 else 418 { 419 if (Strings.isEmpty(connectionToken)) 420 { 421 key = new ResourceNameKey(resourceName, context); 422 } else { 423 key = new ResourceNameTokenKey(resourceName, connectionToken, context); 424 } 425 } 426 return key; 427 } 428 429 /** 430 * A dummy page that is used to create a new WebSocketRequestHandler for 431 * web socket connections to WebSocketResource 432 */ 433 private static class WebSocketResourcePage extends WebPage implements IMarkupResourceStreamProvider 434 { 435 private WebSocketResourcePage() 436 { 437 setStatelessHint(true); 438 } 439 440 @Override 441 public IResourceStream getMarkupResourceStream(MarkupContainer container, Class<?> containerClass) 442 { 443 return new StringResourceStream(""); 444 } 445 } 446}