package org.springframework.messaging.tcp.reactor;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.0.5.RELEASE.jar:org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.class */
public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
    private final NettyInbound inbound;
    private final NettyOutbound outbound;
    private final ReactorNettyCodec<P> codec;
    private final DirectProcessor<Void> closeProcessor;

    public ReactorNettyTcpConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound, ReactorNettyCodec<P> reactorNettyCodec, DirectProcessor<Void> directProcessor) {
        this.inbound = nettyInbound;
        this.outbound = nettyOutbound;
        this.codec = reactorNettyCodec;
        this.closeProcessor = directProcessor;
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public ListenableFuture<Void> send(Message<P> message) {
        ByteBuf buffer = this.outbound.alloc().buffer();
        this.codec.encode(message, buffer);
        return new MonoToListenableFutureAdapter(this.outbound.send(Mono.just(buffer)).then());
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public void onReadInactivity(Runnable runnable, long j) {
        ChannelPipeline pipeline = this.inbound.context().channel().pipeline();
        if (pipeline.context(NettyPipeline.OnChannelReadIdle) != null) {
            pipeline.remove(NettyPipeline.OnChannelReadIdle);
        }
        this.inbound.onReadIdle(j, runnable);
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public void onWriteInactivity(Runnable runnable, long j) {
        this.outbound.onWriteIdle(j, runnable);
    }

    @Override // org.springframework.messaging.tcp.TcpConnection, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeProcessor.onComplete();
    }
}
