import { Inject, Injectable } from '@angular/core';
import { BehaviorSubject, Observable, retryWhen, Subject, Subscription, timer } from 'rxjs';
import { WebSocketSubject } from 'rxjs/internal/observable/dom/WebSocketSubject';
import { delay, retry, tap } from 'rxjs/operators';
import { APP_ENV_CONFIG } from '../index';

@Injectable({
	providedIn: 'root'
})
export class SocketService {
	socketDataSource = new BehaviorSubject<any>(null);
	public socket$: WebSocketSubject<any>;
	isConnected = new Subject<boolean>();
	socketBaseUrl: string = this.env.socketBaseUrl;
	sessionId: string;
	subscription: Subscription[] = [];

	constructor(@Inject(APP_ENV_CONFIG) private env: any) {
	}

	connectAsync(sessionId: string) {
		this.sessionId = sessionId;
		this.connect();
		return new Promise<any>((resolve) => {
			this.isConnected.subscribe(val => {
				console.log('Socket is connected!');
				// this.isConnected.next(true);
				resolve(val);
			}, error => {
				resolve(false);
			});
		});
	}

	createWebSocket(url: string) {
		return new Observable(observer => {
			try {
				this.socket$ = new WebSocketSubject({
					url,
					openObserver: {
						next: value => {
							this.isConnected.next(true);
						}
					}
				});
				this.subscription[this.sessionId] = this.socket$.asObservable()
					.subscribe(message => {
							observer.next(message);
							if (message?.action) {
								this.socketDataSource.next(message);
							}
						}, error => console.log(error),
						() => observer.complete());

				return () => {
					if (!this.subscription[this.sessionId].closed) {
						this.subscription[this.sessionId].unsubscribe();
					}
				};
			} catch (error) {
				return observer.error(error);
			}
		});
	}

	connect() {
		this.createWebSocket(`${this.socketBaseUrl}?sessionId=${this.sessionId}`)
			.pipe(retry({
				count: 10,
				delay: () => timer(5000) // Delay 5 seconds before retrying
			}))
			.subscribe({
				next: (data) => {
					// this.startHeartbeat();
				}, error: (err) => {
					console.error(err);
				}
			});

	}

	/**
	 * Method used for unsubscribe socket with specific socket id
	 */
	unsubscribe(sessionId: string) {
		if (this.subscription && this.subscription[sessionId] && !this.subscription[sessionId].closed) {
			this.subscription[sessionId].unsubscribe();
		}
	}
}
