import {Component, OnInit, OnDestroy, Input, Inject} from '@angular/core';
import {IMqttMessage, MqttService} from "ngx-mqtt";
import {timer as observableTimer} from "rxjs/internal/observable/timer";
import mixpanel from 'mixpanel-browser/src/loader-module';
import {kerberosConfig} from "../../../../../../environments/environment";
import {Subscription} from "rxjs/Subscription";

@Component({
  selector: "MqttStream",
  templateUrl: './mqttstream.component.html',
  styleUrls: ['./mqttstream.component.scss'],
})
export class MqttStream implements OnInit, OnDestroy  {

  @Input() deviceKey: string;
  @Input() cloudKey: string;
  @Input() changeStatus: any;

  public status: string = 'pending';
  public livestream;
  public liveStreamObservable;
  public liveLegacyStreamObservable;
  public pollingObservables: Array<any> = [];
  public livestreamStarted: boolean = false;
  public timer: any;
  public timerSubscription: any = null;

  constructor(
    @Inject('mqttNew') private _mqttService: MqttService,
    @Inject('mqttLegacy') private _mqttLegacyService: MqttService) {}

  ngOnInit(){
    this.updateStatus('pending');
    const cloudKey = this.cloudKey;
    const deviceKey = this.deviceKey;

    this.publishRequestMQTTLiveStream(cloudKey, deviceKey);
    this.subscribeToMQTTLiveStream(cloudKey, deviceKey);
  }

  /*ngOnChanges(changes){
    if(changes && changes.deviceKey) {
      const { currentValue } = changes.deviceKey;

      // Close previous thread
      if(this.liveStreamObservable) {
        this.liveStreamObservable.unsubscribe();
      }
      if(this.liveLegacyStreamObservable) {
        this.liveLegacyStreamObservable.unsubscribe();
      }

      // Open for new device/camera id
      this.deviceKey = currentValue;
      const cloudKey = this.cloudKey;
      const deviceKey = this.deviceKey;
      this.publishRequestMQTTLiveStream(cloudKey, deviceKey);
      this.subscribeToMQTTLiveStream(cloudKey, deviceKey);
    }
  }*/

  ngOnDestroy(){
    clearInterval(this.timerSubscription)
    if(this.liveStreamObservable) {
      this.liveStreamObservable.unsubscribe();
    }
    if(this.liveLegacyStreamObservable) {
      this.liveLegacyStreamObservable.unsubscribe();
    }
  }

  updateStatus(status) {
    this.status = status;
    if(this.changeStatus){
      this.changeStatus(status);
    }
  }

  subscribeToMQTTLiveStream(cloudKey, deviceKey) {
    // Legacy
    if(kerberosConfig.mqttLegacyServer != "") {
      this.liveLegacyStreamObservable = this._mqttLegacyService.observe(`kerberos/${cloudKey}/device/${deviceKey}/live`).subscribe((message: IMqttMessage) => {
        const topic = message.topic.split("/")[3];
        if(topic===deviceKey) {
          this.livestream = message.payload.toString();
        }
        this.updateStatus('started');
      });
    }

    // New MQTT
    this.liveStreamObservable = this._mqttService.observe(`kerberos/${cloudKey}/device/${deviceKey}/live`).subscribe((message: IMqttMessage) => {
      const topic = message.topic.split("/")[3];
      if(topic===deviceKey) {
        this.livestream = message.payload.toString();
      }
      this.updateStatus('started');
    });
  }

  publishRequestMQTTLiveStream(cloudKey, deviceKey) {
    const interval = 2000;
    this.timerSubscription = setInterval(() => {
      // Legacy
      if(kerberosConfig.mqttLegacyServer != "") {
        this._mqttLegacyService.unsafePublish('kerberos/' + cloudKey + '/device/' + deviceKey + '/request-live', Math.random().toString(), {qos: 0});
      }
      // New MQTT
      this._mqttService.unsafePublish('kerberos/' + cloudKey + '/device/' + deviceKey + '/request-live', Math.random().toString(), {qos: 0});
    }, interval);
  }
}
