import { InjectionToken } from '@angular/core';
import { isAuthenticated, isNotAuthenticated, IWebAuthService } from '@cp/common-services/web-auth';
import { Replace } from '@cp/common/protocol/Common';
import {
  SubscriptionDetails,
  UpdateAccessTokenRequest,
  WebSocketAccountStateInterface,
  WebSocketApplicationType,
  WebSocketsAcknowledgeMessageRequest,
  WebSocketsMessage,
  WebSocketsRequest,
  WebSocketsRpcAction,
  WebSocketsSubscribeRequest,
  WebSocketsUnsubscribeRequest
} from '@cp/common/protocol/WebSocket';
import { truthy } from '@cp/common/utils/Assert';
import { getOrSetDefault } from '@cp/common/utils/MiscUtils';
import { isEmpty } from '@cp/common/utils/ValidationUtils';
import {
  BehaviorSubject,
  bufferTime,
  distinctUntilChanged,
  finalize,
  firstValueFrom,
  Observable,
  of,
  skip,
  Subject,
  switchMap,
  take,
  takeUntil,
  tap,
  timer
} from 'rxjs';
import { filter, map } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';

export class WebSocketService {
  private webSocket?: WebSocket;
  private readonly webSocketObserver = new Subject<WebSocketsMessage>();
  private readonly subscriptions = new Map<SubscriptionDetails, number>();
  private readonly uniqueSubscriptions = new Map<string, SubscriptionDetails>();
  private clientId = uuidv4();
  private readonly connectionReady = new BehaviorSubject(false);
  private readonly seenMessages = new Set<string>();
  private retrying = false;
  private readonly clearSubscriptionsSubject = new Subject<boolean>();
  private readonly subscriptionBufferedTimeSubject = new Subject<SubscriptionAction>();
  private accessToken = '';

  constructor(
    private readonly accountStateService: WebSocketAccountStateInterface,
    private readonly websocketsEndpoint: string,
    private readonly authService: IWebAuthService,
    private readonly webSocketCtor: WebsocketCtor,
    private readonly application: WebSocketApplicationType = 'control-plane'
  ) {
    console.debug('Web socket service constructed.');

    // Re-connect on every access token value update.
    this.authService
      .observeAuthEvents()
      .pipe(
        filter(isAuthenticated),
        map((e) => e.accessToken),
        distinctUntilChanged()
      )
      .subscribe((accessToken) => {
        this.accessToken = accessToken;
        return this.connect(accessToken).then().catch();
      });

    // Disconnect on every isAuthenticated === false event.
    this.authService
      .observeAuthEvents()
      .pipe(filter(isNotAuthenticated))
      .subscribe(() => this.disconnect());

    // Acknowledge messages after they're received.
    this.webSocketObserver
      .pipe(
        tap((notification) => {
          if (!Object.keys(notification).length) return;
          console.debug('WebSocketService: acknowledgeMessage', notification?.subscription?.type, notification);
          this.sendMessage<WebSocketsAcknowledgeMessageRequest>('acknowledgeMessage', {
            clientId: this.clientId,
            messageId: notification.messageId
          });
        })
      )
      .subscribe();

    // Resubscribe after reconnecting
    this.connectionReady
      .pipe(
        filter(Boolean),
        // Skip the first time since each call to observeNotification will subscribe by itself and this code needs to
        // trigger only after reconnecting.
        skip(1)
      )
      .subscribe(() => {
        console.log('Resubscribing after reconnect, subscriptions: ', this.subscriptions.size);
        if (!this.subscriptions.size) {
          return;
        }

        this.sendMessage<WebSocketsSubscribeRequest>('subscribe', {
          clientId: this.clientId,
          subscriptions: [...this.subscriptions.keys()],
          accessToken: this.accessToken
        });
      });

    this.subscriptionBufferedTimeSubject
      .pipe(
        bufferTime(500),
        filter((ar) => !!ar.length)
      )
      .subscribe((subscriptionActionAr) => {
        console.debug('Subscription actions', subscriptionActionAr);
        const subscribeMap = new Map<string, SubscriptionDetails>();
        const unsubscribeMap = new Map<string, SubscriptionDetails>();
        for (const { details, action } of subscriptionActionAr) {
          const subscriptionDetailsKey = this.getSubscriptionMapKey(details);
          if (action === 'subscribe') {
            unsubscribeMap.delete(subscriptionDetailsKey);
            subscribeMap.set(subscriptionDetailsKey, details);
          } else {
            const deleted = subscribeMap.delete(subscriptionDetailsKey);
            if (!deleted) {
              unsubscribeMap.set(subscriptionDetailsKey, details);
            }
          }
        }

        const subscribeArray = [...subscribeMap.values()];
        if (subscribeArray.length) {
          console.debug('Sending subscribe message to the server: ', subscribeArray);
          this.sendMessage<WebSocketsSubscribeRequest>('subscribe', {
            clientId: this.clientId,
            subscriptions: subscribeArray,
            accessToken: this.accessToken
          });
        }

        const unsubscribeArray = [...unsubscribeMap.values()];
        if (unsubscribeArray.length) {
          this.sendMessage<WebSocketsUnsubscribeRequest>('unsubscribe', {
            clientId: this.clientId,
            subscriptions: unsubscribeArray
          });
        }
      });
  }

  private sendMessage<T>(action: WebSocketsRpcAction, request: T): void {
    this.connectionReady.pipe(filter(Boolean), take(1)).subscribe(() => {
      console.debug('WebSocketService: sending message to the server: ', action, request);
      // Websocket 'action' must be synchronized with 'websocket.route' in serverless.yml.
      // See description of 'messageHandler' in serverless.yml.
      // We use a common 'websocketMessage' action for all WS actions except connect/disconnect,
      // because connect/disconnect events use different routes in AWS Gateway API.
      const wsMessage: WebSocketsRequest<T> = {
        action: 'websocketMessage',
        rpcAction: action,
        request: {
          ...request,
          application: this.application
        }
      };
      truthy(this.webSocket).send(JSON.stringify(wsMessage));
    });
  }

  sendSubscriptionAction<T>(subscriptionAction: SubscriptionAction) {
    this.connectionReady.pipe(filter(Boolean), take(1)).subscribe(() => {
      this.subscriptionBufferedTimeSubject.next(subscriptionAction);
    });
  }

  async connect(accessToken: string): Promise<void> {
    if (this.webSocket) {
      this.updateAccessToken(accessToken);
      return;
    }

    console.log('Connecting to websockets');
    this.webSocket = new this.webSocketCtor(
      `${this.websocketsEndpoint}?at=${accessToken}&ci=${this.clientId}&ap=${this.application}`
    );

    this.webSocket.addEventListener('open', () => {
      console.log('Connected to websockets');
      this.connectionReady.next(true);
    });

    this.webSocket.addEventListener('message', (message) => {
      if (isEmpty(message.data)) return;
      const notification = JSON.parse(message.data);
      console.debug('WebSocketService: got notification', notification?.subscription?.type, notification);
      if (!Object.keys(notification).length) {
        return;
      }
      if (this.seenMessages.has(notification.messageId)) {
        return;
      }
      this.seenMessages.add(notification.messageId);
      this.webSocketObserver.next(notification);
    });

    this.webSocket.addEventListener('close', (e) => {
      console.log('Got a websockets close event', e);
      if (!this.webSocket || e.target !== this.webSocket) return;
      this.disconnect(false);
      this.onError(e);
    });

    this.webSocket.addEventListener('error', (e) => {
      console.log('Got a websockets error event', e);
      if (!this.webSocket || e.target !== this.webSocket) return;
      this.disconnect(false);
      this.onError(e);
    });
  }

  disconnect(clearSubscriptions = true) {
    console.log('Disconnecting from websockets: clearSubscriptions: ', clearSubscriptions);
    if (this.webSocket) {
      try {
        const webSocket = this.webSocket;
        this.webSocket = undefined;
        // The websocket needs to be cleared before calling close() because the close handler checks for the websocket
        // existence.
        webSocket.close();
      } catch (e) {
        console.error('Error while disconnecting from websocket', e);
      }
      this.connectionReady.next(false);
    }

    if (clearSubscriptions) {
      this.clientId = uuidv4();
      this.clearSubscriptions();
    }
  }

  clearSubscriptions() {
    this.subscriptions.clear();
    this.uniqueSubscriptions.clear();
    this.clearSubscriptionsSubject.next(true);
  }

  observeNotification<PayloadType>(
    subscriptionDetails: SubscriptionDetails
  ): Observable<Replace<WebSocketsMessage, 'payload', PayloadType>> {
    if (!this.accountStateService.getIsAuthenticated()) {
      console.debug('Cannot observe notifications when not authenticated.');
      throw new Error('Cannot observe notifications when not authenticated.');
    }
    // The this.subscribe call needs to be triggered as soon as the returned observable is subscribed.
    // This is why the top observable is created with of() and then switch to this.webSocketObserver which will trigger
    // only once the server receives the subscription request and starts sending messages.
    return of('ignored').pipe(
      tap(() => {
        console.debug('Subscribing to details: ', subscriptionDetails);
        this.subscribe(subscriptionDetails);
      }),
      switchMap(() => {
        return this.webSocketObserver.pipe(
          finalize(() => {
            console.debug('Unsubscribing from details: ', subscriptionDetails);
            this.unsubscribe(subscriptionDetails);
          }),
          takeUntil(this.clearSubscriptionsSubject),
          filter(
            (notification) =>
              notification.subscription?.type === subscriptionDetails.type &&
              notification.subscription?.objId === subscriptionDetails.objId
          )
        ) as Observable<Replace<WebSocketsMessage, 'payload', PayloadType>>;
      })
    );
  }

  private updateAccessToken(accessToken: string): void {
    if (!this.webSocket) {
      console.debug('not connected to WebSocket during access token update.');
      throw new Error('Not connected to websockets');
    }
    console.debug('Updating access token');

    this.sendMessage<UpdateAccessTokenRequest>('updateAccessToken', {
      accessToken,
      clientId: this.clientId
    });
  }

  private subscribe(subscriptionDetails: SubscriptionDetails): void {
    const subscriptionMapKey = this.getSubscriptionMapKey(subscriptionDetails);
    const uniqueSubscriptionDetails = this.getUniqueSubscriptionDetails(subscriptionMapKey, subscriptionDetails);

    const numberOfSubscriptions = (this.subscriptions.get(uniqueSubscriptionDetails) || 0) + 1;
    if (numberOfSubscriptions === 1) {
      this.sendSubscriptionAction({
        details: uniqueSubscriptionDetails,
        action: 'subscribe'
      });
    }
    this.subscriptions.set(uniqueSubscriptionDetails, numberOfSubscriptions);
  }

  private unsubscribe(subscriptionDetails: SubscriptionDetails): void {
    const subscriptionMapKey = this.getSubscriptionMapKey(subscriptionDetails);
    const uniqueSubscriptionDetails = this.getUniqueSubscriptionDetails(subscriptionMapKey, subscriptionDetails);
    let numOfSubscriptions = this.subscriptions.get(uniqueSubscriptionDetails);
    if (!numOfSubscriptions) {
      return;
    }
    numOfSubscriptions--;
    if (!numOfSubscriptions) {
      this.sendSubscriptionAction({
        details: uniqueSubscriptionDetails,
        action: 'unsubscribe'
      });
      this.subscriptions.delete(uniqueSubscriptionDetails);
      this.uniqueSubscriptions.delete(subscriptionMapKey);
    } else {
      this.subscriptions.set(uniqueSubscriptionDetails, numOfSubscriptions);
    }
  }

  private getSubscriptionMapKey(subscriptionDetails: SubscriptionDetails): string {
    return subscriptionDetails.type + (subscriptionDetails.objId ? `_${subscriptionDetails.objId}` : '');
  }

  private getUniqueSubscriptionDetails(
    subscriptionMapKey: string,
    subscriptionDetails: SubscriptionDetails
  ): SubscriptionDetails {
    return getOrSetDefault(this.uniqueSubscriptions, subscriptionMapKey, subscriptionDetails);
  }

  private async onError(error: any): Promise<void> {
    console.warn('WebSockets error: ', error);
    if (this.retrying) {
      console.debug('on error already retrying');
      return;
    }
    let isAuthenticated = await this.accountStateService.isAuthenticated();
    if (!isAuthenticated) {
      console.debug('on error not authenticated - 1');
      return;
    }

    this.retrying = true;
    try {
      while (!this.webSocket) {
        if (this.retrying) {
          // Sleep for 5 seconds
          await firstValueFrom(timer(5000));
        }
        // In case we got signed out during retries
        isAuthenticated = await this.accountStateService.isAuthenticated();
        if (!isAuthenticated) {
          console.debug('on error not authenticated - 2');
          break;
        }
        console.log('Retrying websockets connection....');
        // Waiting for a timer with 0 to finish in order to prevent an infinite loop.
        await firstValueFrom(timer(0));
        if (!this.webSocket) {
          const accessToken = await this.authService.getAccessToken();
          if (!accessToken) {
            console.debug('on error not authenticated - 3');
            break;
          }
          await this.connect(accessToken);
        }
      }
    } finally {
      this.retrying = false;
    }
  }
}

export type WebsocketCtor = new (...args: any[]) => WebSocket;
export const WEB_SOCKET_CONSTRUCTOR = new InjectionToken<WebsocketCtor>('webSocketCtor');

interface SubscriptionAction {
  details: SubscriptionDetails;
  action: 'subscribe' | 'unsubscribe';
}
