package reactor.rx.action;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.processor.CancelException;
import reactor.core.queue.CompletableLinkedQueue;
import reactor.core.queue.CompletableQueue;
import reactor.core.support.Exceptions;
import reactor.core.support.NonBlocking;
import reactor.core.support.Recyclable;
import reactor.core.support.SpecificationExceptions;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.rx.Stream;
import reactor.rx.StreamUtils;
import reactor.rx.action.combination.FanInAction;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.DropSubscription;
import reactor.rx.subscription.FanOutSubscription;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

/* loaded from: input_file:BOOT-INF/lib/reactor-stream-2.0.8.RELEASE.jar:reactor/rx/action/Action.class */
public abstract class Action<I, O> extends Stream<O> implements Processor<I, O>, Consumer<I>, Recyclable, Control {
    public static final int RESERVED_SLOTS = 4;
    public static final int NO_CAPACITY = -1;
    protected PushSubscription<I> upstreamSubscription;
    protected PushSubscription<O> downstreamSubscription;
    protected long capacity;

    public static void checkRequest(long j) {
        if (j <= 0) {
            throw SpecificationExceptions.spec_3_09_exception(j);
        }
    }

    public static long evaluateCapacity(long j) {
        if (j != Long.MAX_VALUE) {
            return Math.max(4L, j - 4);
        }
        return Long.MAX_VALUE;
    }

    public Action() {
        this(Long.MAX_VALUE);
    }

    public Action(long j) {
        this.capacity = j;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super O> subscriber) {
        try {
            NonBlocking nonBlocking = NonBlocking.class.isAssignableFrom(subscriber.getClass()) ? (NonBlocking) subscriber : null;
            boolean z = null == nonBlocking || nonBlocking.isReactivePull(getDispatcher(), this.capacity);
            PushSubscription<O> createSubscription = createSubscription(subscriber, z);
            if (createSubscription == null) {
                return;
            }
            if (null != nonBlocking && z) {
                createSubscription.maxCapacity(nonBlocking.getCapacity());
            }
            subscribeWithSubscription(subscriber, createSubscription);
        } catch (Throwable th) {
            Exceptions.throwIfFatal(th);
            subscriber.onError(th);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("Spec 2.13: Subscription cannot be null");
        }
        if (this.upstreamSubscription != null) {
            subscription.cancel();
            return;
        }
        this.upstreamSubscription = createTrackingSubscription(subscription);
        this.upstreamSubscription.maxCapacity(getCapacity());
        try {
            doOnSubscribe(subscription);
            doStart();
        } catch (Throwable th) {
            Exceptions.throwIfFatal(th);
            doError(th);
        }
    }

    protected final void doStart() {
        PushSubscription<O> pushSubscription = this.downstreamSubscription;
        if (pushSubscription != null) {
            pushSubscription.start();
        }
    }

    public final void accept(I i) {
        onNext(i);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(I i) {
        if (i == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (this.upstreamSubscription == null && this.downstreamSubscription == null) {
            throw CancelException.get();
        }
        try {
            doNext(i);
        } catch (CancelException e) {
            throw e;
        } catch (Throwable th) {
            doError(Exceptions.addValueAsLastCause(th, i));
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        try {
            doComplete();
            doShutdown();
        } catch (Throwable th) {
            doError(th);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.updatePendingRequests(0L);
        }
        doError(th);
        doShutdown();
    }

    @Override // reactor.rx.Stream
    public Action<I, O> capacity(long j) {
        SynchronousDispatcher dispatcher = getDispatcher();
        if (dispatcher == SynchronousDispatcher.INSTANCE || dispatcher.getClass() == TailRecurseDispatcher.class) {
            this.capacity = j;
        } else {
            long evaluateCapacity = evaluateCapacity(dispatcher.backlogSize());
            this.capacity = j > evaluateCapacity ? evaluateCapacity : j;
        }
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.maxCapacity(this.capacity);
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void broadcastNext(O o) {
        PushSubscription<O> pushSubscription = this.downstreamSubscription;
        if (pushSubscription == null) {
            throw CancelException.get();
        }
        try {
            pushSubscription.onNext(o);
        } catch (CancelException e) {
            throw e;
        } catch (Throwable th) {
            doError(Exceptions.addValueAsLastCause(th, o));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void broadcastError(Throwable th) {
        if (this.downstreamSubscription != null) {
            this.downstreamSubscription.onError(th);
        } else if (Environment.alive()) {
            Environment.get().routeError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void broadcastComplete() {
        if (this.downstreamSubscription == null) {
            return;
        }
        try {
            this.downstreamSubscription.onComplete();
        } catch (Throwable th) {
            doError(th);
        }
    }

    @Override // reactor.rx.action.Control
    public boolean isPublishing() {
        PushSubscription<I> pushSubscription = this.upstreamSubscription;
        return (pushSubscription == null || pushSubscription.isComplete()) ? false : true;
    }

    @Override // reactor.rx.action.Control
    public void cancel() {
        PushSubscription<I> pushSubscription = this.upstreamSubscription;
        if (pushSubscription != null) {
            this.upstreamSubscription = null;
            pushSubscription.cancel();
        }
    }

    @Override // reactor.rx.action.Control
    public void requestAll() {
        if (this.downstreamSubscription == null) {
            requestMore(Long.MAX_VALUE);
        }
    }

    @Override // reactor.rx.action.Control
    public StreamUtils.StreamVisitor debug() {
        return StreamUtils.browse((Stream) findOldestUpstream(Action.class));
    }

    public final <E> Action<I, O> control(Stream<E> stream, final Consumer<Tuple2<Action<I, O>, ? super E>> consumer) {
        stream.consume((Consumer<? super E>) new Consumer<E>() { // from class: reactor.rx.action.Action.1
            public void accept(E e) {
                consumer.accept(Tuple.of(this, e));
            }
        });
        return this;
    }

    @Override // reactor.rx.Stream
    public final Stream<O> onOverflowBuffer(final Supplier<? extends CompletableQueue<O>> supplier) {
        return (Stream<O>) lift(new Supplier<Action<O, O>>() { // from class: reactor.rx.action.Action.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Action<O, O> m10302get() {
                Broadcaster<O> capacity = Broadcaster.create(Action.this.getEnvironment(), Action.this.getDispatcher()).capacity(Action.this.capacity);
                if (supplier == null) {
                    Action.this.subscribeWithSubscription(capacity, new DropSubscription<O>(Action.this, capacity) { // from class: reactor.rx.action.Action.2.1
                        @Override // reactor.rx.subscription.DropSubscription, reactor.rx.subscription.PushSubscription, org.reactivestreams.Subscription
                        public void request(long j) {
                            super.request(j);
                            Action.this.requestUpstream(this.capacity, isComplete(), j);
                        }
                    });
                } else {
                    Action.this.subscribeWithSubscription(capacity, Action.this.createSubscription(capacity, (CompletableQueue) supplier.get()));
                }
                return capacity;
            }
        });
    }

    @Override // reactor.rx.Stream
    public final <E> CompositeAction<E, O> combine() {
        Action action = (Action) findOldestUpstream(Action.class);
        action.upstreamSubscription = null;
        return new CompositeAction<>(action, this);
    }

    public final Consumer<?> toBroadcastCompleteConsumer() {
        return new Consumer<Object>() { // from class: reactor.rx.action.Action.3
            public void accept(Object obj) {
                Action.this.broadcastComplete();
            }
        };
    }

    public final Consumer<O> toBroadcastNextConsumer() {
        return new Consumer<O>() { // from class: reactor.rx.action.Action.4
            public void accept(O o) {
                Action.this.broadcastNext(o);
            }
        };
    }

    public final Consumer<Throwable> toBroadcastErrorConsumer() {
        return new Consumer<Throwable>() { // from class: reactor.rx.action.Action.5
            public void accept(Throwable th) {
                Action.this.broadcastError(th);
            }
        };
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <P extends Publisher<?>> P findOldestUpstream(Class<P> cls) {
        Action action = this;
        while (inspectPublisher(action, Action.class)) {
            action = (Action) action.upstreamSubscription.getPublisher();
            if (action != null && FanInAction.class.isAssignableFrom(action.getClass())) {
                action = ((FanInAction) action).dynamicMergeAction() != null ? ((FanInAction) action).dynamicMergeAction() : action;
            }
        }
        return inspectPublisher(action, cls) ? action.upstreamSubscription.getPublisher() : action;
    }

    @Override // reactor.rx.Stream
    public final long getCapacity() {
        return this.capacity;
    }

    public PushSubscription<I> getSubscription() {
        return this.upstreamSubscription;
    }

    @Override // reactor.rx.Stream
    public final PushSubscription<O> downstreamSubscription() {
        return this.downstreamSubscription;
    }

    @Override // reactor.rx.Stream
    public boolean cancelSubscription(PushSubscription<O> pushSubscription) {
        if (this.downstreamSubscription == null) {
            return false;
        }
        if (pushSubscription == this.downstreamSubscription) {
            this.downstreamSubscription = null;
            cancel();
            return true;
        }
        if (!FanOutSubscription.class.isAssignableFrom(this.downstreamSubscription.getClass())) {
            return false;
        }
        FanOutSubscription fanOutSubscription = (FanOutSubscription) this.downstreamSubscription;
        if (!fanOutSubscription.remove(pushSubscription) || !fanOutSubscription.isEmpty()) {
            return false;
        }
        cancel();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, boolean z) {
        return createSubscription((Subscriber) subscriber, (CompletableQueue) (z ? new CompletableLinkedQueue() : null));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, CompletableQueue<O> completableQueue) {
        return completableQueue != null ? new ReactiveSubscription<O>(this, subscriber, completableQueue) { // from class: reactor.rx.action.Action.6
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // reactor.rx.subscription.PushSubscription
            public void onRequest(long j) {
                Action.this.requestUpstream(Action.this.capacity, this.buffer.isComplete(), j);
            }
        } : new PushSubscription<O>(this, subscriber) { // from class: reactor.rx.action.Action.7
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // reactor.rx.subscription.PushSubscription
            public void onRequest(long j) {
                Action.this.requestUpstream(-1L, isComplete(), j);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void requestUpstream(long j, boolean z, long j2) {
        if (this.upstreamSubscription != null && !z) {
            requestMore(j2);
            return;
        }
        PushSubscription<O> pushSubscription = this.downstreamSubscription;
        if (pushSubscription != null) {
            pushSubscription.updatePendingRequests(j2);
        }
    }

    protected PushSubscription<I> createTrackingSubscription(Subscription subscription) {
        return !PushSubscription.class.isAssignableFrom(subscription.getClass()) ? PushSubscription.wrap(subscription, this) : (PushSubscription) subscription;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doOnSubscribe(Subscription subscription) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doComplete() {
        broadcastComplete();
    }

    protected abstract void doNext(I i);

    /* JADX INFO: Access modifiers changed from: protected */
    public void doError(Throwable th) {
        if (this.downstreamSubscription != null) {
            try {
                this.downstreamSubscription.onError(th);
                return;
            } catch (Throwable th2) {
                Environment.get().routeError(th2);
            }
        }
        if (Environment.alive()) {
            Environment.get().routeError(th);
        }
    }

    @Override // reactor.rx.action.Control
    public void requestMore(long j) {
        checkRequest(j);
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.request(j);
        }
    }

    protected void subscribeWithSubscription(Subscriber<? super O> subscriber, PushSubscription<O> pushSubscription) {
        try {
            if (addSubscription(pushSubscription)) {
                pushSubscription.markAsDeferredStart();
                if (this.upstreamSubscription != null) {
                    pushSubscription.start();
                }
            } else {
                subscriber.onError(new IllegalStateException("The subscription cannot be linked to this Stream"));
            }
        } catch (Exception e) {
            Exceptions.throwIfFatal(e);
            subscriber.onError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean addSubscription(PushSubscription<O> pushSubscription) {
        PushSubscription<O> pushSubscription2 = this.downstreamSubscription;
        if (pushSubscription2 == null) {
            this.downstreamSubscription = pushSubscription;
            return true;
        }
        if (pushSubscription2.equals(pushSubscription)) {
            pushSubscription.onError(SpecificationExceptions.spec_2_12_exception());
            return false;
        }
        if (!FanOutSubscription.class.isAssignableFrom(pushSubscription2.getClass())) {
            this.downstreamSubscription = new FanOutSubscription(this, pushSubscription2, pushSubscription);
            return true;
        }
        if (!((FanOutSubscription) pushSubscription2).contains(pushSubscription)) {
            return ((FanOutSubscription) pushSubscription2).add(pushSubscription);
        }
        pushSubscription.onError(SpecificationExceptions.spec_2_12_exception());
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doShutdown() {
    }

    private boolean inspectPublisher(Action<?, ?> action, Class<?> cls) {
        return (action.upstreamSubscription == null || action.upstreamSubscription.getPublisher() == null || !cls.isAssignableFrom(action.upstreamSubscription.getPublisher().getClass())) ? false : true;
    }

    public void recycle() {
        this.downstreamSubscription = null;
        this.upstreamSubscription = null;
    }

    @Override // reactor.rx.Stream
    public String toString() {
        String str;
        StringBuilder append = new StringBuilder().append("{");
        if (this.capacity != Long.MAX_VALUE || this.upstreamSubscription == null) {
            str = "{dispatcher=" + getDispatcher() + (!SynchronousDispatcher.class.isAssignableFrom(getDispatcher().getClass()) ? ":" + getDispatcher().remainingSlots() : "") + ", max-capacity=" + (this.capacity == Long.MAX_VALUE ? "infinite" : Long.valueOf(this.capacity)) + "}";
        } else {
            str = "";
        }
        return append.append(str).append(this.upstreamSubscription != null ? this.upstreamSubscription : "").append('}').toString();
    }
}
