import {
  DynamoDBDocumentClient,
  GetCommand,
  GetCommandOutput,
  paginateQuery,
  QueryCommandInput,
  TransactWriteCommand,
  TransactWriteCommandInput,
  TransactWriteCommandOutput,
  UpdateCommand,
  UpdateCommandInput,
  UpdateCommandOutput,
} from '@aws-sdk/lib-dynamodb';

import {
  PublishCommand,
  PublishCommandInput,
  PublishCommandOutput,
  SNSClient,
} from '@aws-sdk/client-sns';
import { SFNClient, StartExecutionCommand } from '@aws-sdk/client-sfn';
import { v4 as uuid } from 'uuid';
import { NotRetryableError, retry } from 'ts-retry-promise';
import { LogRecordFn, logRecordFor } from './logger';
import { EventType } from '../../api/interface/events';
import { SNSMessage, SQSEvent, SQSRecord } from 'aws-lambda';
import {
  InternalServerError,
  RetryingGetItemError,
  RetryingTransactionWriteError,
  RetryingUpdateItemError,
} from './error';
import { UpdateItemInput } from '@aws-sdk/client-dynamodb';
import { groupBy } from 'lodash';
import { Readable } from 'stream';
import consumers from 'stream/consumers';
import {
  GetObjectCommand,
  S3Client,
  GetObjectCommandInput,
  GetObjectCommandOutput,
} from '@aws-sdk/client-s3';
import { GetCommandInput } from '@aws-sdk/lib-dynamodb/dist-types/commands/GetCommand';

const debug = logRecordFor('aws-util', 'debug');
const error = logRecordFor('aws-util', 'error');

export function makePagedQueryHandler({
  dynamodb,
}: {
  dynamodb: DynamoDBDocumentClient;
}) {
  return async function <R>(
    params: QueryCommandInput,
    mapper: (item: Record<string, any>) => R = item => item?.data,
    // TODO: support limit
    _limit?: number
  ): Promise<R[]> {
    const queryPaginator = paginateQuery({ client: dynamodb }, params);
    const items = [];

    for await (const page of queryPaginator) {
      page.Items && items.push(page.Items);
    }

    return items
      .filter(item => item != null)
      .flat()
      .map(item => mapper(item));
  };
}

export function makeConfigurableRetryingTransactionWriteHandler({
  dynamodb,
  errorLogger,
  initialDelayInMilliseconds = 100,
  totalTimeoutInMilliseconds = 120000,
}: {
  dynamodb: DynamoDBDocumentClient;
  errorLogger?: LogRecordFn;
  initialDelayInMilliseconds?: number;
  totalTimeoutInMilliseconds?: number;
}) {
  return async function ({
    onGetParams,
    isRetryable,
  }: {
    /**
     * Will be called at the start of retry -sequence and every time transaction write is retried
     *
     * This allows us to modify the request based on the error received during write
     */
    onGetParams: () => TransactWriteCommandInput;
    /**
     * Will be called when error has occurred during transaction write
     *
     * By returning 'true', a new retry is made (unless total timeout is achieved)
     *
     * @param err error received from dynamodb document client
     */
    isRetryable: (err: any) => boolean;
  }): Promise<[TransactWriteCommandInput, TransactWriteCommandOutput]> {
    return retry<[TransactWriteCommandInput, TransactWriteCommandOutput]>(
      async () => {
        const input: TransactWriteCommandInput = onGetParams();

        const output = await dynamodb
          .send(new TransactWriteCommand(input))
          .catch(err => {
            if (!isRetryable(err)) {
              if (errorLogger) {
                errorLogger('Un-retryable error during transactWrite', {
                  params: input,
                  err,
                });
              }
              throw new NotRetryableError();
            }
            throw new RetryingTransactionWriteError(
              'TransactWriteCommand failed',
              input,
              err
            );
          });

        return [input, output];
      },
      {
        backoff: 'LINEAR',
        delay: initialDelayInMilliseconds,
        timeout: totalTimeoutInMilliseconds,
      }
    ).catch(err => {
      if (errorLogger) {
        errorLogger('TransactWriteCommand failed', { err });
      }
      throw err.lastError;
    });
  };
}

export function makeConfigurableRetryingUpdateHandler({
  dynamodb,
  errorLogger,
  initialDelayInMilliseconds = 100,
  totalTimeoutInMilliseconds = 120000,
}: {
  dynamodb: DynamoDBDocumentClient;
  errorLogger?: LogRecordFn;
  initialDelayInMilliseconds?: number;
  totalTimeoutInMilliseconds?: number;
}) {
  return async function ({
    onGetParams,
    isRetryable,
  }: {
    /**
     * Will be called at the start of retry -sequence and every time update is retried
     *
     * This allows us to modify the request based on the error received during update
     */
    onGetParams: () => UpdateCommandInput;
    /**
     * Will be called when error has occurred during update item
     *
     * By returning 'true', a new retry is made (unless total timeout is achieved)
     *
     * @param err error received from dynamodb document client
     */
    isRetryable: (err: any) => boolean;
  }): Promise<UpdateCommandOutput> {
    return retry<UpdateCommandOutput>(
      () => {
        const params: UpdateItemInput = onGetParams();

        return dynamodb.send(new UpdateCommand(params)).catch(err => {
          if (!isRetryable(err)) {
            if (errorLogger) {
              errorLogger('Un-retryable error during update item', {
                params,
                err,
              });
            }
            throw new NotRetryableError();
          }
          throw new RetryingUpdateItemError('UpdateItem failed', params, err);
        });
      },
      {
        backoff: 'LINEAR',
        delay: initialDelayInMilliseconds,
        timeout: totalTimeoutInMilliseconds,
      }
    ).catch(err => {
      if (errorLogger) {
        errorLogger('UpdateItem failed', { err });
      }
      throw err.lastError;
    });
  };
}

export function makeConfigurableRetryingGetItemHandler({
  dynamodb,
  errorLogger,
  initialDelayInMilliseconds = 100,
  totalTimeoutInMilliseconds = 120000,
  backoff = 'LINEAR',
}: {
  dynamodb: DynamoDBDocumentClient;
  errorLogger?: LogRecordFn;
  initialDelayInMilliseconds?: number;
  totalTimeoutInMilliseconds?: number;
  backoff?: 'FIXED' | 'EXPONENTIAL' | 'LINEAR';
}) {
  return async function ({
    onGetParams,
    isRetryable,
    until,
  }: {
    /**
     * Will be called at the start of retry -sequence and every time get item is retried
     *
     * This allows us to modify the request based on the error received during get item
     */
    onGetParams: () => GetCommandInput;
    /**
     * Will be called when error has occurred during get item
     *
     * By returning 'true', a new retry is made (unless total timeout is achieved)
     *
     * @param err error received from dynamodb document client
     */
    isRetryable: (err: any) => boolean;
    /**
     * Will be called every time the get command returns a result
     *
     * By return 'true', a new retry is made (unless total timeout is achieved)
     *
     * @param output the result received
     */
    until: (output: GetCommandOutput) => boolean;
  }): Promise<GetCommandOutput> {
    return retry<GetCommandOutput>(
      () => {
        const params: GetCommandInput = onGetParams();

        return dynamodb.send(new GetCommand(params)).catch(err => {
          if (!isRetryable(err)) {
            if (errorLogger) {
              errorLogger('Un-retryable error during get item', {
                params,
                err,
              });
            }
            throw new NotRetryableError();
          }
          throw new RetryingGetItemError('GetCommand failed', params, err);
        });
      },
      {
        retries: 'INFINITELY',
        backoff,
        delay: initialDelayInMilliseconds,
        timeout: totalTimeoutInMilliseconds,
        until,
      }
    ).catch(err => {
      if (errorLogger) {
        errorLogger('GetCommand failed', { err });
      }
      throw err.lastError;
    });
  };
}

export function makeRetryingTransactionWriteHandler({
  dynamodb,
  errorLogger,
  initialDelayInMilliseconds = 100,
  totalTimeoutInMilliseconds = 120000,
}: {
  dynamodb: DynamoDBDocumentClient;
  errorLogger?: LogRecordFn;
  initialDelayInMilliseconds?: number;
  totalTimeoutInMilliseconds?: number;
}) {
  return async function (
    params: TransactWriteCommandInput
  ): Promise<TransactWriteCommandOutput> {
    return retry<TransactWriteCommandOutput>(
      () =>
        dynamodb.send(new TransactWriteCommand(params)).catch(err => {
          if (!err?.retryable) {
            if (errorLogger) {
              errorLogger('Un-retryable error during transactWrite', {
                params,
                err,
              });
            }
            throw new NotRetryableError();
          }
          throw err;
        }),
      {
        backoff: 'LINEAR',
        delay: initialDelayInMilliseconds,
        timeout: totalTimeoutInMilliseconds,
      }
    ).catch(err => {
      if (errorLogger) {
        errorLogger('TransactWrite failed', { err });
      }
      throw err.lastError;
    });
  };
}

const asEventBusSNSMessageAttributes = (
  eventType: string,
  additionalMessageAttributes?: Record<string, string>
) => {
  const defaultMessageAttributes = {
    eventType: {
      DataType: 'String',
      StringValue: eventType as string,
    },
    timestamp: {
      DataType: 'String',
      StringValue: new Date().toISOString(),
    },
  };

  if (!additionalMessageAttributes) {
    return defaultMessageAttributes;
  }

  return Object.keys(additionalMessageAttributes).reduce((res, k) => {
    return {
      ...res,
      [k]: {
        DataType: 'String',
        StringValue: additionalMessageAttributes[k],
      },
    };
  }, defaultMessageAttributes);
};

export type PipelineSNSMessage<T> = {
  eventType: EventType;
  topicArn: string;
  messageGroupId: string;
  deDuplicationId: string;
  data: T;
  additionalMessageAttributes?: Record<string, string>;
};

export const toSNSMessage = <T>({
  eventType,
  topicArn,
  messageGroupId,
  deDuplicationId,
  data,
  additionalMessageAttributes,
}: PipelineSNSMessage<T>): PublishCommandInput => ({
  TopicArn: topicArn,
  MessageGroupId: messageGroupId,
  MessageDeduplicationId: deDuplicationId,
  MessageAttributes: asEventBusSNSMessageAttributes(
    eventType,
    additionalMessageAttributes
  ),
  Message: JSON.stringify({
    default: JSON.stringify(data),
  }),
  MessageStructure: 'json',
});

interface EventBusMessageParams {
  eventType: EventType;
  topicArn: string;
  snsClient?: SNSClient;
  /**
   * The message group id is the queueId and is used when targeting messages to a given queue
   *
   * Use the default value when the event is a normal event-bus event
   */
  messageGroupId?: string;
  /**
   * The de-duplication id is a combination of a queueId and timestamp, and is used when targeting messages to a given queue
   *
   * Use the default value when the event is a normal event-bus event
   */
  deDuplicationId?: string;
}

export const makeEventBusPublisher = <T>({
  eventType,
  topicArn,
  snsClient = new SNSClient({}),
  messageGroupId = 'eventBus',
  deDuplicationId = uuid(),
}: EventBusMessageParams): ((
  data: T,
  additionalMessageAttributes?: Record<string, string>
) => Promise<PublishCommandOutput>) => {
  return async (
    data: T,
    additionalMessageAttributes?: Record<string, string>
  ): Promise<PublishCommandOutput> => {
    const message = toSNSMessage({
      eventType,
      topicArn,
      messageGroupId,
      deDuplicationId,
      data,
      additionalMessageAttributes,
    });

    debug('Publishing event', { message });

    const result = await snsClient.send(new PublishCommand(message));

    debug(`Published event`, { result });

    return result;
  };
};

interface StepFunctionExecutorParams {
  stateMachineArn: string;
  sfnClient: SFNClient;
}

export interface StepFunctionEvent {
  name: string;
}

export const makeStepFunctionExecutor = <T extends StepFunctionEvent>({
  stateMachineArn,
  sfnClient = new SFNClient({}),
}: StepFunctionExecutorParams): ((data: T) => Promise<T>) => {
  return async (data: T): Promise<T> => {
    debug(`Starting execution`, { data });
    try {
      const result = await sfnClient.send(
        new StartExecutionCommand({
          input: JSON.stringify(data),
          name: data.name,
          stateMachineArn,
        })
      );
      debug(`Started execution`, { result });
    } catch (e) {
      error(`Execution could not be started`, { e });
    }
    return data;
  };
};

export function parseFromSQSRecord<T>(record: SQSRecord) {
  return JSON.parse(record.body) as T;
}

export function parseFromSNSEvent<T>(message: SNSMessage): T {
  return JSON.parse(message.Message) as T;
}

export function parseSNSMessageStringAttribute(
  snsMessage: SNSMessage,
  attribute: string
) {
  if (snsMessage.MessageAttributes.hasOwnProperty(attribute))
    return snsMessage.MessageAttributes[attribute].Value;
  return undefined;
}

export interface MessagesByEventType {
  [eventType: string]: SNSMessage[];
}

export function sortMessages(sqsEvent: SQSEvent): MessagesByEventType {
  const messages = sqsEvent.Records.map(sqsRecord =>
    parseFromSQSRecord<SNSMessage>(sqsRecord)
  );

  return groupBy(messages, msg => {
    const eventType = parseSNSMessageStringAttribute(msg, 'eventType');
    if (eventType === undefined)
      throw new InternalServerError('Cannot handle event - missing eventType', {
        msg,
      });
    return eventType;
  });
}

type S3GetObjectOutput<T> = Omit<GetObjectCommandOutput, 'Body'> & {
  Body: T;
};

export function makeS3ObjectToJsonReader({
  s3Client = new S3Client({}),
}: {
  s3Client?: S3Client;
}) {
  return async function <T>(
    params: GetObjectCommandInput
  ): Promise<S3GetObjectOutput<T>> {
    const result = await s3Client.send(new GetObjectCommand(params));

    if (result.Body === undefined)
      throw new InternalServerError(
        `Cannot read S3 object to JSON - content not found in S3`
      );

    if (result.Body instanceof Readable) {
      const Body = (await consumers.json(result.Body)) as T;
      return {
        ...result,
        Body,
      };
    }

    throw new InternalServerError(
      `Cannot read S3 object to JSON - unsupported body format`
    );
  };
}

export function makeS3ObjectReader({
  s3Client = new S3Client({}),
}: {
  s3Client?: S3Client;
}) {
  return async function (
    params: GetObjectCommandInput
  ): Promise<S3GetObjectOutput<number[]>> {
    const result = await s3Client.send(new GetObjectCommand(params));

    if (result.Body === undefined)
      throw new InternalServerError(
        `Cannot read S3 object to JSON - content not found in S3`
      );

    if (result.Body instanceof Readable) {
      const buffer = await consumers.buffer(result.Body);
      return {
        ...result,
        Body: Array.from(buffer),
      };
    }

    throw new InternalServerError(
      `Cannot read S3 object - unsupported body format`
    );
  };
}
