KubernetesMembershipProvider.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.membership.cloud;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.json.JSONParser;
/**
* A {@link org.apache.catalina.tribes.MembershipProvider} that uses Kubernetes API to retrieve the members of a
* cluster.<br>
*/
public class KubernetesMembershipProvider extends CloudMembershipProvider {
private static final Log log = LogFactory.getLog(KubernetesMembershipProvider.class);
@Override
public void start(int level) throws Exception {
if ((level & MembershipService.MBR_RX) == 0) {
return;
}
super.start(level);
// Set up Kubernetes API parameters
String namespace = getNamespace();
if (log.isDebugEnabled()) {
log.debug(sm.getString("cloudMembershipProvider.start", namespace));
}
String protocol = getEnv(CUSTOM_ENV_PREFIX + "MASTER_PROTOCOL", "KUBERNETES_MASTER_PROTOCOL");
String masterHost = getEnv(CUSTOM_ENV_PREFIX + "MASTER_HOST", "KUBERNETES_SERVICE_HOST");
String masterPort = getEnv(CUSTOM_ENV_PREFIX + "MASTER_PORT", "KUBERNETES_SERVICE_PORT");
String clientCertificateFile =
getEnv(CUSTOM_ENV_PREFIX + "CLIENT_CERT_FILE", "KUBERNETES_CLIENT_CERTIFICATE_FILE");
String caCertFile = getEnv(CUSTOM_ENV_PREFIX + "CA_CERT_FILE", "KUBERNETES_CA_CERTIFICATE_FILE");
if (caCertFile == null) {
caCertFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
}
if (clientCertificateFile == null) {
if (protocol == null) {
protocol = "https";
}
String saTokenFile = getEnv(CUSTOM_ENV_PREFIX + "SA_TOKEN_FILE", "SA_TOKEN_FILE");
if (saTokenFile == null) {
saTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token";
}
try {
byte[] bytes = Files.readAllBytes(FileSystems.getDefault().getPath(saTokenFile));
streamProvider = new TokenStreamProvider(new String(bytes, StandardCharsets.US_ASCII), caCertFile);
} catch (IOException e) {
log.error(sm.getString("kubernetesMembershipProvider.streamError"), e);
}
} else {
if (protocol == null) {
protocol = "http";
}
String clientKeyFile = getEnv("KUBERNETES_CLIENT_KEY_FILE");
if (clientKeyFile == null) {
log.error(sm.getString("kubernetesMembershipProvider.noKey"));
return;
}
String clientKeyPassword = getEnv("KUBERNETES_CLIENT_KEY_PASSWORD");
String clientKeyAlgo = getEnv("KUBERNETES_CLIENT_KEY_ALGO");
if (clientKeyAlgo == null) {
clientKeyAlgo = "RSA";
}
streamProvider = new CertificateStreamProvider(clientCertificateFile, clientKeyFile, clientKeyPassword,
clientKeyAlgo, caCertFile);
}
String ver = getEnv(CUSTOM_ENV_PREFIX + "API_VERSION", "KUBERNETES_API_VERSION");
if (ver == null) {
ver = "v1";
}
String labels = getEnv(CUSTOM_ENV_PREFIX + "LABELS", "KUBERNETES_LABELS");
namespace = URLEncoder.encode(namespace, "UTF-8");
labels = labels == null ? null : URLEncoder.encode(labels, "UTF-8");
url = String.format("%s://%s:%s/api/%s/namespaces/%s/pods", protocol, masterHost, masterPort, ver, namespace);
if (labels != null && labels.length() > 0) {
url = url + "?labelSelector=" + labels;
}
// Fetch initial members
heartbeat();
}
@Override
public boolean stop(int level) throws Exception {
try {
return super.stop(level);
} finally {
streamProvider = null;
}
}
@Override
protected Member[] fetchMembers() {
if (streamProvider == null) {
return new Member[0];
}
List<MemberImpl> members = new ArrayList<>();
try (InputStream stream = streamProvider.openStream(url, headers, connectionTimeout, readTimeout);
InputStreamReader reader = new InputStreamReader(stream, "UTF-8")) {
parsePods(reader, members);
} catch (IOException e) {
log.error(sm.getString("kubernetesMembershipProvider.streamError"), e);
}
return members.toArray(new Member[0]);
}
@SuppressWarnings("unchecked")
protected void parsePods(Reader reader, List<MemberImpl> members) {
JSONParser parser = new JSONParser(reader);
try {
LinkedHashMap<String,Object> json = parser.object();
Object itemsObject = json.get("items");
if (!(itemsObject instanceof List<?>)) {
log.error(sm.getString("kubernetesMembershipProvider.invalidPodsList", "no items"));
return;
}
List<Object> items = (List<Object>) itemsObject;
for (Object podObject : items) {
if (!(podObject instanceof LinkedHashMap<?,?>)) {
log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "item"));
continue;
}
LinkedHashMap<String,Object> pod = (LinkedHashMap<String,Object>) podObject;
// If there is a "kind", check it is "Pod"
Object podKindObject = pod.get("kind");
if (podKindObject != null && !"Pod".equals(podKindObject)) {
continue;
}
// "metadata" contains "name", "uid" and "creationTimestamp"
Object metadataObject = pod.get("metadata");
if (!(metadataObject instanceof LinkedHashMap<?,?>)) {
log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "metadata"));
continue;
}
LinkedHashMap<String,Object> metadata = (LinkedHashMap<String,Object>) metadataObject;
Object nameObject = metadata.get("name");
if (nameObject == null) {
log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "name"));
continue;
}
Object objectUid = metadata.get("uid");
Object creationTimestampObject = metadata.get("creationTimestamp");
if (creationTimestampObject == null) {
log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "uid"));
continue;
}
// "status" contains "phase" (which must be "Running") and "podIP"
Object statusObject = pod.get("status");
if (!(statusObject instanceof LinkedHashMap<?,?>)) {
log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "status"));
continue;
}
LinkedHashMap<String,Object> status = (LinkedHashMap<String,Object>) statusObject;
if (!"Running".equals(status.get("phase"))) {
continue;
}
Object podIPObject = status.get("podIP");
if (podIPObject == null) {
log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "podIP"));
continue;
}
String podIP = podIPObject.toString();
String uid = (objectUid == null) ? podIP : objectUid.toString();
// We found ourselves, ignore
if (podIP.equals(localIp)) {
// Update the UID on initial lookup
Member localMember = service.getLocalMember(false);
if (localMember.getUniqueId() == CloudMembershipService.INITIAL_ID &&
localMember instanceof MemberImpl) {
byte[] id = md5.digest(uid.getBytes(StandardCharsets.US_ASCII));
((MemberImpl) localMember).setUniqueId(id);
}
continue;
}
long aliveTime =
Duration.between(Instant.parse(creationTimestampObject.toString()), startTime).toMillis();
MemberImpl member = null;
try {
member = new MemberImpl(podIP, port, aliveTime);
} catch (IOException e) {
// Shouldn't happen:
// an exception is thrown if hostname can't be resolved to IP, but we already provide an IP
log.error(sm.getString("kubernetesMembershipProvider.memberError"), e);
continue;
}
byte[] id = md5.digest(uid.getBytes(StandardCharsets.US_ASCII));
member.setUniqueId(id);
members.add(member);
}
} catch (Exception e) {
log.error(sm.getString("kubernetesMembershipProvider.jsonError"), e);
}
}
}