package com.ebaiyihui.data.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:BOOT-INF/classes/com/ebaiyihui/data/canal/CanalUtils.class */
public class CanalUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CanalUtils.class);
    private static List<IChangeHandler> changeHandlers;
    private static volatile CanalUtils instance;

    public CanalUtils attachChangeHandler(List<IChangeHandler> list) {
        changeHandlers = list;
        return this;
    }

    public static CanalUtils getInstance() {
        if (instance == null) {
            synchronized (CanalUtils.class) {
                if (instance == null) {
                    instance = new CanalUtils();
                }
            }
        }
        return instance;
    }

    public void connServer(String str, String str2, String str3) {
        CanalConnector newSingleConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(str, 22222), str3, "", "");
        try {
            try {
                newSingleConnector.connect();
                newSingleConnector.subscribe(str2);
                newSingleConnector.rollback();
                while (true) {
                    Long l = null;
                    try {
                        Message withoutAck = newSingleConnector.getWithoutAck(1000);
                        l = Long.valueOf(withoutAck.getId());
                        if (l.longValue() != -1 && withoutAck.getEntries().size() != 0) {
                            doSync(withoutAck.getEntries());
                        }
                        newSingleConnector.ack(l.longValue());
                    } catch (Exception e) {
                        newSingleConnector.ack(l.longValue());
                        log.info("处理数据发送异常", (Throwable) e);
                    }
                }
            } catch (Exception e2) {
                log.info("连接canal发送异常：", (Throwable) e2);
                newSingleConnector.disconnect();
            }
        } catch (Throwable th) {
            newSingleConnector.disconnect();
            throw th;
        }
    }

    private void doSync(@NotNull List<CanalEntry.Entry> list) {
        if (CollectionUtils.isEmpty(changeHandlers)) {
            return;
        }
        Iterator<IChangeHandler> it = changeHandlers.iterator();
        while (it.hasNext()) {
            it.next().handleChange(list);
        }
    }
}
