DNSMembershipProvider.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.membership.cloud;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

/**
 * A {@link org.apache.catalina.tribes.MembershipProvider} that uses DNS to retrieve the members of a cluster.<br>
 * <p>
 * <strong>Configuration example for Kubernetes</strong>
 * </p>
 * {@code server.xml }
 *
 * <pre>
 * {@code
 * <Server ...
 *
 *   <Service ...
 *
 *     <Engine ...
 *
 *       <Host ...
 *
 *         <Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster">
 *           <Channel className="org.apache.catalina.tribes.group.GroupChannel">
 *             <Membership className="org.apache.catalina.tribes.membership.cloud.CloudMembershipService"
 *                 membershipProviderClassName="org.apache.catalina.tribes.membership.cloud.DNSMembershipProvider"/>
 *           </Channel>
 *         </Cluster>
 *         ...
 *  }
 *  </pre>
 *
 * minimal example for the Service my-tomcat-app-membership, note the <strong>selector</strong><br>
 * {@code dns-membership-service.yml }
 *
 * <pre>
 * {@code
 * apiVersion: v1
 * kind: Service
 * metadata:
 *   annotations:
 *     service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
 *     description: "The service for tomcat cluster membership."
 *   name: my-tomcat-app-membership
 * spec:
 *   clusterIP: None
 *   selector:
 *     app: my-tomcat-app
 * }
 * </pre>
 *
 * First Tomcat pod minimal example, note the <strong>labels</strong> that must correspond to the
 * <strong>selector</strong> in the service.<br>
 * {@code tomcat1.yml }
 *
 * <pre>
 * {@code
 * apiVersion: v1
 * kind: Pod
 * metadata:
 *   name: tomcat1
 *   labels:
 *     app: my-tomcat-app
 * spec:
 *   containers:
 *   - name: tomcat
 *     image: tomcat
 *     ports:
 *     - containerPort: 8080
 * }
 * </pre>
 *
 * Environment variable configuration<br>
 * {@code DNS_MEMBERSHIP_SERVICE_NAME=my-tomcat-app-membership }
 */

public class DNSMembershipProvider extends CloudMembershipProvider {
    private static final Log log = LogFactory.getLog(DNSMembershipProvider.class);

    private String dnsServiceName;

    @Override
    public void start(int level) throws Exception {
        if ((level & MembershipService.MBR_RX) == 0) {
            return;
        }

        super.start(level);

        // Set up Kubernetes API parameters
        dnsServiceName = getEnv("DNS_MEMBERSHIP_SERVICE_NAME");
        if (dnsServiceName == null) {
            dnsServiceName = getNamespace();
        }

        if (log.isDebugEnabled()) {
            log.debug(sm.getString("cloudMembershipProvider.start", dnsServiceName));
        }
        dnsServiceName = URLEncoder.encode(dnsServiceName, "UTF-8");

        // Fetch initial members
        heartbeat();
    }

    @Override
    public boolean stop(int level) throws Exception {
        return super.stop(level);
    }

    @Override
    protected Member[] fetchMembers() {
        List<MemberImpl> members = new ArrayList<>();

        InetAddress[] inetAddresses = null;
        try {
            inetAddresses = InetAddress.getAllByName(dnsServiceName);
        } catch (UnknownHostException exception) {
            log.warn(sm.getString("dnsMembershipProvider.dnsError", dnsServiceName), exception);
        }

        if (inetAddresses != null) {
            for (InetAddress inetAddress : inetAddresses) {
                String ip = inetAddress.getHostAddress();
                byte[] id = md5.digest(ip.getBytes());
                // We found ourselves, ignore
                if (ip.equals(localIp)) {
                    // Update the UID on initial lookup
                    Member localMember = service.getLocalMember(false);
                    if (localMember.getUniqueId() == CloudMembershipService.INITIAL_ID &&
                            localMember instanceof MemberImpl) {
                        ((MemberImpl) localMember).setUniqueId(id);
                    }
                    continue;
                }
                long aliveTime = -1;
                MemberImpl member = null;
                try {
                    member = new MemberImpl(ip, port, aliveTime);
                } catch (IOException e) {
                    log.error(sm.getString("kubernetesMembershipProvider.memberError"), e);
                    continue;
                }
                member.setUniqueId(id);
                members.add(member);
            }
        }

        return members.toArray(new Member[0]);
    }

    @Override
    public boolean accept(Serializable msg, Member sender) {
        // Check if the sender is in the member list.
        boolean found = false;
        Member[] members = membership.getMembers();
        if (members != null) {
            for (Member member : members) {
                if (Arrays.equals(sender.getHost(), member.getHost()) && sender.getPort() == member.getPort()) {
                    found = true;
                    break;
                }
            }
        }
        if (!found) {
            MemberImpl member = new MemberImpl();
            member.setHost(sender.getHost());
            member.setPort(sender.getPort());
            byte[] host = sender.getHost();
            int i = 0;
            StringBuilder buf = new StringBuilder();
            buf.append(host[i++] & 0xff);
            for (; i < host.length; i++) {
                buf.append('.').append(host[i] & 0xff);
            }

            byte[] id = md5.digest(buf.toString().getBytes());
            member.setUniqueId(id);
            member.setMemberAliveTime(-1);
            updateMember(member, true);
        }
        return false;
    }
}