ws = require("ws");
http = require("http");
url = require("url");
connections = new Map();
endpoint_connections = new Map();
endpoint_servers = new Map();
endpoint_queues = new Map();
endpoint_auth_creds = new Map();
queue = new Map();
queue.set("wsa", []);
queue.set("wso", []);
server = http.createServer();
server.on("upgrade", function (request, socket, head) {
const { pathname } = url.parse(request.url);
socket.setKeepAlive(true);
console.log(pathname);
var ps = pathname.split("/");
var lastname = ps.pop();
var endpoint_id = ps.pop();
var auth = request.headers["authorization"];
var parsedUrl = url.parse(request.url, true);
var query = parsedUrl.query;
console.log(`query: ${JSON.stringify(query)}`);
if (
!auth &&
request.method === "GET" &&
query &&
query.user &&
query.pass
) {
// Fallback: create Basic auth header using query parameters (Base64-encoded).
auth =
"Basic " +
Buffer.from(query.user + ":" + query.pass).toString("base64");
console.log(`fallback auth: ${auth}`);
}
var stored_auth = endpoint_auth_creds.get(endpoint_id);
if (stored_auth) {
if (!(stored_auth === auth)) {
//Wrong creds
try {
if (auth)
console.log(
"Rejected: " +
[
endpoint_id,
lastname,
auth,
atob(auth.split(" ").pop()),
],
);
else
console.log(
"Rejected (noauth): " + [endpoint_id, lastname, auth],
);
socket.write("HTTP/1.1 403 Forbidden");
//request.end('Forbidden');
socket.end();
socket.destroy();
} catch (E) {
console.log(E);
} finally {
return;
}
}
} else {
endpoint_auth_creds.set(endpoint_id, auth);
}
if (auth)
console.log([endpoint_id, lastname, auth, atob(auth.split(" ").pop())]);
else console.log(`No auth: ${endpoint_id}`);
if (!endpoint_servers.get(endpoint_id)) {
var queue = new Map();
queue.set("wsa", []);
queue.set("wso", []);
var servers = new Map();
servers.set(
"wsa",
new ws.WebSocketServer({
noServer: true,
}),
);
servers.set(
"wso",
new ws.WebSocketServer({
noServer: true,
}),
);
if (!endpoint_connections.get(endpoint_id)) {
var connections = new Map();
endpoint_connections.set(endpoint_id, connections);
}
endpoint_queues.set(endpoint_id, queue);
servers.get("wso").on("connection", function (ws) {
endpoint_queues
.get(endpoint_id)
.get("wso")
.forEach((v) => ws.send(v.toString()));
ws.on("error", console.error);
ws.on("message", function message(data) {
if (endpoint_connections.get(endpoint_id).get("wsa")) {
try {
endpoint_connections
.get(endpoint_id)
.get("wsa")
.send(data.toString());
console.log(`{endpoint_id}: wso -> wsa`);
} catch (E) {
console.error(E);
}
} else {
endpoint_queues.get(endpoint_id).get("wsa").push(data);
}
});
endpoint_connections.get(endpoint_id).set("wso", ws);
});
servers.get("wsa").on("connection", function (ws) {
endpoint_queues
.get(endpoint_id)
.get("wsa")
.forEach((v) => ws.send(v.toString()));
ws.on("error", console.error);
ws.on("message", function message(data) {
if (endpoint_connections.get(endpoint_id).get("wso")) {
try {
endpoint_connections
.get(endpoint_id)
.get("wso")
.send(data.toString());
console.log(`{endpoint_id}: wsa -> wso`);
} catch (E) {
console.error(E);
}
} else {
endpoint_queues.get(endpoint_id).get("wso").push(data);
}
});
endpoint_connections.get(endpoint_id).set("wsa", ws);
});
endpoint_servers.set(endpoint_id, servers);
}
wso = endpoint_servers.get(endpoint_id).get("wso");
wsa = endpoint_servers.get(endpoint_id).get("wsa");
if (lastname == "wso") {
wso.handleUpgrade(request, socket, head, function done(ws) {
wso.emit("connection", ws, request);
});
} else if (lastname == "wsa") {
wsa.handleUpgrade(request, socket, head, function done(ws) {
wsa.emit("connection", ws, request);
});
} else {
socket.destroy();
}
});
server.listen(9054, "localhost");
setInterval(function () {
endpoint_connections.forEach((c, k) => {
if (c != undefined) {
console.log("Active: " + k);
c.forEach((c) => c.terminate());
} else {
}
endpoint_connections.delete(k);
endpoint_queues.delete(k);
endpoint_servers.delete(k);
endpoint_auth_creds.delete(k);
});
console.log(new Date().toISOString());
console.log("Clearning the connections and the queues...");
}, 25000);