import { ReadableStream } from 'web-streams-polyfill/ponyfill'

import { config } from 'config'
import {
  StreamOnOpenError,
  StreamTimeoutError,
  StreamUnexpectedCloseError,
} from 'modules/ai/stream/errors'
import { StreamMessageTransformer } from 'modules/ai/stream/StreamMessageTransformer'
import { JsxGenerationInput, StreamMessageEvent } from 'modules/ai/stream/types'
import { ChatCompletionInput } from 'modules/api'
import {
  EventSourceMessage,
  EventStreamContentType,
  fetchEventSource,
  FetchEventSourceInit,
} from 'utils/fetchEventSource'

import { JsxPromptKey } from '../../api/generated/jsx-prompt-types'

export class AiStreamRetryableError extends Error {
  constructor(public timeout: number) {
    super()
  }
}

export type CancellableSseStream = {
  cancel: () => void
  stream: ReadableStream<StreamMessageEvent>
}

export function createJsxGenerationStream<K extends JsxPromptKey>(
  input: JsxGenerationInput<K>,
  opts: {
    streamId?: string
    // NOTE(jordan): the server endpoint for jsx generation
    // doesn't support stream resuming at the moment. leaving
    // these options in here until it does.
    streamResumingEnabled?: boolean
    timeout?: number | null
    retries?: number
    backoff?: number
  } = {}
): {
  cancel: () => void
  stream: ReadableStream<StreamMessageEvent>
} {
  const url = config.API_HOST + '/ai/v2/generation'
  const headers = {
    'Content-Type': 'application/json',
  }

  if (opts.streamId) {
    headers['gamma-chat-completion-stream-id'] = opts.streamId
  }

  const fetchOpts = {
    method: 'POST',
    headers,
    body: JSON.stringify(input),
  }

  return createSseStream(url, fetchOpts, opts)
}

export const createChatCompletionStream = (
  input: ChatCompletionInput,
  opts: {
    streamId?: string
    streamResumingEnabled?: boolean
    timeout?: number | null
    retries?: number
    backoff?: number
  } = {}
): {
  cancel: () => void
  stream: ReadableStream<StreamMessageEvent>
} => {
  const url = config.API_HOST + '/ai/chatCompletion'
  const headers = {
    'Content-Type': 'application/json',
  }
  if (opts.streamId) {
    headers['gamma-chat-completion-stream-id'] = opts.streamId
  }

  const fetchOpts = {
    method: 'POST',
    headers,
    body: JSON.stringify(input),
  }

  return createSseStream(url, fetchOpts, opts)
}

export const resumeChatCompletionStream = (opts: {
  streamId: string
  lastEventId: string
  interactionId: string
  timeout?: number | null
  retries?: number
  backoff?: number
}): {
  cancel: () => void
  stream: ReadableStream<StreamMessageEvent>
} => {
  const { streamId, lastEventId, interactionId, ...restOpts } = opts

  const url = new URL(config.API_HOST + `/ai/resumeChatCompletion`)
  url.searchParams.set('interactionId', interactionId)

  const fetchOpts = {
    method: 'GET',
    headers: {
      'gamma-chat-completion-stream-id': streamId,
      'last-event-id': lastEventId,
    },
  }

  return createSseStream(url.toString(), fetchOpts, {
    streamResumingEnabled: true,
    ...restOpts,
  })
}

export const createSseStream = (
  url: string,
  fetchOptions: FetchEventSourceInit,
  opts: {
    streamResumingEnabled?: boolean
    timeout?: number | null
    retries?: number
    backoff?: number
  }
): CancellableSseStream => {
  const retries = opts.retries || 0
  const backoff = opts.backoff || 1000
  const timeout = opts.timeout || null
  const streamResumingEnabled = opts.streamResumingEnabled ?? false

  let tryCount: number = 0
  const abortController = new AbortController()
  let hasRecievedData = false
  let hasSentCancelEvent = false
  let isCancelled = false

  const cancel = () => {
    if (!isCancelled) {
      isCancelled = true
    }
  }

  let timeoutId: ReturnType<typeof setTimeout> | null = null
  let hasTimedOut = false
  const getRetryDelay = () =>
    backoff *
    Math.pow(
      2,
      Math.max(
        // use tryCount - 1, first retry should just be backoff, ie backoff x 2^0
        tryCount - 1,
        0
      )
    )

  const resumeStreamHeaders = {}
  if (streamResumingEnabled) {
    resumeStreamHeaders['gamma-stream-resuming-enabled'] = 'true'
  }

  const requestStream = new ReadableStream<EventSourceMessage>({
    async start(controller) {
      if (timeout != null) {
        timeoutId = setTimeout(() => {
          hasTimedOut = true
          abortController.abort()
          controller.error(
            new StreamTimeoutError(`Stream timed out after ${timeout}ms`)
          )
        }, timeout)
      }

      const { headers, ...restFetchOptions } = fetchOptions
      fetchEventSource(url, {
        openWhenHidden: true,
        credentials: 'include',
        ...restFetchOptions,
        headers: {
          ...headers,
          ...resumeStreamHeaders,
        },
        signal: abortController.signal,
        onopen: async (response) => {
          if (
            response.ok &&
            response.headers.get('content-type') === EventStreamContentType
          ) {
            return // everything's good
          } else if (
            response.status >= 400 &&
            response.status < 500 &&
            response.status !== 429
          ) {
            // client-side errors are usually non-retriable:
            // these errors dont make it to `onerror` because the abortController
            // is cancelled
            abortController.abort()
            controller.error(
              new StreamOnOpenError('Unable to complete request')
            )
          } else if (tryCount++ < retries && !hasRecievedData) {
            // should retry
            throw new AiStreamRetryableError(getRetryDelay())
          } else {
            abortController.abort()
            controller.error(
              new StreamOnOpenError('Unable to complete request')
            )
          }
        },
        onmessage(ev) {
          if (isCancelled) {
            // wait until the next message, then send a cancel message
            // and abort the connection

            if (!hasSentCancelEvent) {
              hasSentCancelEvent = true
              console.log('[AIStream] enqueue cancel event', {
                ...ev,
                event: 'cancel',
                data: '',
              })
              controller.enqueue({
                ...ev,
                event: 'cancel',
                data: '',
              })
              controller.close()
              abortController.abort()
            }
            return
          }

          if (hasTimedOut) {
            return
          }
          controller.enqueue(ev)
          hasRecievedData = true
        },
        onclose() {
          if (timeoutId) {
            clearTimeout(timeoutId)
            timeoutId = null
          }
          controller.close()
        },
        onerror(err) {
          if (err instanceof AiStreamRetryableError) {
            // do nothing to automatically retry. You can also
            // return a specific retry interval here.
            if (isCancelled) {
              console.error('Stream cancelled - not retrying')
              throw new Error('Stream cancelled')
            }
            return err.timeout
          } else if (err instanceof Error) {
            console.error('Uncaught stream error', err)
            // all errors here should be from onmessage, check if the we
            // have retries available and retry

            if (!streamResumingEnabled) {
              // resume from stream is not enabled, dont try
              throw err
            }

            if (tryCount++ < retries) {
              // adhere to standard retry limits, resume from stream
              return getRetryDelay()
            } else {
              // otherwise we've exhausted our retries, throw the error
              throw err
            }
          } else if (hasRecievedData) {
            const msg = 'Stream closed unexpectedly'
            console.error('Uncaught stream error', msg)
            throw new StreamUnexpectedCloseError(msg)
          }
          return
        },
      }).catch((e) => {
        if (timeoutId) {
          clearTimeout(timeoutId)
          timeoutId = null
        }
        if (!isCancelled) {
          controller.error(e)
        }
      })
    },
  })
  return {
    stream: requestStream.pipeThrough(new StreamMessageTransformer()),
    cancel,
  }
}
