package org.springframework.data.mongodb.core.messaging;

import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.netflix.eureka.ServerRequestAuthFilter;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-3.3.5.jar:org/springframework/data/mongodb/core/messaging/ChangeStreamTask.class */
public class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {
    private final Set<String> denylist;
    private final QueryMapper queryMapper;
    private final MongoConverter mongoConverter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-mongodb-3.3.5.jar:org/springframework/data/mongodb/core/messaging/ChangeStreamTask$ChangeStreamEventMessage.class */
    public static class ChangeStreamEventMessage<T> implements Message<ChangeStreamDocument<Document>, T> {
        private final ChangeStreamEvent<T> delegate;
        private final Message.MessageProperties messageProperties;

        ChangeStreamEventMessage(ChangeStreamEvent<T> changeStreamEvent, Message.MessageProperties messageProperties) {
            this.delegate = changeStreamEvent;
            this.messageProperties = messageProperties;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.springframework.data.mongodb.core.messaging.Message
        @Nullable
        public ChangeStreamDocument<Document> getRaw() {
            return this.delegate.getRaw();
        }

        @Override // org.springframework.data.mongodb.core.messaging.Message
        @Nullable
        public T getBody() {
            return this.delegate.getBody();
        }

        @Override // org.springframework.data.mongodb.core.messaging.Message
        public Message.MessageProperties getProperties() {
            return this.messageProperties;
        }

        @Nullable
        BsonValue getResumeToken() {
            return this.delegate.getResumeToken();
        }

        @Nullable
        Instant getTimestamp() {
            return this.delegate.getTimestamp();
        }

        ChangeStreamEvent<T> getChangeStreamEvent() {
            return this.delegate;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangeStreamTask(MongoTemplate mongoTemplate, ChangeStreamRequest<?> changeStreamRequest, Class<?> cls, ErrorHandler errorHandler) {
        super(mongoTemplate, changeStreamRequest, cls, errorHandler);
        this.denylist = new HashSet(Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"));
        this.queryMapper = new QueryMapper(mongoTemplate.getConverter());
        this.mongoConverter = mongoTemplate.getConverter();
    }

    @Override // org.springframework.data.mongodb.core.messaging.CursorReadingTask
    protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate mongoTemplate, SubscriptionRequest.RequestOptions requestOptions, Class<?> cls) {
        ChangeStreamIterable watch;
        List<Document> emptyList = Collections.emptyList();
        BsonDocument bsonDocument = new BsonDocument();
        Collation collation = null;
        FullDocument fullDocument = ClassUtils.isAssignable(Document.class, cls) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP;
        BsonTimestamp bsonTimestamp = null;
        boolean z = true;
        if (requestOptions instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
            ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequest.ChangeStreamRequestOptions) requestOptions).getChangeStreamOptions();
            emptyList = prepareFilter(mongoTemplate, changeStreamOptions);
            if (changeStreamOptions.getFilter().isPresent()) {
                Object obj = changeStreamOptions.getFilter().get();
                if (obj instanceof Aggregation) {
                    collation = (Collation) ((Aggregation) obj).getOptions().getCollation().map((v0) -> {
                        return v0.toMongoCollation();
                    }).orElse(null);
                }
            }
            if (changeStreamOptions.getResumeToken().isPresent()) {
                bsonDocument = changeStreamOptions.getResumeToken().get().asDocument();
                z = changeStreamOptions.isResumeAfter();
            }
            fullDocument = changeStreamOptions.getFullDocumentLookup().orElseGet(() -> {
                return ClassUtils.isAssignable(Document.class, cls) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP;
            });
            bsonTimestamp = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
        }
        MongoDatabase mongoDatabase = StringUtils.hasText(requestOptions.getDatabaseName()) ? mongoTemplate.getMongoDbFactory().getMongoDatabase(requestOptions.getDatabaseName()) : mongoTemplate.getDb();
        if (StringUtils.hasText(requestOptions.getCollectionName())) {
            watch = emptyList.isEmpty() ? mongoDatabase.getCollection(requestOptions.getCollectionName()).watch(Document.class) : mongoDatabase.getCollection(requestOptions.getCollectionName()).watch(emptyList, Document.class);
        } else {
            watch = emptyList.isEmpty() ? mongoDatabase.watch(Document.class) : mongoDatabase.watch(emptyList, Document.class);
        }
        if (!requestOptions.maxAwaitTime().isZero()) {
            watch = watch.maxAwaitTime(requestOptions.maxAwaitTime().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (!bsonDocument.isEmpty()) {
            watch = z ? watch.resumeAfter(bsonDocument) : watch.startAfter(bsonDocument);
        }
        if (bsonTimestamp != null) {
            watch.startAtOperationTime(bsonTimestamp);
        }
        if (collation != null) {
            watch = watch.collation(collation);
        }
        return watch.fullDocument(fullDocument).iterator();
    }

    List<Document> prepareFilter(MongoTemplate mongoTemplate, ChangeStreamOptions changeStreamOptions) {
        if (!changeStreamOptions.getFilter().isPresent()) {
            return Collections.emptyList();
        }
        Object orElse = changeStreamOptions.getFilter().orElse(null);
        if (orElse instanceof Aggregation) {
            Aggregation aggregation = (Aggregation) orElse;
            return aggregation.toPipeline(new PrefixingDelegatingAggregationOperationContext(aggregation instanceof TypedAggregation ? new TypeBasedAggregationOperationContext(((TypedAggregation) aggregation).getInputType(), mongoTemplate.getConverter().getMappingContext(), this.queryMapper) : Aggregation.DEFAULT_CONTEXT, "fullDocument", this.denylist));
        }
        if (orElse instanceof List) {
            return (List) orElse;
        }
        throw new IllegalArgumentException("ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.data.mongodb.core.messaging.CursorReadingTask
    public Message<ChangeStreamDocument<Document>, Object> createMessage(ChangeStreamDocument<Document> changeStreamDocument, Class<Object> cls, SubscriptionRequest.RequestOptions requestOptions) {
        MongoNamespace namespace = changeStreamDocument.getNamespace() != null ? changeStreamDocument.getNamespace() : createNamespaceFromOptions(requestOptions);
        return new ChangeStreamEventMessage(new ChangeStreamEvent(changeStreamDocument, cls, this.mongoConverter), Message.MessageProperties.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
    }

    MongoNamespace createNamespaceFromOptions(SubscriptionRequest.RequestOptions requestOptions) {
        return new MongoNamespace(StringUtils.hasText(requestOptions.getDatabaseName()) ? requestOptions.getDatabaseName() : ServerRequestAuthFilter.UNKNOWN, StringUtils.hasText(requestOptions.getCollectionName()) ? requestOptions.getCollectionName() : ServerRequestAuthFilter.UNKNOWN);
    }
}
