/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

@InterfaceAudience.Private
public class ReplicationPeersZKImpl
extends ReplicationStateZKBase
implements ReplicationPeers {
    private Map<String, ReplicationPeerZKImpl> peerClusters;
    private final String tableCFsNodeName;
    private final ReplicationQueuesClient queuesClient;
    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);

    public ReplicationPeersZKImpl(ZooKeeperWatcher zk, Configuration conf, ReplicationQueuesClient queuesClient, Abortable abortable) {
        super(zk, conf, abortable);
        this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
        this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
        this.queuesClient = queuesClient;
    }

    @Override
    public void init() throws ReplicationException {
        try {
            if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
                ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not initialize replication peers", e);
        }
        this.addExistingPeers();
    }

    @Override
    public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs) throws ReplicationException {
        try {
            if (this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot add a peer with id=" + id + " because that id already exists.");
            }
            if (id.contains("-") || id.equals("lock")) {
                throw new IllegalArgumentException("Found invalid peer name:" + id);
            }
            this.checkQueuesDeleted(id);
            ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
            ArrayList<ZKUtil.ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
            ZKUtil.ZKUtilOp op1 = ZKUtil.ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), ReplicationPeersZKImpl.toByteArray(peerConfig));
            ZKUtil.ZKUtilOp op2 = ZKUtil.ZKUtilOp.createAndFailSilent(this.getPeerStateNode(id), ENABLED_ZNODE_BYTES);
            String tableCFsStr = tableCFs == null ? "" : tableCFs;
            ZKUtil.ZKUtilOp op3 = ZKUtil.ZKUtilOp.createAndFailSilent(this.getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
            listOfOps.add(op1);
            listOfOps.add(op2);
            listOfOps.add(op3);
            ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>" + peerConfig, e);
        }
    }

    @Override
    public void removePeer(String id) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot remove peer with id=" + id + " because that id does not exist.");
            }
            ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not remove peer with id=" + id, e);
        }
    }

    @Override
    public void enablePeer(String id) throws ReplicationException {
        this.changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
        LOG.info((Object)("peer " + id + " is enabled"));
    }

    @Override
    public void disablePeer(String id) throws ReplicationException {
        this.changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
        LOG.info((Object)("peer " + id + " is disabled"));
    }

    @Override
    public String getPeerTableCFsConfig(String id) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("peer " + id + " doesn't exist");
            }
            try {
                return Bytes.toString(ZKUtil.getData(this.zookeeper, this.getTableCFsNode(id)));
            }
            catch (Exception e) {
                throw new ReplicationException(e);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
        }
    }

    @Override
    public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + " does not exist.");
            }
            String tableCFsZKNode = this.getTableCFsNode(id);
            byte[] tableCFs = Bytes.toBytes(tableCFsStr);
            if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
                ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
            } else {
                ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
            }
            LOG.info((Object)("Peer tableCFs with id= " + id + " is now " + tableCFsStr));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
        }
    }

    @Override
    public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
        ReplicationPeer replicationPeer = this.peerClusters.get(id);
        if (replicationPeer == null) {
            throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
        }
        return replicationPeer.getTableCFs();
    }

    @Override
    public boolean getStatusOfPeer(String id) {
        ReplicationPeer replicationPeer = this.peerClusters.get(id);
        if (replicationPeer == null) {
            throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
        }
        return replicationPeer.getPeerState() == ReplicationPeer.PeerState.ENABLED;
    }

    @Override
    public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
        try {
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("peer " + id + " doesn't exist");
            }
            String peerStateZNode = this.getPeerStateNode(id);
            try {
                return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
            }
            catch (KeeperException e) {
                throw new ReplicationException(e);
            }
            catch (DeserializationException e) {
                throw new ReplicationException(e);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to get status of the peer with id=" + id + " from backing store", e);
        }
        catch (InterruptedException e) {
            throw new ReplicationException(e);
        }
    }

    @Override
    public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
        TreeMap<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
        List<String> ids = null;
        try {
            ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
            for (String id : ids) {
                ReplicationPeerConfig peerConfig = this.getReplicationPeerConfig(id);
                if (peerConfig == null) {
                    LOG.warn((Object)("Failed to get replication peer configuration of clusterid=" + id + " znode content, continuing."));
                    continue;
                }
                peers.put(id, peerConfig);
            }
        }
        catch (KeeperException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        }
        catch (ReplicationException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        }
        return peers;
    }

    @Override
    public ReplicationPeer getPeer(String peerId) {
        return this.peerClusters.get(peerId);
    }

    @Override
    public Set<String> getPeerIds() {
        return this.peerClusters.keySet();
    }

    @Override
    public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException {
        String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
        byte[] data = null;
        try {
            data = ZKUtil.getData(this.zookeeper, znode);
        }
        catch (InterruptedException e) {
            LOG.warn((Object)("Could not get configuration for peer because the thread was interrupted. peerId=" + peerId));
            Thread.currentThread().interrupt();
            return null;
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
        }
        if (data == null) {
            LOG.error((Object)("Could not get configuration for peer because it doesn't exist. peerId=" + peerId));
            return null;
        }
        try {
            return ReplicationPeersZKImpl.parsePeerFrom(data);
        }
        catch (DeserializationException e) {
            LOG.warn((Object)("Failed to parse cluster key from peerId=" + peerId + ", specifically the content from the following znode: " + znode));
            return null;
        }
    }

    @Override
    public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException {
        Configuration otherConf;
        ReplicationPeerConfig peerConfig = this.getReplicationPeerConfig(peerId);
        if (peerConfig == null) {
            return null;
        }
        try {
            otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
        }
        catch (IOException e) {
            LOG.error((Object)("Can't get peer configuration for peerId=" + peerId + " because:"), (Throwable)e);
            return null;
        }
        if (!peerConfig.getConfiguration().isEmpty()) {
            CompoundConfiguration compound = new CompoundConfiguration();
            compound.add(otherConf);
            compound.addStringMap(peerConfig.getConfiguration());
            return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
        }
        return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
    }

    @Override
    public List<String> getAllPeerIds() {
        List<String> ids = null;
        try {
            ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
        }
        catch (KeeperException e) {
            this.abortable.abort("Cannot get the list of peers ", e);
        }
        return ids;
    }

    private void addExistingPeers() throws ReplicationException {
        List<String> znodes = null;
        try {
            znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error getting the list of peer clusters.", e);
        }
        if (znodes != null) {
            for (String z : znodes) {
                this.createAndAddPeer(z);
            }
        }
    }

    @Override
    public boolean peerAdded(String peerId) throws ReplicationException {
        return this.createAndAddPeer(peerId);
    }

    @Override
    public void peerRemoved(String peerId) {
        ReplicationPeer rp = this.peerClusters.get(peerId);
        if (rp != null) {
            ((ConcurrentMap)this.peerClusters).remove(peerId, rp);
        }
    }

    public boolean createAndAddPeer(String peerId) throws ReplicationException {
        if (this.peerClusters == null) {
            return false;
        }
        if (this.peerClusters.containsKey(peerId)) {
            return false;
        }
        ReplicationPeerZKImpl peer = null;
        try {
            peer = this.createPeer(peerId);
        }
        catch (Exception e) {
            throw new ReplicationException("Error adding peer with id=" + peerId, e);
        }
        if (peer == null) {
            return false;
        }
        ReplicationPeerZKImpl previous = ((ConcurrentMap)this.peerClusters).putIfAbsent(peerId, peer);
        if (previous == null) {
            LOG.info((Object)("Added new peer cluster=" + peer.getPeerConfig().getClusterKey()));
        } else {
            LOG.info((Object)("Peer already present, " + previous.getPeerConfig().getClusterKey() + ", new cluster=" + peer.getPeerConfig().getClusterKey()));
        }
        return true;
    }

    private String getTableCFsNode(String id) {
        return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
    }

    private String getPeerStateNode(String id) {
        return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
    }

    private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state) throws ReplicationException {
        try {
            byte[] stateBytes;
            if (!this.peerExists(id)) {
                throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id + " does not exist.");
            }
            String peerStateZNode = this.getPeerStateNode(id);
            byte[] byArray = stateBytes = state == ZooKeeperProtos.ReplicationState.State.ENABLED ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
            if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
                ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
            } else {
                ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
            }
            LOG.info((Object)("Peer with id= " + id + " is now " + state.name()));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
        }
    }

    private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
        Pair<ReplicationPeerConfig, Configuration> pair = this.getPeerConf(peerId);
        if (pair == null) {
            return null;
        }
        Configuration peerConf = pair.getSecond();
        ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
        try {
            peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId, e);
        }
        try {
            peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
        }
        catch (KeeperException e) {
            throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + peerId, e);
        }
        return peer;
    }

    private static ReplicationPeerConfig parsePeerFrom(byte[] bytes) throws DeserializationException {
        if (ProtobufUtil.isPBMagicPrefix(bytes)) {
            ZooKeeperProtos.ReplicationPeer peer;
            int pblen = ProtobufUtil.lengthOfPBMagic();
            ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
            try {
                ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
                peer = builder.build();
            }
            catch (IOException e) {
                throw new DeserializationException(e);
            }
            return ReplicationPeersZKImpl.convert(peer);
        }
        if (bytes.length > 0) {
            return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
        }
        return new ReplicationPeerConfig().setClusterKey("");
    }

    private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
        ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
        if (peer.hasClusterkey()) {
            peerConfig.setClusterKey(peer.getClusterkey());
        }
        if (peer.hasReplicationEndpointImpl()) {
            peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
        }
        for (HBaseProtos.BytesBytesPair bytesBytesPair : peer.getDataList()) {
            peerConfig.getPeerData().put(bytesBytesPair.getFirst().toByteArray(), bytesBytesPair.getSecond().toByteArray());
        }
        for (HBaseProtos.NameStringPair nameStringPair : peer.getConfigurationList()) {
            peerConfig.getConfiguration().put(nameStringPair.getName(), nameStringPair.getValue());
        }
        return peerConfig;
    }

    private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
        ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
        if (peerConfig.getClusterKey() != null) {
            builder.setClusterkey(peerConfig.getClusterKey());
        }
        if (peerConfig.getReplicationEndpointImpl() != null) {
            builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
        }
        for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
            builder.addData(HBaseProtos.BytesBytesPair.newBuilder().setFirst(ByteString.copyFrom(entry.getKey())).setSecond(ByteString.copyFrom(entry.getValue())).build());
        }
        for (Map.Entry<Object, Object> entry : peerConfig.getConfiguration().entrySet()) {
            builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName((String)entry.getKey()).setValue((String)entry.getValue()).build());
        }
        return builder.build();
    }

    private static byte[] toByteArray(ReplicationPeerConfig peerConfig) {
        byte[] bytes = ReplicationPeersZKImpl.convert(peerConfig).toByteArray();
        return ProtobufUtil.prependPBMagic(bytes);
    }

    private void checkQueuesDeleted(String peerId) throws ReplicationException {
        if (this.queuesClient == null) {
            return;
        }
        try {
            List<String> replicators = this.queuesClient.getListOfReplicators();
            for (String replicator : replicators) {
                List<String> queueIds = this.queuesClient.getAllQueues(replicator);
                for (String queueId : queueIds) {
                    ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
                    if (!queueInfo.getPeerId().equals(peerId)) continue;
                    throw new ReplicationException("undeleted queue for peerId: " + peerId + ", replicator: " + replicator + ", queueId: " + queueId);
                }
            }
            if (-1 != ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) && this.queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
                throw new ReplicationException("Undeleted queue for peerId: " + peerId + ", found in hfile-refs node path " + this.hfileRefsZNode);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
        }
    }
}

