MemberImpl.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.membership;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.SenderState;
import org.apache.catalina.tribes.util.StringManager;

/**
 * A <b>membership</b> implementation using simple multicast. This is the representation of a multicast member. Carries
 * the host, and port of the this or other cluster nodes.
 */
public class MemberImpl implements Member, java.io.Externalizable {

    public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] { 84, 82, 73, 66, 69, 83, 45, 66, 1, 0 };
    public static final transient byte[] TRIBES_MBR_END = new byte[] { 84, 82, 73, 66, 69, 83, 45, 69, 1, 0 };
    protected static final StringManager sm = StringManager.getManager(Constants.Package);

    /**
     * The listen host for this member
     */
    protected volatile byte[] host = new byte[0];
    protected transient volatile String hostname;
    /**
     * The tcp listen port for this member
     */
    protected volatile int port;
    /**
     * The udp listen port for this member
     */
    protected volatile int udpPort = -1;

    /**
     * The tcp/SSL listen port for this member
     */
    protected volatile int securePort = -1;

    /**
     * Counter for how many broadcast messages have been sent from this member
     */
    protected AtomicInteger msgCount = new AtomicInteger(0);

    /**
     * The number of milliseconds since this member was created, is kept track of using the start time
     */
    protected volatile long memberAliveTime = 0;

    /**
     * For the local member only
     */
    protected transient long serviceStartTime;

    /**
     * To avoid serialization over and over again, once the local dataPkg has been set, we use that to transmit data
     */
    protected transient byte[] dataPkg = null;

    /**
     * Unique session Id for this member
     */
    protected volatile byte[] uniqueId = new byte[16];

    /**
     * Custom payload that an app framework can broadcast Also used to transport stop command.
     */
    protected volatile byte[] payload = new byte[0];

    /**
     * Command, so that the custom payload doesn't have to be used This is for internal tribes use, such as
     * SHUTDOWN_COMMAND
     */
    protected volatile byte[] command = new byte[0];

    /**
     * Domain if we want to filter based on domain.
     */
    protected volatile byte[] domain = new byte[0];

    /**
     * The flag indicating that this member is a local member.
     */
    protected volatile boolean local = false;

    /**
     * Empty constructor for serialization
     */
    public MemberImpl() {

    }

    /**
     * Construct a new member object.
     *
     * @param host      - the tcp listen host
     * @param port      - the tcp listen port
     * @param aliveTime - the number of milliseconds since this member was created
     *
     * @throws IOException If there is an error converting the host name to an IP address
     */
    public MemberImpl(String host, int port, long aliveTime) throws IOException {
        setHostname(host);
        this.port = port;
        this.memberAliveTime = aliveTime;
    }

    public MemberImpl(String host, int port, long aliveTime, byte[] payload) throws IOException {
        this(host, port, aliveTime);
        setPayload(payload);
    }

    @Override
    public boolean isReady() {
        return SenderState.getSenderState(this).isReady();
    }

    @Override
    public boolean isSuspect() {
        return SenderState.getSenderState(this).isSuspect();
    }

    @Override
    public boolean isFailing() {
        return SenderState.getSenderState(this).isFailing();
    }

    /**
     * Increment the message count.
     */
    protected void inc() {
        msgCount.incrementAndGet();
    }

    /**
     * Create a data package to send over the wire representing this member. This is faster than serialization.
     *
     * @return - the bytes for this member deserialized
     */
    public byte[] getData() {
        return getData(true);
    }


    @Override
    public byte[] getData(boolean getalive) {
        return getData(getalive, false);
    }


    @Override
    public synchronized int getDataLength() {
        return TRIBES_MBR_BEGIN.length + // start pkg
                4 + // data length
                8 + // alive time
                4 + // port
                4 + // secure port
                4 + // udp port
                1 + // host length
                host.length + // host
                4 + // command length
                command.length + // command
                4 + // domain length
                domain.length + // domain
                16 + // unique id
                4 + // payload length
                payload.length + // payload
                TRIBES_MBR_END.length; // end pkg
    }


    @Override
    public synchronized byte[] getData(boolean getalive, boolean reset) {
        if (reset) {
            dataPkg = null;
        }
        // Look in cache first
        if (dataPkg != null) {
            if (getalive) {
                // You'd be surprised, but System.currentTimeMillis
                // shows up on the profiler
                long alive = System.currentTimeMillis() - getServiceStartTime();
                byte[] result = dataPkg.clone();
                XByteBuffer.toBytes(alive, result, TRIBES_MBR_BEGIN.length + 4);
                dataPkg = result;
            }
            return dataPkg;
        }

        // package looks like
        // start package TRIBES_MBR_BEGIN.length
        // package length - 4 bytes
        // alive - 8 bytes
        // port - 4 bytes
        // secure port - 4 bytes
        // udp port - 4 bytes
        // host length - 1 byte
        // host - hl bytes
        // clen - 4 bytes
        // command - clen bytes
        // dlen - 4 bytes
        // domain - dlen bytes
        // uniqueId - 16 bytes
        // payload length - 4 bytes
        // payload plen bytes
        // end package TRIBES_MBR_END.length
        long alive = System.currentTimeMillis() - getServiceStartTime();
        byte[] data = new byte[getDataLength()];

        int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);

        int pos = 0;

        // TRIBES_MBR_BEGIN
        System.arraycopy(TRIBES_MBR_BEGIN, 0, data, pos, TRIBES_MBR_BEGIN.length);
        pos += TRIBES_MBR_BEGIN.length;

        // body length
        XByteBuffer.toBytes(bodylength, data, pos);
        pos += 4;

        // alive data
        XByteBuffer.toBytes(alive, data, pos);
        pos += 8;
        // port
        XByteBuffer.toBytes(port, data, pos);
        pos += 4;
        // secure port
        XByteBuffer.toBytes(securePort, data, pos);
        pos += 4;
        // udp port
        XByteBuffer.toBytes(udpPort, data, pos);
        pos += 4;
        // host length
        data[pos++] = (byte) host.length;
        // host
        System.arraycopy(host, 0, data, pos, host.length);
        pos += host.length;
        // clen - 4 bytes
        XByteBuffer.toBytes(command.length, data, pos);
        pos += 4;
        // command - clen bytes
        System.arraycopy(command, 0, data, pos, command.length);
        pos += command.length;
        // dlen - 4 bytes
        XByteBuffer.toBytes(domain.length, data, pos);
        pos += 4;
        // domain - dlen bytes
        System.arraycopy(domain, 0, data, pos, domain.length);
        pos += domain.length;
        // unique Id
        System.arraycopy(uniqueId, 0, data, pos, uniqueId.length);
        pos += uniqueId.length;
        // payload
        XByteBuffer.toBytes(payload.length, data, pos);
        pos += 4;
        System.arraycopy(payload, 0, data, pos, payload.length);
        pos += payload.length;

        // TRIBES_MBR_END
        System.arraycopy(TRIBES_MBR_END, 0, data, pos, TRIBES_MBR_END.length);
        pos += TRIBES_MBR_END.length;

        // create local data
        dataPkg = data;
        return data;
    }

    /**
     * Deserializes a member from data sent over the wire.
     *
     * @param data   The bytes received
     * @param member The member object to populate
     *
     * @return The populated member object.
     */
    public static Member getMember(byte[] data, MemberImpl member) {
        return getMember(data, 0, data.length, member);
    }

    public static Member getMember(byte[] data, int offset, int length, MemberImpl member) {
        // package looks like
        // start package TRIBES_MBR_BEGIN.length
        // package length - 4 bytes
        // alive - 8 bytes
        // port - 4 bytes
        // secure port - 4 bytes
        // udp port - 4 bytes
        // host length - 1 byte
        // host - hl bytes
        // clen - 4 bytes
        // command - clen bytes
        // dlen - 4 bytes
        // domain - dlen bytes
        // uniqueId - 16 bytes
        // payload length - 4 bytes
        // payload plen bytes
        // end package TRIBES_MBR_END.length

        int pos = offset;

        if (XByteBuffer.firstIndexOf(data, offset, TRIBES_MBR_BEGIN) != pos) {
            throw new IllegalArgumentException(sm.getString("memberImpl.invalid.package.begin",
                    org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN)));
        }

        if (length < (TRIBES_MBR_BEGIN.length + 4)) {
            throw new ArrayIndexOutOfBoundsException(sm.getString("memberImpl.package.small"));
        }

        pos += TRIBES_MBR_BEGIN.length;

        int bodylength = XByteBuffer.toInt(data, pos);
        pos += 4;

        if (length < (bodylength + 4 + TRIBES_MBR_BEGIN.length + TRIBES_MBR_END.length)) {
            throw new ArrayIndexOutOfBoundsException(sm.getString("memberImpl.notEnough.bytes"));
        }

        int endpos = pos + bodylength;
        if (XByteBuffer.firstIndexOf(data, endpos, TRIBES_MBR_END) != endpos) {
            throw new IllegalArgumentException(sm.getString("memberImpl.invalid.package.end",
                    org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END)));
        }


        byte[] alived = new byte[8];
        System.arraycopy(data, pos, alived, 0, 8);
        pos += 8;
        byte[] portd = new byte[4];
        System.arraycopy(data, pos, portd, 0, 4);
        pos += 4;

        byte[] sportd = new byte[4];
        System.arraycopy(data, pos, sportd, 0, 4);
        pos += 4;

        byte[] uportd = new byte[4];
        System.arraycopy(data, pos, uportd, 0, 4);
        pos += 4;


        byte hl = data[pos++];
        byte[] addr = new byte[hl];
        System.arraycopy(data, pos, addr, 0, hl);
        pos += hl;

        int cl = XByteBuffer.toInt(data, pos);
        pos += 4;

        byte[] command = new byte[cl];
        System.arraycopy(data, pos, command, 0, command.length);
        pos += command.length;

        int dl = XByteBuffer.toInt(data, pos);
        pos += 4;

        byte[] domain = new byte[dl];
        System.arraycopy(data, pos, domain, 0, domain.length);
        pos += domain.length;

        byte[] uniqueId = new byte[16];
        System.arraycopy(data, pos, uniqueId, 0, 16);
        pos += 16;

        int pl = XByteBuffer.toInt(data, pos);
        pos += 4;

        byte[] payload = new byte[pl];
        System.arraycopy(data, pos, payload, 0, payload.length);
        pos += payload.length;

        synchronized (member) {
            member.setHost(addr);
            member.setPort(XByteBuffer.toInt(portd, 0));
            member.setSecurePort(XByteBuffer.toInt(sportd, 0));
            member.setUdpPort(XByteBuffer.toInt(uportd, 0));
            member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
            member.setUniqueId(uniqueId);
            member.payload = payload;
            member.domain = domain;
            member.command = command;

            member.dataPkg = new byte[length];
            System.arraycopy(data, offset, member.dataPkg, 0, length);
        }

        return member;
    }

    public static Member getMember(byte[] data) {
        return getMember(data, new MemberImpl());
    }

    public static Member getMember(byte[] data, int offset, int length) {
        return getMember(data, offset, length, new MemberImpl());
    }

    @Override
    public String getName() {
        return "tcp://" + getHostname() + ":" + getPort();
    }

    @Override
    public int getPort() {
        return this.port;
    }

    @Override
    public byte[] getHost() {
        return host;
    }

    public String getHostname() {
        if (this.hostname != null) {
            return hostname;
        } else {
            byte[] host = this.host;
            this.hostname = org.apache.catalina.tribes.util.Arrays.toString(host, 0, host.length, true);
            return this.hostname;
        }
    }

    public int getMsgCount() {
        return msgCount.get();
    }

    @Override
    public long getMemberAliveTime() {
        return memberAliveTime;
    }

    public long getServiceStartTime() {
        return serviceStartTime;
    }

    @Override
    public byte[] getUniqueId() {
        return uniqueId;
    }

    @Override
    public byte[] getPayload() {
        return payload;
    }

    @Override
    public byte[] getCommand() {
        return command;
    }

    @Override
    public byte[] getDomain() {
        return domain;
    }

    @Override
    public int getSecurePort() {
        return securePort;
    }

    @Override
    public int getUdpPort() {
        return udpPort;
    }

    @Override
    public void setMemberAliveTime(long time) {
        memberAliveTime = time;
    }


    @Override
    public String toString() {
        StringBuilder buf = new StringBuilder(getClass().getName());
        buf.append('[');
        buf.append(getName()).append(',');
        buf.append(getHostname()).append(',');
        buf.append(port).append(", alive=");
        buf.append(memberAliveTime).append(", ");
        buf.append("securePort=").append(securePort).append(", ");
        buf.append("UDP Port=").append(udpPort).append(", ");
        buf.append("id=").append(bToS(this.uniqueId)).append(", ");
        buf.append("payload=").append(bToS(this.payload, 8)).append(", ");
        buf.append("command=").append(bToS(this.command, 8)).append(", ");
        buf.append("domain=").append(bToS(this.domain, 8));
        buf.append(']');
        return buf.toString();
    }

    public static String bToS(byte[] data) {
        return bToS(data, data.length);
    }

    public static String bToS(byte[] data, int max) {
        StringBuilder buf = new StringBuilder(4 * 16);
        buf.append('{');
        for (int i = 0; data != null && i < data.length; i++) {
            buf.append(String.valueOf(data[i])).append(' ');
            if (i == max) {
                buf.append("...(" + data.length + ")");
                break;
            }
        }
        buf.append('}');
        return buf.toString();
    }

    @Override
    public int hashCode() {
        return getHost()[0] + getHost()[1] + getHost()[2] + getHost()[3];
    }

    /**
     * Returns true if the param o is a McastMember with the same name
     *
     * @param o The object to test for equality
     */
    @Override
    public boolean equals(Object o) {
        if (o instanceof MemberImpl) {
            return Arrays.equals(this.getHost(), ((MemberImpl) o).getHost()) &&
                    this.getPort() == ((MemberImpl) o).getPort() &&
                    Arrays.equals(this.getUniqueId(), ((MemberImpl) o).getUniqueId());
        } else {
            return false;
        }
    }

    public synchronized void setHost(byte[] host) {
        this.host = host;
    }

    public void setHostname(String host) throws IOException {
        hostname = host;
        synchronized (this) {
            this.host = java.net.InetAddress.getByName(host).getAddress();
        }
    }

    public void setMsgCount(int msgCount) {
        this.msgCount.set(msgCount);
    }

    public synchronized void setPort(int port) {
        this.port = port;
        this.dataPkg = null;
    }

    public void setServiceStartTime(long serviceStartTime) {
        this.serviceStartTime = serviceStartTime;
    }

    public synchronized void setUniqueId(byte[] uniqueId) {
        this.uniqueId = uniqueId != null ? uniqueId : new byte[16];
        getData(true, true);
    }

    @Override
    public synchronized void setPayload(byte[] payload) {
        // longs to avoid any possibility of overflow
        long oldPayloadLength = this.payload.length;
        long newPayloadLength = 0;
        if (payload != null) {
            newPayloadLength = payload.length;
        }
        if (newPayloadLength > oldPayloadLength) {
            // It is possible that the max packet size will be exceeded
            if ((newPayloadLength - oldPayloadLength +
                    getData(false, false).length) > McastServiceImpl.MAX_PACKET_SIZE) {
                throw new IllegalArgumentException(sm.getString("memberImpl.large.payload"));
            }
        }
        this.payload = payload != null ? payload : new byte[0];
        getData(true, true);
    }

    @Override
    public synchronized void setCommand(byte[] command) {
        this.command = command != null ? command : new byte[0];
        getData(true, true);
    }

    public synchronized void setDomain(byte[] domain) {
        this.domain = domain != null ? domain : new byte[0];
        getData(true, true);
    }

    public synchronized void setSecurePort(int securePort) {
        this.securePort = securePort;
        this.dataPkg = null;
    }

    public synchronized void setUdpPort(int port) {
        this.udpPort = port;
        this.dataPkg = null;
    }

    @Override
    public boolean isLocal() {
        return local;
    }

    @Override
    public void setLocal(boolean local) {
        this.local = local;
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        int length = in.readInt();
        byte[] message = new byte[length];
        in.readFully(message);
        getMember(message, this);

    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        byte[] data = this.getData();
        out.writeInt(data.length);
        out.write(data);
    }

}