001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.wicket.protocol.ws.api; 018 019import static java.util.Collections.singletonList; 020 021import java.util.Collection; 022 023import org.apache.wicket.Application; 024import org.apache.wicket.protocol.ws.WebSocketSettings; 025import org.apache.wicket.protocol.ws.api.message.ConnectedMessage; 026import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; 027import org.apache.wicket.protocol.ws.api.registry.IKey; 028import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry; 029import org.apache.wicket.protocol.ws.concurrent.Executor; 030import org.apache.wicket.util.lang.Args; 031 032/** 033 * Allows pushing events for processing to Pages that have active web sockets. 034 * 035 * @since 6.4 036 * @author Mikko Tiihonen 037 */ 038public class WebSocketPushBroadcaster 039{ 040 private final IWebSocketConnectionRegistry registry; 041 042 public WebSocketPushBroadcaster(IWebSocketConnectionRegistry registry) 043 { 044 Args.notNull(registry, "registry"); 045 046 this.registry = registry; 047 } 048 049 /** 050 * Processes the given message in the page and session identified by the given Web Socket connection. 051 * The message is sent as an event to the Page and components of the session allowing the components 052 * to be updated. 053 * 054 * This method can be invoked from any thread, even a non-wicket thread. By default all processing 055 * is done in the caller thread. Use 056 * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)} 057 * to move processing to background threads. 058 * 059 * If the given connection is no longer open then the broadcast is silently ignored. 060 * 061 * @param connection 062 * The Web Socket connection that identifies the page and session 063 * @param message 064 * The push message event 065 */ 066 public void broadcast(ConnectedMessage connection, IWebSocketPushMessage message) 067 { 068 Args.notNull(connection, "connection"); 069 Args.notNull(message, "message"); 070 071 Application application = connection.getApplication(); 072 String sessionId = connection.getSessionId(); 073 IKey key = connection.getKey(); 074 IWebSocketConnection wsConnection = registry.getConnection(application, sessionId, key); 075 if (wsConnection == null) 076 { 077 return; 078 } 079 process(application, singletonList(wsConnection), message); 080 } 081 082 /** 083 * Processes the given message in all pages that have active Web Socket connections. 084 * The message is sent as an event to the Page and components of the session allowing the components 085 * to be updated. 086 * 087 * This method can be invoked from any thread, even a non-wicket thread. By default all processing 088 * is done in the caller thread. Use 089 * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)} 090 * to move processing to background threads. 091 * 092 * If some connections are not in valid state they are silently ignored. 093 * 094 * @param application 095 * The wicket application 096 * @param message 097 * The push message event 098 */ 099 public void broadcastAll(Application application, IWebSocketPushMessage message) 100 { 101 Args.notNull(application, "application"); 102 Args.notNull(message, "message"); 103 104 Collection<IWebSocketConnection> wsConnections = registry.getConnections(application); 105 if (wsConnections == null) 106 { 107 return; 108 } 109 process(application, wsConnections, message); 110 } 111 112 /** 113 * Processes the given message in all pages in a given session that have active Web Socket connections. 114 * The message is sent as an event to the Page and components of the session allowing the components 115 * to be updated. 116 * 117 * This method can be invoked from any thread, even a non-wicket thread. By default all processing 118 * is done in the caller thread. Use 119 * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)} 120 * to move processing to background threads. 121 * 122 * If some connections are not in valid state they are silently ignored. 123 * 124 * @param application 125 * The wicket application 126 * 127 * @param sessionId 128 * The session ID 129 * @param message 130 * The push message event 131 */ 132 public void broadcastAllInSession(Application application, String sessionId, IWebSocketPushMessage message) 133 { 134 Args.notNull(application, "application"); 135 Args.notNull(message, "message"); 136 Args.notNull(sessionId, "sessionId"); 137 138 Collection<IWebSocketConnection> wsConnections = registry.getConnections(application, sessionId); 139 if (wsConnections == null || wsConnections.isEmpty()) 140 { 141 return; 142 } 143 process(application, wsConnections, message); 144 } 145 146 /** 147 * Processes the given message in all pages in a given session that have active Web Socket connections and match 148 * the given filter. The message is sent as an event to the Page and components of the session allowing the components 149 * to be updated. 150 * 151 * This method can be invoked from any thread, even a non-wicket thread. By default all processing 152 * is done in the caller thread. Use 153 * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)} 154 * to move processing to background threads. 155 * 156 * If some connections are not in valid state they are silently ignored. 157 * 158 * @param application 159 * The wicket application 160 * 161 * @param connectionsFilter 162 * the {@link org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry.IConnectionsFilter} 163 * @param message 164 * The push message event 165 */ 166 public void broadcastAllMatchingFilter(Application application, IWebSocketConnectionRegistry.IConnectionsFilter connectionsFilter, IWebSocketPushMessage message) 167 { 168 Args.notNull(application, "application"); 169 Args.notNull(message, "message"); 170 Args.notNull(connectionsFilter, "connectionsFilter"); 171 172 Collection<IWebSocketConnection> wsConnections = registry.getConnections(application, connectionsFilter); 173 if (wsConnections == null || wsConnections.isEmpty()) 174 { 175 return; 176 } 177 process(application, wsConnections, message); 178 } 179 180 private void process(final Application application, final Collection<IWebSocketConnection> wsConnections, 181 final IWebSocketPushMessage message) 182 { 183 WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(application); 184 Executor executor = webSocketSettings.getWebSocketPushMessageExecutor(); 185 for (final IWebSocketConnection wsConnection : wsConnections) 186 { 187 executor.run(new Runnable() 188 { 189 @Override 190 public void run() 191 { 192 if (webSocketSettings.isAsynchronousPush()) 193 { 194 wsConnection.sendMessageAsync(message, webSocketSettings.getAsynchronousPushTimeout()); 195 } 196 else 197 { 198 wsConnection.sendMessage(message); 199 } 200 } 201 }); 202 } 203 } 204}