Membership.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.membership;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import org.apache.catalina.tribes.Member;
/**
* A <b>membership</b> implementation using simple multicast. This is the representation of a multicast membership. This
* class is responsible for maintaining a list of active cluster nodes in the cluster. If a node fails to send out a
* heartbeat, the node will be dismissed.
*
* @author Peter Rossbach
*/
public class Membership implements Cloneable {
protected static final Member[] EMPTY_MEMBERS = new Member[0];
// Non-final to support clone()
private Object membersLock = new Object();
/**
* The local member.
*/
protected final Member local;
/**
* A map of all the members in the cluster.
*/
protected HashMap<Member,MbrEntry> map = new HashMap<>(); // Guarded by membersLock
/**
* A list of all the members in the cluster.
*/
protected volatile Member[] members = EMPTY_MEMBERS; // Guarded by membersLock
/**
* Comparator for sorting members by alive time.
*/
protected final Comparator<Member> memberComparator;
@Override
public Membership clone() {
synchronized (membersLock) {
Membership clone;
try {
clone = (Membership) super.clone();
} catch (CloneNotSupportedException e) {
// Can't happen
throw new AssertionError();
}
// Standard clone() method will copy the map object. Replace that
// with a new map but with the same contents.
@SuppressWarnings("unchecked")
final HashMap<Member,MbrEntry> tmpclone = (HashMap<Member,MbrEntry>) map.clone();
clone.map = tmpclone;
// Standard clone() method will copy the array object. Replace that
// with a new array but with the same contents.
clone.members = members.clone();
// Standard clone() method will copy the lock object. Replace that
// with a new object.
clone.membersLock = new Object();
return clone;
}
}
/**
* Constructs a new membership
*
* @param local - has to be the name of the local member. Used to filter the local member from the cluster
* membership
* @param includeLocal - TBA
*/
public Membership(Member local, boolean includeLocal) {
this(local, Comparator.comparingLong(Member::getMemberAliveTime).reversed(), includeLocal);
}
public Membership(Member local) {
this(local, false);
}
public Membership(Member local, Comparator<Member> comp) {
this(local, comp, false);
}
public Membership(Member local, Comparator<Member> comp, boolean includeLocal) {
this.local = local;
this.memberComparator = comp;
if (includeLocal) {
addMember(local);
}
}
/**
* Reset the membership and start over fresh. i.e., delete all the members and wait for them to ping again and join
* this membership.
*/
public void reset() {
synchronized (membersLock) {
map.clear();
members = EMPTY_MEMBERS;
}
}
/**
* Notify the membership that this member has announced itself.
*
* @param member - the member that just pinged us
*
* @return - true if this member is new to the cluster, false otherwise.<br>
* - false if this member is the local member or updated.
*/
public boolean memberAlive(Member member) {
// Ignore ourselves
if (member.equals(local)) {
return false;
}
boolean result = false;
synchronized (membersLock) {
MbrEntry entry = map.get(member);
if (entry == null) {
entry = addMember(member);
result = true;
} else {
// Update the member alive time
Member updateMember = entry.getMember();
if (updateMember.getMemberAliveTime() != member.getMemberAliveTime()) {
// Update fields that can change
updateMember.setMemberAliveTime(member.getMemberAliveTime());
updateMember.setPayload(member.getPayload());
updateMember.setCommand(member.getCommand());
// Re-order. Can't sort in place since a call to
// getMembers() may then receive an intermediate result.
Member[] newMembers = members.clone();
Arrays.sort(newMembers, memberComparator);
members = newMembers;
}
}
entry.accessed();
}
return result;
}
/**
* Add a member to this component and sort array with memberComparator
*
* @param member The member to add
*
* @return The member entry created for this new member.
*/
public MbrEntry addMember(Member member) {
MbrEntry entry = new MbrEntry(member);
synchronized (membersLock) {
if (!map.containsKey(member)) {
map.put(member, entry);
Member results[] = new Member[members.length + 1];
System.arraycopy(members, 0, results, 0, members.length);
results[members.length] = member;
Arrays.sort(results, memberComparator);
members = results;
}
}
return entry;
}
/**
* Remove a member from this component.
*
* @param member The member to remove
*/
public void removeMember(Member member) {
synchronized (membersLock) {
map.remove(member);
int n = -1;
for (int i = 0; i < members.length; i++) {
if (members[i] == member || members[i].equals(member)) {
n = i;
break;
}
}
if (n < 0) {
return;
}
Member results[] = new Member[members.length - 1];
int j = 0;
for (int i = 0; i < members.length; i++) {
if (i != n) {
results[j++] = members[i];
}
}
members = results;
}
}
/**
* Runs a refresh cycle and returns a list of members that has expired. This also removes the members from the
* membership, in such a way that getMembers() = getMembers() - expire()
*
* @param maxtime - the max time a member can remain unannounced before it is considered dead.
*
* @return the list of expired members
*/
public Member[] expire(long maxtime) {
synchronized (membersLock) {
if (!hasMembers()) {
return EMPTY_MEMBERS;
}
ArrayList<Member> list = null;
for (MbrEntry entry : map.values()) {
if (entry.hasExpired(maxtime)) {
if (list == null) {
// Only need a list when members are expired (smaller gc)
list = new ArrayList<>();
}
list.add(entry.getMember());
}
}
if (list != null) {
Member[] result = list.toArray(new Member[0]);
for (Member member : result) {
removeMember(member);
}
return result;
} else {
return EMPTY_MEMBERS;
}
}
}
/**
* Returning that service has members or not.
*
* @return <code>true</code> if there are one or more members, otherwise <code>false</code>
*/
public boolean hasMembers() {
return members.length > 0;
}
public Member getMember(Member mbr) {
Member[] members = this.members;
if (members.length > 0) {
for (Member member : members) {
if (member.equals(mbr)) {
return member;
}
}
}
return null;
}
public boolean contains(Member mbr) {
return getMember(mbr) != null;
}
/**
* Returning a list of all the members in the membership. We not need a copy: add and remove generate new arrays.
*
* @return An array of the current members
*/
public Member[] getMembers() {
return members;
}
// --------------------------------------------- Inner Class
/**
* Inner class that represents a member entry
*/
protected static class MbrEntry {
protected final Member mbr;
protected long lastHeardFrom;
public MbrEntry(Member mbr) {
this.mbr = mbr;
}
/**
* Indicate that this member has been accessed.
*/
public void accessed() {
lastHeardFrom = System.currentTimeMillis();
}
/**
* Obtain the member associated with this entry.
*
* @return The member for this entry.
*/
public Member getMember() {
return mbr;
}
/**
* Check if this member has expired.
*
* @param maxtime The time threshold
*
* @return <code>true</code> if the member has expired, otherwise <code>false</code>
*/
public boolean hasExpired(long maxtime) {
return !mbr.isLocal() && (System.currentTimeMillis() - lastHeardFrom) > maxtime;
}
}
}