import { BehaviorSubject, fromEvent, Observable, Subject, timer } from 'rxjs';
import { distinctUntilChanged, filter, mergeMap, retryWhen, share, take, takeUntil, tap, timeout } from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { getBackoffDelay } from '@pm/lib/utils';

export interface IRxWebSocketConfig {
  initialTimeout?: number;
  maxTimeout?: number;
  reconnectOnBadClose?: boolean;
  resultSelector?: (evt: MessageEvent) => any;
  serializer?: (v: any) => string;
}

export interface IConnectionStatus {
  status: ConnectionStatusType;
  event?: any;
  closeCode?: number;
  error?: any;
}

export interface IReconnectionStatus {
  count: number;
}

export enum ConnectionStatusType {
  WAITING = 'Waiting',
  OPEN = 'Open',
  CLOSED = 'Closed',
  CLOSING = 'Closing',
  CONNECTING = 'Connecting'
}

export class RxWebSocketSubject<T> extends Subject<T> {
  readonly normalCloseCode = 1000;

  private reconnectCount = 0;

  private readonly status$: BehaviorSubject<IConnectionStatus>;
  private readonly reconnecting$: Subject<IReconnectionStatus>;

  private readonly ngUnsubscribe$: Subject<void> = new Subject();

  private socket$: WebSocketSubject<any>;
  private config: IRxWebSocketConfig;

  constructor(
    private readonly url: string,
    private readonly _window: any,
    config?: IRxWebSocketConfig
  ) {
    super();

    this.config = {
      initialTimeout: 1000,
      maxTimeout: 58000,
      reconnectOnBadClose: true,
      serializer: ((data) => data),
      ...config,
    };

    this.status$ = new BehaviorSubject({ status: ConnectionStatusType.WAITING });
    this.reconnecting$ = new Subject<IReconnectionStatus>();
  }

  connect(authToken: string | null): void {
    if (this.socket$) {
      this.disconnect();
    }

    this.status$
      .pipe(
        filter((status) => status.status === ConnectionStatusType.OPEN),
        tap(() => {
          this.reconnectCount = 0;
        }),
        takeUntil(this.ngUnsubscribe$),
      )
      .subscribe();

    this.socket$ = new WebSocketSubject(this.getWsSubjectConfig(this.url, this.status$, authToken));

    this.status$.next({ status: ConnectionStatusType.CONNECTING });

    this.socket$
      .pipe(
        retryWhen((errors) => {
          return errors
            .pipe(
              mergeMap((error) => {
                console.warn(error);

                const isOnline = this._window.hasOwnProperty('navigator') && this._window.navigator.onLine;

                if (!this._window.hasOwnProperty('navigator') || isOnline) {
                  this.reconnectCount++;
                  this.reconnecting$.next({ count: this.reconnectCount });

                  const delay = getBackoffDelay(this.reconnectCount, this.config.initialTimeout, this.config.maxTimeout);

                  console.warn(`Retrying in ${delay}ms.`);
                  return timer(delay);
                } else {
                  return fromEvent(this._window, 'online').pipe(take(1));
                }
              })
            );
        }),
        takeUntil(this.ngUnsubscribe$),
        timeout(this.config.maxTimeout + this.config.initialTimeout),
      )
      .subscribe(
        (m) => this.next(m),
        (error) => {
          console.warn(error);
          this.disconnectOnTimeout();
          this.complete();
        },
        () => {
          this.ngUnsubscribe$.next();
          this.complete();
        });
  }

  disconnectOnTimeout() {
    if (this.socket$) {
      this.ngUnsubscribe$.next();
      this.reconnecting$.next({ count: 0 });
      this.status$.next({ status: ConnectionStatusType.CLOSED, closeCode: this.normalCloseCode });
      this.socket$ = null;
    }
  }

  disconnect() {
    if (this.socket$) {
      this.ngUnsubscribe$.next();
      this.reconnecting$.next({ count: 0 });
      this.status$.next({ status: ConnectionStatusType.CLOSED, closeCode: this.normalCloseCode });
      this.status$.next({ status: ConnectionStatusType.WAITING });
      this.socket$ = null;
    }
  }

  send(data: any): void {
    this.socket$.next(this.config.serializer(data));
  }

  get statusObservable(): Observable<IConnectionStatus> {
    return this.status$
      .pipe(
        distinctUntilChanged(),
        share()
      );
  }

  get reconnectingObservable(): Observable<IReconnectionStatus> {
    return this.reconnecting$.pipe(share());
  }

  private getWsSubjectConfig(
    url: string,
    subject$: BehaviorSubject<IConnectionStatus>,
    authToken: string | null
  ): WebSocketSubjectConfig<any> {
    return {
      url,
      ...authToken && { protocol: authToken },
      closeObserver: {
        next: (evt: CloseEvent) => subject$.next({
          status: ConnectionStatusType.CLOSED,
          event: evt,
          closeCode: evt.code
        })
      },
      openObserver: {
        next: (evt: Event) => subject$.next({ status: ConnectionStatusType.OPEN, event: evt })
      },
      closingObserver: {
        next: () => subject$.next({ status: ConnectionStatusType.CLOSING })
      },

    };
  }
}
