package org.springframework.amqp.rabbit.core;

import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.support.BatchingStrategy;
import org.springframework.amqp.rabbit.core.support.MessageBatch;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.Lifecycle;
import org.springframework.scheduling.TaskScheduler;

/* loaded from: input_file:BOOT-INF/lib/spring-rabbit-2.0.8.RELEASE.jar:org/springframework/amqp/rabbit/core/BatchingRabbitTemplate.class */
public class BatchingRabbitTemplate extends RabbitTemplate implements Lifecycle {
    private final BatchingStrategy batchingStrategy;
    private final TaskScheduler scheduler;
    private volatile ScheduledFuture<?> scheduledTask;

    public BatchingRabbitTemplate(BatchingStrategy batchingStrategy, TaskScheduler taskScheduler) {
        this.batchingStrategy = batchingStrategy;
        this.scheduler = taskScheduler;
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitTemplate, org.springframework.amqp.rabbit.core.RabbitOperations
    public synchronized void send(String str, String str2, Message message, CorrelationData correlationData) throws AmqpException {
        if (correlationData != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Cannot use batching with correlation data");
            }
            super.send(str, str2, message, correlationData);
            return;
        }
        if (this.scheduledTask != null) {
            this.scheduledTask.cancel(false);
        }
        MessageBatch addToBatch = this.batchingStrategy.addToBatch(str, str2, message);
        if (addToBatch != null) {
            super.send(addToBatch.getExchange(), addToBatch.getRoutingKey(), addToBatch.getMessage(), null);
        }
        Date nextRelease = this.batchingStrategy.nextRelease();
        if (nextRelease != null) {
            this.scheduledTask = this.scheduler.schedule(() -> {
                releaseBatches();
            }, nextRelease);
        }
    }

    public void flush() {
        releaseBatches();
    }

    private synchronized void releaseBatches() {
        for (MessageBatch messageBatch : this.batchingStrategy.releaseBatches()) {
            super.send(messageBatch.getExchange(), messageBatch.getRoutingKey(), messageBatch.getMessage(), null);
        }
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitTemplate
    public void doStart() {
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitTemplate
    public void doStop() {
        flush();
    }

    @Override // org.springframework.amqp.rabbit.core.RabbitTemplate, org.springframework.context.Lifecycle
    public boolean isRunning() {
        return true;
    }
}
