import Rx from 'rxjs/Rx';
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { ReplaySubject } from 'rxjs/ReplaySubject';

import PQueue from 'p-queue';
import result from 'lodash.result';
import MuxClient, { toObservable } from './client';
import DCBackend from './backends';

class Session {
  constructor(userInfos, options = {}) {
    if (!options.language) throw new Error('No language specified in options');
    const { language } = options;
    const defaultOptions = {
      maxRetry: 0,
      debug: false,
      maxRestart: 1,
      startOptions: {},
      flattenOutputs: true,
    };
    this.userInfos = userInfos;
    this.options = { ...defaultOptions, ...options };
    this.dcBackend = options.dcBackend || DCBackend(language);
    this.client =
      options.client ||
      new MuxClient(
        options.multiplexerUrl,
        options.retryOptions,
        options.multiplexerClientOptions
      );
    this.state = { status: options.status || Session.STATUS.NONE };
    this.busyCounter = 0;
    this.inputSubject = new ReplaySubject();
    this.output$ = new Subject();
    this.start$ = overrideOptions => {
      return this.newSession$({
        language: this.options.language,
        ...this.options.startOptions,
        ...overrideOptions,
      }).flatMap(() => {
        if (this.options.initCommand) {
          this.increment(this.options.initCommand);
          return Observable.concat(
            this.input$(this.options.initCommand),
            this.poll$()
          )
            .formatAndFilter(this.dcBackend)
            .take(1)
            .do(this.decrement.bind(this));
        }
        this.setStatus(Session.STATUS.READY);
        return Observable.empty();
      });
    };
    this.subscribers = new Set();
  }

  /* ========= MUX CALLS ========= */

  static getInfoFromErrorResponse(error) {
    return {
      statusCode: result(error, 'res.statusCode'),
      message: result(error, 'res.text'),
      error,
    };
  }

  newSession$(options) {
    const promiseCreator = signal => {
      this.log('Sending new request');
      this.dcBackend = DCBackend(options.language);
      this.busyCounter = 0;
      this.setStatus(Session.STATUS.BUSY);
      return this.client.requests(signal).new(options, this.userInfos);
    };
    return toObservable(promiseCreator).do(
      () => {},
      err => {
        this.setStatus(
          Session.STATUS.BROKEN,
          this.constructor.getInfoFromErrorResponse(err)
        );
      }
    );
  }

  input$(command) {
    const formattedCommand = this.dcBackend.formatRequestPayload(command);
    const promiseCreator = signal => {
      this.log(`Sending input request: ${formattedCommand}`);
      return this.client
        .requests(signal)
        .input({ command: formattedCommand, jwt: this.userInfos.jwt });
    };
    return toObservable(promiseCreator).do(
      () => {},
      err => {
        this.log(`Status: ${err.status} on input ${formattedCommand}`);
        this.setStatus(
          Session.STATUS.BROKEN,
          this.constructor.getInfoFromErrorResponse(err)
        );
      }
    );
  }

  read$() {
    const promiseCreator = signal => {
      this.log('Sending read request');
      return this.client.requests(signal).read();
    };
    return toObservable(promiseCreator).do(
      () => {},
      err => {
        this.log(`Status: ${err.status} on read`);
        this.setStatus(
          Session.STATUS.BROKEN,
          this.constructor.getInfoFromErrorResponse(err)
        );
      }
    );
  }

  flush$() {
    const promiseCreator = signal => {
      this.log('Sending flush request');
      return this.client.requests(signal).flush();
    };
    return toObservable(promiseCreator).do(
      () => {},
      err => {
        this.log(`Status: ${err.status} on flush`);
        this.setStatus(
          Session.STATUS.BROKEN,
          this.constructor.getInfoFromErrorResponse(err)
        );
      }
    );
  }

  poll$() {
    return this.read$().repeat();
  }

  deleteSession$() {
    const promiseCreator = signal => {
      this.log('Sending delete request');
      return this.client.requests(signal).delete();
    };
    return toObservable(promiseCreator).do(
      () => this.setStatus(Session.STATUS.NONE),
      () => this.setStatus(Session.STATUS.NONE)
    );
  }

  // ====== UTILS =======

  increment(command) {
    if (!Session.noIncrement.includes(command.command)) {
      this.busyCounter = this.busyCounter + 1;
      this.setStatus(Session.STATUS.BUSY);
    }
  }

  decrement(outputs) {
    if (
      outputs.filter(o => !Session.noDecrement.includes(o.type)).length > 0 ||
      outputs.length === 0
    ) {
      this.busyCounter = Math.max(this.busyCounter - 1, 0);
      if (this.busyCounter === 0) {
        this.setStatus(Session.STATUS.READY);
      }
    }
  }

  setStatus(status, info = {}) {
    const state = { status, ...info };
    if (status !== this.status) {
      this.log(`Session status changed from ${this.status} to ${status}`);
      this.setState(state);
    }
  }

  get status() {
    return this.state.status;
  }

  log(message) {
    if (this.options.debug) {
      console.log(message, this.client.sid); // eslint-disable-line no-console
    }
  }

  setState(newState) {
    this.state = { ...this.state, ...newState };
    this.subscribers.forEach(subscriber => subscriber(this.state));
  }

  subscribe(fn) {
    fn(this.state);
    this.subscribers.add(fn);
    return () => this.subscribers.delete(fn);
  }
}

Session.noIncrement = ['code_completion', 'view', 'auto_submit', 'logs'];
Session.noDecrement = ['code-completion'];

Observable.prototype.formatAndFilter = function formatAndFilter(dcBackend) {
  return this.map(response => {
    return dcBackend.formatResponseData(response.text);
  }).filter(outputs => outputs !== null);
};

Observable.prototype.retryAndRestart = function retryAndRestart(
  options,
  restarter
) {
  return this.retry(options.maxRetry).restart(options, restarter);
};

Observable.prototype.restart = function restart(options, restarter$) {
  return this.retryWhen(errors$ => {
    return errors$
      .scan((acc, error) => ({ nbTry: acc.nbTry + 1, error }), { nbTry: 0 })
      .concatMap(({ nbTry, error }) => {
        if (options.maxRestart >= 0 && nbTry > options.maxRestart) {
          return Observable.throw(
            new Session.TooManyRestart({
              message: `Reached ${options.maxRestart} retries, stopping.`,
              lastError: error,
            })
          );
        }
        // Restarting, this will trigger a retry as soon as the session is ready !
        return restarter$().take(1);
      });
  });
};

class SyncSession extends Session {
  constructor(userInfos, options = {}) {
    super(userInfos, options);
    this.queue = new PQueue({ concurrency: 1 });
    this.pollSubject = new Subject();
  }

  // ======== PUBLIC API =========

  async start(options, initCommand) {
    if (options) this.options.startOptions = options;
    if (initCommand) this.options.initCommand = initCommand;

    // Add the start to the queue so inputs will queue up after the start
    const startPromise = this.queue.add(() => this.start$().toPromise());

    if (this.options.keepAlive) {
      /* Keep Alive logic */
      this.queue.onIdle().then(() => {
        this.pollSubject.next(this.poll$());
      });
      this.keepAliveSubscription = this.pollSubject
        .switch()
        .subscribe(() => {});
    }

    return startPromise;
  }

  async reset() {
    return this.queue.add(() => {
      if (this.options.initCommand) {
        return this.input$(this.options.initCommand)
          .formatAndFilter(this.dcBackend)
          .retryAndRestart(this.options, () => {
            // Avoid restarting session when we killed it manually
            if (this.status === Session.STATUS.NONE) {
              return Observable.throw(
                new Error('No session running, please start one with start()')
              );
            }
            return this.start$({ force_new: true }).catch(() =>
              Observable.of(1)
            );
          })
          .toPromise();
      }
      return Promise.resolve();
    });
  }

  async input(command, options) {
    const extendedOptions = { ...this.options, ...options };

    const inputPromiseFactory = () => {
      if (this.status === Session.STATUS.NONE) {
        return Promise.reject(
          new Error('No session running, please start one with start()')
        );
      }
      this.increment(command);
      return Observable.concat(this.input$(command), this.poll$())
        .formatAndFilter(this.dcBackend)
        .take(1)
        .do(this.decrement.bind(this))
        .retryAndRestart(extendedOptions, () => {
          // Avoid restarting session when we killed it manually
          if (this.status === Session.STATUS.NONE) {
            return Observable.throw(
              new Error('No session running, please start one with start()')
            );
          }
          return this.start$({ force_new: true })
            .concat(Observable.of(1))
            .catch(() => Observable.of(1));
        })
        .do(output => {
          this.log(`Got output: ${JSON.stringify(output)}`);
        })
        .toPromise();
    };

    const prePendingCount = this.queue.pending;
    const inputPromise = this.queue.add(inputPromiseFactory);

    // If there was things already pending on the queue,
    // it means there is already a onIdle listener that will trigger the polling
    if (this.options.keepAlive && prePendingCount === 0) {
      this.queue.onIdle().then(() => {
        this.pollSubject.next(this.poll$());
      });
      // Stop the keepAlive polling
      // This will cancel any in-flight /read requests
      // However, notice how this is done *after* adding the input command to the queue
      // That means those requests will only be cancelled right *after* sending the /input request.
      // This may technically lead to problems, but we haven't seen any so far, so we're leaving this for now.
      // See: https://datacamp.atlassian.net/browse/MPE-4992
      this.pollSubject.next(Observable.never());
    }
    // Queue up the input
    return Promise.resolve(inputPromise);
  }

  async flush() {
    return this.queue.add(() => this.flush$().toPromise());
  }

  async stop({ stopRemoteSession = true } = {}) {
    if (this.keepAliveSubscription) {
      this.keepAliveSubscription.unsubscribe();
    }

    if (stopRemoteSession) {
      return this.deleteSession$().toPromise();
    }
    return Promise.resolve();
  }
}

class AsyncSession extends Session {
  constructor(userInfos, options = {}) {
    super(userInfos, options);
    this.subscriptions = [];
    this.startAsyncSubscription();
  }

  // ========= ASYNC =========

  startAsyncSubscription() {
    this.output$ = Observable.defer(() => this.start$())
      .concat(
        this.poll$()
          .merge(
            Observable.defer(() => this.inputSubject).mergeMap(input => {
              if (
                this.status === Session.STATUS.NONE ||
                this.status === Session.STATUS.BROKEN
              ) {
                return Observable.throw(
                  new Error(`Can't send input on a ${this.status} session`)
                );
              }
              this.increment(input);
              return this.input$(input);
            })
          )
          .formatAndFilter(this.dcBackend)
          .do(this.decrement.bind(this))
          .retry(this.options.maxRetry)
      )
      .mergeMap(outputs => {
        if (this.options.flattenOutputs) return Observable.from(outputs);
        return Observable.of(outputs);
      })
      .do(output => {
        this.log(`Got output: ${JSON.stringify(output)}`);
      })
      .catch(err => {
        this.log(err);
        return Observable.never();
      })
      .publish();
  }

  subscribeToOutputs(predicate, listener) {
    const subscription = this.output$
      .flatMap(output => {
        const filteredOutput = !this.options.flattenOutputs
          ? output.filter(outEl => predicate(outEl))
          : output;
        if (!this.options.flattenOutputs && filteredOutput.length > 0) {
          return Rx.Observable.of(filteredOutput);
        }
        if (predicate(filteredOutput)) {
          return Rx.Observable.of(filteredOutput);
        }
        return Rx.Observable.empty();
      })
      .subscribe({
        next: listener,
      });
    this.subscriptions.push(subscription);
    return subscription;
  }

  connect() {
    this.connection = this.output$.connect();
  }

  disconnect() {
    if (this.connection) {
      this.log('Disconnecting');
      this.connection.unsubscribe();
      this.setStatus(Session.STATUS.NONE);
      this.inputSubject = new ReplaySubject();
    }
  }

  unsubscribeAll() {
    this.subscriptions.forEach(s => {
      s.unsubscribe();
    });
    this.subscriptions = [];
  }

  // ======== PUBLIC API =========
  start(options, initCommand) {
    if (options) this.options.startOptions = options;
    if (initCommand) this.options.initCommand = initCommand;
    this.disconnect();
    this.connect();
  }

  reset() {
    if (this.options.initCommand) {
      this.inputSubject.next(this.options.initCommand);
    }
  }

  input(input) {
    this.inputSubject.next(input);
  }

  stopPolling() {
    this.disconnect();
  }

  stop() {
    this.disconnect();
    this.unsubscribeAll();
    return this.deleteSession$().toPromise();
  }
}

Object.assign(AsyncSession, Session);
Object.assign(SyncSession, Session);

class TooManyRestart extends Error {
  constructor({ message, lastError }) {
    super(message);
    this.lastError = lastError;
  }
}
Session.TooManyRestart = TooManyRestart;

Session.STATUS = {
  BROKEN: 'broken',
  READY: 'ready',
  NONE: 'none',
  BUSY: 'busy',
};

export { Session as default, SyncSession, AsyncSession };
