import { createListenerMiddleware } from "@reduxjs/toolkit";
import { Observable } from "rxjs";
import SockJS from "sockjs-client";
import Stomp, { Client, Subscription } from "webstomp-client";
import { webSocketActions } from "../actions/webSocketActions";
import { setWidgetData } from "../slices/widgetDataSlice";

let stompClient: Client | null = null;
let subscriber: Subscription | null = null;
let connection: Promise<any>;
let connectedPromise: any = null;
let listener: Observable<any>;
let listenerObserver: any;
let alreadyConnectedOnce = false;
let connectionAttempts = 1;

const createConnection = (): Promise<any> =>
  new Promise((resolve) => (connectedPromise = resolve));

const createListener = (): Observable<any> =>
  new Observable((observer) => {
    listenerObserver = observer;
  });

const subscribe = (widgetId: string, dispatch: any) => {
  connection.then(() => {
    if (stompClient !== null) {
      subscriber = stompClient.subscribe(
        "/live-usage/widget/" + widgetId,
        (data) => {
          const parsedData = JSON.parse(data.body);

          if (listenerObserver && typeof listenerObserver.next === "function") {
            listenerObserver.next(parsedData);
          }

          dispatch(setWidgetData({ widgetId, data: parsedData }));
        }
      );
    }
  });
};

const connect = (widgetId: string) => {
  if (connectedPromise !== null || alreadyConnectedOnce) {
    return;
  }
  connection = createConnection();
  listener = createListener();

  const headers = { widgetId };
  let url = process.env.REACT_APP_BE_HOST + "ws";
  const authToken = localStorage.getItem("token");

  if (authToken) {
    url += "?access_token=" + authToken;
  }

  const socket = new SockJS(url);
  stompClient = Stomp.over(socket, { protocols: ["v12.stomp"] });

  connectionAttempts++;

  stompClient.connect(
    headers,
    () => {
      console.log("ws connected");
      connectedPromise("success");
      connectedPromise = null;
      alreadyConnectedOnce = true;
    },
    (error) => {
      console.log(error);
      connectedPromise = null;
      alreadyConnectedOnce = false;
      setTimeout(connect, 1000 * connectionAttempts);
    }
  );
};

const disconnect = () => {
  if (stompClient !== null) {
    if (stompClient.connected) {
      stompClient.disconnect();
    }
    stompClient = null;
  }
  alreadyConnectedOnce = false;
};

const receive = () => listener;

const unsubscribe = () => {
  if (subscriber !== null) {
    subscriber.unsubscribe();
  }
  listener = createListener();
};

const listenerMiddleware = createListenerMiddleware();

listenerMiddleware.startListening({
  actionCreator: webSocketActions,
  effect: async (action, listenerApi) => {
    listenerApi.cancelActiveListeners();

    const { widgetId } = action.payload;

    connect(widgetId);

    subscribe(widgetId, listenerApi.dispatch);
  },
});

export { connect, disconnect, receive, subscribe, unsubscribe };

export default listenerMiddleware;
