001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.wicket.protocol.ws.api;
018
019import static java.util.Collections.singletonList;
020
021import java.util.Collection;
022
023import org.apache.wicket.Application;
024import org.apache.wicket.protocol.ws.WebSocketSettings;
025import org.apache.wicket.protocol.ws.api.message.ConnectedMessage;
026import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
027import org.apache.wicket.protocol.ws.api.registry.IKey;
028import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
029import org.apache.wicket.protocol.ws.concurrent.Executor;
030import org.apache.wicket.util.lang.Args;
031
032/**
033 * Allows pushing events for processing to Pages that have active web sockets.
034 *
035 * @since 6.4
036 * @author Mikko Tiihonen
037 */
038public class WebSocketPushBroadcaster
039{
040        private final IWebSocketConnectionRegistry registry;
041
042        public WebSocketPushBroadcaster(IWebSocketConnectionRegistry registry)
043        {
044                Args.notNull(registry, "registry");
045
046                this.registry = registry;
047        }
048
049        /**
050         * Processes the given message in the page and session identified by the given Web Socket connection.
051         * The message is sent as an event to the Page and components of the session allowing the components
052         * to be updated.
053         *
054         * This method can be invoked from any thread, even a non-wicket thread. By default all processing
055         * is done in the caller thread. Use
056         * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)}
057         * to move processing to background threads.
058         *
059         * If the given connection is no longer open then the broadcast is silently ignored.
060         *
061         * @param connection
062         *                      The Web Socket connection that identifies the page and session
063         * @param message
064         *                      The push message event
065         */
066        public void broadcast(ConnectedMessage connection, IWebSocketPushMessage message)
067        {
068                Args.notNull(connection, "connection");
069                Args.notNull(message, "message");
070
071                Application application = connection.getApplication();
072                String sessionId = connection.getSessionId();
073                IKey key = connection.getKey();
074                IWebSocketConnection wsConnection = registry.getConnection(application, sessionId, key);
075                if (wsConnection == null)
076                {
077                        return;
078                }
079                process(application, singletonList(wsConnection), message);
080        }
081
082        /**
083         * Processes the given message in all pages that have active Web Socket connections.
084         * The message is sent as an event to the Page and components of the session allowing the components
085         * to be updated.
086         *
087         * This method can be invoked from any thread, even a non-wicket thread. By default all processing
088         * is done in the caller thread. Use
089         * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)}
090         * to move processing to background threads.
091         *
092         * If some connections are not in valid state they are silently ignored.
093         *
094         * @param application
095         *                      The wicket application
096         * @param message
097         *                      The push message event
098         */
099        public void broadcastAll(Application application, IWebSocketPushMessage message)
100        {
101                Args.notNull(application, "application");
102                Args.notNull(message, "message");
103
104                Collection<IWebSocketConnection> wsConnections = registry.getConnections(application);
105                if (wsConnections == null)
106                {
107                        return;
108                }
109                process(application, wsConnections, message);
110        }
111
112        /**
113         * Processes the given message in all pages in a given session that have active Web Socket connections.
114         * The message is sent as an event to the Page and components of the session allowing the components
115         * to be updated.
116         *
117         * This method can be invoked from any thread, even a non-wicket thread. By default all processing
118         * is done in the caller thread. Use
119         * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)}
120         * to move processing to background threads.
121         *
122         * If some connections are not in valid state they are silently ignored.
123         *
124         * @param application
125         *                      The wicket application
126         *
127         * @param sessionId
128         *         The session ID
129         * @param message
130         *                      The push message event
131         */
132        public void broadcastAllInSession(Application application, String sessionId, IWebSocketPushMessage message)
133        {
134                Args.notNull(application, "application");
135                Args.notNull(message, "message");
136                Args.notNull(sessionId, "sessionId");
137
138                Collection<IWebSocketConnection> wsConnections = registry.getConnections(application, sessionId);
139                if (wsConnections == null || wsConnections.isEmpty())
140                {
141                        return;
142                }
143                process(application, wsConnections, message);
144        }
145
146        /**
147         * Processes the given message in all pages in a given session that have active Web Socket connections and match
148         * the given filter. The message is sent as an event to the Page and components of the session allowing the components
149         * to be updated.
150         *
151         * This method can be invoked from any thread, even a non-wicket thread. By default all processing
152         * is done in the caller thread. Use
153         * {@link WebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)}
154         * to move processing to background threads.
155         *
156         * If some connections are not in valid state they are silently ignored.
157         *
158         * @param application
159         *                      The wicket application
160         *
161         * @param connectionsFilter
162         *         the {@link org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry.IConnectionsFilter}
163         * @param message
164         *                      The push message event
165         */
166        public void broadcastAllMatchingFilter(Application application, IWebSocketConnectionRegistry.IConnectionsFilter connectionsFilter, IWebSocketPushMessage message)
167        {
168                Args.notNull(application, "application");
169                Args.notNull(message, "message");
170                Args.notNull(connectionsFilter, "connectionsFilter");
171
172                Collection<IWebSocketConnection> wsConnections = registry.getConnections(application, connectionsFilter);
173                if (wsConnections == null  || wsConnections.isEmpty())
174                {
175                        return;
176                }
177                process(application, wsConnections, message);
178        }
179
180        private void process(final Application application, final Collection<IWebSocketConnection> wsConnections,
181                             final IWebSocketPushMessage message)
182        {
183                WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(application);
184                Executor executor = webSocketSettings.getWebSocketPushMessageExecutor();
185                for (final IWebSocketConnection wsConnection : wsConnections)
186                {
187                        executor.run(new Runnable()
188                        {
189                                @Override
190                                public void run()
191                                {
192                                        if (webSocketSettings.isAsynchronousPush())
193                                        {
194                                                wsConnection.sendMessageAsync(message, webSocketSettings.getAsynchronousPushTimeout());
195                                        }
196                                        else
197                                        {
198                                                wsConnection.sendMessage(message);
199                                        }
200                                }
201                        });
202                }
203        }
204}