import fetch from 'cross-fetch';
import throttle from 'lodash/throttle';
import qs from 'qs';
import io from 'socket.io-client';

import type {
  JsonRpcMessage,
  JsonRpcNotification,
  JsonRpcRequest,
  JsonRpcResponse,
  JsonRpcSuccessResponse,
} from './jsonRpc';
import {
  createNotificationMessage,
  createRequestMessage,
  onMessage,
} from './jsonRpc';
import type {
  IMultiplexerUser,
  IRetryOptions,
  ISessionOptions,
  ISessionStatus,
  IStartOptions,
  NotificationDecoders,
  RemoteNotification,
  RemoteSession,
  RemoteSessionCommand,
  RemoteSessionOutput,
  ResponseDecoders,
} from './types';

const sleep = (ms: number) =>
  new Promise((resolve) => {
    setTimeout(resolve, ms);
  });

const getSession = async ({
  abortSignal,
  multiplexerServicesUrl,
  retryOptions,
  startOptions,
  user,
}: {
  abortSignal: AbortSignal;
  multiplexerServicesUrl: string;
  retryOptions?: IRetryOptions;
  startOptions?: IStartOptions;
  user: IMultiplexerUser;
}): Promise<string> => {
  const requestSession = async () => {
    const response = await fetch(
      `${multiplexerServicesUrl}/new?${qs.stringify(startOptions)}`,
      {
        body: JSON.stringify(user),
        credentials: 'include',
        headers: {
          Accept: 'application/json',
        },
        method: 'POST',
        signal: abortSignal,
      },
    );
    if (!response.ok) {
      const message = await response.text();
      if (response.status === 403 && message === 'Blocked') {
        throw new Error('Blocked');
      }
      throw new Error('Request was not ok');
    }

    const { id: sessionId } = await response.json();
    return sessionId;
  };

  let nbRetries = 0;
  const requestSessionWithRetry = async (): Promise<string> => {
    try {
      return await requestSession();
    } catch (error) {
      if (
        abortSignal.aborted ||
        (error instanceof Error && error.message === 'Blocked')
      ) {
        throw error;
      }

      if (nbRetries < (retryOptions?.nbOfRetry ?? 0)) {
        nbRetries++;
        await sleep(retryOptions?.delay ?? 0);
        return requestSessionWithRetry();
      }

      throw error;
    }
  };

  return requestSessionWithRetry();
};

// eslint-disable-next-line @datacamp/workspace/smart-todo
// TODO add return type once the socket.io library has been upgraded
// See https://socket.io/docs/v4/typescript/ and the multiplexer codebase (sidecar)
const createMultiplexerIoSocket = (
  multiplexerSessionsUrl: string,
  sessionId: string,
) => {
  return io(multiplexerSessionsUrl, {
    autoConnect: true,
    query: {
      multiplexerSessionId: sessionId,
    },
  });
};

type ResponseCallback = (response: JsonRpcResponse) => void;

type NotificationCallback = (notification: JsonRpcNotification) => void;

type RpcSocket = ReturnType<typeof attachToMultiplexerRpcSocket>;

const attachToMultiplexerRpcSocket = (
  multiplexerSessionsUrl: string,
  sessionId: string,
  broadcastStatusChange: (status: ISessionStatus) => void,
  onResponse: ResponseCallback,
  onNotification: NotificationCallback,
  responseDecoders: ResponseDecoders,
  notificationDecoders: NotificationDecoders,
) => {
  let status: ISessionStatus = { status: 'none' };
  const setStatus = (newStatus: ISessionStatus) => {
    if (status.status === 'broken') {
      return;
    }

    status = newStatus;
    broadcastStatusChange(newStatus);
  };

  const responseCallbacks: Record<string, ResponseCallback> = {};

  /* eslint-disable @typescript-eslint/no-empty-function */
  const onMessageFromServer = onMessage({
    // Empty because we don't yet support batch requests
    onBatchMessage: (_message) => {},
    // Empty because we don't yet support notifications/requests from course-container to frontend
    onNotification: (notification) => {
      // @ts-expect-error notificationDecoders is indexable
      const decoder = notificationDecoders[notification.method];

      if (decoder != null) {
        const decodedNotification = {
          ...notification,
          params: decoder(notification.params),
        };

        onNotification(decodedNotification);
        return;
      }

      onNotification(notification);
    },
    onRequest: (_request) => {},
    onResponse: (response) => {
      onResponse(response);

      const callback = responseCallbacks[response.id];
      delete responseCallbacks[response.id];

      if (callback != null) {
        callback(response);
      }
    },
  });
  /* eslint-enable @typescript-eslint/no-empty-function */

  const socket = createMultiplexerIoSocket(multiplexerSessionsUrl, sessionId);
  const close = () => {
    socket.disconnect();
  };

  socket.on('json-rpc', onMessageFromServer);
  const sendMessage = (message: JsonRpcMessage) => {
    socket.emit('json-rpc', message);
  };

  socket.on('status', (reason: string) => {
    setStatus({ message: reason, status: 'broken' });
    close();
  });

  const request = (
    req: JsonRpcRequest,
    onResponseToRequest: ResponseCallback,
  ) => {
    responseCallbacks[req.id] = onResponseToRequest;
    sendMessage(req);
  };

  return {
    close,
    notify: (method: string, params: JsonRpcRequest['params']) => {
      sendMessage(createNotificationMessage(method, params));
    },
    request: async (method: string, params: JsonRpcRequest['params']) => {
      setStatus({ status: 'busy' });

      const result = await new Promise<JsonRpcSuccessResponse>(
        (resolve, reject) => {
          request(createRequestMessage(method, params), (response) => {
            if ('error' in response) {
              reject(response);
            } else {
              resolve(response);
            }
          });
        },
      );

      // @ts-expect-error responseDecoders is indexable
      const decoder = responseDecoders[method];

      setStatus({ status: 'ready' });

      if (decoder != null) {
        return {
          ...result,
          result: decoder(result.result),
        };
      }
      return result;
    },
    stopSession: () => {
      socket.emit('stop');
      close();
    },
  };
};

const proxyFromRpcSocket = (
  socket: ReturnType<typeof attachToMultiplexerRpcSocket>,
): RemoteSession => {
  return new Proxy(
    {},
    {
      get: (_target, method, _receiver) => {
        if (typeof method !== 'string') {
          throw new Error('The method is not a string');
        }

        return (params: JsonRpcRequest['params']) =>
          socket.request(method, params).then((response) => response.result);
      },
    },
  );
};

type Option<T> = { type: 'none' } | { type: 'some'; value: T };
type Subscriber<T> = (value: T) => void;
class Emitter<T> {
  private subscribers = new Set<Subscriber<T>>();

  public emit(state: T) {
    this.subscribers.forEach((subscriber) => subscriber(state));
  }

  public subscribe(subscriber: Subscriber<T>) {
    this.subscribers.add(subscriber);
  }

  public unsubscribe(subscriber: Subscriber<T>) {
    this.subscribers.delete(subscriber);
  }

  public unsubscribeAll() {
    this.subscribers.clear();
  }
}
class ReplayEmitter<T> extends Emitter<T> {
  private latestValue: Option<T> = { type: 'none' };

  public override emit(value: T) {
    super.emit(value);
    this.latestValue = { type: 'some', value };
  }

  public override subscribe(subscriber: Subscriber<T>) {
    super.subscribe(subscriber);
    if (this.latestValue.type === 'some') {
      subscriber(this.latestValue.value);
    }
  }

  public get() {
    if (this.latestValue.type === 'none') {
      return null;
    }

    return this.latestValue.value;
  }
}

const ONE_SECOND = 1000;
const ONE_MINUTE = 60 * ONE_SECOND;
export class JsonRpcSession {
  public readonly status;

  public readonly outputs;

  public readonly notifications;

  public remoteSession: RemoteSession | null = null;

  private finalizers = new Emitter<void>();

  private rpcSocket: RpcSocket | null = null;

  private responseDecoders: ResponseDecoders;

  private notificationDecoders: NotificationDecoders;

  constructor(private readonly options: ISessionOptions) {
    this.status = new ReplayEmitter<ISessionStatus>();
    this.finalizers.subscribe(() => {
      this.status.emit({ status: 'none' });
      this.status.unsubscribeAll();
    });

    this.outputs = new Emitter<JsonRpcResponse<RemoteSessionOutput>>();
    this.finalizers.subscribe(() => {
      this.outputs.unsubscribeAll();
    });

    this.notifications = new Emitter<JsonRpcNotification<RemoteNotification>>();
    this.finalizers.subscribe(() => {
      this.notifications.unsubscribeAll();
    });

    this.responseDecoders = options.responseDecoders;
    this.notificationDecoders = options.notificationDecoders;
  }

  async start(startOptions?: IStartOptions) {
    this.status.emit({ status: 'none' });

    // https://developer.mozilla.org/en-US/docs/Web/API/AbortController
    const abortController = new AbortController();
    this.finalizers.subscribe(() => {
      abortController.abort();
    });

    const baseStartOptions = this.options.startOptions;
    const mergedStartOptions =
      baseStartOptions == null && startOptions == null
        ? undefined
        : ({
            ...baseStartOptions,
            ...startOptions,
          } as IStartOptions);

    let sessionId;
    try {
      sessionId = await getSession({
        ...this.options,
        abortSignal: abortController.signal,
        startOptions: mergedStartOptions,
      });
    } catch (error) {
      if (abortController.signal.aborted) {
        this.status.emit({
          message: 'abortedRequestedToGetSession',
          status: 'broken',
        });
        return null;
      }

      if (
        error instanceof Error &&
        error.message === 'Network request failed'
      ) {
        this.status.emit({
          message: 'networkFailureToGetSession',
          status: 'broken',
        });
        return null;
      }

      if (error instanceof Error && error.message === 'Blocked') {
        this.status.emit({
          message: 'blockedFromGetSession',
          status: 'broken',
        });
        return null;
      }

      this.status.emit({ message: 'failedToGetSession', status: 'broken' });
      throw error;
    }

    this.rpcSocket = attachToMultiplexerRpcSocket(
      this.options.multiplexerSessionsUrl,
      sessionId,
      (newStatus) => this.status.emit(newStatus),
      (response) =>
        this.outputs.emit(response as JsonRpcResponse<RemoteSessionOutput>),
      (notification) =>
        this.notifications.emit(
          notification as JsonRpcNotification<RemoteNotification>,
        ),
      this.responseDecoders,
      this.notificationDecoders,
    );
    const remoteSession = proxyFromRpcSocket(this.rpcSocket);
    this.remoteSession = remoteSession;
    this.finalizers.subscribe(() => {
      this.rpcSocket?.close();
      this.rpcSocket = null;

      this.remoteSession = null;
    });

    if (this.options.initCommand != null) {
      const { method: initMethod, params: initParams } =
        this.options.initCommand;
      void this.rpcSocket.request(initMethod, initParams);
    }

    if (this.options.sendFakeRequestsToKeepAlive ?? false) {
      const keepAliveIntervalId = setInterval(() => {
        this.sendFakeKeepAliveCommand();
      }, 5 * ONE_MINUTE);

      this.finalizers.subscribe(() => {
        clearInterval(keepAliveIntervalId);
      });
    }

    // Returning a Proxy that overwrites 'get' from an async function gets a bit
    // messy, as internally '.then' is called on the return value, which is
    // implemented because every method is implemented on the Proxy. Work around
    // is to wrap it in an object.
    return { remoteSession, sessionId };
  }

  private sendFakeKeepAliveCommandImmediate = () => {
    this.rpcSocket?.notify('keep_alive', undefined);
  };

  private sendFakeKeepAliveCommandThrottled = throttle(
    this.sendFakeKeepAliveCommandImmediate,
    ONE_MINUTE,
  );

  sendFakeKeepAliveCommand = ({ force = false }: { force?: boolean } = {}) => {
    if (force) {
      this.sendFakeKeepAliveCommandImmediate();
    } else {
      this.sendFakeKeepAliveCommandThrottled();
    }
  };

  input(command: RemoteSessionCommand) {
    // @ts-expect-error There are no methods declared in RemoteSession so this doesn't type-check
    return this.rpcSocket?.request(command.method, command.params);
  }

  notify(command: RemoteSessionCommand) {
    // @ts-expect-error There are no methods declared in RemoteSession so this doesn't type-check
    return this.rpcSocket?.notify(command.method, command.params);
  }

  closeConnection() {
    this.finalizers.emit();
    this.finalizers.unsubscribeAll();
  }

  stop() {
    this.rpcSocket?.stopSession();

    this.closeConnection();
  }
}
