import {HttpClient, HubConnection, HubConnectionBuilder, IHttpConnectionOptions} from '@microsoft/signalr';
import {BehaviorSubject, firstValueFrom, Observable, startWith, Subscriber} from 'rxjs';
import {filter, shareReplay, switchMap} from 'rxjs/operators';
import {SignalrLogger} from './signalr-logger';

export class SignalrHubConnectionBuilder {
  public constructor(
    private readonly signalrLogger: SignalrLogger,
    private readonly httpClient: HttpClient,
    private readonly accessTokenFactory: () => Promise<string>
  ) {}

  public build(url: string, connect: Observable<void>, disconnect: Observable<void>): Observable<HubConnection> {
    return new Observable<HubConnection>((subscriber: Subscriber<HubConnection>) => {
      let hubConnection: HubConnection;
      const connectedSubject = new BehaviorSubject<boolean>(false);

      const connectSubscription = connect
        .pipe(
          startWith(undefined),
          switchMap(() => {
            hubConnection = this.getHubConnection(url);
            hubConnection.onclose(() => connectedSubject.next(false));
            return this.safeStartHubConnection(hubConnection, connectedSubject);
          })
        )
        .subscribe(() => subscriber.next(hubConnection));

      const disconnectSubscription = disconnect.pipe(switchMap(() => this.safeStopHubConnection(hubConnection, connectedSubject))).subscribe(() => subscriber.next(hubConnection));

      return () => {
        this.safeStopHubConnection(hubConnection, connectedSubject);
        connectSubscription.unsubscribe();
        disconnectSubscription.unsubscribe();
      };
    }).pipe(shareReplay({bufferSize: 1, refCount: true}));
  }

  private async safeStartHubConnection(hubConnection: HubConnection, connectedUpdates: BehaviorSubject<boolean>): Promise<void> {
    await firstValueFrom(connectedUpdates.pipe(filter((connected: boolean) => !connected)));
    await hubConnection.start();
    connectedUpdates.next(true);
  }

  private async safeStopHubConnection(hubConnection: HubConnection, connectedUpdates: BehaviorSubject<boolean>): Promise<void> {
    await firstValueFrom(connectedUpdates.pipe(filter((connected: boolean) => connected)));
    await hubConnection.stop();
  }

  private getHubConnection(url: string): HubConnection {
    const connectionOptions: IHttpConnectionOptions = {
      accessTokenFactory: this.accessTokenFactory,
      httpClient: this.httpClient
    };

    return new HubConnectionBuilder().withAutomaticReconnect().configureLogging(this.signalrLogger).withUrl(url, connectionOptions).build();
  }
}
