import React from "react";
import { Observable, Subject, asyncScheduler } from "rxjs";
import { debounceTime, tap, throttleTime } from "rxjs/operators";

/* useOnload is a wrapper for useEffect(condition, []), which 
   triggers only once during the component mount. It provides 
   an abstraction layer to help to enforce the practice of 
   avoiding useEffect for triggering data fetching 'directly'
   and makes it easier to locate the function responsible for loading.
 */

export function useOnload(callback: React.EffectCallback) {
  React.useEffect(() => {
    return callback();
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);
}

export function useListenWindowEvent(
  event: string,
  callback?: React.EventHandler<React.SyntheticEvent>,
  options?: boolean | AddEventListenerOptions
) {
  const callbackRef = React.useRef(callback);
  callbackRef.current = callback;

  React.useEffect(() => {
    const listener: EventListener = (e: any) => {
      callbackRef.current?.(e);
    };

    window.addEventListener(event, listener, options);

    return () => {
      window.removeEventListener(event, listener);
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);
}

/* useAbortable is a hook that manage an AbortController 
   instance. User could call the abort function to abort the
   fetch request and return a new signal for the next fetch request.
   When the component unmounts, the signal will be aborted automatically.

   Usage:
   - create a abortable fetch function
 */

export function useAbortable() {
  const controllerRef = React.useRef<AbortController>(new AbortController());

  const abort = React.useCallback(() => {
    controllerRef.current.abort();
    const newController = new AbortController();
    controllerRef.current = newController;
    return { newSignal: newController.signal };
  }, []);

  React.useEffect(() => {
    return () => {
      abort();
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);

  return {
    abort,
    signal: controllerRef.current.signal,
  };
}

type BackpressureEvent = {
  seqId: number;
  args: any[];
  resolve: any;
  reject: any;
};

class AbortError extends Error {
  constructor(message?: string) {
    super(message ?? "The operation was aborted.");
    this.name = "AbortError";
  }
}

export function isAbortError(e: any): boolean {
  try {
    return e.name === "AbortError";
  } catch {
    return false;
  }
}

/**
 * Create a wrapper of a async function that will be debounce
 * the value to the async function accroding to the BackpressureMode
 * used.
 *
 * @param func The async function that will be involved at the end of pipeline
 * @param backpressureMode You can't change the value once it is created
 * @returns
 */

export function useBackpressure<T>(
  func: (...args: any[]) => Promise<T>,
  backpressureMode: (
    source: Observable<BackpressureEvent>
  ) => Observable<BackpressureEvent>
) {
  const subjectRef = React.useRef<Subject<BackpressureEvent> | null>(null);
  subjectRef.current = subjectRef.current || new Subject();
  const counterRef = React.useRef(0);
  const funcRef = React.useRef(func);
  funcRef.current = func;
  const backpressureModeRef = React.useRef(backpressureMode);
  backpressureModeRef.current = backpressureMode;

  const queue = React.useRef<BackpressureEvent[]>([]);

  const createPipeline = React.useCallback(() => {
    const subject = new Subject<BackpressureEvent>();
    subjectRef.current = subject;
    queue.current = [];
    const enqueue = subject.pipe(tap(value => queue.current.push(value)));
    const pipe = backpressureModeRef.current(enqueue);

    pipe.subscribe({
      next: async (pending: BackpressureEvent) => {
        while (queue.current.length > 0) {
          const pendingEvent = queue.current.shift() as BackpressureEvent;
          if (pendingEvent.seqId === pending.seqId) {
            break;
          }
          pendingEvent.reject(new AbortError());
        }

        try {
          pending.resolve(await funcRef.current(...pending.args));
        } catch (e) {
          pending.reject(e);
        }
      },
    });
  }, []);
  const createPipelineRef = React.useRef(createPipeline);
  createPipelineRef.current = createPipeline;

  const onUnmount = React.useCallback(() => {
    // Abort all pending event on unmount automatically
    while (queue.current.length > 0) {
      const pendingEvent = queue.current.shift() as BackpressureEvent;
      pendingEvent.reject(new AbortError());
    }
    subjectRef.current?.complete();

    // Reset the subjectRef to null so that it could recreate
    // during HMR reload
    subjectRef.current = null;
    queue.current = [];
  }, []);

  useOnload(() => {
    createPipelineRef.current();
    return onUnmount;
  });

  const run = React.useCallback(async (...args: any[]) => {
    return new Promise<T>((resolve, reject) => {
      if (subjectRef.current == null) {
        // Sometimes the HMR reload may not trigger the useOnload in this fucntion
        // Re-create the pipeline if that is happen
        createPipelineRef.current();
      }
      subjectRef.current?.next({
        seqId: counterRef.current++,
        args,
        resolve,
        reject,
      });
    });
  }, []);

  return run;
}

export class BackpressureMode {
  /** bypass - Run the events immediately without any delay or filtering.
   *
   * This mode will process all events as they come, without any backpressure
   * mechanism. Use this when you want to handle all events without any
   * throttling or debouncing.
   *
   * It is designed for debugging purpose
   */
  static bypass() {
    return (source: Observable<BackpressureEvent>) => source;
  }

  /** debounce - Debounce the events for the next `time` milliseconds.
   *
   * During the period, if there are multiple events, only the last event
   * will be run.
   *
   * Examples
   * - Simulate slow network response by increasing the debounce time.
   */

  static debounce(milliseconds: number) {
    return (source: Observable<BackpressureEvent>) => {
      return source.pipe(debounceTime(milliseconds));
    };
  }

  /** throttleLatest - Run the first event immediately
   * and then debounce the rest of the events for the next `milliseconds`.
   * The latest event will be run after the `milliseconds`.
   */

  static throttleLatest(milliseconds: number) {
    return (source: Observable<BackpressureEvent>) => {
      return source.pipe(
        throttleTime(milliseconds, asyncScheduler, {
          leading: true,
          trailing: true,
        })
      );
    };
  }
}
