import { Injectable, OnDestroy } from '@angular/core';
import { SSE } from '@core/models';
import { environment } from '@environments';
import { UntilDestroy } from '@ngneat/until-destroy';
import { EventSourcePolyfill } from 'event-source-polyfill';
import { EMPTY, fromEvent, of, Subject } from 'rxjs';
import { catchError, filter, share, takeUntil } from 'rxjs/operators';
import { isNotNil } from 'src/app/utils';
import { API_URL } from './constants';

type SSEStatus = 'connected' | 'not-connected' | 'reconnecting' | 'error';
const RETRY_THRESHOLD = 15;

@UntilDestroy()
@Injectable({
  providedIn: 'root',
})
export class SSEService implements OnDestroy {
  private readonly event$ = new Subject<SSE.EventData | undefined>();
  readonly sseEvent$ = this.event$.pipe(
    filter(isNotNil),
    filter((event) => event?.type !== 'heartbeat'),
    share()
  );

  private readonly close$ = new Subject<void>();
  private readonly url = `${API_URL}/${environment.portalApiUrl}/users/sse`;

  private retryCounter = 0;
  private sse?: EventSource;
  private statusValue: SSEStatus = 'not-connected';

  get status(): SSEStatus {
    return this.statusValue;
  }

  ngOnDestroy(): void {
    this.close();
  }

  close(): void {
    if (this.sse) {
      this.sse.close();
      this.close$.next();
      this.statusValue = 'not-connected';
    }
  }

  createSSE(token: string | null): void {
    if (!token) {
      return;
    }

    this.sse = new EventSourcePolyfill(this.url, {
      headers: {
        Authorization: `Bearer ${token}`,
      },
    });

    this.subscribeSSE();
  }

  private handleMessage(event: SSE.Event): void {
    try {
      if (event?.data) {
        const data = JSON.parse(event.data) as SSE.EventData;

        if (environment.logSSE) {
          // eslint-disable-next-line
          console.log(data);
        }

        this.event$.next(data);
      }
    } catch {}
  }

  private subscribeSSE(): void {
    if (!this.sse) {
      return;
    }

    this.statusValue = 'reconnecting';

    fromEvent(this.sse, 'message')
      .pipe(
        takeUntil(this.close$),
        catchError(() => EMPTY),
        filter(isNotNil)
      )
      .subscribe({
        next: (event) => {
          this.statusValue = 'connected';
          this.handleMessage(event as unknown as SSE.Event);
        },
      });

    fromEvent(this.sse, 'error')
      .pipe(
        takeUntil(this.close$),
        catchError((event) => of(event))
      )
      .subscribe({
        next: (event) => {
          this.retryCounter += 1;
          this.statusValue = 'error';

          if (this.retryCounter >= RETRY_THRESHOLD) {
            this.close();
            this.retryCounter = 0;
            return;
          }
        },
      });

    fromEvent(this.sse, 'open')
      .pipe(
        takeUntil(this.close$),
        catchError(() => EMPTY),
        filter(isNotNil)
      )
      .subscribe({
        next: () => {
          this.statusValue = 'connected';
          this.retryCounter = 0;
        },
      });
  }
}
