You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
4.3 KiB
JavaScript

class MQTTConnection extends Connection {
static priority = 500;
static name = 'MQTT';
#client;
#preflist;
#buffers;
constructor(hub) {
super(hub);
this.options.enabled = false;
this.options.host = 'test.mosquitto.org';
this.options.port = '8081';
this.options.login = '';
this.options.password = '';
this.#preflist = [];
this.#buffers = new Map();
this.hub.addEventListener('deviceadded', ev => this.#sub_device(ev.device.info.prefix, ev.device.info.id));
}
isConnected() {
return this.#client && this.#client.connected;
}
async discover() {
if (this.isDiscovering() || !this.isConnected()) return;
for (const id of this.hub.getDeviceIds()) {
const dev = this.hub.dev(id);
this._discoverTimer();
await this.send(dev.info.prefix + '/' + dev.info.id + '=' + this.hub.clientId);
}
}
async search() {
if (this.isDiscovering() || !this.isConnected()) return;
await this.#upd_prefix(this.hub.prefix);
this._discoverTimer();
await this.send(this.hub.prefix + '=' + this.hub.clientId);
}
async connect() {
await this.disconnect();
this._setState(ConnectionState.CONNECTING);
const url = 'wss://' + this.options.host + ':' + this.options.port + '/mqtt';
const options = {
keepalive: 60,
clientId: this.hub.clientId,
username: this.options.login,
password: this.options.password,
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
reconnectPeriod: 3000,
connectTimeout: 10 * 1000
}
try {
this.#client = await mqtt.connectAsync(url, options);
} catch (e) {
this._setState(ConnectionState.DISCONNECTED);
return;
}
this.#client.on('connect', () => {
this._setState(ConnectionState.CONNECTED);
});
this.#client.on('reconnect', () => {
this._setState(ConnectionState.CONNECTING);
});
this.#client.on('close', () => {
this._setState(ConnectionState.DISCONNECTED);
});
this.#client.on('error', () => {
this.disconnect();
});
this.#client.on('message', (topic, text) => {
topic = topic.toString();
text = text.toString();
const parts = topic.split('/');
if (parts.length < 2 || parts[1] != 'hub' || !this.#preflist.includes(parts[0]))
return;
// prefix/hub
if (parts.length == 2) {
this.hub._parsePacket(this, text);
// prefix/hub/client_id/id
} else if (parts.length == 4 && parts[2] == this.hub.clientId) {
let buffer = this.#buffers.get(parts[3]);
if (!buffer) {
buffer = new PacketBufferScanFirst(data => {
this.hub._parsePacket(this, data);
}, 1500);
this.#buffers.set(parts[3], buffer);
}
buffer.push(text);
// prefix/hub/id/get/name
} else if (parts.length == 5 && parts[3] == 'get') {
const dev = this.hub.dev(parts[2]);
if (dev) {
let upd = {};
upd[parts[4]] = {value: text};
dev._checkUpdates(upd);
}
}
});
this.#preflist = [];
await this.#upd_prefix(this.hub.prefix);
for (const id of this.hub.getDeviceIds()) {
const dev = this.hub.dev(id);
await this.#sub_device(dev.info.prefix, dev.info.id);
}
this._setState(ConnectionState.CONNECTED);
}
async disconnect() {
if (this.#client) {
try {
await this.#client.endAsync();
} catch (e) {}
}
this.#client = undefined;
this._setState(ConnectionState.DISCONNECTED);
}
async send(topic) {
const i = topic.indexOf('=');
let msg = '';
if (i !== -1) {
msg = topic.substring(i + 1);
topic = topic.substring(0, i);
}
if (this.isConnected()) {
await this.#client.publishAsync(topic, msg);
}
}
async sub_device(prefix, id) {
await this.#sub_device(prefix, id);
}
async #sub_device(prefix, id) {
if (!this.isConnected()) return;
await this.#client.subscribeAsync(prefix + '/hub/' + id + '/get/#');
await this.#upd_prefix(prefix);
}
async #upd_prefix(prefix) {
if (!this.isConnected() || this.#preflist.includes(prefix)) return;
this.#preflist.push(prefix);
await this.#client.subscribeAsync(prefix + '/hub');
await this.#client.subscribeAsync(prefix + '/hub/' + this.hub.clientId + '/#');
}
};