import { RealtimeChannelSendResponse } from "@supabase/supabase-js";
import retry from "async-retry";

import { SupabaseAdminClient } from "~/lib/supabase/admin";
import {
  TActivityStreamEvent,
  TActivityStreamMeta,
  TEndedEvent,
  TJoinedEvent,
  TStartedEvent,
} from "./active-event-types";

export function createStream<
  Topic extends string,
  ActiveEvent extends TActivityStreamEvent = TActivityStreamEvent,
  Meta extends TActivityStreamMeta = TActivityStreamMeta,
>(supabase: SupabaseAdminClient, topic: Topic, meta: Meta) {
  let events: ActiveEvent[] = [];

  const ref = {
    messageId: meta.replyId,
    streamId: meta.streamId,
    eventId: meta.eventId,
    // TODO: should this be idle when not open or closed? Should this be an activity with idle, active, ended?
    status: "open",
  };

  // TODO: we collected user_ids to support channel mechanics
  const refreshed = [] as { user_id: string }[]; // user id of those refreshed
  let subscribed: boolean;
  let aborted = false;

  const channel = supabase
    .channel(`honest-input/${meta.aggregateId}`, {
      config: { broadcast: { ack: true } },
    })
    .on("broadcast", { event: "user:joined" }, ({ payload }) => {
      console.log("user:joined", payload);
      refreshed.push(payload);
    })
    .on("broadcast", { event: `${topic}:abort` }, ({ payload }) => {
      aborted = true;
    });

  function broadcast<
    // TODO: invert to "name" and Extract<EventUnion, { name: Name }>?
    T extends
      | TStartedEvent
      | TEndedEvent
      | TJoinedEvent<ActiveEvent>
      | ActiveEvent extends TActivityStreamEvent
      ? ActiveEvent
      : never,
  >(name: T["name"], payload: T["payload"]) {
    if (!subscribed) throw new Error("Channel not subscribeddddddd");

    // TODO: type checking
    return retry(
      async (_) => {
        const response = await channel.send({
          // the received event
          name,
          payload,
          meta,
          // treated as supabase internals
          type: "broadcast",
          event: `${topic}:${name}`,
        });
        if (response !== "ok") {
          throw response;
        } else return response;
      },
      { minTimeout: 47, maxTimeout: 127 },
    );
  }

  return {
    get id() {
      return ref.streamId;
    },

    get status() {
      return ref.status;
    },

    get aborted() {
      return aborted;
    },

    async start() {
      subscribed = subscribed
        ? subscribed
        : await new Promise((resolve, reject) =>
            // TODO this callback is continuously called, not just once
            channel.subscribe((status, err) => {
              if (status === "SUBSCRIBED") {
                console.info(`${status} ${topic} ${ref.eventId}`);
                resolve(true);
              } else {
                console.info(`${status} ${topic} ${ref.eventId}`);
                reject({ status, err });
              }
            }),
          );

      try {
        ref.status = "open";
        await broadcast("started", {
          eventId: ref.eventId,
          streamId: ref.streamId,
        });
      } catch (error) {
        ref.status = "closed";
        console.error("Failed to open stream", error);
        throw error;
      }
    },

    async refresh() {
      if (refreshed.length === 0) return "ok" as const;

      console.log("refreshing", refreshed.length, "users");
      // remove from waitlist
      const targets = refreshed.splice(0, refreshed.length);

      let result: RealtimeChannelSendResponse = "ok";
      // TODO: we should be able to make this a single call once we verify ACK on frontend
      for (const target of targets) {
        try {
          let status = await broadcast("joined", {
            eventId: ref.eventId,
            streamId: ref.streamId,
            history: events,
          });
          if (status !== "ok") {
            result = status;
            throw status;
          }
        } catch (result) {
          console.warn("Failed to refresh stream", result);
          // re-add to waitlist
          refreshed.splice(refreshed.length, 0, target);
          console.log(refreshed.length, "users remaining");
          // throw for unknown errors
          if (typeof result !== "string") throw result;
        }
      }

      return result;
    },

    broadcast<T extends ActiveEvent>(name: T["name"], payload: T["payload"]) {
      events.push({ name, payload } as ActiveEvent);
      return broadcast(name, payload);
    },

    async end() {
      if (subscribed) {
        ref.status = "closed";
        await broadcast("ended", {
          eventId: ref.eventId,
          streamId: ref.streamId,
        });

        subscribed = false;
        await channel.unsubscribe();

        supabase.removeChannel(channel);
      }
    },
  };
}
