package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import rx.Observable;
import rx.functions.Func1;

@InterfaceAudience.Public
@InterfaceStability.Experimental
/* loaded from: input_file:core-io-1.2.0.jar:com/couchbase/client/core/dcp/BucketStreamAggregator.class */
public class BucketStreamAggregator {
    private final ClusterFacade core;
    private final String bucket;

    public BucketStreamAggregator(ClusterFacade clusterFacade, String str) {
        this.core = clusterFacade;
        this.bucket = str;
    }

    public Observable<DCPRequest> feed() {
        return feed("jvmCore", BucketStreamAggregatorState.BLANK);
    }

    public Observable<DCPRequest> feed(String str, BucketStreamAggregatorState bucketStreamAggregatorState) {
        return open(str, bucketStreamAggregatorState).flatMap(new Func1<StreamRequestResponse, Observable<DCPRequest>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1
            public Observable<DCPRequest> call(StreamRequestResponse streamRequestResponse) {
                return streamRequestResponse.stream();
            }
        });
    }

    public Observable<StreamRequestResponse> open(String str, final BucketStreamAggregatorState bucketStreamAggregatorState) {
        return this.core.send(new OpenConnectionRequest(str, this.bucket)).flatMap(new Func1<OpenConnectionResponse, Observable<Integer>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.3
            public Observable<Integer> call(OpenConnectionResponse openConnectionResponse) {
                return BucketStreamAggregator.this.partitionSize();
            }
        }).flatMap(new Func1<Integer, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2
            public Observable<StreamRequestResponse> call(Integer num) {
                return Observable.range(0, num.intValue()).flatMap(new Func1<Integer, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2.1
                    public Observable<StreamRequestResponse> call(Integer num2) {
                        BucketStreamState bucketStreamState = bucketStreamAggregatorState.get(num2.intValue());
                        return BucketStreamAggregator.this.core.send(new StreamRequestRequest(num2.shortValue(), bucketStreamState.vbucketUUID(), bucketStreamState.startSequenceNumber(), bucketStreamState.endSequenceNumber(), bucketStreamState.snapshotStartSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.4
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(((CouchbaseBucketConfig) getClusterConfigResponse.config().bucketConfig(BucketStreamAggregator.this.bucket)).numberOfPartitions());
            }
        });
    }
}
