package io.wondrous.sns.api.tmg.realtime;

import com.google.gson.Gson;
import com.meetme.util.android.Bundles;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.functions.Action;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.wondrous.sns.api.tmg.TmgApiConfig;
import io.wondrous.sns.api.tmg.exception.ConnectionRefusedException;
import io.wondrous.sns.api.tmg.exception.RetryException;
import io.wondrous.sns.api.tmg.realtime.internal.CompositeWebsocketListener;
import io.wondrous.sns.api.tmg.realtime.internal.RealtimeLoggedEvent;
import io.wondrous.sns.api.tmg.realtime.internal.SocketConnectingListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketEnvelopeMessage;
import io.wondrous.sns.api.tmg.realtime.internal.SocketFailureListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketTopicMessage;
import io.wondrous.sns.logger.SnsLogger;
import io.wondrous.sns.oauth.OAuthInterceptor;
import io.wondrous.sns.util.RetryWhen;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.reactivestreams.Publisher;

@Singleton
/* loaded from: classes.dex */
public class TmgRealtimeApi {
    private static final String TAG = "TmgRealtimeApi";
    private final Gson mGson;
    private final SnsLogger mLogger;
    private final OAuthInterceptor mOAuthInterceptor;
    private final OkHttpClient mOkHttpClient;
    private final Observable<WebSocket> mSocketTask;
    final Map<String, Flowable<TopicEvent>> mTopicPublishers = new ConcurrentHashMap();
    final CompositeWebsocketListener mWebsocketListener = new CompositeWebsocketListener();
    private final Flowable<SocketEnvelopeMessage> mStreamPublisher = Flowable.create(new FlowableOnSubscribe() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$dpCqpf1TjM80BNV_bDyNhshoCkQ
        @Override // io.reactivex.FlowableOnSubscribe
        public final void subscribe(FlowableEmitter flowableEmitter) {
            TmgRealtimeApi.this.lambda$new$1$TmgRealtimeApi(flowableEmitter);
        }
    }, BackpressureStrategy.BUFFER).share();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public TmgRealtimeApi(SnsLogger snsLogger, @Named OkHttpClient okHttpClient, final TmgApiConfig tmgApiConfig, TmgRealtimeConfig tmgRealtimeConfig, Gson gson) {
        this.mLogger = snsLogger;
        this.mOkHttpClient = okHttpClient;
        this.mOAuthInterceptor = extractOAuthInterceptor(okHttpClient);
        this.mGson = gson;
        this.mSocketTask = Observable.create(new ObservableOnSubscribe() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$PNaJm5dTb7n1Y_tGs5pJkIFVS9E
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                TmgRealtimeApi.this.lambda$new$4$TmgRealtimeApi(tmgApiConfig, observableEmitter);
            }
        }).doOnError(new Consumer() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$GQmME1IV7o-IT-U1WnYPmMRJGnQ
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                TmgRealtimeApi.this.lambda$new$5$TmgRealtimeApi((Throwable) obj);
            }
        }).replay(1).refCount(1, tmgRealtimeConfig.getSocketReuseTimeoutInSecs(), TimeUnit.SECONDS);
    }

    private Flowable<TopicEvent> createTopicPublisher(final String str) {
        return subscribeToTopic(str).toFlowable(BackpressureStrategy.LATEST).retryWhen(retryPolicy().build()).switchMap(new Function() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$tfAG4LrmnDytauqige9uundEGOw
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return TmgRealtimeApi.this.lambda$createTopicPublisher$11$TmgRealtimeApi((RealtimeSubscription) obj);
            }
        }).ofType(SocketTopicMessage.class).filter(new Predicate() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$KqbXXBNGNQDtYYazTOMHGmWtxuI
            @Override // io.reactivex.functions.Predicate
            public final boolean test(Object obj) {
                boolean equals;
                equals = str.equals(((SocketTopicMessage) obj).getTopic());
                return equals;
            }
        }).map(new Function() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$4IyelldpnUjHSg4KU0REKx_iJgU
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return ((SocketTopicMessage) obj).getMessage();
            }
        });
    }

    private static OAuthInterceptor extractOAuthInterceptor(OkHttpClient okHttpClient) {
        for (Interceptor interceptor : okHttpClient.interceptors()) {
            if (interceptor instanceof OAuthInterceptor) {
                return (OAuthInterceptor) interceptor;
            }
        }
        return null;
    }

    private Flowable<SocketEnvelopeMessage> getMessagesStream() {
        return this.mStreamPublisher;
    }

    private RetryWhen.Builder retryPolicy() {
        return RetryWhen.action(new Consumer() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$xKm19pVoBzWaaVjAdpMPfSschRA
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                TmgRealtimeApi.this.lambda$retryPolicy$13$TmgRealtimeApi((RetryWhen.ErrorAndDuration) obj);
            }
        }).exponentialBackoff(2L, 10L, TimeUnit.SECONDS, 2.0d);
    }

    private Observable<RealtimeSubscription> subscribeToTopic(final String str) {
        return getSocket().switchMap(new Function() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$QJuTqKOUl3tMvzHuJoG7L_Oo8E0
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return TmgRealtimeApi.this.lambda$subscribeToTopic$10$TmgRealtimeApi(str, (WebSocket) obj);
            }
        });
    }

    public Flowable<TopicEvent> authenticatedEvents(final String str) {
        if (!str.startsWith("/")) {
            str = "/" + str;
        }
        return Flowable.defer(new Callable() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$FOxqnZgTnwJU_IzO1G-kxqZNVCw
            @Override // java.util.concurrent.Callable
            public final Object call() {
                return TmgRealtimeApi.this.lambda$authenticatedEvents$8$TmgRealtimeApi(str);
            }
        });
    }

    public Flowable<TopicEvent> events(final String str) {
        if (!str.startsWith("/")) {
            str = "/" + str;
        }
        Flowable<TopicEvent> flowable = this.mTopicPublishers.get(str);
        if (flowable != null) {
            return flowable;
        }
        Flowable<TopicEvent> share = createTopicPublisher(str).doOnTerminate(new Action() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$rrb7W30NITk-2v6Yh1mXbduWZIk
            @Override // io.reactivex.functions.Action
            public final void run() {
                TmgRealtimeApi.this.lambda$events$6$TmgRealtimeApi(str);
            }
        }).share();
        this.mTopicPublishers.put(str, share);
        return share;
    }

    Observable<WebSocket> getSocket() {
        return this.mSocketTask;
    }

    @Deprecated
    public String getUserId() {
        OAuthInterceptor oAuthInterceptor = this.mOAuthInterceptor;
        if (oAuthInterceptor != null) {
            return oAuthInterceptor.getSub();
        }
        return null;
    }

    public /* synthetic */ Publisher lambda$authenticatedEvents$7$TmgRealtimeApi(String str, String str2) throws Exception {
        return events("/" + str2 + str);
    }

    public /* synthetic */ Publisher lambda$authenticatedEvents$8$TmgRealtimeApi(final String str) throws Exception {
        OAuthInterceptor oAuthInterceptor = this.mOAuthInterceptor;
        return oAuthInterceptor == null ? Flowable.error(new IllegalStateException("Unable to subscribe to privileged realtime topic.")) : oAuthInterceptor.getSubSingle(this.mOkHttpClient).flatMapPublisher(new Function() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$Ay-WnRe5iHa4CMDB531BbbIdRb0
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return TmgRealtimeApi.this.lambda$authenticatedEvents$7$TmgRealtimeApi(str, (String) obj);
            }
        });
    }

    public /* synthetic */ Publisher lambda$createTopicPublisher$11$TmgRealtimeApi(RealtimeSubscription realtimeSubscription) throws Exception {
        return getMessagesStream();
    }

    public /* synthetic */ void lambda$events$6$TmgRealtimeApi(String str) throws Exception {
        this.mTopicPublishers.remove(str);
    }

    public /* synthetic */ void lambda$new$0$TmgRealtimeApi(WebSocketStreamCallbacks webSocketStreamCallbacks) throws Exception {
        this.mWebsocketListener.removeListener(webSocketStreamCallbacks);
    }

    public /* synthetic */ void lambda$new$1$TmgRealtimeApi(FlowableEmitter flowableEmitter) throws Exception {
        final WebSocketStreamCallbacks webSocketStreamCallbacks = new WebSocketStreamCallbacks(flowableEmitter, this.mGson);
        this.mWebsocketListener.addListener(webSocketStreamCallbacks);
        flowableEmitter.setCancellable(new Cancellable() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$Stcnhbalj3RrG1-W3tFW1tf8PZI
            @Override // io.reactivex.functions.Cancellable
            public final void cancel() {
                TmgRealtimeApi.this.lambda$new$0$TmgRealtimeApi(webSocketStreamCallbacks);
            }
        });
    }

    public /* synthetic */ void lambda$new$2$TmgRealtimeApi(SocketConnectingListener socketConnectingListener) throws Exception {
        this.mWebsocketListener.removeListener(socketConnectingListener);
    }

    public /* synthetic */ void lambda$new$3$TmgRealtimeApi(SocketFailureListener socketFailureListener, WebSocket webSocket) throws Exception {
        this.mWebsocketListener.removeListener(socketFailureListener);
        webSocket.close(1000, "Client disconnected");
    }

    public /* synthetic */ void lambda$new$4$TmgRealtimeApi(TmgApiConfig tmgApiConfig, ObservableEmitter observableEmitter) throws Exception {
        final SocketFailureListener socketFailureListener = new SocketFailureListener(observableEmitter);
        this.mWebsocketListener.addListener(socketFailureListener);
        final SocketConnectingListener socketConnectingListener = new SocketConnectingListener(observableEmitter, this.mGson);
        socketConnectingListener.setCancellable(new Cancellable() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$MtxPJhP_kdh3iJLeS2tGeWxkmBE
            @Override // io.reactivex.functions.Cancellable
            public final void cancel() {
                TmgRealtimeApi.this.lambda$new$2$TmgRealtimeApi(socketConnectingListener);
            }
        });
        this.mWebsocketListener.addListener(socketConnectingListener);
        final WebSocket newWebSocket = this.mOkHttpClient.newWebSocket(new Request.Builder().url(tmgApiConfig.getWebSocketUrl()).build(), this.mWebsocketListener);
        observableEmitter.setCancellable(new Cancellable() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$oEfG3EHD6NL517v23EC04CWeiyY
            @Override // io.reactivex.functions.Cancellable
            public final void cancel() {
                TmgRealtimeApi.this.lambda$new$3$TmgRealtimeApi(socketFailureListener, newWebSocket);
            }
        });
    }

    public /* synthetic */ void lambda$new$5$TmgRealtimeApi(Throwable th) throws Exception {
        OAuthInterceptor oAuthInterceptor;
        if (!(th instanceof ConnectionRefusedException) || (oAuthInterceptor = this.mOAuthInterceptor) == null) {
            return;
        }
        oAuthInterceptor.logout();
    }

    public /* synthetic */ void lambda$retryPolicy$13$TmgRealtimeApi(RetryWhen.ErrorAndDuration errorAndDuration) throws Exception {
        this.mLogger.track(RealtimeLoggedEvent.RETRY, Bundles.builder().putString("error", errorAndDuration.throwable().toString()).putLong("delayMs", errorAndDuration.durationMs()).build());
        this.mLogger.trackException(new RetryException("Error in Stream socket. Reconnecting in " + errorAndDuration.durationMs() + " ms", errorAndDuration.throwable()));
    }

    public /* synthetic */ ObservableSource lambda$subscribeToTopic$10$TmgRealtimeApi(final String str, final WebSocket webSocket) throws Exception {
        return Observable.create(new ObservableOnSubscribe() { // from class: io.wondrous.sns.api.tmg.realtime.-$$Lambda$TmgRealtimeApi$oNZlJ1WfOcZElq_8yUPb2uCrVzU
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                TmgRealtimeApi.this.lambda$subscribeToTopic$9$TmgRealtimeApi(webSocket, str, observableEmitter);
            }
        });
    }

    public /* synthetic */ void lambda$subscribeToTopic$9$TmgRealtimeApi(WebSocket webSocket, String str, ObservableEmitter observableEmitter) throws Exception {
        RealtimeSubscription realtimeSubscription = new RealtimeSubscription(webSocket, str, this.mGson);
        realtimeSubscription.subscribe();
        observableEmitter.setDisposable(realtimeSubscription);
        observableEmitter.onNext(realtimeSubscription);
    }
}
