package reactor.rx.action.transformation;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.queue.CompletableQueue;
import reactor.core.reactivestreams.SerializedSubscriber;
import reactor.core.support.Assert;
import reactor.fn.Function;
import reactor.rx.action.Action;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.stream.GroupedStream;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

/* loaded from: input_file:BOOT-INF/lib/reactor-stream-2.0.8.RELEASE.jar:reactor/rx/action/transformation/GroupByAction.class */
public class GroupByAction<T, K> extends Action<T, GroupedStream<K, T>> {
    private final Function<? super T, ? extends K> fn;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private final Map<K, ReactiveSubscription<T>> groupByMap = new ConcurrentHashMap();
    private final SerializedSubscriber<Long> serialized = SerializedSubscriber.create(new DefaultSubscriber<Long>() { // from class: reactor.rx.action.transformation.GroupByAction.1
        @Override // reactor.rx.action.support.DefaultSubscriber, org.reactivestreams.Subscriber
        public void onNext(Long l) {
            Action.checkRequest(l.longValue());
            if (GroupByAction.this.upstreamSubscription != null) {
                GroupByAction.this.upstreamSubscription.request(l.longValue());
            }
        }
    });

    public GroupByAction(Environment environment, Function<? super T, ? extends K> function, Dispatcher dispatcher) {
        Assert.notNull(function, "Key mapping function cannot be null.");
        this.dispatcher = dispatcher;
        this.fn = function;
        this.environment = environment;
    }

    public Map<K, ReactiveSubscription<T>> groupByMap() {
        return this.groupByMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.rx.action.Action
    protected void doNext(T t) {
        final Object apply = this.fn.apply(t);
        ReactiveSubscription<T> reactiveSubscription = this.groupByMap.get(apply);
        if (reactiveSubscription != null) {
            reactiveSubscription.onNext(t);
            return;
        }
        ReactiveSubscription reactiveSubscription2 = new ReactiveSubscription(null, null);
        reactiveSubscription2.getBuffer().add(t);
        this.groupByMap.put(apply, reactiveSubscription2);
        final CompletableQueue buffer = reactiveSubscription2.getBuffer();
        broadcastNext(new GroupedStream<K, T>(apply) { // from class: reactor.rx.action.transformation.GroupByAction.2
            @Override // reactor.rx.Stream
            public long getCapacity() {
                return GroupByAction.this.getCapacity();
            }

            @Override // reactor.rx.Stream
            public Dispatcher getDispatcher() {
                return GroupByAction.this.dispatcher;
            }

            @Override // reactor.rx.Stream
            public Environment getEnvironment() {
                return GroupByAction.this.environment;
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.reactivestreams.Publisher
            public void subscribe(Subscriber<? super T> subscriber) {
                final AtomicBoolean atomicBoolean = new AtomicBoolean();
                ReactiveSubscription<T> reactiveSubscription3 = new ReactiveSubscription<T>(this, subscriber, buffer) { // from class: reactor.rx.action.transformation.GroupByAction.2.1
                    @Override // reactor.rx.subscription.PushSubscription, org.reactivestreams.Subscription
                    public void cancel() {
                        super.cancel();
                        if (atomicBoolean.compareAndSet(false, true)) {
                            GroupByAction.this.removeGroupedStream(apply);
                        }
                    }

                    @Override // reactor.rx.subscription.ReactiveSubscription, reactor.rx.subscription.PushSubscription
                    public void onComplete() {
                        super.onComplete();
                        if (atomicBoolean.compareAndSet(false, true)) {
                            GroupByAction.this.removeGroupedStream(apply);
                        }
                    }

                    /* JADX INFO: Access modifiers changed from: protected */
                    @Override // reactor.rx.subscription.PushSubscription
                    public void onRequest(long j) {
                        GroupByAction.this.serialized.onNext(Long.valueOf(j));
                    }
                };
                GroupByAction.this.groupByMap.put(apply, reactiveSubscription3);
                subscriber.onSubscribe(reactiveSubscription3);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeGroupedStream(K k) {
        PushSubscription<I> pushSubscription = this.upstreamSubscription;
        ReactiveSubscription<T> remove = this.groupByMap.remove(k);
        if (remove == null || !this.groupByMap.isEmpty()) {
            return;
        }
        if (pushSubscription == 0 || pushSubscription.isComplete()) {
            PushSubscription<O> pushSubscription2 = this.downstreamSubscription;
            if (pushSubscription2 == 0 || pushSubscription2.isComplete()) {
                cancel();
            }
            if (remove.getBufferSize() == 0) {
                broadcastComplete();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doComplete() {
        Iterator<ReactiveSubscription<T>> it = this.groupByMap.values().iterator();
        while (it.hasNext()) {
            it.next().onComplete();
        }
        super.doComplete();
    }

    @Override // reactor.rx.action.Action, reactor.rx.action.Control
    public void requestMore(long j) {
        this.serialized.onNext(Long.valueOf(j));
    }

    @Override // reactor.rx.Stream
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override // reactor.rx.Stream
    public final Environment getEnvironment() {
        return this.environment;
    }
}
