import { Observable, Subject } from 'rxjs';
import { environment } from '../../environments/environment';
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { shareReplay, take, takeUntil, publishReplay, refCount, share } from 'rxjs/operators';
import { AbstractService } from '../abstract.service';
import { Page } from '../domain/page';
import { PortfolioItem } from "../domain/portfolio-item";
import { RequestArguments } from '../domain/request-arguments';

import { Security } from '../domain/security';
import { Broker } from '../domain/broker';

const API_URL = environment.apiUrl;
const CACHE_SIZE = 1;

export interface PortfolioResponse {
  portfolio: Array<Security>;
}

@Injectable()
export class PortfolioService extends AbstractService {
  private prevPortfolioArgs: RequestArguments<PortfolioItem>;
  private portfolioCache: Observable<Page<PortfolioItem>>;
  public portfolioCacheRefresh = new Subject<void>();  // Used to force reload of portfolioCache from server
  private prevSecuritiesArgs: RequestArguments<Security>;
  private securitiesCache: Observable<Page<Security>>;
  private securitiesCacheRefresh = new Subject<void>();
  private brokersCache: Observable<Array<Broker>>;
  public portfolioChangedNotifier = new Subject<void>();

  constructor(private http: HttpClient) {
    super();
  }

  public getPortfolio(requestArgs: RequestArguments<PortfolioItem>): Observable<Page<PortfolioItem>> {
    if (!requestArgs.argumentsEquals(this.prevPortfolioArgs)) {
      this.prevPortfolioArgs = requestArgs;
      this.clearPortfolioCache();
    }
    if (!this.portfolioCache) {  // GET from API if portfolioCache is empty or args have changed
      this.portfolioCache = this.requestPage<PortfolioItem>(`/portfolio${requestArgs.getQueryParams('?')}`)
        .pipe(
          takeUntil(this.portfolioCacheRefresh),
          shareReplay(CACHE_SIZE),
          take(1)
        );
    }
    return this.portfolioCache;
  }

  public clearPortfolioCache(): void {
    this.portfolioCacheRefresh.next();  // Causes current cache instance to complete (due to takeUntil)
    this.portfolioCache = null;  // Clears the cache
  }

  public getSecurities(requestArgs: RequestArguments<Security>): Observable<Page<Security>> {
    if (!requestArgs.argumentsEquals(this.prevSecuritiesArgs)) {
      this.prevSecuritiesArgs = requestArgs;
      this.clearSecuritiesCache();
    }
    if (!this.securitiesCache) {  // GET from API if securitiesCache is empty or args have changed
      this.securitiesCache = this.requestPage<Security>(`/securities${requestArgs.getQueryParams('?')}`)
        .pipe(
          takeUntil(this.securitiesCacheRefresh),
          shareReplay(CACHE_SIZE),
          take(1)
        );
    }
    return this.securitiesCache;
  }

  public clearSecuritiesCache(): void {
    this.securitiesCacheRefresh.next();  // Causes current cache instance to complete (due to takeUntil)
    this.securitiesCache = null;  // Clears the cache
  }

  private requestPage<T>(requestPath: string): Observable<Page<T>> {
    return this.http.get<Page<T>>(API_URL + requestPath);
  }

  public getBrokers(): Observable<Array<Broker>> {
    if (!this.brokersCache) {
      this.brokersCache = this.http.get<Array<Broker>>(API_URL + '/brokers')
        .pipe(
          take(1),
          publishReplay(1),
          refCount()
        );
    }
    return this.brokersCache;
  }

  public savePortfolioItem(portfolioItem: PortfolioItem): Observable<PortfolioItem> {
    return this.http.put<PortfolioItem>(API_URL + '/portfolio', this.objectToFormData(portfolioItem)).pipe(share());
  }
}
