ReplicatedMap.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.tipis;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelException.FaultyMember;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

/**
 * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical copy of the
 * map.<br>
 * <br>
 * This map implementation doesn't have a background thread running to replicate changes. If you do have changes without
 * invoking put/remove then you need to invoke one of the following methods:
 * <ul>
 * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li>
 * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li>
 * </ul>
 * the <code>boolean</code> value in the <code>replicate</code> method used to decide whether to only replicate objects
 * that implement the <code>ReplicatedMapEntry</code> interface or to replicate all objects. If an object doesn't
 * implement the <code>ReplicatedMapEntry</code> interface each time the object gets replicated the entire object gets
 * serialized, hence a call to <code>replicate(true)</code> will replicate all objects in this map that are using this
 * node as primary. <br>
 * <br>
 * <b>REMEMBER TO CALL <code>breakdown()</code> when you are done with the map to avoid memory leaks.</b><br>
 * <br>
 * TODO implement periodic sync/transfer thread<br>
 * TODO memberDisappeared, should do nothing except change map membership by default it relocates the primary objects
 *
 * @param <K> The type of Key
 * @param <V> The type of Value
 */
public class ReplicatedMap<K, V> extends AbstractReplicatedMap<K,V> {

    private static final long serialVersionUID = 1L;

    // Lazy init to support serialization
    private transient volatile Log log;

    // --------------------------------------------------------------------------
    // CONSTRUCTORS / DESTRUCTORS
    // --------------------------------------------------------------------------
    /**
     * Creates a new map
     *
     * @param owner           The map owner
     * @param channel         The channel to use for communication
     * @param timeout         long - timeout for RPC messages
     * @param mapContextName  String - unique name for this map, to allow multiple maps per channel
     * @param initialCapacity int - the size of this map, see HashMap
     * @param loadFactor      float - load factor, see HashMap
     * @param cls             Class loaders
     */
    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,
            float loadFactor, ClassLoader[] cls) {
        super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls,
                true);
    }

    /**
     * Creates a new map
     *
     * @param owner           The map owner
     * @param channel         The channel to use for communication
     * @param timeout         long - timeout for RPC messages
     * @param mapContextName  String - unique name for this map, to allow multiple maps per channel
     * @param initialCapacity int - the size of this map, see HashMap
     * @param cls             Class loaders
     */
    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,
            ClassLoader[] cls) {
        super(owner, channel, timeout, mapContextName, initialCapacity, DEFAULT_LOAD_FACTOR,
                Channel.SEND_OPTIONS_DEFAULT, cls, true);
    }

    /**
     * Creates a new map
     *
     * @param owner          The map owner
     * @param channel        The channel to use for communication
     * @param timeout        long - timeout for RPC messages
     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
     * @param cls            Class loaders
     */
    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) {
        super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
                Channel.SEND_OPTIONS_DEFAULT, cls, true);
    }

    /**
     * Creates a new map
     *
     * @param owner          The map owner
     * @param channel        The channel to use for communication
     * @param timeout        long - timeout for RPC messages
     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
     * @param cls            Class loaders
     * @param terminate      boolean - Flag for whether to terminate this map that failed to start.
     */
    public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls,
            boolean terminate) {
        super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR,
                Channel.SEND_OPTIONS_DEFAULT, cls, terminate);
    }

    // ------------------------------------------------------------------------------
    // METHODS TO OVERRIDE
    // ------------------------------------------------------------------------------
    @Override
    protected int getStateMessageType() {
        return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
    }

    @Override
    protected int getReplicateMessageType() {
        return AbstractReplicatedMap.MapMessage.MSG_COPY;
    }

    @Override
    protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
        if (!(key instanceof Serializable && value instanceof Serializable)) {
            return new Member[0];
        }
        // select a backup node
        Member[] backup = getMapMembers();

        if (backup == null || backup.length == 0) {
            return null;
        }

        try {
            // publish the data out to all nodes
            MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable) key,
                    (Serializable) value, null, channel.getLocalMember(false), backup);

            getChannel().send(backup, msg, getChannelSendOptions());
        } catch (ChannelException e) {
            FaultyMember[] faultyMembers = e.getFaultyMembers();
            if (faultyMembers.length == 0) {
                throw e;
            }
            List<Member> faulty = new ArrayList<>();
            for (FaultyMember faultyMember : faultyMembers) {
                if (!(faultyMember.getCause() instanceof RemoteProcessException)) {
                    faulty.add(faultyMember.getMember());
                }
            }
            Member[] realFaultyMembers = faulty.toArray(new Member[0]);
            if (realFaultyMembers.length != 0) {
                backup = excludeFromSet(realFaultyMembers, backup);
                if (backup.length == 0) {
                    throw e;
                } else {
                    if (getLog().isWarnEnabled()) {
                        getLog().warn(sm.getString("replicatedMap.unableReplicate.completely", key,
                                Arrays.toString(backup), Arrays.toString(realFaultyMembers)), e);
                    }
                }
            }
        }
        return backup;
    }

    @Override
    public void memberDisappeared(Member member) {
        boolean removed = false;
        Log log = getLog();
        synchronized (mapMembers) {
            removed = (mapMembers.remove(member) != null);
            if (!removed) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("replicatedMap.member.disappeared.unknown", member));
                }
                return; // the member was not part of our map.
            }
        }
        if (log.isInfoEnabled()) {
            log.info(sm.getString("replicatedMap.member.disappeared", member));
        }
        long start = System.currentTimeMillis();
        for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
            MapEntry<K,V> entry = innerMap.get(e.getKey());
            if (entry == null) {
                continue;
            }
            if (entry.isPrimary()) {
                try {
                    Member[] backup = getMapMembers();
                    if (backup.length > 0) {
                        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false,
                                (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup);
                        getChannel().send(backup, msg, getChannelSendOptions());
                    }
                    entry.setBackupNodes(backup);
                    entry.setPrimary(channel.getLocalMember(false));
                } catch (ChannelException x) {
                    log.error(sm.getString("replicatedMap.unable.relocate", entry.getKey()), x);
                }
            } else if (member.equals(entry.getPrimary())) {
                entry.setPrimary(null);
            }

            if (entry.getPrimary() == null && entry.isCopy() && entry.getBackupNodes() != null &&
                    entry.getBackupNodes().length > 0 &&
                    entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) {
                try {
                    entry.setPrimary(channel.getLocalMember(false));
                    entry.setBackup(false);
                    entry.setProxy(false);
                    entry.setCopy(false);
                    Member[] backup = getMapMembers();
                    if (backup.length > 0) {
                        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false,
                                (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup);
                        getChannel().send(backup, msg, getChannelSendOptions());
                    }
                    entry.setBackupNodes(backup);
                    if (mapOwner != null) {
                        mapOwner.objectMadePrimary(entry.getKey(), entry.getValue());
                    }

                } catch (ChannelException x) {
                    log.error(sm.getString("replicatedMap.unable.relocate", entry.getKey()), x);
                }
            }

        } // while
        long complete = System.currentTimeMillis() - start;
        if (log.isInfoEnabled()) {
            log.info(sm.getString("replicatedMap.relocate.complete", Long.toString(complete)));
        }
    }

    @Override
    public void mapMemberAdded(Member member) {
        if (member.equals(getChannel().getLocalMember(false))) {
            return;
        }
        boolean memberAdded = false;
        synchronized (mapMembers) {
            if (!mapMembers.containsKey(member)) {
                mapMembers.put(member, Long.valueOf(System.currentTimeMillis()));
                memberAdded = true;
            }
        }
        if (memberAdded) {
            synchronized (stateMutex) {
                Member[] backup = getMapMembers();
                for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) {
                    MapEntry<K,V> entry = innerMap.get(e.getKey());
                    if (entry == null) {
                        continue;
                    }
                    if (entry.isPrimary() && !inSet(member, entry.getBackupNodes())) {
                        entry.setBackupNodes(backup);
                    }
                }
            }
        }
    }


    private Log getLog() {
        if (log == null) {
            synchronized (this) {
                if (log == null) {
                    log = LogFactory.getLog(ReplicatedMap.class);
                }
            }
        }
        return log;
    }
}