import retry from "async-retry";

import { useAtomValue, useSetAtom, useStore } from "jotai";
import { A, F } from "@mobily/ts-belt";
import { useEffect } from "react";
import { GPTStreamActivity } from "../actives";

import {
  StreamState,
  chatAtomFor,
  messageAtomLastActiveFor,
  streamAtomLastActiveFor,
  streamMapAtom,
  useSupabaseAtom,
} from "../atoms";
import { usePatchAtomFamily } from "../atoms/use-patch-atom-family";
import { selectChatHistoryById } from "../db";
import { TRefId, shortIdFor } from "../ids";
import { GPTStreamAbortEvent, UserJoinedBroadcast } from "../inngest/gpt-stream-types";
import { SetupAction } from "../chat-reducer";

const LOG_STREAM = true;
const LOG_RAW_STREAM = false;

function createLog(name: string, id: string) {
  if (!LOG_STREAM) return (() => {}) as typeof console.log;
  return (...args: Parameters<typeof console.log>) => console.log(name, shortIdFor(id), ...args);
}

export function useActivityStream(chatId: string) {
  const ChatAtom = () => chatAtomFor(chatId);
  const setStreamMapAtom = useSetAtom(streamMapAtom);
  // TODO: we formally required an anonymous client for streaming,
  //       but this works now … if it breaks revert
  const supabase = useSupabaseAtom("authenticated");

  // TODO: consider if we want to observe our given chatId …
  const streamAtom = usePatchAtomFamily(streamAtomLastActiveFor, chatId);
  const messageAtom = usePatchAtomFamily(messageAtomLastActiveFor, chatId);
  const activeEnvelope = useAtomValue(messageAtom);
  const shouldListen = activeEnvelope?.message?.mode === "started";

  const store = useStore();
  useEffect(() => {
    if (!supabase || !shouldListen) return;

    const ref = { mounted: true };
    // FIXME: must establish anon channel before auth, bug with supabase
    const $gptstream = GPTStreamActivity.willReceive(supabase, { aggregateId: chatId });

    const log$gptstream = createLog("$gptstream", chatId);
    if (LOG_RAW_STREAM) {
      $gptstream.__on("broadcast", { event: "*" }, ({ event, name, payload }) => {
        log$gptstream(name, "event received");
      });
    }

    function setStream(
      messageId: TRefId,
      update: (stream?: StreamState) => StreamState | undefined,
    ) {
      setStreamMapAtom((streams) => {
        let prev = streams[messageId];
        let next = update(prev);
        return prev === next ? streams : { ...streams, [messageId]: next };
      });
    }

    // we can kill the start signal when we migrate to activity driven stream/message component
    $gptstream.on("started", (message) => {
      log$gptstream(".on(started)", "received", message);
      if (!message.meta || !message.payload) {
        log$gptstream(".on(started) received invalid message", message);
        return;
      }
      const { payload, meta } = message;
      // request messages if they're not already loaded
      ensureEventPresent(meta.eventId);
      setStream(meta.replyId, (_stream) => ({
        messageId: meta.replyId,
        promptId: meta.eventId,
        streamId: meta.streamId,
        source: "system",
        status: "open",
        parts: [],
        stop,
      }));
    });

    $gptstream.on("joined", ({ payload, meta }) => {
      log$gptstream(".on(joined)", "received", payload, meta);
      setStream(meta.replyId, (state) => {
        log$gptstream("joined and reseting state");
        return {
          // TODO: fix
          ...state,
          status: "open",
          messageId: meta.replyId,
          promptId: meta.eventId,
          streamId: meta.streamId,
          source: state?.source!,
          parts: A.uniqBy(
            // TODO: see if there's an elegant reduction to avoid sending event envelopes
            payload.history.map((e) => ({ index: e.payload.index, content: e.payload.part })),
            (v) => v.index,
          ),
          stop,
        };
      });
    });

    $gptstream.on("streaming", ({ payload, meta }) => {
      log$gptstream(".on(streaming)", "received");
      ensureEventPresent(meta.eventId);
      setStream(meta.replyId, (state) => {
        const parts = state?.parts ?? [];
        const { part, index, ...stream } = payload;

        return {
          // TODO: fix
          status: state?.status!,
          messageId: state?.messageId!,
          promptId: state?.promptId!,
          streamId: state?.streamId!,
          source: state?.source!,
          ...state,
          ...stream,
          parts: A.sortBy([...parts, { content: part, index }], (v) => v.index),
          stop,
        };
      });
    });

    $gptstream.on("ended", ({ payload, meta }) => {
      log$gptstream(".on(ended)", "received", payload, meta);
      // TODO: this may fuck things up
      // ensureEventPresent(payload.recordId);
      setStream(meta.replyId, (state) => {
        return state != null && state.streamId === meta.streamId
          ? { ...state, status: "closed" }
          : state;
      });
    });

    // subscribe to the channel, and send join message
    const sendUserJoinedOnce = F.once(sendUserJoined);
    $gptstream.subscribe((status) => {
      log$gptstream(`subscribe(status: ${status})`);
      if (status === "SUBSCRIBED") {
        log$gptstream(
          "calling sendUserJoined()",
          "with status",
          status,
          "and with channel state",
          $gptstream.__channel.state,
        );
        sendUserJoinedOnce();
      }
    });

    function sendUserJoined() {
      log$gptstream("once(retry(send(user:joined)) with channel state", $gptstream.__channel.state);
      retry(
        async (bail) => {
          log$gptstream("send(user:joined) with channel state", $gptstream.__channel.state);
          if (!ref.mounted) {
            log$gptstream("send(user:joined) bailing, not mounted", $gptstream.__channel.state);
            return bail(undefined!);
          }
          log$gptstream("send(user:joined) retrying", $gptstream.__channel.state);
          const result = await $gptstream.__send({
            type: "broadcast",
            event: "user:joined",
            payload: {
              aggregate_id: chatId,
              user_id: supabase?.__user?.id ?? "__UKNONWN__",
              ts: Date.now(),
            } satisfies UserJoinedBroadcast,
          });
          if (result !== "ok") {
            log$gptstream(
              "send(user:joined) failed with status:",
              result,
              "— with channel state",
              $gptstream.__channel.state,
            );
            throw result;
          }
        },
        { minTimeout: 1000 },
      )
        .then(() => {
          log$gptstream("joined");
        })
        .catch(() => {});
    }

    // TODO: isolate from state — isolate $gptstream as
    function stop() {
      if (!ref.mounted) return console.log("Stream handler not mounted");
      // TODO: verify atom does not disappear due to component unmount with patched atom family
      const stream = store.get(streamAtom);
      if (!stream || stream.status === "closed")
        return console.warn("Cannot abort stream, none active for", stream);

      return $gptstream
        .__send({
          type: "broadcast",
          event: "gpt:stream:abort",
          payload: {
            promptId: stream.promptId,
            streamId: stream.streamId,
            source: "user",
          } satisfies GPTStreamAbortEvent,
        })
        .then((result) => {
          console.log("gptstream broadcast aborted", result);
        });
    }

    async function ensureEventPresent(eventId: string | number) {
      const { log } = store.get(ChatAtom());
      if (!log.find((event) => event.id === eventId)) {
        console.warn("fetching missing event", eventId);
        const events = await selectChatHistoryById(supabase!, chatId);
        if (events.error) throw events.error;
        else store.set(ChatAtom(), SetupAction(chatId, events.data));
      }
    }

    return () => {
      ref.mounted = false;
      $gptstream.unsubscribe();
    };
  }, [chatId, supabase, shouldListen]);
}

// // TODO: our effect should write all stream updates
// //       we no longer need to filter
// //       will have to reorganize to support this
// export const $gptStreamMapEffectFamily = atomFamily((chatId: string) => {
//   const ChatAtom = () => chatStateAtomFamily(chatId);

//   const $gptChatActivityStreamEffect = atomEffect((get, set) => {
//     console.log("MOUNTING GPT STREAM MAP EFFEECT ATOM");
//     console.log("MOUNTING GPT STREAM MAP EFFEECT ATOM");
//     console.log("MOUNTING GPT STREAM MAP EFFEECT ATOM");
//     console.log("MOUNTING GPT STREAM MAP EFFEECT ATOM");
//     console.log("MOUNTING GPT STREAM MAP EFFEECT ATOM");
//     function setStream(
//       messageId: TRefId,
//       update: (stream?: StreamState) => StreamState | undefined,
//     ) {
//       setStreamMapAtom( (streams) => {
//         let prev = streams[messageId];
//         let next = update(prev);
//         return prev === next ? streams : { ...streams, [messageId]: next };
//       });
//     }

//     const supabase = get(supabaseAtom("authenticated"));
//     const supabaseAnon = get(supabaseAtom("anonymous"))!;
//     const ref = { mounted: true };

//     // FIXME: must establish anon channel before auth, bug with supabase
//     const $gptstream = GPTStreamActivity.willReceive(supabaseAnon, { aggregateId: chatId });

//     const log$gptstream = createLog("$gptstream", chatId);
//     if (LOG_RAW_STREAM) {
//       $gptstream.__on("broadcast", { event: "*" }, ({ event, name, payload }) => {
//         log$gptstream(name, "event received");
//       });
//     }

//     // we can kill the start signal when we migrate to activity driven stream/message component
//     $gptstream.on("started", (message) => {
//       log$gptstream(".on(started)", "received", message);
//       if (!message.meta || !message.payload) {
//         log$gptstream(".on(started) received invalid message", message);
//         return;
//       }
//       const { payload, meta } = message;
//       // request messages if they're not already loaded
//       ensureEventPresent(meta.eventId);
//       setStream(meta.replyId, (_stream) => ({
//         messageId: meta.replyId,
//         promptId: meta.eventId,
//         streamId: meta.streamId,
//         source: "system",
//         status: "open",
//         parts: [],
//         stop,
//       }));
//     });

//     $gptstream.on("joined", ({ payload, meta }) => {
//       log$gptstream(".on(joined)", "received", payload, meta);
//       setStream(meta.replyId, (state) => {
//         log$gptstream("joined and reseting state");
//         return {
//           // TODO: fix
//           ...state,
//           status: "open",
//           messageId: meta.replyId,
//           promptId: meta.eventId,
//           streamId: meta.streamId,
//           source: state?.source!,
//           parts: uniqBy(
//             // TODO: see if there's an elegant reduction to avoid sending event envelopes
//             payload.history.map((e) => ({ index: e.payload.index, content: e.payload.part })),
//             "index",
//           ),
//           stop,
//         };
//       });
//     });

//     $gptstream.on("streaming", ({ payload, meta }) => {
//       log$gptstream(".on(streaming)", "received");
//       ensureEventPresent(meta.eventId);
//       setStream(meta.replyId, (state) => {
//         const parts = state?.parts ?? [];
//         const { part, index, ...stream } = payload;

//         return {
//           // TODO: fix
//           status: state?.status!,
//           messageId: state?.messageId!,
//           promptId: state?.promptId!,
//           streamId: state?.streamId!,
//           source: state?.source!,
//           ...state,
//           ...stream,
//           parts: sortBy([...parts, { content: part, index }], "index"),
//           stop,
//         };
//       });
//     });

//     $gptstream.on("ended", ({ payload, meta }) => {
//       log$gptstream(".on(ended)", "received", payload, meta);
//       // TODO: this may fuck things up
//       // ensureEventPresent(payload.recordId);
//       setStream(meta.replyId, (state) => {
//         return state != null && state.streamId === meta.streamId
//           ? { ...state, status: "closed" }
//           : state;
//       });
//     });

//     // subscribe to the channel, and send join message
//     const sendUserJoinedOnce = once(sendUserJoined);
//     $gptstream.subscribe((status) => {
//       log$gptstream(`subscribe(status: ${status})`);
//       if (status === "SUBSCRIBED") {
//         log$gptstream(
//           "calling sendUserJoined()",
//           "with status",
//           status,
//           "and with channel state",
//           $gptstream.__channel.state,
//         );
//         sendUserJoinedOnce();
//       }
//     });

//     return () => {
//       console.log("unmounting $gptstream");
//       console.log("unmounting $gptstream");
//       console.log("unmounting $gptstream");
//       console.log("unmounting $gptstream");
//       console.log("unmounting $gptstream");
//       ref.mounted = false;
//       $gptstream.unsubscribe();
//     };

//     function sendUserJoined() {
//       log$gptstream("once(retry(send(user:joined)) with channel state", $gptstream.__channel.state);
//       retry(
//         async (bail) => {
//           log$gptstream("send(user:joined) with channel state", $gptstream.__channel.state);
//           if (!ref.mounted) {
//             log$gptstream("send(user:joined) bailing, not mounted", $gptstream.__channel.state);
//             return bail(undefined!);
//           }
//           log$gptstream("send(user:joined) retrying", $gptstream.__channel.state);
//           const result = await $gptstream.__send({
//             type: "broadcast",
//             event: "user:joined",
//             payload: {
//               aggregate_id: chatId,
//               user_id: supabase?.__user?.id ?? "__UKNONWN__",
//               ts: Date.now(),
//             } satisfies UserJoinedBroadcast,
//           });
//           if (result !== "ok") {
//             log$gptstream(
//               "send(user:joined) failed with status:",
//               result,
//               "— with channel state",
//               $gptstream.__channel.state,
//             );
//             throw result;
//           }
//         },
//         { minTimeout: 1000 },
//       )
//         .then(() => {
//           log$gptstream("joined");
//         })
//         .catch(() => {});
//     }

//     // TODO: isolate from state — isolate $gptstream as
//     function stop() {
//       if (!ref.mounted) return console.log("Stream handler not mounted");

//       const stream = get.peek(currentChatLatestStreamAtom);
//       if (!stream || stream.status === "closed")
//         return console.warn("Cannot abort stream, none active for", stream);

//       return $gptstream
//         .__send({
//           type: "broadcast",
//           event: "gpt:stream:abort",
//           payload: {
//             promptId: stream.promptId,
//             streamId: stream.streamId,
//             source: "user",
//           } satisfies GPTStreamAbortEvent,
//         })
//         .then((result) => {
//           console.log("gptstream broadcast aborted", result);
//         });
//     }

//     async function ensureEventPresent(eventId: string | number) {
//       const { log } = get.peek(ChatAtom());
//       if (!log.find((event) => event.id === eventId)) {
//         console.warn("fetching missing event", eventId);
//         const events = await selectChatHistoryById(supabase!, chatId);
//         if (events.error) throw events.error;
//         else set(ChatAtom(), SetupAction(chatId, events.data));
//       }
//     }
//   });

//   return $gptChatActivityStreamEffect;
// });
