njs使用方法 - Thu, Mar 14, 2019
使用nginx和njs进行负载均衡。
1. 物联网模型(用户-产品-设备)
物联网模型中进行了用户、产品和设备的建模。每个用户包含很多产品,每个产品包含很多设备。每个产品都会有相应的服务。 这样可以保证不同用户的产品和设备互不影响,并能对服务负载进行分担。
2. 问题点:
用户-产品-设备模型将一个服务要做的事情分成了很多服务,但是对外使用时不能暴露不同的端口和域名进行访问。 所以需要有另一个服务进行调度和负载均衡。下面就需要使用nginx和njs进行解决。
3. TCP协议(mqtt)
mqtt是TCP协议。
下面是nginx官方博客介绍怎样对mqtt进行调度: NGINX Plus for the IoT: Load Balancing MQTT
由于mqtt属于长连接形式的TCP连接,所以只需要在第一次连接时能够进行调度就可以了。
njs日志:
njs中s.log(string)和s.error(string)打出的log都在error_log中,如果需要查看不同等级的log,需要配置error_log的参数。
error_log /var/log/nginx/mqtt_error.log debug;
nginx配置
js_include /etc/nginx/stream_conf.d/steam.js;
js_set $mqtt_server getMqttServer;
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received '
'$bytes_sent $upstream_addr $mqtt_server';
server {
listen 1883;
preread_buffer_size 1k; # Big enough to read CONNECT packet header
js_preread parserMqttProductID;
proxy_pass $mqtt_server;
proxy_connect_timeout 1s;
proxy_timeout 1m;
access_log /var/log/nginx/mqtt_access.log mqtt;
error_log /var/log/nginx/mqtt_error.log;
}
njs文件steam.js
///////////////////////////////mqtt///////////////////////////////////////
var client_messages = 1;
var mqttProductID = "";
function getStringLen(s, offset) {
var len_msb = s.buffer.charCodeAt(offset).toString(16);
var len_lsb = s.buffer.charCodeAt(offset + 1).toString(16);
if ( len_lsb.length < 2 ) len_lsb = "0" + len_lsb;
return parseInt(len_msb + len_lsb, 16);
}
function parserMqttProductID(s) {
if ( !s.fromUpstream ) {
if ( s.buffer.toString().length == 0 ) { // Initial calls may contain no data, so
return s.AGAIN; // ask that we get called again
} else if ( client_messages == 1 ) { // CONNECT is first packet from the client
// CONNECT packet is 1, using upper 4 bits (00010000 to 00011111)
var packet_type_flags_byte = s.buffer.charCodeAt(0);
if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) {
// Calculate remaining length with variable encoding scheme
var multiplier = 1;
var remaining_len_val = 0;
var remaining_len_byte;
for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) {
remaining_len_byte = s.buffer.charCodeAt(remaining_len_pos);
if ( remaining_len_byte == 0 ) break; // Stop decoding on 0
remaining_len_val += (remaining_len_byte & 127) * multiplier;
multiplier *= 128;
}
var connect_flags = s.buffer.charCodeAt(remaining_len_pos + 7);
if((connect_flags & 0x40) == 0 || (connect_flags & 0x80) == 0) {
s.log("miss username or password");
return s.AGAIN;
}
// Extract ClientId based on length defined by 2-byte encoding
var payload_offset = remaining_len_pos + 10; // Skip fixed header
payload_offset += 2 + getStringLen(s, payload_offset);
if(connect_flags & 0x04) {
payload_offset += 2 + getStringLen(s, payload_offset);
payload_offset += 2 + getStringLen(s, payload_offset);
}
var username_len_int = getStringLen(s, payload_offset);
var username_str = s.buffer.substr(payload_offset + 2, username_len_int);
var str_arr = username_str.split("&");
if (str_arr.length > 1) {
mqttProductID = str_arr[1];
} else {
s.log("Wrong username format")
return s.AGAIN;
}
} else {
s.log("Received unexpected MQTT packet type + flags: " + packet_type_flags_byte.toString());
return s.AGAIN;
}
}
client_messages++;
}
return s.OK;
}
function getMqttServer() {
return "broker-" + mqttProductID + "." + "default" + ".svc.cluster.local:1883";
}
4. UDP协议(coap)
coap是UDP协议,是http协议的变种。
由于coap协议是短连接协议,所以需要在每次连接时都需要调度。
njs日志:
njs中s.log(string)和s.error(string)打出的log都在error_log中,如果需要查看不同等级的log,需要配置error_log的参数。
error_log /var/log/nginx/mqtt_error.log debug;
nginx配置
js_include /etc/nginx/stream_conf.d/steam.js;
js_set $coap_server getCoapServer;
log_format coap '$remote_addr [$time_local] $protocol $status $bytes_received '
'$bytes_sent $upstream_addr $coap_server';
server {
listen 5683 udp;
preread_buffer_size 1k; # Big enough to read CONNECT packet header
js_preread parserCoapProductID;
proxy_pass $coap_server;
proxy_connect_timeout 1s;
proxy_timeout 1m;
access_log /var/log/nginx/coap_access.log coap;
error_log /var/log/nginx/coap_error.log;
}
njs文件steam.js
///////////////////////////////coap///////////////////////////////////////
var coapProductID = "";
function parseChar(c) {
var hex = '0x' + c.toString('hex');
return parseInt(hex);
}
function parserCoapProductID(s) {
s.log("buffer:" + s.buffer.toString('hex'));
s.log("remoteAddress:" + s.remoteAddress.toString());
s.log("fromUpstream:" + s.fromUpstream.toString());
if (s.fromUpstream) {
return s.OK;
}
var data = s.buffer;
s.log('dataLen:' + data.length.toString());
if (data.length < 4) {
s.error("truncated");
return s.ERROR;
}
var data0 = parseChar(data[0]);
if (data0 >> 6 != 1) {
s.error("version is error, version need to be 1, but " + (data0 >> 6).toString(10));
return s.ERROR;
}
var tokenLen = data0 & 0x0f;
s.log("tokenLen:" + tokenLen);
if (tokenLen > 8) {
s.error("token length error" + tokenLen.toString());
return s.ERROR;
}
if (data.length < (4 + tokenLen)) {
s.error("truncated");
return s.ERROR;
}
var b = data.slice(4 + tokenLen, data.length);
var prev = 0
var parseExtOpt = function(opt) {
switch (opt) {
case 13: // extoptByteCode
if (b.length < 1) {
return -1;
}
opt = parseChar(b[0]) + 13;
b = b.slice(1, b.length);
break;
case 14: // extoptWordCode
if (b.length < 2) {
return -1;
}
var b0 = parseChar(b[0])
var b1 = parseChar(b[1])
opt = (b1 | (b0 << 8)) + 269;
b = b.slice(2, b.length);
break;
}
return opt
}
while (b.length > 0) {
var b0 = parseChar(b[0]);
if (b0 == 0xff) {
b = b.slice(1, b.length);
break;
}
var delta = b0 >> 4; // b[0] >> 4
var length = b0 & 0x0f;
// 0xF0 || 0x0F
if ((delta == 15) || (length == 15)) {
return s.ERROR;
}
b = b.slice(1, b.length);
delta = parseExtOpt(delta);
if (delta == -1) {
return s.ERROR;
}
length = parseExtOpt(length);
if (length == -1) {
return s.ERROR;
}
if (b.length < length) {
s.error("truncated");
return s.ERROR;
}
var oid = prev + delta;
var valueBuf = b.slice(0, length);
s.log("valueBuf:" + valueBuf.toString('hex'));
s.log("oid:" + oid.toString())
if (oid != 11) {
b = b.slice(length, b.length);
prev = oid;
continue;
}
if ((valueBuf.length < 0) || (valueBuf.length > 255)) {
s.error("valueBuf.length:" + valueBuf.length.toString());
return s.ERROR;
}
coapProductID = valueBuf.toString()
s.log("coapProductID:" + coapProductID);
return s.OK;
}
return s.OK;
}
function getCoapServer() {
return "broker-" + coapProductID + "." + "default" + ".svc.cluster.local:5683";
}