import { inject, Injectable } from '@angular/core';
import { ChatMessageComposer, ChatMessageComposerData, ChatMessageComposerExtras, EmbeddedForwardHandler, EmbeddedForwardUploadFactory, EmbeddedMediaHandler, EmbeddedMediaUploaderFactory, EmbedContext, EmbedContextTypes, MediaUseCases, isPollEmbed, EmbedPollCreator } from '@portal/wen-backend-api';
import { concurrentlyInterruptable, executeInChunks, StreamAbortedError } from '@portal/wen-common';
import { BehaviorSubject, catchError, defaultIfEmpty, expand, first, forkJoin, map, mergeMap, of, ReplaySubject, shareReplay, switchMap, takeLast, takeWhile } from 'rxjs';
import { ChatMessageTransactionPreviewGenerator, MessagePreviewGeneratorFactory } from './services/message-preview-generator';
import { ChatRoomPrepareService, ChatRoomPrepareServiceFactory, ValidRoomResult } from './services/prepare-chat-room-service';
import { ChatMessageBatchLoader } from './types/chat-message-batch-loader';
import { ChatMessageErrorSendResult, ChatMessageSuccesfulSendResult, ChatMessageTransactionResult, ChatMessageTransactionState, TransactionResultState } from './types/chat-message-send-result';

const MAX_PARALLEL_CHAT_ROOM_OPERATIONS = 30;

export class ChatMessageComposerTransaction {

  private result: ChatMessageTransactionState;

  private isAborted = new BehaviorSubject<boolean>(false);

  private isPaused = new BehaviorSubject<boolean>(false);

  private embedMediasUploaded = new ReplaySubject<unknown>(1);
  public embedMediasUploaded$ = this.embedMediasUploaded.pipe(first());

  private embedForwardsUploaded = new ReplaySubject<unknown>(1);
  public embedForwardsUploaded$ = this.embedForwardsUploaded.pipe(first());

  private embedPollsUploaded = new ReplaySubject<unknown>(1);
  public embedPollsUploaded$ = this.embedPollsUploaded.pipe(first());

  private allMessageSendFinished = new ReplaySubject<ChatMessageTransactionResult>(1);
  public allMessageSendFinished$ = this.allMessageSendFinished.pipe(first());

  private singleMessageSendFinished = new ReplaySubject<ChatMessageSuccesfulSendResult>(1);
  public singleMessageSendFinished$ = this.singleMessageSendFinished.asObservable();

  private singleMessageSendError = new ReplaySubject<ChatMessageErrorSendResult>(1);
  public singleMessageSendError$ = this.singleMessageSendError.asObservable();

  constructor(
    private embedMediaHandler: EmbeddedMediaHandler,
    private embedForwardHandler: EmbeddedForwardHandler,
    private embedPollCreator: EmbedPollCreator,
    private messagePreviewGenerator: ChatMessageTransactionPreviewGenerator,
    private chatRoomPrepareService: ChatRoomPrepareService,
    private chatMessageComposer: ChatMessageComposer,
    private extras: ChatMessageComposerExtras,
    private dataStream: ChatMessageBatchLoader,
  ) { }

  execute() {
    if (this.result) {
      throw new Error('This chat message transaction is already started!');
    }
    this.result = new ChatMessageTransactionState();
    const messageBatchSent$ = this.next().pipe(
      expand(() => this.next()),
      takeWhile(() => this.dataStream.hasMore(), true),
      shareReplay(1)
    );
    messageBatchSent$.pipe(
      takeLast(1),
      map(() => TransactionResultState.SUCCESSFUL),
      catchError((error) => {
        if (error instanceof StreamAbortedError) {
          return of(TransactionResultState.ABORTED);
        }
        return of(TransactionResultState.ERROR);
      })
    ).subscribe((resultState) => {
      this.result.setResultState(resultState);
      const sendMessageResults = this.result.getSuccessfulMessageSendResults();
      this.persistEmbeds(sendMessageResults);
      this.allMessageSendFinished.next(this.result);
      this.allMessageSendFinished.complete();
    });
  }

  pause() {
    this.isPaused.next(true);
  }

  resume() {
    this.isPaused.next(false);
  }

  abort() {
    this.isAborted.next(true);
  }

  getUploadableMediaEmbeds() {
    return this.embedMediaHandler.getUploads();
  }

  private next() {
    return this.dataStream.next().pipe(
      switchMap((pageRequest) => {
        const { data = [] } = pageRequest;
        this.result.addDatas(data);
        const chunkedOps$ = executeInChunks((dataSlice) => {
          const sendMessageOp$ = this.chatRoomPrepareService.prepareRooms(dataSlice).pipe(
            mergeMap(({ validRoomRequests, invalidRoomRequests }) => {
              invalidRoomRequests.forEach(invalidRoomRequest => {
                const errorResult = new ChatMessageErrorSendResult(invalidRoomRequest);
                this.result.addResult(errorResult);
                this.singleMessageSendError.next(errorResult);
              });
              const createSendMessage$ = (prepareResult: ValidRoomResult) => {
                const transactionDataRequest = this.messagePreviewGenerator.createPreview(prepareResult.transactionDataRequest);
                const { roomMemberData } = prepareResult;
                const { roomId } = roomMemberData;
                const { roomMessage, originalEventId, scheduledFor, relation } = transactionDataRequest;
                const composerData: ChatMessageComposerData = {
                  roomId, roomMessage, originalEventId, scheduledFor, relation
                };
                return this.chatMessageComposer.sendMessage(composerData, this.extras).pipe(
                  map(({ eventId }) => {
                    const result = new ChatMessageSuccesfulSendResult(
                      eventId, roomMemberData, transactionDataRequest
                    );
                    this.result.addResult(result);
                    this.singleMessageSendFinished.next(result);
                    return { eventId };
                  })
                );
              };
              return of(validRoomRequests).pipe(
                concurrentlyInterruptable(createSendMessage$, this.isAborted, this.isPaused),
                map(() => {
                  return null as unknown;
                })
              );
            })
          );
          return sendMessageOp$;
        }, data, MAX_PARALLEL_CHAT_ROOM_OPERATIONS);
        return chunkedOps$;
      }),
    );
  }

  private persistEmbeds(sendMessages: ChatMessageSuccesfulSendResult[]) {
    this.persistPolls(sendMessages);
    this.persistMedia(sendMessages);
  }

  persistPolls(sendMessages: ChatMessageSuccesfulSendResult[]) {
    const pollCreations$ = sendMessages
      .filter(result => result.transactionData.roomMessage?.message?.embeds?.filter(e => isPollEmbed(e))?.length)
      .map(
        result => {
          const context = this.contextFromSuccessfullSendResult(result);
          const pollEmbed = result.transactionData.roomMessage?.message?.embeds?.filter(e => isPollEmbed(e))[0];
          return this.embedPollCreator.createPoll(pollEmbed.poll, context);
        }
      );

      forkJoin(pollCreations$).pipe(
        defaultIfEmpty([])
      ).subscribe(() => this.embedPollsUploaded.next(null));
  }

  private persistMedia(sendMessages: ChatMessageSuccesfulSendResult[]) {
    const mediaContexts = sendMessages.map(result => this.contextFromSuccessfullSendResult(result));
    this.embedMediaHandler.persist(mediaContexts, MediaUseCases.MESSAGE).subscribe(() => {
      this.embedMediasUploaded.next(null);
    });
    this.embedForwardHandler.persist(mediaContexts).subscribe(() => {
      this.embedForwardsUploaded.next(null);
    });
  }

  contextFromSuccessfullSendResult(result: ChatMessageSuccesfulSendResult): EmbedContext {
    const { eventId, roomMemberData, transactionData } = result;
    const contextId = transactionData?.originalEventId ?? eventId;
    const { roomId } = roomMemberData;
    const context: EmbedContext = {
      type: EmbedContextTypes.CHAT_MESSAGE,
      id: contextId,
      parent: { type: EmbedContextTypes.CHAT, id: roomId }
    };

    return context;
  }
}

@Injectable()
export class ChatMessageComposerTransactionFactory {

  private mediaHandlers = inject(EmbeddedMediaUploaderFactory);
  private forwardHandlers = inject(EmbeddedForwardUploadFactory);
  private messagePreviewGeneratorFactory = inject(MessagePreviewGeneratorFactory);
  private chatRoomPrepareServiceFactory = inject(ChatRoomPrepareServiceFactory);
  private chatMessageComposer = inject(ChatMessageComposer);
  private embedPollCreator = inject(EmbedPollCreator);

  create(messageLoader: ChatMessageBatchLoader, extras: ChatMessageComposerExtras) {
    const embedMediaHandler = this.mediaHandlers.get();
    const embedForwardHandler = this.forwardHandlers.get();
    const chatRoomPrepareService = this.chatRoomPrepareServiceFactory.create(extras.enableEncryption);
    const messagePreviewGenerator = this.messagePreviewGeneratorFactory.create(
      embedMediaHandler, embedForwardHandler
    );

    return new ChatMessageComposerTransaction(
      embedMediaHandler,
      embedForwardHandler,
      this.embedPollCreator,
      messagePreviewGenerator,
      chatRoomPrepareService,
      this.chatMessageComposer,
      extras, messageLoader,
    );
  }

}
