package reactor.rx.action.aggregation;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.BehaviorBroadcaster;
import reactor.rx.broadcast.Broadcaster;

/* loaded from: input_file:BOOT-INF/lib/reactor-stream-2.0.8.RELEASE.jar:reactor/rx/action/aggregation/WindowWhenAction.class */
public class WindowWhenAction<T> extends Action<T, Stream<T>> {
    private final Supplier<? extends Publisher<?>> boundarySupplier;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private Broadcaster<T> windowBroadcaster;

    public WindowWhenAction(Environment environment, Dispatcher dispatcher, Supplier<? extends Publisher<?>> supplier) {
        this.boundarySupplier = supplier;
        this.environment = environment;
        this.dispatcher = dispatcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doOnSubscribe(Subscription subscription) {
        super.doOnSubscribe(subscription);
        ((Publisher) this.boundarySupplier.get()).subscribe(new Subscriber<Object>() { // from class: reactor.rx.action.aggregation.WindowWhenAction.1
            Subscription s;

            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription2) {
                this.s = subscription2;
                subscription2.request(1L);
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(Object obj) {
                WindowWhenAction.this.flush();
                if (this.s != null) {
                    this.s.request(1L);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                if (this.s != null) {
                    this.s.cancel();
                }
                WindowWhenAction.this.onError(th);
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                if (this.s != null) {
                    this.s.cancel();
                }
                WindowWhenAction.this.onComplete();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flush() {
        Broadcaster<T> broadcaster = this.windowBroadcaster;
        if (broadcaster != null) {
            this.windowBroadcaster = null;
            broadcaster.onComplete();
        }
    }

    @Override // reactor.rx.action.Action
    protected void doNext(T t) {
        if (this.windowBroadcaster == null) {
            broadcastNext(createWindowStream(t));
        } else {
            this.windowBroadcaster.onNext(t);
        }
    }

    public Broadcaster<T> currentWindow() {
        return this.windowBroadcaster;
    }

    protected Stream<T> createWindowStream(T t) {
        Broadcaster<T> first = BehaviorBroadcaster.first(t, this.environment, this.dispatcher);
        this.windowBroadcaster = first;
        return first;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doError(Throwable th) {
        if (this.windowBroadcaster != null) {
            this.windowBroadcaster.onError(th);
            this.windowBroadcaster = null;
        }
        super.doError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doComplete() {
        if (this.windowBroadcaster != null) {
            this.windowBroadcaster.onComplete();
            this.windowBroadcaster = null;
        }
        super.doComplete();
    }

    @Override // reactor.rx.Stream
    public final Environment getEnvironment() {
        return this.environment;
    }

    @Override // reactor.rx.Stream
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }
}
