MultipointBioSender.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.transport.bio;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.AbstractSender;
import org.apache.catalina.tribes.transport.MultiPointSender;

/**
 * @deprecated This will be removed in Tomcat 10
 */
@Deprecated
public class MultipointBioSender extends AbstractSender implements MultiPointSender {
    public MultipointBioSender() {
        // NO-OP
    }

    protected final HashMap<Member,BioSender> bioSenders = new HashMap<>();

    @Override
    public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
        byte[] data = XByteBuffer.createDataPackage((ChannelData) msg);
        BioSender[] senders = setupForSend(destination);
        ChannelException cx = null;
        for (int i = 0; i < senders.length; i++) {
            try {
                senders[i].sendMessage(data,
                        (msg.getOptions() & Channel.SEND_OPTIONS_USE_ACK) == Channel.SEND_OPTIONS_USE_ACK);
            } catch (Exception x) {
                if (cx == null) {
                    cx = new ChannelException(x);
                }
                cx.addFaultyMember(destination[i], x);
            }
        }
        if (cx != null) {
            throw cx;
        }
    }


    protected BioSender[] setupForSend(Member[] destination) throws ChannelException {
        ChannelException cx = null;
        BioSender[] result = new BioSender[destination.length];
        for (int i = 0; i < destination.length; i++) {
            try {
                BioSender sender = bioSenders.get(destination[i]);
                if (sender == null) {
                    sender = new BioSender();
                    AbstractSender.transferProperties(this, sender);
                    sender.setDestination(destination[i]);
                    bioSenders.put(destination[i], sender);
                }
                result[i] = sender;
                if (!result[i].isConnected()) {
                    result[i].connect();
                }
                result[i].keepalive();
            } catch (Exception x) {
                if (cx == null) {
                    cx = new ChannelException(x);
                }
                cx.addFaultyMember(destination[i], x);
            }
        }
        if (cx != null) {
            throw cx;
        } else {
            return result;
        }
    }

    @Override
    public void connect() throws IOException {
        // do nothing, we connect on demand
        setConnected(true);
    }


    private synchronized void close() throws ChannelException {
        ChannelException x = null;
        Object[] members = bioSenders.keySet().toArray();
        for (int i = 0; i < members.length; i++) {
            Member mbr = (Member) members[i];
            try {
                BioSender sender = bioSenders.get(mbr);
                sender.disconnect();
            } catch (Exception e) {
                if (x == null) {
                    x = new ChannelException(e);
                }
                x.addFaultyMember(mbr, e);
            }
            bioSenders.remove(mbr);
        }
        if (x != null) {
            throw x;
        }
    }

    @Override
    public void add(Member member) {
        // NO-OP
        // Members are defined by the array of members specified in the call to
        // sendMessage()
    }

    @Override
    public void remove(Member member) {
        // disconnect senders
        BioSender sender = bioSenders.remove(member);
        if (sender != null) {
            sender.disconnect();
        }
    }


    @Override
    public synchronized void disconnect() {
        try {
            close();
        } catch (Exception x) {
            // Ignore
        }
        setConnected(false);
    }

    @Override
    protected void finalize() throws Throwable {
        try {
            disconnect();
        } catch (Exception e) {
            // Ignore
        }
        super.finalize();
    }


    @Override
    public boolean keepalive() {
        boolean result = false;
        @SuppressWarnings("unchecked")
        Map.Entry<Member,BioSender>[] entries = bioSenders.entrySet().toArray(new Map.Entry[0]);
        for (int i = 0; i < entries.length; i++) {
            BioSender sender = entries[i].getValue();
            if (sender.keepalive()) {
                bioSenders.remove(entries[i].getKey());
            }
        }
        return result;
    }

}