import {
  Subject,
  Observable,
  combineLatest,
  Subscription,
  tap,
  ReplaySubject,
  share,
  from,
  mergeMap,
  shareReplay,
  skip,
  take
} from 'rxjs';
import { blobToJson, Mutex } from 'src/utils';
import Debug from 'debug';
import { tableBroker } from './TableBroker';

const log = Debug('pdq:services:DataBroker');
const noValueYet = Symbol('NO_VALUE_OR_NO_SUBSCRIPTION_YET');
export class DataBroker {
  private _mutex = new Mutex('DataBroker');
  private _derivations = new Map<string, DerivationMeta>();
  private _dataSources = new Map<string, { subject: Subject<any>, sharedObservable: Observable<any>, lastValue: any }>();
  private _dataSourcesChanged = new Subject<string[]>();
  private _dataSourceAdded = new Subject<string>();
  private _dataSourceChanged = new Subject<string>();
  private _dataSourceRemoved = new Subject<string>();

  dataSourcesChanged = this._dataSourcesChanged.pipe(share());
  dataSourceAdded= this._dataSourceAdded.pipe(share());
  dataSourceChanged= this._dataSourceChanged.pipe(share());
  dataSourceRemoved= this._dataSourceRemoved.pipe(share());

  constructor() {
    this.dataSourcesChanged.subscribe(i => log(`dataSourcesChanged:${i}`))
    this.dataSourceAdded.subscribe(i => log(`dataSourceAdded:${i}`))
    this.dataSourceChanged.subscribe(i => log(`dataSourceChanged:${i}`))
    this.dataSourceRemoved.subscribe(i => log(`dataSourceRemoved:${i}`))

    log('DataBroker created')
  }

  getDataSource(name: string) {
    const ds = this._dataSources.get(name);
    if (!ds)
      return undefined;
    ds.sharedObservable.pipe(take(1)).subscribe()
    return this._dataSources.get(name)?.lastValue;
  }
  async getNextDataSourceValue(name: string) {
    const ds = await this.getObservableDataSource(name);
    if (this._dataSources.get(name)?.lastValue !== noValueYet) {
      return new Promise((resolve, reject) => {
        ds.pipe(skip(1))
        .pipe(take(1))
        .subscribe(i => {
          resolve(i)
        })
      });
    }
    return new Promise((resolve, reject) => {
      ds
      .pipe(take(1))
      .subscribe(i => {
        resolve(i)
      })
    });

  }
  async getObservableDataSource(name: string) : Promise<Observable<any>> {
    const unlock = await this._mutex.lock();
    try {
      log(`getObservableDataSource: ${name}`)
      let ds = this._dataSources.get(name)
      if (!ds) {
        log(`getObservableDataSource: ${name} - new datasource`)
        ds = this._setupDataSource(name);
        this._dataSourcesChanged.next([...this._dataSources.keys()]);
      }
      return ds.sharedObservable
    } finally {
      unlock()
    }
  }
  async setDataSource(name: string, data: any) {
    log('setDataSource', name, data)
    let ds
    const unlock = await this._mutex.lock();
    try {
      ds = this._dataSources.get(name);
      if (!ds) {
        ds = this._setupDataSource(name);
        this._dataSourceAdded.next(name);
        this._dataSourcesChanged.next([...this._dataSources.keys()]);
      }
      ds!.subject.next(data);
      this._dataSourceChanged.next(name);
    } catch (e) {
      log(`error setting datasource ${name}: ${e}`)
    } finally {
      unlock()
    }
  }
  async removeDataSource(name:string) {
    log(`removeDataSource: ${name}`);
    const unlock = await this._mutex.lock();
    try {
      let ds = this._dataSources.get(name)
      if (ds) {
        ds.subject.complete()
        ds.subject.unsubscribe()
        this._dataSources.delete(name);
        this._dataSourceRemoved.next(name);
        this._dataSourcesChanged.next([...this._dataSources.keys()]);
      }
    } finally {
      unlock()
    }
  }
  async upsertDerivation(key: string, sources: string[], fn: (data: any[]) => any) {
    log('upsertDerivation', key, sources, fn);
    const sourcesObs = await Promise.all(sources.map(s => this.getObservableDataSource(s)));
    log(`upsertDerivation: ${key} sources: ${sources}`)
    const unlock = await this._mutex.lock();
    try {
      let isNew = false
      let ds = this._derivations.get(key);
      let dds = {lastValue: undefined} as any as {lastValue: any, sharedObservable: Observable<any>, subject: Subject<any>};
      if (ds) {
        ds.sourceSubscription.unsubscribe()
      } else {
        isNew = true;
        const rs = this._dataSources.get(key)?.subject || new ReplaySubject(1);
        ds = {
          downStream: rs,
          downSteamSharedObservable: rs.pipe(tap(i => {
            log(`>>>>>>>>>>>> ${i}`);
            dds.lastValue = i;
            ds!.lastValue = i;
          }), shareReplay(1)),
          lastValue: undefined
        } as any as DerivationMeta
      }

      const pipeline = combineLatest(sourcesObs).pipe(mergeMap(i => from(fn(i)))); //.pipe(shareReplay(1));
      ds.sourceSubscription = pipeline.subscribe(v => ds!.downStream.next(v));
      ds.source = pipeline;

      if (isNew) {
        this._derivations.set(key, ds);
        dds.subject = ds.downStream
        dds.sharedObservable = ds.downSteamSharedObservable
        this._dataSources.set(key, dds);
        log(`upsertDerivation: ${key} - new datasource`)
        this._dataSourceAdded.next(key);
        this._dataSourcesChanged.next([...this._dataSources.keys()]);
      }
      this._dataSourceChanged.next(key);
      return ds.downSteamSharedObservable
    } finally {
      unlock()
    }
  }
  async removeDerivation(key: string) {
    log(`removeDerivation: ${key}`);
    const unlock = await this._mutex.lock();
    try {
      const ds = this._derivations.get(key);
      if (ds) {
        ds.sourceSubscription.unsubscribe();
        ds.downStream.complete()
        ds.downStream.unsubscribe()

        this._derivations.delete(key)
        this._dataSources.delete(key)

        this._dataSources.delete(key);
        this._dataSourceRemoved.next(key);
        this._dataSourcesChanged.next([...this._dataSources.keys()]);
      }
    } finally {
      unlock()
    }
  }
  async info() {
    const info = []
    for(const [key, value] of this._dataSources.entries()) {
      info.push({
        id: key,
        data: value.lastValue
      })
    }
    console.table(info)
  }
  async addBlobToData(id: string, file: Blob) {
    switch(file.type) {
      case 'application/json':
        try
        {
          const json = await blobToJson(file!);
          log(`adding json blob with ${id}`)
          await this.setDataSource(id, json);
        }
        catch (ex)
        {
          log(`error adding json blob ${id} ${ex}`)
        }
        break;
      default:
        log(`not transforming blob ${id} of type ${file.type} keeping it as blob.`)
        await this.setDataSource(id, file);
    }
  }
  private _setupDataSource(name: string) {
    let meta = this._dataSources.get(name)
    if (meta)
      return meta
    log(`Creating data placeholder ${name}`);

    const subject = new ReplaySubject<any>(1);
    const ds = {
      subject,
      sharedObservable: subject.pipe(tap(i => {
        log('setDataSource', name, i)
        ds!.lastValue = i;
      }), shareReplay(1)),
      lastValue: noValueYet
    }
    this._dataSources.set(name, ds);
    return ds
  }
}
interface DerivationMeta {
  source: Observable<any>
  sourceSubscription: Subscription
  lastValue: any
  downStream: ReplaySubject<any>
  downSteamSharedObservable: Observable<any>
}
export const dataBroker = new DataBroker();
