AbstractReplicatedMap.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.tipis;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelException.FaultyMember;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Heartbeat;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcCallback;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

/**
 * @param <K> The type of Key
 * @param <V> The type of Value
 */
public abstract class AbstractReplicatedMap<K, V>
        implements Map<K,V>, Serializable, RpcCallback, ChannelListener, MembershipListener, Heartbeat {

    private static final long serialVersionUID = 1L;

    protected static final StringManager sm = StringManager.getManager(AbstractReplicatedMap.class);

    private final Log log = LogFactory.getLog(AbstractReplicatedMap.class); // must not be static

    /**
     * The default initial capacity - MUST be a power of two.
     */
    public static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * The load factor used when none specified in constructor.
     **/
    public static final float DEFAULT_LOAD_FACTOR = 0.75f;


    // ------------------------------------------------------------------------------
    // INSTANCE VARIABLES
    // ------------------------------------------------------------------------------
    protected final ConcurrentMap<K,MapEntry<K,V>> innerMap;

    protected abstract int getStateMessageType();

    protected abstract int getReplicateMessageType();


    /**
     * Timeout for RPC messages, how long we will wait for a reply
     */
    protected transient long rpcTimeout = 5000;
    /**
     * Reference to the channel for sending messages
     */
    protected transient Channel channel;
    /**
     * The RpcChannel to send RPC messages through
     */
    protected transient RpcChannel rpcChannel;
    /**
     * The Map context name makes this map unique, this allows us to have more than one map shared through one channel
     */
    protected transient byte[] mapContextName;
    /**
     * Has the state been transferred
     */
    protected transient boolean stateTransferred = false;
    /**
     * Simple lock object for transfers
     */
    protected final transient Object stateMutex = new Object();
    /**
     * A list of members in our map
     */
    protected final transient HashMap<Member,Long> mapMembers = new HashMap<>();
    /**
     * Our default send options
     */
    protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
    /**
     * The owner of this map, ala a SessionManager for example
     */
    protected transient MapOwner mapOwner;
    /**
     * External class loaders if serialization and deserialization is to be performed successfully.
     */
    protected transient ClassLoader[] externalLoaders;

    /**
     * The node we are currently backing up data to, this index will rotate on a round robin basis
     */
    protected transient int currentNode = 0;

    /**
     * Since the map keeps internal membership this is the timeout for a ping message to be responded to If a remote map
     * doesn't respond within this timeframe, its considered dead.
     */
    protected transient long accessTimeout = 5000;

    /**
     * Readable string of the mapContextName value
     */
    protected transient String mapname = "";

    /**
     * State of this map
     */
    private transient volatile State state = State.NEW;

    // ------------------------------------------------------------------------------
    // map owner interface
    // ------------------------------------------------------------------------------

    public interface MapOwner {
        void objectMadePrimary(Object key, Object value);
    }

    // ------------------------------------------------------------------------------
    // CONSTRUCTORS
    // ------------------------------------------------------------------------------

    /**
     * Creates a new map.
     *
     * @param owner              The map owner
     * @param channel            The channel to use for communication
     * @param timeout            long - timeout for RPC messages
     * @param mapContextName     String - unique name for this map, to allow multiple maps per channel
     * @param initialCapacity    int - the size of this map, see HashMap
     * @param loadFactor         float - load factor, see HashMap
     * @param channelSendOptions Send options
     * @param cls                - a list of classloaders to be used for deserialization of objects.
     * @param terminate          - Flag for whether to terminate this map that failed to start.
     */
    public AbstractReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName,
            int initialCapacity, float loadFactor, int channelSendOptions, ClassLoader[] cls, boolean terminate) {
        innerMap = new ConcurrentHashMap<>(initialCapacity, loadFactor, 15);
        init(owner, channel, mapContextName, timeout, channelSendOptions, cls, terminate);

    }

    /**
     * Helper methods, wraps a single member in an array
     *
     * @param m Member
     *
     * @return Member[]
     */
    protected Member[] wrap(Member m) {
        if (m == null) {
            return new Member[0];
        } else {
            return new Member[] { m };
        }
    }

    /**
     * Initializes the map by creating the RPC channel, registering itself as a channel listener This method is also
     * responsible for initiating the state transfer
     *
     * @param owner              Object
     * @param channel            Channel
     * @param mapContextName     String
     * @param timeout            long
     * @param channelSendOptions int
     * @param cls                ClassLoader[]
     * @param terminate          - Flag for whether to terminate this map that failed to start.
     */
    protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,
            ClassLoader[] cls, boolean terminate) {
        long start = System.currentTimeMillis();
        if (log.isInfoEnabled()) {
            log.info(sm.getString("abstractReplicatedMap.init.start", mapContextName));
        }
        this.mapOwner = owner;
        this.externalLoaders = cls;
        this.channelSendOptions = channelSendOptions;
        this.channel = channel;
        this.rpcTimeout = timeout;

        this.mapname = mapContextName;
        // unique context is more efficient if it is stored as bytes
        this.mapContextName = mapContextName.getBytes(StandardCharsets.ISO_8859_1);
        if (log.isTraceEnabled()) {
            log.trace(
                    "Created Lazy Map with name:" + mapContextName + ", bytes:" + Arrays.toString(this.mapContextName));
        }

        // create an rpc channel and add the map as a listener
        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
        // add this map as a message listener
        this.channel.addChannelListener(this);
        // listen for membership notifications
        this.channel.addMembershipListener(this);

        try {
            // broadcast our map, this just notifies other members of our existence
            broadcast(MapMessage.MSG_INIT, true);
            // transfer state from another map
            transferState();
            // state is transferred, we are ready for messaging
            broadcast(MapMessage.MSG_START, true);
        } catch (ChannelException x) {
            log.warn(sm.getString("abstractReplicatedMap.unableSend.startMessage"));
            if (terminate) {
                breakdown();
                throw new RuntimeException(sm.getString("abstractReplicatedMap.unableStart"), x);
            }
        }
        this.state = State.INITIALIZED;
        long complete = System.currentTimeMillis() - start;
        if (log.isInfoEnabled()) {
            log.info(sm.getString("abstractReplicatedMap.init.completed", mapContextName, Long.toString(complete)));
        }
    }


    /**
     * Sends a ping out to all the members in the cluster, not just map members that this map is alive.
     *
     * @param timeout long
     *
     * @throws ChannelException Send error
     */
    protected void ping(long timeout) throws ChannelException {
        MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PING, false, null, null, null,
                channel.getLocalMember(false), null);
        if (channel.getMembers().length > 0) {
            try {
                // send a ping, wait for all nodes to reply
                Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions),
                        (int) accessTimeout);
                for (Response response : resp) {
                    MapMessage mapMsg = (MapMessage) response.getMessage();
                    try {
                        mapMsg.deserialize(getExternalLoaders());
                        Member member = response.getSource();
                        State state = (State) mapMsg.getValue();
                        if (state.isAvailable()) {
                            memberAlive(member);
                        } else if (state == State.STATETRANSFERRED) {
                            synchronized (mapMembers) {
                                if (log.isInfoEnabled()) {
                                    log.info(sm.getString("abstractReplicatedMap.ping.stateTransferredMember", member));
                                }
                                if (mapMembers.containsKey(member)) {
                                    mapMembers.put(member, Long.valueOf(System.currentTimeMillis()));
                                }
                            }
                        } else {
                            if (log.isInfoEnabled()) {
                                log.info(sm.getString("abstractReplicatedMap.mapMember.unavailable", member));
                            }
                        }
                    } catch (ClassNotFoundException | IOException e) {
                        log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"), e);
                    }
                }
            } catch (ChannelException ce) {
                // Handle known failed members
                FaultyMember[] faultyMembers = ce.getFaultyMembers();
                for (FaultyMember faultyMember : faultyMembers) {
                    memberDisappeared(faultyMember.getMember());
                }
                throw ce;
            }
        }
        // update our map of members, expire some if we didn't receive a ping back
        synchronized (mapMembers) {
            Member[] members = mapMembers.keySet().toArray(new Member[0]);
            long now = System.currentTimeMillis();
            for (Member member : members) {
                long access = mapMembers.get(member).longValue();
                if ((now - access) > timeout) {
                    log.warn(sm.getString("abstractReplicatedMap.ping.timeout", member, mapname));
                    memberDisappeared(member);
                }
            }
        } // synch
    }

    /**
     * We have received a member alive notification
     *
     * @param member Member
     */
    protected void memberAlive(Member member) {
        mapMemberAdded(member);
        synchronized (mapMembers) {
            mapMembers.put(member, Long.valueOf(System.currentTimeMillis()));
        }
    }

    /**
     * Helper method to broadcast a message to all members in a channel
     *
     * @param msgtype int
     * @param rpc     boolean
     *
     * @throws ChannelException Send error
     */
    protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
        Member[] members = channel.getMembers();
        // No destination.
        if (members.length == 0) {
            return;
        }
        // send out a map membership message, only wait for the first reply
        MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null,
                channel.getLocalMember(false), null);
        if (rpc) {
            Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout);
            if (resp.length > 0) {
                for (Response response : resp) {
                    mapMemberAdded(response.getSource());
                    messageReceived(response.getMessage(), response.getSource());
                }
            } else {
                log.warn(sm.getString("abstractReplicatedMap.broadcast.noReplies"));
            }
        } else {
            channel.send(channel.getMembers(), msg, channelSendOptions);
        }
    }

    public void breakdown() {
        this.state = State.DESTROYED;
        if (this.rpcChannel != null) {
            this.rpcChannel.breakdown();
        }
        if (this.channel != null) {
            try {
                broadcast(MapMessage.MSG_STOP, false);
            } catch (Exception ignore) {
            }
            // cleanup
            this.channel.removeChannelListener(this);
            this.channel.removeMembershipListener(this);
        }
        this.rpcChannel = null;
        this.channel = null;
        synchronized (mapMembers) {
            this.mapMembers.clear();
        }
        innerMap.clear();
        this.stateTransferred = false;
        this.externalLoaders = null;
    }

    @Override
    public int hashCode() {
        return Arrays.hashCode(this.mapContextName);
    }

    @Override
    public boolean equals(Object o) {
        if (!(o instanceof AbstractReplicatedMap)) {
            return false;
        }
        if (!(o.getClass().equals(this.getClass()))) {
            return false;
        }
        @SuppressWarnings("unchecked")
        AbstractReplicatedMap<K,V> other = (AbstractReplicatedMap<K,V>) o;
        return Arrays.equals(mapContextName, other.mapContextName);
    }

    // ------------------------------------------------------------------------------
    // GROUP COM INTERFACES
    // ------------------------------------------------------------------------------
    public Member[] getMapMembers(HashMap<Member,Long> members) {
        return members.keySet().toArray(new Member[0]);
    }

    public Member[] getMapMembers() {
        synchronized (mapMembers) {
            return getMapMembers(mapMembers);
        }
    }

    public Member[] getMapMembersExcl(Member[] exclude) {
        if (exclude == null) {
            return null;
        }
        synchronized (mapMembers) {
            @SuppressWarnings("unchecked") // mapMembers has the correct type
            HashMap<Member,Long> list = (HashMap<Member,Long>) mapMembers.clone();
            for (Member member : exclude) {
                list.remove(member);
            }
            return getMapMembers(list);
        }
    }


    /**
     * Replicates any changes to the object since the last time The object has to be primary, ie, if the object is a
     * proxy or a backup, it will not be replicated<br>
     *
     * @param key      The object to replicate
     * @param complete - if set to true, the object is replicated to its backup if set to false, only objects that
     *                     implement ReplicatedMapEntry and the isDirty() returns true will be replicated
     */
    public void replicate(Object key, boolean complete) {
        if (log.isTraceEnabled()) {
            log.trace("Replicate invoked on key:" + key);
        }
        MapEntry<K,V> entry = innerMap.get(key);
        if (entry == null) {
            return;
        }
        if (!entry.isSerializable()) {
            return;
        }
        if (entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) {
            // check to see if we need to replicate this object isDirty()||complete || isAccessReplicate()
            ReplicatedMapEntry rentry = null;
            if (entry.getValue() instanceof ReplicatedMapEntry) {
                rentry = (ReplicatedMapEntry) entry.getValue();
            }
            boolean isDirty = rentry != null && rentry.isDirty();
            boolean isAccess = rentry != null && rentry.isAccessReplicate();
            boolean repl = complete || isDirty || isAccess;

            if (!repl) {
                if (log.isTraceEnabled()) {
                    log.trace("Not replicating:" + key + ", no change made");
                }

                return;
            }
            // check to see if the message is diffable
            MapMessage msg = null;
            if (rentry != null && rentry.isDiffable() && (isDirty || complete)) {
                rentry.lock();
                try {
                    // construct a diff message
                    msg = new MapMessage(mapContextName, getReplicateMessageType(), true, (Serializable) entry.getKey(),
                            null, rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes());
                    rentry.resetDiff();
                } catch (IOException x) {
                    log.error(sm.getString("abstractReplicatedMap.unable.diffObject"), x);
                } finally {
                    rentry.unlock();
                }
            }
            if (msg == null && complete) {
                // construct a complete
                msg = new MapMessage(mapContextName, getReplicateMessageType(), false, (Serializable) entry.getKey(),
                        (Serializable) entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes());
            }
            if (msg == null) {
                // construct a access message
                msg = new MapMessage(mapContextName, MapMessage.MSG_ACCESS, false, (Serializable) entry.getKey(), null,
                        null, entry.getPrimary(), entry.getBackupNodes());
            }
            try {
                if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) {
                    if (rentry != null) {
                        rentry.setLastTimeReplicated(System.currentTimeMillis());
                    }
                    channel.send(entry.getBackupNodes(), msg, channelSendOptions);
                }
            } catch (ChannelException x) {
                log.error(sm.getString("abstractReplicatedMap.unable.replicate"), x);
            }
        } // end if

    }

    /**
     * This can be invoked by a periodic thread to replicate out any changes. For maps that don't store objects that
     * implement ReplicatedMapEntry, this method should be used infrequently to avoid large amounts of data transfer
     *
     * @param complete boolean
     */
    public void replicate(boolean complete) {
        for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
            replicate(e.getKey(), complete);
        }
    }

    public void transferState() {
        try {
            Member[] members = getMapMembers();
            Member backup = members.length > 0 ? members[0] : null;
            if (backup != null) {
                MapMessage msg =
                        new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null);
                Response[] resp = rpcChannel.send(new Member[] { backup }, msg, RpcChannel.FIRST_REPLY,
                        channelSendOptions, rpcTimeout);
                if (resp.length > 0) {
                    synchronized (stateMutex) {
                        msg = (MapMessage) resp[0].getMessage();
                        msg.deserialize(getExternalLoaders());
                        ArrayList<?> list = (ArrayList<?>) msg.getValue();
                        for (Object o : list) {
                            messageReceived((Serializable) o, resp[0].getSource());
                        } // for
                    }
                    stateTransferred = true;
                } else {
                    log.warn(sm.getString("abstractReplicatedMap.transferState.noReplies"));
                }
            }
        } catch (ChannelException | ClassNotFoundException | IOException x) {
            log.error(sm.getString("abstractReplicatedMap.unable.transferState"), x);
        }
        this.state = State.STATETRANSFERRED;
    }

    @Override
    public Serializable replyRequest(Serializable msg, final Member sender) {
        if (!(msg instanceof MapMessage)) {
            return null;
        }
        MapMessage mapmsg = (MapMessage) msg;

        // map init request
        if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
            mapmsg.setPrimary(channel.getLocalMember(false));
            return mapmsg;
        }

        // map start request
        if (mapmsg.getMsgType() == MapMessage.MSG_START) {
            mapmsg.setPrimary(channel.getLocalMember(false));
            mapMemberAdded(sender);
            return mapmsg;
        }

        // backup request
        if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) {
            MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
            if (entry == null || (!entry.isSerializable())) {
                return null;
            }
            mapmsg.setValue((Serializable) entry.getValue());
            return mapmsg;
        }

        // state transfer request
        if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) {
            synchronized (stateMutex) { // make sure we don't do two things at the same time
                ArrayList<MapMessage> list = new ArrayList<>();
                for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
                    MapEntry<K,V> entry = innerMap.get(e.getKey());
                    if (entry != null && entry.isSerializable()) {
                        boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY);
                        MapMessage me =
                                new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false,
                                        (Serializable) entry.getKey(), copy ? (Serializable) entry.getValue() : null,
                                        null, entry.getPrimary(), entry.getBackupNodes());
                        list.add(me);
                    }
                }
                mapmsg.setValue(list);
                return mapmsg;

            } // synchronized
        }

        // ping
        if (mapmsg.getMsgType() == MapMessage.MSG_PING) {
            mapmsg.setValue(state);
            mapmsg.setPrimary(channel.getLocalMember(false));
            return mapmsg;
        }

        return null;

    }

    @Override
    public void leftOver(Serializable msg, Member sender) {
        // left over membership messages
        if (!(msg instanceof MapMessage)) {
            return;
        }

        MapMessage mapmsg = (MapMessage) msg;
        try {
            mapmsg.deserialize(getExternalLoaders());
            if (mapmsg.getMsgType() == MapMessage.MSG_START) {
                mapMemberAdded(mapmsg.getPrimary());
            } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
                memberAlive(mapmsg.getPrimary());
            } else if (mapmsg.getMsgType() == MapMessage.MSG_PING) {
                Member member = mapmsg.getPrimary();
                if (log.isInfoEnabled()) {
                    log.info(sm.getString("abstractReplicatedMap.leftOver.pingMsg", member));
                }
                State state = (State) mapmsg.getValue();
                if (state.isAvailable()) {
                    memberAlive(member);
                }
            } else {
                // other messages are ignored.
                if (log.isInfoEnabled()) {
                    log.info(sm.getString("abstractReplicatedMap.leftOver.ignored", mapmsg.getTypeDesc()));
                }
            }
        } catch (IOException | ClassNotFoundException x) {
            log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"), x);
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public void messageReceived(Serializable msg, Member sender) {
        if (!(msg instanceof MapMessage)) {
            return;
        }

        MapMessage mapmsg = (MapMessage) msg;
        if (log.isTraceEnabled()) {
            log.trace("Map[" + mapname + "] received message:" + mapmsg);
        }

        try {
            mapmsg.deserialize(getExternalLoaders());
        } catch (IOException | ClassNotFoundException x) {
            log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"), x);
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg);
        }
        if (mapmsg.getMsgType() == MapMessage.MSG_START) {
            mapMemberAdded(mapmsg.getPrimary());
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
            memberDisappeared(mapmsg.getPrimary());
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
            MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
            if (entry == null) {
                entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue());
                MapEntry<K,V> old = innerMap.putIfAbsent(entry.getKey(), entry);
                if (old != null) {
                    entry = old;
                }
            }
            entry.setProxy(true);
            entry.setBackup(false);
            entry.setCopy(false);
            entry.setBackupNodes(mapmsg.getBackupNodes());
            entry.setPrimary(mapmsg.getPrimary());
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
            innerMap.remove(mapmsg.getKey());
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
            MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
            if (entry == null) {
                entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue());
                entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                entry.setProxy(false);
                entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                entry.setPrimary(mapmsg.getPrimary());
                if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                    ((ReplicatedMapEntry) mapmsg.getValue()).setOwner(getMapOwner());
                }
            } else {
                entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                entry.setProxy(false);
                entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
                entry.setBackupNodes(mapmsg.getBackupNodes());
                entry.setPrimary(mapmsg.getPrimary());
                if (entry.getValue() instanceof ReplicatedMapEntry) {
                    ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
                    if (mapmsg.isDiff()) {
                        diff.lock();
                        try {
                            diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
                        } catch (Exception x) {
                            log.error(sm.getString("abstractReplicatedMap.unableApply.diff", entry.getKey()), x);
                        } finally {
                            diff.unlock();
                        }
                    } else {
                        if (mapmsg.getValue() != null) {
                            if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                                ReplicatedMapEntry re = (ReplicatedMapEntry) mapmsg.getValue();
                                re.setOwner(getMapOwner());
                                entry.setValue((V) re);
                            } else {
                                entry.setValue((V) mapmsg.getValue());
                            }
                        } else {
                            ((ReplicatedMapEntry) entry.getValue()).setOwner(getMapOwner());
                        }
                    } // end if
                } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
                    ReplicatedMapEntry re = (ReplicatedMapEntry) mapmsg.getValue();
                    re.setOwner(getMapOwner());
                    entry.setValue((V) re);
                } else {
                    if (mapmsg.getValue() != null) {
                        entry.setValue((V) mapmsg.getValue());
                    }
                } // end if
            } // end if
            innerMap.put(entry.getKey(), entry);
        } // end if

        if (mapmsg.getMsgType() == MapMessage.MSG_ACCESS) {
            MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
            if (entry != null) {
                entry.setBackupNodes(mapmsg.getBackupNodes());
                entry.setPrimary(mapmsg.getPrimary());
                if (entry.getValue() instanceof ReplicatedMapEntry) {
                    ((ReplicatedMapEntry) entry.getValue()).accessEntry();
                }
            }
        }

        if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) {
            MapEntry<K,V> entry = innerMap.get(mapmsg.getKey());
            if (entry != null) {
                entry.setBackupNodes(mapmsg.getBackupNodes());
                entry.setPrimary(mapmsg.getPrimary());
                if (entry.getValue() instanceof ReplicatedMapEntry) {
                    ((ReplicatedMapEntry) entry.getValue()).accessEntry();
                }
            }
        }
    }

    @Override
    public boolean accept(Serializable msg, Member sender) {
        boolean result = false;
        if (msg instanceof MapMessage) {
            if (log.isTraceEnabled()) {
                log.trace("Map[" + mapname + "] accepting...." + msg);
            }
            result = Arrays.equals(mapContextName, ((MapMessage) msg).getMapId());
            if (log.isTraceEnabled()) {
                log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg);
            }
        }
        return result;
    }

    public void mapMemberAdded(Member member) {
        if (member.equals(getChannel().getLocalMember(false))) {
            return;
        }
        boolean memberAdded = false;
        // select a backup node if we don't have one
        Member mapMember = getChannel().getMember(member);
        if (mapMember == null) {
            log.warn(sm.getString("abstractReplicatedMap.mapMemberAdded.nullMember", member));
            return;
        }
        synchronized (mapMembers) {
            if (!mapMembers.containsKey(mapMember)) {
                if (log.isInfoEnabled()) {
                    log.info(sm.getString("abstractReplicatedMap.mapMemberAdded.added", mapMember));
                }
                mapMembers.put(mapMember, Long.valueOf(System.currentTimeMillis()));
                memberAdded = true;
            }
        }
        if (memberAdded) {
            synchronized (stateMutex) {
                for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
                    MapEntry<K,V> entry = innerMap.get(e.getKey());
                    if (entry == null) {
                        continue;
                    }
                    if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
                        try {
                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                            entry.setBackupNodes(backup);
                            entry.setPrimary(channel.getLocalMember(false));
                        } catch (ChannelException x) {
                            log.error(sm.getString("abstractReplicatedMap.unableSelect.backup"), x);
                        } // catch
                    } // end if
                } // while
            } // synchronized
        } // end if
    }

    public boolean inSet(Member m, Member[] set) {
        if (set == null) {
            return false;
        }
        boolean result = false;
        for (Member member : set) {
            if (m.equals(member)) {
                result = true;
                break;
            }
        }
        return result;
    }

    public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
        List<Member> result = new ArrayList<>();
        for (Member member : set) {
            boolean include = true;
            for (Member mbr : mbrs) {
                if (mbr.equals(member)) {
                    include = false;
                    break;
                }
            }
            if (include) {
                result.add(member);
            }
        }
        return result.toArray(new Member[0]);
    }

    @Override
    public void memberAdded(Member member) {
        // do nothing
    }

    @Override
    public void memberDisappeared(Member member) {
        boolean removed = false;
        synchronized (mapMembers) {
            removed = (mapMembers.remove(member) != null);
            if (!removed) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("replicatedMap.member.disappeared.unknown", member));
                }
                return; // the member was not part of our map.
            }
        }
        if (log.isInfoEnabled()) {
            log.info(sm.getString("replicatedMap.member.disappeared", member));
        }
        long start = System.currentTimeMillis();
        Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry<K,MapEntry<K,V>> e = i.next();
            MapEntry<K,V> entry = innerMap.get(e.getKey());
            if (entry == null) {
                continue;
            }
            if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("abstractReplicatedMap.newBackup"));
                }
                try {
                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                    entry.setBackupNodes(backup);
                    entry.setPrimary(channel.getLocalMember(false));
                } catch (ChannelException x) {
                    log.error(sm.getString("abstractReplicatedMap.unable.relocate", entry.getKey()), x);
                }
            } else if (member.equals(entry.getPrimary())) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("abstractReplicatedMap.primaryDisappeared"));
                }
                entry.setPrimary(null);
            } // end if

            if (entry.isProxy() && entry.getPrimary() == null && entry.getBackupNodes() != null &&
                    entry.getBackupNodes().length == 1 && entry.getBackupNodes()[0].equals(member)) {
                // remove proxies that have no backup nor primaries
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("abstractReplicatedMap.removeOrphan"));
                }
                i.remove();
            } else if (entry.getPrimary() == null && entry.isBackup() && entry.getBackupNodes() != null &&
                    entry.getBackupNodes().length == 1 &&
                    entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("abstractReplicatedMap.newPrimary"));
                    }
                    entry.setPrimary(channel.getLocalMember(false));
                    entry.setBackup(false);
                    entry.setProxy(false);
                    entry.setCopy(false);
                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                    entry.setBackupNodes(backup);
                    if (mapOwner != null) {
                        mapOwner.objectMadePrimary(entry.getKey(), entry.getValue());
                    }

                } catch (ChannelException x) {
                    log.error(sm.getString("abstractReplicatedMap.unable.relocate", entry.getKey()), x);
                }
            }

        } // while
        long complete = System.currentTimeMillis() - start;
        if (log.isInfoEnabled()) {
            log.info(sm.getString("abstractReplicatedMap.relocate.complete", Long.toString(complete)));
        }
    }

    public int getNextBackupIndex() {
        synchronized (mapMembers) {
            int size = mapMembers.size();
            if (mapMembers.size() == 0) {
                return -1;
            }
            int node = currentNode++;
            if (node >= size) {
                node = 0;
                currentNode = 1;
            }
            return node;
        }
    }

    public Member getNextBackupNode() {
        Member[] members = getMapMembers();
        int node = getNextBackupIndex();
        if (members.length == 0 || node == -1) {
            return null;
        }
        if (node >= members.length) {
            node = 0;
        }
        return members[node];
    }

    /**
     * Publish info about a map pair (key/value) to other nodes in the cluster.
     *
     * @param key   Object
     * @param value Object
     *
     * @return Member - the backup node
     *
     * @throws ChannelException Cluster error
     */
    protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;

    @Override
    public void heartbeat() {
        try {
            if (this.state.isAvailable()) {
                ping(accessTimeout);
            }
        } catch (Exception x) {
            log.error(sm.getString("abstractReplicatedMap.heartbeat.failed"), x);
        }
    }

    // ------------------------------------------------------------------------------
    // METHODS TO OVERRIDE
    // ------------------------------------------------------------------------------

    @Override
    public V remove(Object key) {
        return remove(key, true);
    }

    public V remove(Object key, boolean notify) {
        MapEntry<K,V> entry = innerMap.remove(key);

        try {
            if (getMapMembers().length > 0 && notify) {
                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key,
                        null, null, null, null);
                getChannel().send(getMapMembers(), msg, getChannelSendOptions());
            }
        } catch (ChannelException x) {
            log.error(sm.getString("abstractReplicatedMap.unable.remove"), x);
        }
        return entry != null ? entry.getValue() : null;
    }

    public MapEntry<K,V> getInternal(Object key) {
        return innerMap.get(key);
    }

    @SuppressWarnings("unchecked")
    @Override
    public V get(Object key) {
        MapEntry<K,V> entry = innerMap.get(key);
        if (log.isTraceEnabled()) {
            log.trace("Requesting id:" + key + " entry:" + entry);
        }
        if (entry == null) {
            return null;
        }
        if (!entry.isPrimary()) {
            // if the message is not primary, we need to retrieve the latest value
            try {
                Member[] backup = null;
                MapMessage msg = null;
                if (entry.isBackup()) {
                    // select a new backup node
                    backup = publishEntryInfo(key, entry.getValue());
                } else if (entry.isProxy()) {
                    // make sure we don't retrieve from ourselves
                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key,
                            null, null, null, null);
                    Response[] resp = getRpcChannel().send(entry.getBackupNodes(), msg, RpcChannel.FIRST_REPLY,
                            getChannelSendOptions(), getRpcTimeout());
                    if (resp == null || resp.length == 0 || resp[0].getMessage() == null) {
                        // no responses
                        log.warn(sm.getString("abstractReplicatedMap.unable.retrieve", key));
                        return null;
                    }
                    msg = (MapMessage) resp[0].getMessage();
                    msg.deserialize(getExternalLoaders());
                    backup = entry.getBackupNodes();
                    if (msg.getValue() != null) {
                        entry.setValue((V) msg.getValue());
                    }

                    // notify member
                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false,
                            (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup);
                    if (backup != null && backup.length > 0) {
                        getChannel().send(backup, msg, getChannelSendOptions());
                    }

                    // invalidate the previous primary
                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null,
                            null, channel.getLocalMember(false), backup);
                    Member[] dest = getMapMembersExcl(backup);
                    if (dest != null && dest.length > 0) {
                        getChannel().send(dest, msg, getChannelSendOptions());
                    }
                    if (entry.getValue() instanceof ReplicatedMapEntry) {
                        ReplicatedMapEntry val = (ReplicatedMapEntry) entry.getValue();
                        val.setOwner(getMapOwner());
                    }
                } else if (entry.isCopy()) {
                    backup = getMapMembers();
                    if (backup.length > 0) {
                        msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false,
                                (Serializable) key, null, null, channel.getLocalMember(false), backup);
                        getChannel().send(backup, msg, getChannelSendOptions());
                    }
                }
                entry.setPrimary(channel.getLocalMember(false));
                entry.setBackupNodes(backup);
                entry.setBackup(false);
                entry.setProxy(false);
                entry.setCopy(false);
                if (getMapOwner() != null) {
                    getMapOwner().objectMadePrimary(key, entry.getValue());
                }

            } catch (RuntimeException | ChannelException | ClassNotFoundException | IOException x) {
                log.error(sm.getString("abstractReplicatedMap.unable.get"), x);
                return null;
            }
        }
        if (log.isTraceEnabled()) {
            log.trace("Requesting id:" + key + " result:" + entry.getValue());
        }
        return entry.getValue();
    }


    protected void printMap(String header) {
        try {
            System.out.println("\nDEBUG MAP:" + header);
            System.out.println(
                    "Map[" + new String(mapContextName, StandardCharsets.ISO_8859_1) + ", Map Size:" + innerMap.size());
            Member[] mbrs = getMapMembers();
            for (int i = 0; i < mbrs.length; i++) {
                System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName());
            }
            Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator();
            int cnt = 0;

            while (i.hasNext()) {
                Map.Entry<?,?> e = i.next();
                System.out.println((++cnt) + ". " + innerMap.get(e.getKey()));
            }
            System.out.println("EndMap]\n\n");
        } catch (Exception ignore) {
            if (log.isTraceEnabled()) {
                log.trace("Error printing map", ignore);
            }
        }
    }

    /**
     * Returns true if the key has an entry in the map. The entry can be a proxy or a backup entry, invoking
     * <code>get(key)</code> will make this entry primary for the group
     *
     * @param key Object
     *
     * @return boolean
     */
    @Override
    public boolean containsKey(Object key) {
        return innerMap.containsKey(key);
    }

    @Override
    public V put(K key, V value) {
        return put(key, value, true);
    }

    public V put(K key, V value, boolean notify) {
        MapEntry<K,V> entry = new MapEntry<>(key, value);
        entry.setBackup(false);
        entry.setProxy(false);
        entry.setCopy(false);
        entry.setPrimary(channel.getLocalMember(false));

        V old = null;

        // make sure that any old values get removed
        if (containsKey(key)) {
            old = remove(key);
        }
        try {
            if (notify) {
                Member[] backup = publishEntryInfo(key, value);
                entry.setBackupNodes(backup);
            }
        } catch (ChannelException x) {
            log.error(sm.getString("abstractReplicatedMap.unable.put"), x);
        }
        innerMap.put(key, entry);
        return old;
    }


    @Override
    public void putAll(Map<? extends K,? extends V> m) {
        for (Entry<? extends K,? extends V> value : m.entrySet()) {
            @SuppressWarnings("unchecked")
            Entry<K,V> entry = (Entry<K,V>) value;
            put(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public void clear() {
        clear(true);
    }

    public void clear(boolean notify) {
        if (notify) {
            // only delete active keys
            for (K k : keySet()) {
                remove(k);
            }
        } else {
            innerMap.clear();
        }
    }

    @Override
    public boolean containsValue(Object value) {
        Objects.requireNonNull(value);
        for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
            MapEntry<K,V> entry = innerMap.get(e.getKey());
            if (entry != null && entry.isActive() && value.equals(entry.getValue())) {
                return true;
            }
        }
        return false;
    }

    /**
     * Returns the entire contents of the map Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object
     * containing all the information about the object.
     *
     * @return Set
     */
    public Set<Map.Entry<K,MapEntry<K,V>>> entrySetFull() {
        return innerMap.entrySet();
    }

    public Set<K> keySetFull() {
        return innerMap.keySet();
    }

    public int sizeFull() {
        return innerMap.size();
    }

    @Override
    public Set<Map.Entry<K,V>> entrySet() {
        LinkedHashSet<Map.Entry<K,V>> set = new LinkedHashSet<>(innerMap.size());
        for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
            Object key = e.getKey();
            MapEntry<K,V> entry = innerMap.get(key);
            if (entry != null && entry.isActive()) {
                set.add(entry);
            }
        }
        return Collections.unmodifiableSet(set);
    }

    @Override
    public Set<K> keySet() {
        // todo implement
        // should only return keys where this is active.
        LinkedHashSet<K> set = new LinkedHashSet<>(innerMap.size());
        for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
            K key = e.getKey();
            MapEntry<K,V> entry = innerMap.get(key);
            if (entry != null && entry.isActive()) {
                set.add(key);
            }
        }
        return Collections.unmodifiableSet(set);

    }


    @Override
    public int size() {
        // todo, implement a counter variable instead
        // only count active members in this node
        int counter = 0;
        Iterator<Map.Entry<K,MapEntry<K,V>>> it = innerMap.entrySet().iterator();
        while (it != null && it.hasNext()) {
            Map.Entry<?,?> e = it.next();
            if (e != null) {
                MapEntry<K,V> entry = innerMap.get(e.getKey());
                if (entry != null && entry.isActive() && entry.getValue() != null) {
                    counter++;
                }
            }
        }
        return counter;
    }

    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    @Override
    public Collection<V> values() {
        List<V> values = new ArrayList<>();
        for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
            MapEntry<K,V> entry = innerMap.get(e.getKey());
            if (entry != null && entry.isActive() && entry.getValue() != null) {
                values.add(entry.getValue());
            }
        }
        return Collections.unmodifiableCollection(values);
    }


    // ------------------------------------------------------------------------------
    // Map Entry class
    // ------------------------------------------------------------------------------
    public static class MapEntry<K, V> implements Map.Entry<K,V> {
        private boolean backup;
        private boolean proxy;
        private boolean copy;
        private Member[] backupNodes;
        private Member primary;
        private K key;
        private V value;

        public MapEntry(K key, V value) {
            setKey(key);
            setValue(value);

        }

        public boolean isKeySerializable() {
            return (key == null) || (key instanceof Serializable);
        }

        public boolean isValueSerializable() {
            return (value == null) || (value instanceof Serializable);
        }

        public boolean isSerializable() {
            return isKeySerializable() && isValueSerializable();
        }

        public boolean isBackup() {
            return backup;
        }

        public void setBackup(boolean backup) {
            this.backup = backup;
        }

        public boolean isProxy() {
            return proxy;
        }

        public boolean isPrimary() {
            return (!proxy && !backup && !copy);
        }

        public boolean isActive() {
            return !proxy;
        }

        public void setProxy(boolean proxy) {
            this.proxy = proxy;
        }

        public boolean isCopy() {
            return copy;
        }

        public void setCopy(boolean copy) {
            this.copy = copy;
        }

        public boolean isDiffable() {
            return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry) value).isDiffable();
        }

        public void setBackupNodes(Member[] nodes) {
            this.backupNodes = nodes;
        }

        public Member[] getBackupNodes() {
            return backupNodes;
        }

        public void setPrimary(Member m) {
            primary = m;
        }

        public Member getPrimary() {
            return primary;
        }

        @Override
        public V getValue() {
            return value;
        }

        @Override
        public V setValue(V value) {
            V old = this.value;
            this.value = value;
            return old;
        }

        @Override
        public K getKey() {
            return key;
        }

        public K setKey(K key) {
            K old = this.key;
            this.key = key;
            return old;
        }

        @Override
        public int hashCode() {
            return key.hashCode();
        }

        @Override
        public boolean equals(Object o) {
            return key.equals(o);
        }

        /**
         * apply a diff, or an entire object
         *
         * @param data   byte[]
         * @param offset int
         * @param length int
         * @param diff   boolean
         *
         * @throws IOException            IO error
         * @throws ClassNotFoundException Deserialization error
         */
        @SuppressWarnings("unchecked")
        public void apply(byte[] data, int offset, int length, boolean diff)
                throws IOException, ClassNotFoundException {
            if (isDiffable() && diff) {
                ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
                rentry.lock();
                try {
                    rentry.applyDiff(data, offset, length);
                } finally {
                    rentry.unlock();
                }
            } else if (length == 0) {
                value = null;
                proxy = true;
            } else {
                value = (V) XByteBuffer.deserialize(data, offset, length);
            }
        }

        @Override
        public String toString() {
            StringBuilder buf = new StringBuilder("MapEntry[key:");
            buf.append(getKey()).append("; ");
            buf.append("value:").append(getValue()).append("; ");
            buf.append("primary:").append(isPrimary()).append("; ");
            buf.append("backup:").append(isBackup()).append("; ");
            buf.append("proxy:").append(isProxy()).append(";]");
            return buf.toString();
        }

    }

    // ------------------------------------------------------------------------------
    // map message to send to and from other maps
    // ------------------------------------------------------------------------------

    public static class MapMessage implements Serializable, Cloneable {
        private static final long serialVersionUID = 1L;
        public static final int MSG_BACKUP = 1;
        public static final int MSG_RETRIEVE_BACKUP = 2;
        public static final int MSG_PROXY = 3;
        public static final int MSG_REMOVE = 4;
        public static final int MSG_STATE = 5;
        public static final int MSG_START = 6;
        public static final int MSG_STOP = 7;
        public static final int MSG_INIT = 8;
        public static final int MSG_COPY = 9;
        public static final int MSG_STATE_COPY = 10;
        public static final int MSG_ACCESS = 11;
        public static final int MSG_NOTIFY_MAPMEMBER = 12;
        public static final int MSG_PING = 13;

        private final byte[] mapId;
        private final int msgtype;
        private final boolean diff;
        private transient Serializable key;
        private transient Serializable value;
        private byte[] valuedata;
        private byte[] keydata;
        private final byte[] diffvalue;
        private final Member[] nodes;
        private Member primary;

        @Override
        public String toString() {
            StringBuilder buf = new StringBuilder("MapMessage[context=");
            buf.append(new String(mapId));
            buf.append("; type=");
            buf.append(getTypeDesc());
            buf.append("; key=");
            buf.append(key);
            buf.append("; value=");
            buf.append(value);
            buf.append(']');
            return buf.toString();
        }

        public String getTypeDesc() {
            switch (msgtype) {
                case MSG_BACKUP:
                    return "MSG_BACKUP";
                case MSG_RETRIEVE_BACKUP:
                    return "MSG_RETRIEVE_BACKUP";
                case MSG_PROXY:
                    return "MSG_PROXY";
                case MSG_REMOVE:
                    return "MSG_REMOVE";
                case MSG_STATE:
                    return "MSG_STATE";
                case MSG_START:
                    return "MSG_START";
                case MSG_STOP:
                    return "MSG_STOP";
                case MSG_INIT:
                    return "MSG_INIT";
                case MSG_STATE_COPY:
                    return "MSG_STATE_COPY";
                case MSG_COPY:
                    return "MSG_COPY";
                case MSG_ACCESS:
                    return "MSG_ACCESS";
                case MSG_NOTIFY_MAPMEMBER:
                    return "MSG_NOTIFY_MAPMEMBER";
                case MSG_PING:
                    return "MSG_PING";
                default:
                    return "UNKNOWN";
            }
        }

        public MapMessage(byte[] mapId, int msgtype, boolean diff, Serializable key, Serializable value,
                byte[] diffvalue, Member primary, Member[] nodes) {
            this.mapId = mapId;
            this.msgtype = msgtype;
            this.diff = diff;
            this.key = key;
            this.value = value;
            this.diffvalue = diffvalue;
            this.nodes = nodes;
            this.primary = primary;
            setValue(value);
            setKey(key);
        }

        public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException {
            key(cls);
            value(cls);
        }

        public int getMsgType() {
            return msgtype;
        }

        public boolean isDiff() {
            return diff;
        }

        public Serializable getKey() {
            try {
                return key(null);
            } catch (Exception x) {
                throw new RuntimeException(sm.getString("mapMessage.deserialize.error.key"), x);
            }
        }

        public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException {
            if (key != null) {
                return key;
            }
            if (keydata == null || keydata.length == 0) {
                return null;
            }
            key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls);
            keydata = null;
            return key;
        }

        public byte[] getKeyData() {
            return keydata;
        }

        public Serializable getValue() {
            try {
                return value(null);
            } catch (Exception x) {
                throw new RuntimeException(sm.getString("mapMessage.deserialize.error.value"), x);
            }
        }

        public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException {
            if (value != null) {
                return value;
            }
            if (valuedata == null || valuedata.length == 0) {
                return null;
            }
            value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls);
            valuedata = null;
            return value;
        }

        public byte[] getValueData() {
            return valuedata;
        }

        public byte[] getDiffValue() {
            return diffvalue;
        }

        public Member[] getBackupNodes() {
            return nodes;
        }

        public Member getPrimary() {
            return primary;
        }

        private void setPrimary(Member m) {
            primary = m;
        }

        public byte[] getMapId() {
            return mapId;
        }

        public void setValue(Serializable value) {
            try {
                if (value != null) {
                    valuedata = XByteBuffer.serialize(value);
                }
                this.value = value;
            } catch (IOException x) {
                throw new RuntimeException(x);
            }
        }

        public void setKey(Serializable key) {
            try {
                if (key != null) {
                    keydata = XByteBuffer.serialize(key);
                }
                this.key = key;
            } catch (IOException x) {
                throw new RuntimeException(x);
            }
        }

        @Override
        public MapMessage clone() {
            try {
                return (MapMessage) super.clone();
            } catch (CloneNotSupportedException e) {
                // Not possible
                throw new AssertionError();
            }
        }
    } // MapMessage


    public Channel getChannel() {
        return channel;
    }

    public byte[] getMapContextName() {
        return mapContextName;
    }

    public RpcChannel getRpcChannel() {
        return rpcChannel;
    }

    public long getRpcTimeout() {
        return rpcTimeout;
    }

    public Object getStateMutex() {
        return stateMutex;
    }

    public boolean isStateTransferred() {
        return stateTransferred;
    }

    public MapOwner getMapOwner() {
        return mapOwner;
    }

    public ClassLoader[] getExternalLoaders() {
        return externalLoaders;
    }

    public int getChannelSendOptions() {
        return channelSendOptions;
    }

    public long getAccessTimeout() {
        return accessTimeout;
    }

    public void setMapOwner(MapOwner mapOwner) {
        this.mapOwner = mapOwner;
    }

    public void setExternalLoaders(ClassLoader[] externalLoaders) {
        this.externalLoaders = externalLoaders;
    }

    public void setChannelSendOptions(int channelSendOptions) {
        this.channelSendOptions = channelSendOptions;
    }

    public void setAccessTimeout(long accessTimeout) {
        this.accessTimeout = accessTimeout;
    }

    private enum State {
        NEW(false),
        STATETRANSFERRED(false),
        INITIALIZED(true),
        DESTROYED(false);

        private final boolean available;

        State(boolean available) {
            this.available = available;
        }

        public boolean isAvailable() {
            return available;
        }
    }
}