import RSocketWebSocketClient from 'rsocket-websocket-client'
import {
	BufferEncoders,
	MESSAGE_RSOCKET_COMPOSITE_METADATA,
	MESSAGE_RSOCKET_ROUTING,
	encodeAndAddWellKnownMetadata,
	encodeAndAddCustomMetadata,
	RSocketClient,
	MAX_STREAM_ID,
	encodeCompositeMetadata,
	RSocketResumableTransport,
} from 'rsocket-core'
import { config } from './config'
import { displayError } from '../helpers/helpers'
import { ToastID } from '../store/types'
import { uid as secUID } from 'uid/secure'
import { Flowable } from 'rsocket-flowable'
import log from 'loglevel'

const bufferSize = 128
const sessionDurationSeconds = 60
const reconnectIntervalMillis = 5000
const keepAlive = 5000
const lifetime = 20000

export default class MessageService {
	constructor(uid) {
		if (!uid || typeof uid !== 'string' || uid?.length < 32) {
			uid = secUID(32)
		}

		log.info('Resume UID: ', uid)
		const resumeToken = Buffer.from(uid)

		this.decoder = new TextDecoder('utf-8')

		const resumableTransport = new RSocketResumableTransport(
			() =>
				new RSocketWebSocketClient(
					{
						url:
							document.location.origin
								.replace('http://', `ws://`)
								.replace('https://', `wss://`) + config.endpoints.api,
					},
					BufferEncoders,
				),
			{
				bufferSize,
				resumeToken: resumeToken,
				sessionDurationSeconds,
			},
		)

		this._client = new RSocketClient({
			setup: {
				keepAlive: keepAlive,
				lifetime: lifetime,
				dataMimeType: 'application/json',
				metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
			},
			transport: resumableTransport,
		})

		// subscribe connection changes
		let start = true
		let firstConnect = true
		resumableTransport.connectionStatus().subscribe({
			onNext: (status) => {
				log.info('CONNECTION STATUS: ', status.kind)
				switch (status.kind) {
					case 'NOT_CONNECTED':
						if (!firstConnect) {
							displayError(ToastID.NETWORK_OFFLINE)
						} else {
							firstConnect = false
						}

						if (!start) {
							log.info('Retrying connection...')
							setTimeout(() => {
								try {
									resumableTransport.connect()
								} catch (err) {
									displayError(ToastID.SERVER_ERROR, err)
								}
							}, reconnectIntervalMillis)
						} else {
							start = false
						}
						break
					case 'ERROR':
					case 'CLOSED':
						displayError(ToastID.SERVER_ERROR)
						break
					default:
						break
				}
			},
			onSubscribe: (subscription) => {
				subscription.request(Number.MAX_SAFE_INTEGER)
			},
			onError: (err) => {
				displayError(ToastID.SERVER_ERROR, err)
			},
		})
	}

	requestResponse(route, data) {
		return this._socket
			.requestResponse({
				data: Buffer.from(JSON.stringify(data)),
				metadata: this.buildMetadata(route),
			})
			.map((itm) => {
				try {
					return JSON.parse(this.decoder.decode(itm.data))
				} catch (err) {
					displayError(ToastID.SERVER_ERROR, err)
					return null
				}
			})
	}

	requestStream(route, data, onNextCallback, onErrorCallback) {
		this._socket
			.requestStream({
				data: Buffer.from(JSON.stringify(data)),
				metadata: this.buildMetadata(route),
			})
			.subscribe({
				onComplete: () => {
					log.info('stream completed: route - ', route)
				},
				onError: (err) => {
					if (typeof onErrorCallback === 'function') {
						onErrorCallback(err)
					} else {
						displayError(ToastID.SERVER_ERROR, err)
					}
				},
				onNext: (value) => {
					onNextCallback(JSON.parse(this.decoder.decode(value.data)))
				},
				onSubscribe: (subscription) => subscription.request(MAX_STREAM_ID),
			})
	}

	requestChannel(route, onNextCallback, onErrorCallback) {
		let _subscriber

		const flowable = new Flowable(
			(subscriber) => (_subscriber = subscriber),
		).map((data) => {
			// log.info('--- DATA', data)
			return {
				data: Buffer.from(JSON.stringify(data)),
				metadata: this.buildMetadata(route),
			}
		})

		this._socket.requestChannel(flowable).subscribe({
			onComplete: () => {
				log.info('stream completed: route - ', route)
			},
			onError: (err) => {
				if (typeof onErrorCallback === 'function') {
					onErrorCallback(err)
				} else {
					displayError(ToastID.SERVER_ERROR, err)
				}
			},
			onNext: (value) => {
				// log.info('ON-NEXT DATA: ', JSON.parse(this.decoder.decode(value.data)))
				onNextCallback(JSON.parse(this.decoder.decode(value.data)))
			},
			onSubscribe: (subscription) => {
				subscription.request(MAX_STREAM_ID)
			},
		})
		return _subscriber
	}

	connect(token, jwt) {
		this._token = token
		this._jwt = jwt
		return this._client.connect().map((socket) => (this._socket = socket))
	}

	buildMetadata(route) {
		if (this._token) {
			return encodeAndAddWellKnownMetadata(
				encodeAndAddCustomMetadata(
					Buffer.alloc(0),
					'message/x.rsocket.authentication.bearer.v0',
					this._token,
				),
				MESSAGE_RSOCKET_ROUTING,
				this.encodeRoute(route),
			)
		}

		return encodeCompositeMetadata([
			[MESSAGE_RSOCKET_ROUTING, this.encodeRoute(route)],
		])
	}

	encodeRoute(route) {
		const length = Buffer.byteLength(route, 'utf8')
		const buffer = Buffer.alloc(1)
		buffer.writeInt8(length)
		return Buffer.concat([buffer, Buffer.from(route, 'utf8')])
	}

	placeBet(betRequest, callback) {
		let req = {
			editionId: betRequest.editionId,
			sessionId: this._jwt['ses'],
			userId: this._jwt['sub'],
			betItems: betRequest.bets.map((value) => {
				return {
					clientId: value['cId'],
					autoCashOut: value['cashOutAt'],
					bet: value['bet'],
					currencyType: value['currencyType'],
				}
			}),
		}
		this.requestResponse('volt.bet.place', req).then(
			(s) => {
				callback(s)
			},
			(err) => displayError(ToastID.SERVER_ERROR, err),
		)
	}

	cashout(cReq, callback) {
		this.requestResponse('volt.bet.cashout', cReq).then(
			(s) => {
				if (typeof callback === 'function') {
					callback(s)
				}
			},
			(err) => displayError(ToastID.SERVER_ERROR, err),
		)
	}

	cashoutAll(editionId, callback) {
		this.requestResponse('volt.bet.cashoutAll', editionId).then(
			(s) => {
				callback(s)
			},
			(err) => displayError(ToastID.SERVER_ERROR, err),
		)
	}

	closeConnection() {
		try {
			this._client.close()
		} catch (err) {
			log.info('Error on connection close: ', err)
		}
	}
}
