import { Observable, of, concat, Subject, throwError, TimeoutError } from 'rxjs';
import { catchError, delay, mergeMap, retryWhen, take, timeout } from 'rxjs/operators';

import { getWsRequestId } from '@app/util/utils';
import { ApiError, IError, NetError } from '../../error/types';

import { ApiResponseObserver, IApiNotificationObserver, IApiResponseObserver, IRequest, IResponse, ISubscription } from './api/types';
import { Logger } from '@app/core/net/ws/services/log/logger';
import { MessageRouter } from './message-router';
import { AbstractTransport, TransportState } from '@app/core/net/ws/ws-models';

/**
 * Интерфейс модели по хранению и управлению "задержанными на время" запросами.
 */
interface IDeferredRequest {
	/**
	 * Наблюдаемое поле с результатом выполнения запроса.
	 */
	observer: ApiResponseObserver;
	/**
	 * Отложенный запрос.
	 */
	request: IRequest;
}

/**
 * Типы сервисов использующие маршрутизируемый транспорт для передачи сообщений:
 * - {@link Storage} - сервис для локального хранения данных
 * - {@link Printer} - сервис для работы с принтером
 * - {@link Scanner} - сервис для работы со сканероим
 * - {@link Communication} - сервис для работы с коммуникационными устройствами
 * - {@link IPC} - сервис для организации взаимодействия между различными приложениями
 */
export enum RoutedTransportType {
	Storage			= 'Storage',
	Printer			= 'Printer',
	Scanner			= 'Scanner',
	Communication	= 'Communication',
	IPC				= 'IPC'
}

/**
 * Модель маршрутизируемого сетевого транспорта посредством протокола Websocket (механизма для взаимодействия приложения и WS сервисов).
 * Под "маршрутизируемый" подразумевается что отправитель запроса ожидает (наблюдает за результатом запроса)
 * ответ который доставляется ему на основании идентификатора запроса.
 */
export class RoutedTransport extends AbstractTransport {

	// -----------------------------
	//  Private properties
	// -----------------------------
	/**
	 * Отложенные запросы.
	 * @private
	 */
	private readonly deferredRequests: Map<string, IDeferredRequest> = new Map();

	/**
	 * Маршрутизатор сообщений.
	 * @private
	 */
	private readonly router: MessageRouter = new MessageRouter();

	/**
	 * Задержка перед повторной отправкой запроса по-умолчанию.
	 * @private
	 */
	private readonly defaultRetryDelay = 1000;

	// -----------------------------
	//  Public functions
	// -----------------------------

	/**
	 * Конструктор транспорта.
	 *
	 * @param {RoutedTransportType} type Тип транспорта.
	 */
	constructor(type: RoutedTransportType) {
		super(type);

		this.stateMonitor({
			ready: () => {
				this.sendDeferred();
				// this.heartBeat();
			},
			error: () => {
				Logger.Log.e(this.Tag, `transport in error state`)
					.console();
				// if (this.heartbeat) {
				//   this.heartbeat.unsubscribe();
				// }
			}
		});
	}

	/**
	 * Подписка на события "нотификаций" со стороны сервисов.
	 *
	 * @param {string} event Тип нотификации.
	 * @param {IApiNotificationObserver} observer Наблюдатель за событиями "нотификаций".
	 * @returns {ISubscription} Подписка.
	 */
	listenNotify(event: string, observer: IApiNotificationObserver): ISubscription {
		return this.router.addNotification(event, observer);
	}

	/**
	 * Отправка сообщения сервисам согласно спецификации обмена данными между приложением и сервисами.
	 *
	 * @param {IRequest} apiRequest Тело сообщения
	 * @param {IApiResponseObserver} apiObserver Наблюдатель за ходом выполнения запроса.
	 * @param {number} apiTimeout Таймаут на выполнения запроса (в мс).
	 * @param {number} apiRetryCount Количество повторов запроса до момента генерации ошибки.
	 */
	sendApiRequest(
		apiRequest: IRequest,
		apiObserver?: IApiResponseObserver,
		apiTimeout?: number,
		apiRetryCount?: number
	): void {
		apiRequest.requestId = getWsRequestId();
		if (apiObserver) {
			let observable: Observable<IResponse> = new Observable(subscriber => {
				const observer = (subscriber as ApiResponseObserver);
				observer.timestamp = new Date().getTime();
				this.sendRequest(apiRequest, observer);
			});

			if (apiTimeout) {
				observable = observable.pipe(timeout(apiTimeout));
				if (apiRetryCount && apiRetryCount - 1 > 0) {
					observable = observable.pipe(
						retryWhen(errors => errors.pipe(
							mergeMap(error => {
								if (error instanceof ApiError) {
									Logger.Log.d(this.Tag, 'api error, code: %s, desc: %s', error.code, error.message)
										.console();

									return throwError(error);
								} else if (error instanceof NetError) {
									Logger.Log.d(this.Tag, 'net error, desc: %s', error.message)
										.console();

									return throwError(error);
								} else if (error instanceof TimeoutError) {
									Logger.Log.d(this.Tag, 'timeout error')
										.console();
								} else {
									Logger.Log.e(this.Tag, `undefined error!, message: ${(error as IError).message}`)
										.console();
								}
								this.cleanUpRequest(apiRequest.requestId);

								return of(error);
							}),
							delay(this.defaultRetryDelay),
							take(apiRetryCount - 1),
							ob => concat(ob, throwError(new NetError('timeout after all retries', undefined, undefined)))
						)));
				} else {
					observable = observable.pipe(
						catchError(error => {
								if (error instanceof TimeoutError) {
									Logger.Log.e(this.Tag, 'timeout error')
										.console();

									return throwError(new NetError('timeout', undefined, undefined));
								}

								if (error instanceof ApiError) {
									Logger.Log.d(this.Tag, 'api error, code: %s, desc: %s', error.code, error.message)
										.console();
								} else if (error instanceof NetError) {
									Logger.Log.d(this.Tag, 'net error, desc: %s', error.message)
										.console();
								} else {
									Logger.Log.e(this.Tag, `undefined error!, message: ${(error as IError).message}`)
										.console();
								}

								return throwError(error);
							}
						)
					);
				}
			}

			observable.subscribe({
				next: value => {
					this.cleanUpRequest(apiRequest.requestId);
					apiObserver.onResult(value);
				},
				error: err => {
					this.cleanUpRequest(apiRequest.requestId);
					apiObserver.onError(err);
				},
				complete: () => {
					this.cleanUpRequest(apiRequest.requestId);
				}
			});
		} else {
			this.sendRequest(apiRequest, null);
		}
	}

	// -----------------------------
	//  AbstractTransport
	// -----------------------------
	/**
	 * Функция создания соединения
	 * @param url - адрес сервера
	 * @param retryTimeout - время ожидания перед повторным подключением
	 * @protected
	 */
	protected create(url: string, retryTimeout: number): void {
		this.url = url;
		this.retryTimeout = retryTimeout * 1000;

		this.stateMonitor(this.router);
		this.open();
	}

	/**
	 * Функция открытия соединения
	 * @protected
	 */
	protected open(): void {
		Logger.Log.i(this.Tag, `try to open new connection: ${this.url}`)
			.console();

		this.wsObserver$$ = new Subject();
		this.wsObserver$$.subscribe({
			next: value => this.router.routeMessage(value.data),
			error: err => {
				if (this.isStateChange) {
					this.isStateChange = false;
					this.eventsListeners.next({type: TransportState.ERROR, event: err});
				}
			},
			complete: () => {
				if (this.isStateChange) {
					this.isStateChange = false;
					this.eventsListeners.next({type: TransportState.ERROR, event: new Event('close')});
				}
			}
		});

		this.ws = new WebSocket(this.url);
		this.ws.onmessage = (e: MessageEvent) => this.onWSMessageHandler(e);
		this.ws.onerror = (e: Event) => this.onWSErrorHandler(e);
		this.ws.onclose = (e: CloseEvent) => this.onWSCloseHandler(e);
		this.ws.onopen = (e: Event) => this.onWSOpenHandler(e);
	}

	// -----------------------------
	//  Private functions
	// -----------------------------

	/**
	 * Процедура фактической (сетевого взаимодействия) отправления запроса.
	 *
	 * @param {IRequest} request тело запроса
	 * @param {ApiResponseObserver} observer наблюдатель за ходом выполнения
	 */
	private sendRequest(request: IRequest, observer?: ApiResponseObserver): void {
		if (this.ws && this.ws.readyState === WebSocket.OPEN) {
			const apiMessage = {request};
			let message: string;
			try {
				message = JSON.stringify(apiMessage);
			} catch (e) {
				return observer.error(new NetError((e as Error).message, undefined, undefined));
			}

			this.ws.send(message);

			if (observer) {
				this.router.addRequest(request, observer);
			}
		} else {
			this.deferredRequests.set(request.requestId, {
				request,
				observer
			});
		}
	}

	/**
	 * Отправить все отложенные запросы
	 * @private
	 */
	private sendDeferred(): void {
		this.deferredRequests.forEach((deferredRequest: IDeferredRequest) => {
			Logger.Log.d(this.Tag, 'send deferred')
				.console();
			this.sendRequest(deferredRequest.request, deferredRequest.observer);
		});
		this.deferredRequests.clear();
	}

	/**
	 * Удалить запрос из списка отложенных
	 * @param requestId Идентификатор запроса
	 * @private
	 */
	private cleanUpRequest(requestId: string): void {
		this.deferredRequests.delete(requestId);
		this.router.delApiResponseObserver(requestId);
	}
}
