import {
    all,
    put,
    takeLatest,
    takeEvery,
    take,
    select,
    call,
    PutEffect,
    CallEffect,
    AllEffect,
    ForkEffect,
} from "redux-saga/effects";
import { eventChannel, END } from "redux-saga";
import * as actions from "./actions";
import * as selectors from "./selectors";
import { messaging } from "@ai360/core";
import {
    AnySubscription,
    ActionSubscription,
    GeneratorSubscription,
    ActionAccumulateSubscription,
    GeneratorAccumulateSubscription,
    isActionSubscription,
    isGeneratorSubscription,
    isActionAccumulateSubscription,
    isGeneratorAccumulateSubscription,
} from "./subscription";

import { device as Device } from "aws-iot-device-sdk";

import { v4 as uuid } from "uuid";

interface StoredSubscriptions {
    actions: ActionSubscription[];
    generators: GeneratorSubscription[];
    actionAccumulates: ActionAccumulateSubscription[];
    generatorAccumulates: GeneratorAccumulateSubscription[];
    models: any[] | null;
}

const createMessageChannel = (device: Device) =>
    eventChannel((emit) => {
        const listener = (_, byteArray: Uint8Array) => {
            const message = messaging.inputFromByteArray(byteArray);
            emit(message);
        };

        const event = "message";
        device.addListener(event, listener);
        return () => {
            device.removeListener(event, listener);
        };
    });

const createTimeoutChannel = (batchMilliseconds: number) =>
    eventChannel((emit) => {
        const timeout = setTimeout(() => {
            emit({});
            emit(END);
        }, batchMilliseconds);

        return () => {
            clearTimeout(timeout);
        };
    });

const pushToReceivers = function* (eventName: string, model: any) {
    const receivers = yield select(selectors.getReceivers);
    const receiverCalls = receivers.map((x) => call(x, eventName, model));
    yield all(receiverCalls);
};

const execute = function* (models: any[], subscriptions: StoredSubscriptions) {
    const basicToArray = (by: (x: any) => any) => models.map((model) => by(model));
    const accumulateToArray = (by: (x: any) => any) => [by(models)];

    const allActions = subscriptions.actions
        .flatMap((subscription) => basicToArray(subscription.action))
        .concat(
            subscriptions.actionAccumulates.flatMap((subscription) =>
                accumulateToArray(subscription.actionAccumulate)
            )
        )
        .filter((x) => x != null);
    for (const action of allActions) {
        yield put(action);
    }
    const generators = subscriptions.generators
        .flatMap((subscription) => basicToArray(subscription.generator))
        .concat(
            subscriptions.generatorAccumulates.flatMap((subscription) =>
                accumulateToArray(subscription.generatorAccumulate)
            )
        );
    for (const generator of generators) {
        yield generator;
    }
};

const toMap = (subscriptions: AnySubscription[]) =>
    (subscriptions instanceof Array ? subscriptions : [subscriptions]).reduce(
        (map, subscription) => {
            let stored = map.get(subscription.eventName);
            if (stored == null) {
                stored = {
                    generators: [],
                    actions: [],
                    actionAccumulates: [],
                    generatorAccumulates: [],
                    models: null,
                };
                map.set(subscription.eventName, stored);
            }

            if (isActionSubscription(subscription)) {
                stored.actions.push(subscription);
            } else if (isGeneratorSubscription(subscription)) {
                stored.generators.push(subscription);
            } else if (isActionAccumulateSubscription(subscription)) {
                stored.actionAccumulates.push(subscription);
            } else if (isGeneratorAccumulateSubscription(subscription)) {
                stored.generatorAccumulates.push(subscription);
            }

            return map;
        },
        new Map<string, StoredSubscriptions>()
    );

const linkToNewModels = function* (
    batchMilliseconds: number,
    eventToSubscriptions: Map<string, StoredSubscriptions>
) {
    let receiver = null;
    if (batchMilliseconds <= 0) {
        receiver = function* (eventName: string, model: any) {
            const subscriptions = eventToSubscriptions.get(eventName);
            if (subscriptions != null) {
                yield execute([model], subscriptions);
            }
        };
    } else {
        receiver = function* (eventName: string, model: any) {
            const subscriptions = eventToSubscriptions.get(eventName);
            if (subscriptions == null) {
                return;
            }

            if (subscriptions.models == null) {
                subscriptions.models = [model];
                yield take(createTimeoutChannel(batchMilliseconds));
                yield execute(subscriptions.models, subscriptions);
                subscriptions.models = null;
            } else {
                subscriptions.models.push(model);
            }
        };
    }

    yield put(actions.newReceiver(receiver));
};

export const start = function* (
    action: ReturnType<typeof actions.start>
): Generator<CallEffect | ForkEffect | PutEffect<any>, void, unknown> {
    const { host, credentials, userGuid, orgLevelGuids } = action.payload;
    const configuration: messaging.Configuration = { host, credentials };
    const device = messaging.createDevice(uuid(), configuration);

    device.subscribe(userGuid.toString());

    if (orgLevelGuids) {
        for (const guid of orgLevelGuids) {
            device.subscribe(guid.toString());
        }
    }

    yield put(actions.setDevice(device));

    yield takeEvery(createMessageChannel(device), function* (input: messaging.Input) {
        if (input.type === messaging.InputType.Invalid) {
            console.error("Invalid input received.", input.value);
        } else {
            const message = messaging.toMessage(input);
            if (message != null) {
                yield pushToReceivers(message.eventName, message.model);
            }
        }
    });
};

export const subscribe = function* (
    action: ReturnType<typeof actions.subscribe>
): Generator<Generator<PutEffect<any>, void, unknown>, void, unknown> {
    const eventToSubscriptions = toMap(action.payload.subscriptions);
    yield linkToNewModels(action.payload.batchMilliseconds, eventToSubscriptions);
};

export const messagingSaga = function* (): Generator<AllEffect, void, unknown> {
    yield all([takeLatest(actions.START, start), takeEvery(actions.SUBSCRIBE, subscribe)]);
};
