import { Observer as RxObserver, Observable, defer, race, throwError, isObservable } from "rxjs";
import { map, ignoreElements, switchMap } from 'rxjs/operators';

import * as Task from '@core/services/task';
import { CancelledError, UnsupportedOperationError } from '@core/errors/errors-core';
import { createLogger } from "@core/services/logger.service";

import { ProgressHandler, ProgressMessage, ProgressSubject } from "@archipad-js/core/progress";

const log = createLogger('Progress');


export type TaskFn<T> = (task:Task.Task)=>Promise<T>;

/*---------------------------------------------------------------------------*/

class OldTaskProgress {
    private _oldTask:Task.Task;
    private _progress:ProgressSubject;
    private _task: Observable<unknown>;
 
    constructor(oldTask:Task.Task, fn: (progress:RxObserver<ProgressMessage>) => Promise<any> | Observable<any>) {
        this._oldTask = oldTask;
        this._progress = new ProgressSubject();
        this._task = defer(() => {
            const ret = fn(this._progress.observer);
            if(!ret || (!isObservable(ret) && !(ret instanceof Promise))) {
                throw new UnsupportedOperationError('Function did not return an observable or a promise');
            }
            return ret;
        });
    }

    promise() {
        if(!this._oldTask) {
            return this._task.toPromise();
        }

        const progressUpdates$ = this._progress.observable.pipe(
            map((message) => {
                switch(message.type) {
                    case 'label': {
                        this._oldTask.label = message.message;
                    } break;
                    case 'progress': {
                        this._oldTask.setTotalUnits(1);
                        this._oldTask.setCompletedUnits(message.progress);
                    } break;
                    case 'pause': {
                        log.warn('Pause is not suppported on old tasks');
                    } break;
                    case 'continue': {
                        log.warn('Pause is not suppported on old tasks');
                    } break;
                }
            }),
            ignoreElements(),
        );

        const oldTaskCancel$ = defer(() => this._oldTask.cancelPromise).pipe(
            switchMap(() => throwError(new CancelledError())),
        );

        return race(
            // update the progress on old task for as long as the promise is running
            progressUpdates$,
            // throw a CancelledError when the old task is cancelled 
            oldTaskCancel$,
            // execute the task
            this._task,
        ).toPromise() as Promise<unknown>;
    }
}

/**
 * Adapt an old task to a new one
 * 
 * ```
 * const self = this;
 * return context.task(l('Loading ...'), function(task) {
 *     return Progress.adaptOldTask(task, function(progress) {
 *         return self.download(progress);
 *     });
 * });
 * ```
 * 
 * @param oldTask old task object
 * @param fn a function that return an observable task with progress
 */
 export function adaptOldTask<T>(oldTask:Task.Task, fn:(progress:RxObserver<ProgressMessage>) => Promise<T>|Observable<T>): Promise<T> {
    const oldTaskProgress = new OldTaskProgress(oldTask, fn);
    return oldTaskProgress.promise() as Promise<T>;
}

export class LegacyHandler extends ProgressHandler {
    oldTask<T>(units: number, label: string, fn: TaskFn<T>): Observable<T> {
        let subTaskProgress = null;
        if(this._progress) {
            if(!this._subTasks) {
                this._subTasks = [];
            }
            subTaskProgress = { units:units, progress:0 };
            this._subTasks.push(subTaskProgress);
        }
    
        return new Observable((observer) => {
            let task = Task.make(label, fn);
    
            if (task.label?.length) {
                this.label(task.label);
            }
    
            task.addListener((state) => {
                if(subTaskProgress) {
                    if(state === 'progressing') {
                        subTaskProgress.progress = task.progress;
                        this._progressChanged();
                    }
                }
                if(task.isFinished) {
                    if(task.error) {
                        observer.error(task.error);
                    } else {
                        if(subTaskProgress) {
                            subTaskProgress.progress = 1;
                            this._progressChanged();
                        }
    
                        observer.next(task.result);
                        observer.complete();
                    }
                    task = null;
                }
            });
    
            return function() {
                if(task) {
                    task.cancel();
                }
            }
        });
    }
}

export function makeProgressHandler(progress: RxObserver<ProgressMessage> | null): LegacyHandler {
    return new LegacyHandler(progress);
}
