package org.redisson.rx;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.ReplayProcessor;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.core.RedissonObjectBuilder;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.17.6.jar:org/redisson/rx/CommandRxService.class */
public class CommandRxService extends CommandAsyncService implements CommandRxExecutor {
    public CommandRxService(ConnectionManager connectionManager, RedissonObjectBuilder redissonObjectBuilder) {
        super(connectionManager, redissonObjectBuilder, RedissonObjectBuilder.ReferenceType.RXJAVA);
    }

    public <R> Flowable<R> flowable(final Callable<CompletableFuture<R>> callable) {
        final ReplayProcessor create = ReplayProcessor.create();
        return (Flowable<R>) create.doOnRequest(new LongConsumer() { // from class: org.redisson.rx.CommandRxService.1
            @Override // io.reactivex.rxjava3.functions.LongConsumer
            public void accept(long j) throws Exception {
                try {
                    final CompletableFuture completableFuture = (CompletableFuture) callable.call();
                    create.doOnCancel(new Action() { // from class: org.redisson.rx.CommandRxService.1.1
                        @Override // io.reactivex.rxjava3.functions.Action
                        public void run() throws Exception {
                            completableFuture.cancel(true);
                        }
                    });
                    ReplayProcessor replayProcessor = create;
                    completableFuture.whenComplete((obj, th) -> {
                        if (th != null) {
                            if (th instanceof CompletionException) {
                                th = th.getCause();
                            }
                            replayProcessor.onError(th);
                        } else {
                            if (obj != null) {
                                replayProcessor.onNext(obj);
                            }
                            replayProcessor.onComplete();
                        }
                    });
                } catch (Exception e) {
                    create.onError(e);
                }
            }
        });
    }
}
