/** @format */

import * as Sentry from '@sentry/browser';
import Session from './Session';
import ReconnectingWebSocket from 'reconnecting-websocket';
import { toCamelCaseAll, toSnakeCaseAll, toCamelCase } from './camelSnake';
import { v4 } from 'uuid';
import download from 'downloadjs'; // used to save stuff as files
import config from '../config';

//how long to wait for the websocket to connect before we assume it won't
const WEBSOCKET_GRACE_PERIOD = config.timings.websocketGracePeriod;
//a constant we derive various values from
const WEBSOCKET_CONNECTION_CONST = config.timings.websocketConnectionConst; //we have to be aggressive since the backend is :simoji:
//how often to check for heartbeat
const HEARTBEAT_INTERVAL = config.timings.heartbeatInterval;
//how much to debounce a forced interval request if asked
const FORCED_INTERVAL_DEBOUNCE = config.timings.forcedIntervalDebounce;

//minimum reconnection delay (this also delays the initial connection for some reason )
const MIN_RECONNECT_DELAY = config.timings.minReconnectDelay; //ms

//a global thing that we use for monitoring purposes
let _DATASTREAMS = {};

/**
 * This class gives you a generic way of interacting with your server-side data sources. It supports one-of requests, intervaled requests and websockets (via the JS implementation, not the native one).
 */
export default class DataStream {
  params = {
    method: 'GET',
    ws: false,
    interval: 0,
    batch: 0,
    message: 'data',
    errorMessage: 'dataError',
    wsConnectionFailureMessage: '',
    wsConnectionRecoveryMessage: '',
    wsConnectionSuccessMessage: '',
    wsHeartbeatMissedMessage: '',
    wsHeartbeatRecoveryMessage: '',
    body: {},
    rawBody: false,
    extras: {},
    actions: null,
    _actions: null,
    json: true,
    raw: false,
    downloadAs: '',
    noLog: false,
    sentryWarnings: false,
    startsPaused: false,
    caseConvert: true,
    ignoreErrors: false,
    recordBandwidth: process.env.NODE_ENV === 'development',
    recordLatency: true,
    recordSentMessages: false, //this is dangerous, don't
    uuid: '',
    retries: 0,
    monitorHeartBeats: false,
    heartBeatMonitorInterval: HEARTBEAT_INTERVAL,
    numberOfHeartBeatsItIsSafeToMiss: 4,
    dcRecoveryTolerance: 15 * 1000,
    hbRecoveryTolerance: 30 * 1000,
    minReconnectDelay: MIN_RECONNECT_DELAY,
  };

  /**
   * @param {string} url - where to get stuff format
   * @param {Object} [params] - extra parameters
   * @param {boolean} [params.ws=false] - is this a websocket
   * @param {number} [params.interval=0] - how long between interval poll cycles; 0 = disabled
   * @param {number} [params.batch=0] - how many ms worth of events to batch; 0 = disabled
   * @param {string} [params.method='GET'] - method to use on URL
   * @param {Object} [params.body={}] - body of the request or websocket message
   * @param {Object} [params.rawBody=false] - sumbit the body as is, with no processing
   * @param {boolean} [params.json=true] - expect response to be json
   * @param {boolean} [params.raw=false] - return the raw response instead of processed
   * @param {string} [params.downloadAs=''] - download contents as this name instead of doing the regular stuff
   * @param {boolean} [params.noLog=false] - ability to disable logging in some instances
   * @param {boolean} [params.sentryWarnings=false] - send warnings about failed requests to sentry ... spammy
   * @param {boolean} [params.startsPaused=false] - if you want to create it but start it manually later
   * @param {boolean} [params.caseConvert=true] - convert everything to camelCase from snake_case (obviously works with json only) when receiving and in reverse when sending
   * @param {string} [params.message] - message to emit, can also search inside payload i.e. '.messageType' will pass payload.messageType as the action, but you have to have json enabled. The default is 'data', however if the raw flag is true, then it will be 'response'
   * @param {boolean} [params.wrapInArray] - wrap single message emits into an array (pricefeed needs this because reasons ...)
   * @param {string} [params.errorMessage] - message to emit in the case of an error
   * @param {string} [params.wsConnectionFailureMessage] - message to emit in the case when the websocket fails to connect
   * @param {Object} [params.extras={}] - something you want added to the response data
   * @param {boolean} [params.ignoreErrors=false] - ignore stream errors
   * @param {boolean} [params.recordBandwidth=false] - record up/down bandwidth, this doesn't work for websockets
   * @param {boolean} [params.recordLatency=true] - record ping, this only works for websockets that support heartbeat
   * @param {string} [params.uuid=''] - for http requests, add a uuid-v4 id in the specified field; example: {uuid: 'request_id'}
   * @param {number} [params.retries=0] - for http requests, how many times to retry before producing error
   * @param {boolean} [params.monitorHeartBeats=false] - send ping/pong requests
   * @param {number} [params.heartBeatMonitorInterval=5000] - how often to ping/pong
   * @param {number} [params.numberOfHeartBeatsItIsSafeToMiss=3] - how many ping/pongs in a row you can miss before there is a problem
   * @param {number} [params.dcRecoveryTolerance=10000] - recovery debounce mechanism to avoid users with bad connection spamming recoveries... this just sends a flag with the recovery messages
   * @param {number} [params.hbRecoveryTolerance=30000] - recovery debounce mechanism to avoid users with bad connection spamming recoveries... this just sends a flag with the recovery messages
   * @param {number} [params.minReconnectDelay=0] - minimum reconnection delay (this also delays the initial connection for some reason )
   * @param {Object} [actions] - the actions of the container you are using
   */
  constructor(url, params, actions = null) {
    //sensible defaults
    Object.assign(this.params, params);
    if (this.params.ws && this.params.errorMessage === 'dataError') {
      this.params.errorMessage = 'wsDataError';
    }

    if (this.params.raw) {
      Object.assign(this.params, { json: false });
    }

    this.actions = actions;
    this._actions = actions;

    this._bandwidth = {
      up: 0,
      down: 0,
    };

    // some internals
    this._uuid = v4();
    this._ws = null;
    this._request = null;
    this._interval = null;
    this._queuedForcedInterval = null;
    this._lastResponse = null; // debug
    this._lastRequestMade = null;
    this._lastRequestMadeHB = '';
    this._lastResponseReceived = null;
    this._lastStatus = 'unknown';
    this._timeWsCreated = null;
    this._timeWsOpen = null;
    this._warningDispatched = false;

    this._lastHbRecovery = null;
    this._lastDcRecovery = null;

    this._request = null;
    this._shortName = '';
    this._queuedExecutor = null;
    this._queued = {
      ts: +new Date(),
      data: [],
    };

    this._hasSuccessfullyConnectedBefore = false;
    this._recentlyMissedHeartBeats = 0;
    this._heartBeatIntervalId = null;
    this._sentHeartbeats = [];

    this._flags = {
      paused: true,
      isClosing: false,
    };
    this._isWs = this.params.ws;

    if (!url) throw Error('No url specified to DataStream');

    // Merge the various urls together
    const HOST_REDIRECT = DataStream.getHostRedirect();
    const RedirectUrl = HOST_REDIRECT ? new URL(HOST_REDIRECT) : null;
    const WindowUrl = new URL(window.location.href);
    const Url = url.startsWith('/') ? new URL(`${WindowUrl.origin}${url}`) : new URL(url);
    const IS_SECURE = RedirectUrl
      ? RedirectUrl.protocol == 'https:' || RedirectUrl.protocol == 'wss:'
      : Url.protocol == 'https:' || Url.protocol == 'wss:';
    if (RedirectUrl) {
      Url.host = RedirectUrl.host;
      Url.port = RedirectUrl.port;
    }
    Url.protocol = `${this._isWs ? 'ws' : 'http'}${IS_SECURE ? 's' : ''}:`;

    // set class properties
    this.url = Url.toString();
    this._shortName = Url.pathname; // for sentry

    // let loose the goose
    if (!this.params.startsPaused) {
      this.start();
    }
  }

  /**
   * this method returns an aproximate value for the ping
   * @return {number} - a number representing the ping
   */

  lastRTT() {
    if (this._lastRequestMade && this._lastResponseReceived) {
      return Math.max(
        Math.round(this._lastResponseReceived.getTime() - this._lastRequestMade.getTime()),
        0
      );
    } else {
      return 0;
    }
  }

  /**
   * this method returns an aproximate value for the ping
   * @return {number} - a number representing the ping
   */

  wsConnectTime() {
    if (this._timeWsOpen && this._timeWsCreated) {
      return Math.max(
        this._timeWsOpen.getTime() - this._timeWsCreated.getTime() - this.params.minReconnectDelay,
        0
      ); //factor this in here for the sake of accuracy
    } else {
      return 0;
    }
  }

  /**
   * this method starts the stream (i.e. makes the request or connects to the websocket)
   * @return {Promise | WebSocket | number} - a Promise for a one-of request, the webSocket for websocket type requests, a number representing the interval for repeating requests
   */
  start() {
    this._flags.paused = false;
    this.actions = this._actions;

    this._lastHbRecovery = null;
    this._lastDcRecovery = null;

    this._warningDispatched = false;
    this._sentHeartbeats = [];
    this._lastRequestMadeHB = '';

    if (this._isWs || this.params.interval) {
      _DATASTREAMS[this._uuid] = this;
    }

    if (!this._isWs) {
      // if it's just request based
      return this.send();
    } else {
      // connect websocket
      if (this.params.batch) {
        this._queueExecutor = setInterval(() => {
          if (this._queued.data?.length) {
            //console.log(`${this.url} dispatching batch of length ${this._queued.data?.length}`);
            this._dispatch(this._queued);
          }
          this._queued = { data: [] };
        }, this.params.batch);
      }
      return this._connectWS();
    }
  }

  /**
   * dispatch warning action (happens when too many heartbeats have been missed)
   */
  dispatchHeartbeatWarning() {
    if (!this._warningDispatched) {
      if (
        this.params.wsHeartbeatMissedMessage &&
        this.actions &&
        this.actions[this.params.wsHeartbeatMissedMessage]
      ) {
        this.actions[this.params.wsHeartbeatMissedMessage]({
          actions: this.actions,
          heartbeat: this._recentlyMissedHeartBeats,
        });
      }

      this._warningDispatched = true; //so we don't keep dispatching warnings
    }
  }

  /**
   * dispatch recovery action (happens when heartbeat has been restored)
   */
  dispatchSafeHeartbeat() {
    if (
      this.params.wsHeartbeatRecoveryMessage &&
      this.actions &&
      this.actions[this.params.wsHeartbeatRecoveryMessage]
    ) {
      let now = new Date();
      let recover = false;
      if (
        !this._lastResponseReceived ||
        this._lastResponseReceived.getTime() + this.params.hbRecoveryTolerance < now.getTime()
      ) {
        recover = true;
      }
      this._lastHbRecovery = now;
      this.actions[this.params.wsHeartbeatRecoveryMessage]({
        actions: this.actions,
        recover,
      });
    }

    //reset this
    this._warningDispatched = false; //so we can dispatch warnings again
  }

  /**
   * this method is a useful abstraction for derpbuggin; will bind to onerror, onclose, onmessage once websocket is opened
   */
  _connectWS() {
    //the websocket should just start sending stuff immediately after open
    this._ws = new ReconnectingWebSocket(this.url, [], {
      maxReconnectionDelay: WEBSOCKET_CONNECTION_CONST * 4,
      minReconnectionDelay:
        this.params.minReconnectDelay +
        Math.floor(Math.random() * 4 * this.params.minReconnectDelay),
      reconnectionDelayGrowFactor: 2 + Math.floor(Math.random() * 4),
      minUptime: WEBSOCKET_CONNECTION_CONST / 2,
      connectionTimeout: WEBSOCKET_CONNECTION_CONST,
      debug: false, //DONT ENABLE THIS!!!
    });

    this._timeWsCreated = new Date();
    this._flags.isClosing = false;

    //interface should be identical to a normal websocket
    //so you can in theory just use the built-in, but you won't get the recovery mechanism
    //this._ws = new WebSocket(this.url)

    this._graceTimeout = setTimeout(() => {
      if (!this._ws || this._ws.readyState !== 1) {
        if (!this.params.ignoreErrors && this.actions && this.actions[this.params.errorMessage]) {
          this.actions[this.params.errorMessage]({
            status: 'error',
            data: `websocket ${this._shortName} very slow or could not connect`,
            responseMessge: `websocket ${this._shortName} very slow or could not connect`, //for lognoice purposes
            actions: this.actions,
            extras: this.params.extras,
          });
        }

        if (
          this.params.wsConnectionFailureMessage &&
          this.actions &&
          this.actions[this.params.wsConnectionFailureMessage]
        ) {
          this.actions[this.params.wsConnectionFailureMessage]({
            actions: this.actions,
          });
        }

        //record to sentry (kinda useless)
        if (this.params.sentryWarnings) {
          Sentry.captureMessage(`Websocket ${this._shortName} unable to connect ?`, {
            level: 'error',
            logger: 'DataStream',
            extra: {
              url: this.url,
              retries: this._ws ? this._ws.retryCount : 0,
            },
          });
        }
      }
    }, WEBSOCKET_GRACE_PERIOD);

    // bind stuff when you open
    this._ws.addEventListener('open', () => {
      if (!this.params.noLog) {
        console.log(`Websocket ${this._shortName} oppened`);
      }

      //this may happen if we change the page before it's done opening
      if (!this._ws) {
        return;
      }

      if (this._hasSuccessfullyConnectedBefore) {
        if (!this.params.noLog) {
          console.log(
            `Websocket ${this._shortName} recovered, retries: [${
              this._ws ? this._ws.retryCount : 0
            }]`
          );
        }

        if (
          this.params.wsConnectionRecoveryMessage &&
          this.actions &&
          this.actions[this.params.wsConnectionRecoveryMessage]
        ) {
          let now = new Date();
          let recover = false;
          if (
            !this._lastResponseReceived ||
            this._lastResponseReceived.getTime() + this.params.dcRecoveryTolerance < now.getTime()
          ) {
            recover = true;
          }
          this._lastDcRecovery = now;
          this.actions[this.params.wsConnectionRecoveryMessage]({
            actions: this.actions,
            recover,
          });
        }

        return;
      } else {
        this._hasSuccessfullyConnectedBefore = true;
        this._timeWsOpen = new Date();

        if (
          this.params.wsConnectionSuccessMessage &&
          this.actions &&
          this.actions[this.params.wsConnectionSuccessMessage]
        ) {
          this.actions[this.params.wsConnectionSuccessMessage]({
            actions: this.actions,
          });
        }

        // API stream is the only one which can handle echo
        if (this.params.monitorHeartBeats) {
          this._heartBeatIntervalId = setInterval(() => {
            if (this._flags.paused) {
              return; //don't do heartbeat logic
            }

            this._recentlyMissedHeartBeats += 1;
            if (this._recentlyMissedHeartBeats > this.params.numberOfHeartBeatsItIsSafeToMiss) {
              this.dispatchHeartbeatWarning();
            }

            //console.log(`Websocket ${this._shortName} periodic bounce sent`)

            let _hb = +new Date() + '';
            this.send(['ping', _hb]);
            this._sentHeartbeats.push(_hb);

            if (this.params.recordLatency) {
              this._lastRequestMade = new Date();
              this._lastRequestMadeHB = _hb;
            }
          }, this.params.heartBeatMonitorInterval);

          //do the first one for latency purposes
          let _hb = +new Date() + '';
          this.send(['ping', _hb]);
          this._sentHeartbeats.push(_hb);

          if (this.params.recordLatency) {
            this._lastRequestMade = new Date();
            this._lastRequestMadeHB = _hb;
          }
        }
      }

      this._ws.addEventListener('error', (data) => {
        if (!this.params.noLog) {
          console.warn(`Websocket ${this._shortName} produced an error [${data.code}]`);
        }

        //circumvent all of the things
        if (
          !this.params.ignoreErrors &&
          this.actions &&
          this.actions[this.params.errorMessage] &&
          !this._flags.isClosing
        ) {
          this.actions[this.params.errorMessage]({
            status: 'error',
            code: data.code,
            data,
            actions: this.actions,
            extras: this.params.extras,
          });
        }
      });

      this._ws.addEventListener('close', (data) => {
        if (!this.params.noLog) {
          console.log(`Websocket ${this._shortName} closed [${data.code}]`);
        }

        if (
          data &&
          data.code !== 1000 &&
          data.code !== 1001 &&
          data.code !== 1006 &&
          !data.wasClean
        ) {
          //1000 = normal termination, 1001 = client leaving, 1006 = browser closed it
          if (this.params.sentryWarnings) {
            Sentry.captureMessage(`Websocket ${this._shortName} Abnormal Closure`, {
              level: 'error',
              logger: 'DataStream',
              extra: {
                //this is aparently a special CloseEvent https://www.w3.org/TR/websockets/#closeevent
                wasClean: data.wasClean,
                code: data.code,
                reason: data.reason,
                retries: this._ws ? this._ws.retryCount : 0,
              },
            });
          }
        }
      });

      this._ws.addEventListener('message', (data) => {
        if (!this._flags.isClosing) {
          this._emit(data); // emit messages to container via actions
        }
      });
    });

    return this._ws;
  }

  /**
   * this method stops the stream
   * @param {boolean} [noPause] - also ignore the responses that will come back
   */
  stop(noPause = false) {
    if (noPause) {
      this._flags.paused = true;
      this.actions = null; //this may leak ?
    }

    delete _DATASTREAMS[this._uuid];

    if (!this._isWs) {
      if (this._interval) {
        clearInterval(this._interval);
        this._interval = null;
      }

      if (this._queuedForcedInterval) {
        clearTimeout(this._queuedForcedInterval);
      }
    } else {
      if (this._ws) {
        this._flags.isClosing = true;
        this._ws.close(); // close
        this._ws = null; // dereference

        if (this.params.monitorHeartBeats) {
          if (this._heartBeatIntervalId) {
            clearInterval(this._heartBeatIntervalId);
            this._heartBeatIntervalId = null;
          }
        }

        if (this._graceTimeout) {
          clearTimeout(this._graceTimeout);
          this._graceTimeout = null;
        }
      }

      if (this.params.batch && this._queueExecutor) {
        clearInterval(this._queueExecutor);
        this._queueExecutor = null;
      }
    }
  }

  /**
   * this method pushes a message to the stream, in the case of a websocket it sends the message, in the case of a request it updates the payload
   * @param {Object} [payload] - what to send, will replace params.body
   * @return {Promise | WebSocket | number} - a Promise for a one-of request, the webSocket for websocket type requests, a number representing the interval for repeating requests
   */
  send(payload) {
    if (payload) {
      this.params.body = payload;
    }

    if (this.params.caseConvert && !this.params.rawBody) {
      this.params.body = toSnakeCaseAll(this.params.body);
    }

    // is it a websocket?
    if (!this._isWs) {
      let _payload,
        _url = this.url;
      if (!this.params.headers) {
        this.params.headers = {};
      }

      if (Session.get('sessionId')) {
        this.params.headers['session'] = Session.get('sessionId', '');
        this.params.headers['authorization'] = Session.get('sessionId', '');
      }

      if (this.params.uuid && !this.params.rawBody) {
        this.params.body[this.params.uuid] = v4();
      }

      if (document.cookie) {
        this.params.headers['cookie'] = document.cookie;
      }

      if (!this.params.rawBody) {
        this.params.headers['content-type'] = 'application/json';
      }

      //bs fallback for SAFARIU ?
      if (window.Headers) {
        let headers = new Headers();
        for (let header in this.params.headers) {
          if (typeof this.params.headers[header] === 'string') {
            //something is sending it a bad headers object
            try {
              headers.set(header, this.params.headers[header]);
            } catch (err) {
              //what can we even do about this ?
              if (this.params.sentryWarnings) {
                Sentry.captureMessage(
                  `Possible header ${header}=${this.params.headers[header]} failure`,
                  {
                    level: 'warning',
                    logger: 'DataStream',
                  }
                );
              }
            }
          }
        }

        this.params.headers = headers;
      }

      if (
        this.params.method === 'POST' ||
        this.params.method === 'PUT' ||
        this.params.method === 'PATCH' ||
        this.params.method === 'DELETE'
      ) {
        //maybe moar
        if (!this.params.rawBody) {
          _payload = JSON.stringify(this.params.body);
        } else {
          _payload = this.params.body;
        }
      } else {
        if (this.params.body && Object.keys(this.params.body).length) {
          if (_url.indexOf('?') === -1) {
            _url += '?';
          }
          for (let key in this.params.body) {
            if (typeof this.params.body[key] !== 'object') {
              _url += `&${key}=${this.params.body[key]}`; //leading & of DOOM
            } else {
              for (let bkey in this.params.body[key]) {
                _url += `&${key}=${this.params.body[key][bkey]}`;
              }
            }
          }
        }
      }

      let badRequest = (response) => {
        this._lastStatus = 'error';
        if (this.params.retries > 0) {
          if (this.params.sentryWarnings) {
            Sentry.captureMessage(`Backend ${this._shortName} Error`, {
              level: 'warning',
              logger: 'DataStream',
              extra: {
                message: response.message || '?',
                code: response.status || '?',
                retries: this.params.retries,
                url: this.url,
              },
            });
          }

          this.params.retries--;
          doRequest();
        } else {
          if (!this.params.ignoreErrors && this.actions && this.actions[this.params.errorMessage]) {
            this.actions[this.params.errorMessage]({
              status: 'error',
              data: response.message || `bad request ${_url}`,
              response: response.response,
              responseMessge: response.message || '?', //for lognoice purposes
              code: response.status || 500,
              actions: this.actions,
              extras: this.params.extras,
            });
          }

          if (
            response.message !== 'Failed to fetch' &&
            response.message !== 'NetworkError when attempting to fetch resource.'
          ) {
            //this is basically just connectivity and not a real error (Chrome/FF)
            if (this.params.sentryWarnings) {
              Sentry.captureMessage(`Backend ${this._shortName} Error`, {
                level: 'error',
                logger: 'DataStream',
                extra: {
                  message: response.message || '?',
                  code: response.status || '?',
                  url: this.url,
                },
              });
            }
          }
        }
      };

      const downloadBadRequest = (response) => {
        badRequest(response);
        this._emit('{"status": "error"}');
      };

      let doRequest = () => {
        if (this.params.recordLatency) {
          this._lastRequestMade = new Date();
        }
        if (this.params.recordBandwidth) {
          this._bandwidth.up += _url.length;
          if (_payload && !this.params.rawBody) {
            this._bandwidth.up += _payload.length;
          }
        }

        this._request = new Request(
          _url,
          Object.assign({}, this.params, {
            body: _payload,
            credentials: 'include',
            cache: 'no-store',
          })
        );

        if (this.params.raw) {
          if (this.actions) {
            fetch(this._request)
              .then((response) => {
                this._emit(response);
              })
              .catch(badRequest);
          } else {
            return fetch(this._request);
          }
        } else {
          let pr = fetch(this._request);

          if (this.actions) {
            if (this.params.downloadAs) {
              pr.then((response) => {
                return response.blob();
              })
                .then((response) => {
                  download(response, this.params.downloadAs);
                  this._emit('{"status": "ok"}'); //emit a dummy string i guess?
                })
                .catch(downloadBadRequest);
            } else {
              pr.then((response) => {
                const serialisedResponseMeta = {
                  ok: response?.ok,
                  redirected: response?.redirected,
                  status: response?.status,
                  statusText: response?.statusText,
                  headers: Object.fromEntries(response?.headers?.entries() || []),
                };
                return response
                  .json()
                  .then((json) => ({ response: serialisedResponseMeta, ...json }));
              })
                .catch(badRequest)
                .then((response) => {
                  this._emit(response);
                })
                .catch(badRequest);
            }

            return null;
          } else {
            return pr;
          }
        }
      };

      if (this.params.interval && !isNaN(this.params.interval)) {
        this._doRequest = doRequest;
        this._interval = setInterval(this._doRequest, parseInt(this.params.interval, 10));
        this._doRequest();
        return this._interval;
      } else {
        return doRequest();
      }
    } else {
      //it is!
      if (this._ws) {
        if (this.params.body) {
          let toSend = this.params.body;
          if (this.params.json) {
            toSend = JSON.stringify(this.params.body);
          }
          if (this.params.recordBandwidth) {
            this._bandwidth.up += JSON.stringify(this.params.body).length;
          }

          if (this.params.recordSentMessages) {
            console.warn(`Websocket ${this._shortName} was sent: ${toSend}`);
          }
          this._ws.send(toSend);
        }
        return this._ws;
      } else {
        console.error(`Websocket ${this._shortName} closed so we can't send data`);
        return null;
      }
    }
  }

  /**
   * this method handle batching and formatting and then tries to forward the data via _dispatch
   * @param {Object} response - the response from the URL or WS
   */
  _emit(response) {
    this._lastStatus = 'healthy';

    if (this._isWs) {
      //because WS responses are of type MessageEvent
      response = response.data;
    }

    if (typeof response === 'string') {
      try {
        //le python...
        //https://docs.python.org/3.4/library/json.html#basic-usage
        response = JSON.parse(response);
      } catch (err) {
        if (!this.params.noLog) {
          console.error(`Response from ${this._shortName} is not JSON: ${response}`);
        }
        //this._dispatch({status: 'error', data: `response from ${this.url} is not json ${response}`})
        if (!this.params.ignoreErrors && this.actions && this.actions[this.params.errorMessage]) {
          this.actions[this.params.errorMessage]({
            status: 'error',
            data: `response from ${this._shortName} is not JSON`,
            responseMessge: response || '?',
            actions: this.actions,
            extras: this.params.extras,
          });
        }

        if (this.params.sentryWarnings) {
          Sentry.captureMessage(`Backend ${this._shortName} bad JSON`, {
            level: 'error',
            logger: 'DataStream',
            extra: {
              url: this.url,
              response,
            },
          });
        }

        //try to recover by reconnecting ?
        if (this._isWs && this._ws) {
          this._ws.reconnect();
        }

        this._lastStatus = 'error';
        return;
      }
    }

    // catch Heart Beat Responses
    if (this.params.monitorHeartBeats && response && response[0]) {
      let isHb = false;
      let hbVal = '';

      if (response[0][0] === 'pong') {
        hbVal = response[0][1];
        let idx = this._sentHeartbeats.indexOf(hbVal);
        if (idx !== -1) {
          isHb = true;
          this._sentHeartbeats.splice(idx, 1);
        }
      }

      if (isHb) {
        if (this.params.recordLatency) {
          if (hbVal === this._lastRequestMadeHB) {
            this._lastResponseReceived = new Date();
          }
        }

        //console.log(`Websocket ${this._shortName} bounce received [${this.lastRTT()}ms]`)

        if (this._recentlyMissedHeartBeats > this.params.numberOfHeartBeatsItIsSafeToMiss) {
          this.dispatchSafeHeartbeat();
          // call reconnection func if such a func has been passed
        }
        this._recentlyMissedHeartBeats = 0;
      }
    }

    if (!this.params.batch) {
      if (this.params.wrapInArray) {
        this._dispatch([response]);
      } else {
        this._dispatch(response);
      }
    } else {
      if (response.ts) {
        for (let dt in response.data) (this._queued.data || []).push(response.data[dt]);
      } else {
        //this may not be entirely correct
        this._queued.data = (this._queued.data || []).concat(response);
      }
      //this._queued.ts = response.ts //this is very slow... WHY!
      //TODO: some message cancelling logic? or is processing fast enough to not matter
    }
  }

  /**
   * this method triggers the relevant actions on the container
   * @param {Object} response - the response from the URL or WS
   */
  _dispatch(response) {
    if (this.params.recordLatency) {
      if (!this._ws) {
        this._lastResponseReceived = new Date();
        //in theory lognoice should pick this up
        console.log(`[RTT ${this.lastRTT()}ms][${this.params.method}] ${this._shortName}`);
      }
    }

    if (this.params.recordBandwidth) {
      this._bandwidth.down += JSON.stringify(response).length;
    }

    if (response && !this._flags.paused) {
      if (this.params.json) {
        if (this.params.caseConvert) {
          response = toCamelCaseAll(response, true);
        }

        this._lastResponse = response;
        let rd = this.params.message;
        if (this.params.message.startsWith('.')) {
          let actionPath = this.params.message.split('.');
          rd = response;

          for (let crumb of actionPath) {
            if (crumb) {
              rd = rd[crumb];
            }
          }
        }

        rd = toCamelCase(rd);

        if (this.actions && this.actions[rd]) {
          if (Array.isArray(response)) {
            response = {
              data: response,
            };
          }
          this.actions[rd].call(
            null,
            Object.assign(response, {
              actions: this.actions,
              extras: this.params.extras,
            })
          );
        } else {
          if (!this.params.noLog) {
            console.warn(`No action ${rd} defined for response from ${this._shortName}`);
            //if (this.params.sentryWarnings) {
            Sentry.captureMessage(`No action ${rd} defined`, {
              level: 'warning',
              logger: 'DataStream',
              extra: {
                url: this.url,
                action: rd,
              },
            });
            //}
          }
        }
      } else {
        if (this.actions && this.actions[this.params.message]) {
          this.actions[this.params.message].call(null, {
            data: response,
            actions: this.actions,
            extras: this.params.extras,
          });
        }
      }
    }
  }

  /**
   * this method forcefully triggers the interval request early
   * @param {number | boolean} debouncedBy - how much to debounce by, or true = FORCED_INTERVAL_DEBOUNCE
   * @return {Promise | null | number} - a Promise from _doRequest if not debounced; a number representing the interval if debounced; null if this is not a repeating DataStream
   */
  forceIntervalRequest(debouncedBy) {
    if (this._interval && this._doRequest) {
      if (!debouncedBy) {
        return this._doRequest();
      } else {
        if (this._queuedForcedInterval) {
          clearTimeout(this._queuedForcedInterval);
        }
        this._queuedForcedInterval = setTimeout(
          this._doRequest,
          typeof debouncedBy === 'number' ? debouncedBy : FORCED_INTERVAL_DEBOUNCE
        );
        return this._queuedForcedInterval;
      }
    } else {
      return null;
    }
  }

  static HOST_REDIRECT_COOKIE = '__HOST_REDIRECT__';

  /**
   * gets the host that all urls are being redirectred to from the cookie
   * @return {string} - the host url
   */
  static getHostRedirect() {
    if (window.location.origin.includes('.skaffold'))
      return (
        document.cookie
          ?.split(';')
          ?.find((cookie) => cookie.includes(`${DataStream.HOST_REDIRECT_COOKIE}=`))
          ?.replace(`${DataStream.HOST_REDIRECT_COOKIE}=`, '')
          ?.trim() || ''
      );
    return '';
  }

  /**
   * sets the host that all urls are being redirectred to from the cookie
   * @param {string} - the host url
   */
  static setHostRedirect(host) {
    if (host && !host.includes('://'))
      throw Error('You must include the protocol in the host redirect...');
    if (window.location.origin.includes('.skaffold')) {
      document.cookie = `${DataStream.HOST_REDIRECT_COOKIE}=${host}`;
      window.location.reload();
    } else {
      throw Error('Tried to set host redirect in non-skaffold environment, this is wrong...');
    }
  }
}

window._DATASTREAMS = _DATASTREAMS;
