import { Injectable, Inject } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import * as immutable from 'immutable';
import { TrainDefinition } from './train-definition';
import { HttpClient } from '@angular/common/http';
import { IRemoteControllerConfiguration, REMOTECONTROLLER_CONFIGURATION,
  ITrainService, ISystemService, IManualCommandsService } from './service-configuration';
import { ManualCommandsService } from './manual-commands-service';
import { webSocket } from 'rxjs/webSocket';
import { switchMap, shareReplay, startWith, scan, map, distinctUntilChanged, bufferTime, share, filter } from 'rxjs/operators';
import { cacheUntilSubscription } from '@cwi/rx';
import { InternalTrainDefinition, mapTrain,  updateSystemsTopic } from './service-updates';
import { SystemService } from './system-service';

@Injectable()
export class TrainService implements ITrainService {

  private readonly websocket: Observable<unknown[]>;
  private readonly trainsMap$: Observable<immutable.OrderedMap<string, InternalTrainDefinition>>;
  public readonly trains$: Observable<Iterable<TrainDefinition>>;
  public readonly trainDefinitions$: Observable<TrainDefinition[]>;
  private readonly retrySubject = new Subject();

  constructor(
    private readonly $http: HttpClient,
    @Inject(REMOTECONTROLLER_CONFIGURATION)
    private readonly configuration: IRemoteControllerConfiguration
  ) {
    const trains$ = $http.get<TrainDefinition[]>(`${configuration.baseUrl}/trains`);
    this.trainDefinitions$ = trains$;

    this.websocket = webSocket(`${configuration.websocket}/trains/updates`).pipe(
      cacheUntilSubscription(),
      bufferTime(100),
      filter(updates => updates.length != 0),
      share()
    );

    this.trainsMap$ = trains$.pipe(
      switchMap(model => {
        const initial = immutable.OrderedMap<string, immutable.RecordOf<InternalTrainDefinition>>(model.map(train => [
          train.name,
          mapTrain(train)
        ]));

        return this.websocket.pipe(
          scan((current, changes) => {
            return current.withMutations(mutable => {
              for (const message of changes) {
                const topic = message[0];
                const change = message[1];
                switch (topic) {
                  case 'system':
                    mutable = updateSystemsTopic(mutable, change as [string, ...any[]]);
                    break;
                }
              }
              return mutable;
            });
          }, initial),
          startWith(initial)
        );
      }),
      distinctUntilChanged(),
      shareReplay(1)
    );

    this.trains$ = this.trainsMap$.pipe(map(trains => trains.valueSeq()));
  }

  public reload() {
    this.retrySubject.next();
  }

  getTrain(trainName: string): Observable<TrainDefinition> {
    return this.trainsMap$.pipe(
      map(trains => trains.get(trainName)),
      distinctUntilChanged()
    );
  }

  getSystemService(trainName: string): ISystemService {
    return new SystemService(
      this.trainsMap$.pipe(
        map(trains => {
          const train = trains.get(trainName);
          if (train) {
            return train.systemsMap;
          }
        }),
        distinctUntilChanged()
      )
    );
  }

  getCommandsService(trainName: string, systemName: string): IManualCommandsService {
    return new ManualCommandsService(this.$http, this.configuration, trainName, systemName);
  }
}
