class DataPull {
    constructor(config) {
        this.config = config;
        this.ws = null;
        this.secondBuckets = [];
        this.secondLatest = 0;
        this.maxBuckets = 10000;
        this.tickers = ['AAPL','AMZN','GOOG','MSFT','META','NVDA','TSLA'];
        this.count = 0;
    }

    async start() {
        this.openWebsocket();
        setTimeout(() => {this.subscribe();}, 5000);
        setTimeout(() => {this.processCompletedSecondBuckets();}, 6000);
    };

    openWebsocket() {
        //this.ws = new WebSocket('wss://delayed.polygon.io/stocks') // stocks 15-min delay
        this.ws = new WebSocket('wss://socket.polygon.io/stocks') // stocks real-time

        this.ws.onopen = () => {
            console.log('Connected!')
            let API_KEY = 'nXg2TNrbpoMlitbcI0p8iJk1ku1QtKla';
            this.ws.send('{"action":"auth","params":"' + API_KEY + '"}')
        };

        // Per message packet:
        this.ws.onmessage = (ev) => {
            this.acceptSubscribedData(ev.data);
        };

        this.ws.onerror = (ev) => {
            console.log("Websocket open error: " + ev.data, ev);
        };
    }

    subscribe() {
        let subscripTrades = '';
        let subscripQuotes = '';
        this.tickers.forEach(tickerCur => {
            subscripTrades += ('T.' + tickerCur+',');
            subscripQuotes += ('Q.' + tickerCur+',');
        });
        let mesgTrades = {action: 'subscribe', params: subscripTrades.slice(0,-1)};
        let mesgQuotes = {action: 'subscribe', params: subscripQuotes.slice(0,-1)};
        this.ws.send(JSON.stringify(mesgTrades));
        this.ws.send(JSON.stringify(mesgQuotes));
    }

    acceptSubscribedData(data) {
        this.count++;
        //if (this.count > 20) return;
        //console.log(data);
        let dataParsed = JSON.parse( data );
        dataParsed.forEach(dataCur => {
            //console.log(dataCur);
            if (dataCur.ev != null) {
                //let dataCurTInMS = dataCur.t / 1000000;
                let dataCurTInMS = dataCur.t;
                let secNumCur = 0;
                let msCur = 0;
                switch (dataCur.ev) {
                    case 'T':
                        secNumCur = parseInt(dataCurTInMS / 1000);
                        msCur = parseInt(dataCurTInMS % 1000);
                        break;
                    case 'Q':
                        secNumCur = parseInt(dataCurTInMS / 1000);
                        msCur = parseInt(dataCurTInMS % 1000);
                        break;
                    case 'status':
                        break;
                }
                //console.log("secNumCur, msCur: ", secNumCur, msCur)
                if (secNumCur > 0) {
                    if (secNumCur > this.secondLatest) {
                        this.secondLatest = secNumCur;
                    }
                    let secBuckCur = this.secondBuckets.find(cur => cur.SecNum === secNumCur);
                    if (secBuckCur == null) {
                        secBuckCur = {
                            SecNum: secNumCur,
                            Syms: []
                        }
                        this.secondBuckets.push(secBuckCur);
                        this.secondBuckets.sort((a, b) => {
                            return a.SecNum - b.SecNum
                        });
                    }
                    let symBuckCur = secBuckCur.Syms.find(cur => cur.Sym === dataCur.sym);
                    if (symBuckCur == null) {
                        symBuckCur = {
                            Sym: dataCur.sym,
                            Vol: 0,
                            Es: []
                        };
                        secBuckCur.Syms.push(symBuckCur);
                    }
                    if (dataCur.ev === 'T') {
                        symBuckCur.Es.push({
                            MS: msCur,
                            E: 'T',
                            P: dataCur.p,
                            S: dataCur.s
                        });
                    } else {
                        symBuckCur.Es.push({
                            MS: msCur,
                            E: 'Q',
                            B: dataCur.bp,
                            A: dataCur.ap
                        });
                    }
                }
            }
        });
    }

    processCompletedSecondBuckets() {
        if (this.secondBuckets.length > this.maxBuckets) {
            this.secondBuckets.splice(0, this.secondBuckets.length - this.maxBuckets);
        }
        let tickerMidQuotes = {};
        let tickerBidQuotes = {};
        let tickerAskQuotes = {};
        let tickerAggVol = {};
        this.tickers.forEach(tickerCur => {
            tickerMidQuotes[tickerCur] = -1.0;
            tickerAggVol[tickerCur] = 0;
        });
        //console.log('Symbol,Date,Timestamp,Bid,Ask,TradePrice,Volume,Volume with sign,Agg. Volume');
        //console.log("Sweep buckets\n");

        this.secondBuckets.forEach(secBuckCur => {
            let secDate = new Date(secBuckCur.SecNum * 1000);
            let dateTimeString = secDate.toLocaleString('en-GB', { timeZone: 'America/New_York' });
            let timeString = dateTimeString.substring(12,20);
            let dateString = dateTimeString.substring(6,10)+'-'+dateTimeString.substring(3,5)+'-'+dateTimeString.substring(0,2);
            //console.log(dateTimeString, timeString);
            secBuckCur.Syms.forEach(symBuckCur => {
                symBuckCur.Es.sort((a, b) => {
                    return a.MS - b.MS
                });
                symBuckCur.Second = timeString;
                symBuckCur.Es.forEach(eCur => {
                    if (eCur.E === 'Q') {
                        tickerMidQuotes[symBuckCur.Sym] = (eCur.B + eCur.A)/2.0;
                        tickerBidQuotes[symBuckCur.Sym] = eCur.B;
                        tickerAskQuotes[symBuckCur.Sym] = eCur.A;
                        symBuckCur.BLast = eCur.B;
                        symBuckCur.ALast = eCur.A;
                    }
                    if (tickerMidQuotes[symBuckCur.Sym] > 0.0) {
                        if (eCur.E === 'T') {
                            let volNet = 0;
                            if (eCur.P > tickerMidQuotes[symBuckCur.Sym]) {
                                volNet = eCur.S;
                            } else {
                                if (eCur.P < tickerMidQuotes[symBuckCur.Sym]) {
                                    volNet = -1 * eCur.S;
                                }                                
                            }
                            symBuckCur.Vol += volNet;
                            tickerAggVol[symBuckCur.Sym] += volNet;
                            symBuckCur.AggVol = tickerAggVol[symBuckCur.Sym];
                            symBuckCur.PriceLast = eCur.P;
                            let outString = symBuckCur.Sym + ',' + dateString + ',' + timeString + ',' +
                                tickerBidQuotes[symBuckCur.Sym] +','+ tickerAskQuotes[symBuckCur.Sym] +','+ eCur.P +','+ eCur.S+','+ volNet+','+ symBuckCur.Vol;
                            //console.log(outString);
                        }
                    }
                });
                let outString2 = symBuckCur.Sym + ',' + dateString + ',' + timeString + ',' +
                    tickerBidQuotes[symBuckCur.Sym] +','+ tickerAskQuotes[symBuckCur.Sym] +',0,0,0,'+ symBuckCur.Vol;
                //console.log(outString2);
            });
            //console.log("");
        });
        //this.secondBuckets = [];
        if (this.secondBuckets.length === 20) {
            this.secondBuckets.forEach(secBuckCur => {
                //console.log("");
                secBuckCur.Syms.forEach(symBuckCur => {
                    //console.log(symBuckCur);
                });
            });
        }
        setTimeout(() => {this.processCompletedSecondBuckets();}, 300);
    }

    fillSecondBucketsFromBE() {
        
    }
}

export default DataPull;
