博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
How it works(4) Tilestrata源码阅读(B) 地图负载均衡
阅读量:4303 次
发布时间:2019-05-27

本文共 11192 字,大约阅读时间需要 37 分钟。

引入

阅读Tilestrata的源码是绕不开Tilestrata-Balancer(以下简称TB)这个默认的负载均衡服务的.其特性是:

  • 支持动态增加,减少节点
  • 定时检查节点健康状况
  • 简单的请求过滤

大多数服务会采取专业的负载均衡或者反向代理,比如常见的nginx.理论上讲,nodejs的http-proxy性能是拍马也不及nginx的,但对于地图瓦片服务,绝大多数瓶颈在瓦片的生成,而非反向代理和负载均衡的效率,直接使用node实现很大的增加了灵活性,性能实际也没什么大下降.

架构

注册与注销节点

TB本质上是个反向代理服务器,处理对内和对外两方面的请求:

  • 对内的请求:接收局域网内各个Tilestrata服务的的注册与注销请求.
  • 对外的请求:接收客户端发过来的地图瓦片请求.
Balancer.prototype.listen = function(callback) {
callback = callback || function() {
}; var self = this; async.parallel([ //同时开启对外对内的端口监听 this._listenPrivate.bind(this), this._listenPublic.bind(this) ], function(err) {
if (err) {
log.error(null, err); return self.close(function() {
callback(err); }); } return callback(); });};

处理对内的请求:

Balancer.prototype._listenPrivate = function(callback) {
var self = this; this.http_private = http.createServer(function(req, res) {
//对内通过向'nodes'接口post数据,供Tilestrata服务注册节点或注销节点 if (req.method === 'POST' && req.url === '/nodes') {
self.handleNodeRegister(req, res); } else if (req.method === 'DELETE' && REGEX_NODES.test(req.url)) {
self.handleNodeUnregister(req, res); } else {
res.writeHead(404, {
'Content-Type': 'text/plain'}); res.write('Unrecognized URL'); res.end(); } }); //监听对内专用端口 var port = this.options.privatePort; var hostname = this.options.hostname; this.http_private.listen(port, hostname, function(err) {
callback(err); });};

以注册节点为例(注销节点代码类似,逻辑相反),TB是被动注册的,它需要Tilestrata主动Post一个JSON格式的实体,将自身的信息传递给负载均衡服务,使TB找到这个节点,进而完成注册.要了解Tilestrata与TB交互的细节,需要看看Tilestrata中我在上一篇中没提到的有关负载均衡的代码.

Tilestrata/TileServer.js中的代码:

//构建请求实体TileServer.prototype._buildBalancerData = function () {
var self = this; //绑定所有的图层 return {
id: this.uuid, version: this.version, listen_port: this.http_port, node_weight: this.options.balancer.node_weight || 1, layers: Object.keys(this.layers).map(function (layerName) {
var layer = self.layer(layerName); return {
name: layerName, options: layer.options, routes: Object.keys(layer.routes) }; }) };};
//绑定节点到TB上TileServer.prototype.bindBalancer = function () {
var self = this; var balancer = this.options.balancer; if (!balancer) return; if (this.balancer) return; //断线重连 var recovery = this.balancer = new Recovery({
'retries': Infinity, 'min': balancer.register_mindelay || '1000 ms', 'max': balancer.register_maxdelay || '30 seconds', 'reconnect timeout': balancer.register_timeout || '30 seconds' }); //每次断线重连都重新注册 recovery.on('reconnect', function (opts) {
log.info('balancer', 'Attempting to register with ' + balancer.host + '...'); request({
json: true, method: 'POST', url: 'http://' + balancer.host + '/nodes', timeout: balancer.register_timeout || 30000, body: self._buildBalancerData() }, function (err, res, body) {
//处理TB返回的参数 if (err) {
var level = err.code === 'ETIMEDOUT' || err.code === 'ECONNREFUSED' ? 'warn' : 'error'; log[level]('balancer', 'Registration failed (' + err.message + ')'); return recovery.reconnected(err); } else if (res.statusCode !== 201 && res.statusCode !== 200) {
log.warn('balancer', 'Registration failed (HTTP ' + res.statusCode + ' ' + HTTPStatus[res.statusCode] + ')'); return recovery.reconnected(new Error('Non 201 status (' + res.statusCode + ')')); } else if (!body || typeof body !== 'object') {
log.error('balancer', 'Registration failed (invalid body)'); return recovery.reconnected(new Error('Invalid body from balancer')); } else if (!body.token) {
log.error('balancer', 'Registration failed ("token" not provided)'); return recovery.reconnected(new Error('Invalid token')); } else if (isNaN(body.check_interval)) {
log.error('balancer', 'Registration failed ("check_interval" not a number or missing)'); return recovery.reconnected(new Error('Invalid check_interval')); } log.info('balancer', res.statusCode === 201 ? 'Successfully registered' : 'Already in pool'); //获取TB返回的token self.balancer_token = body.token; //获取TB的存活检查时间间隔 self.balancer_check_interval = body.check_interval; //TB连接保证 self.handleBalancerPing(); //主动重连 recovery.reconnected(); }); }); recovery.reconnect();};
//预防TB超时未检查,自动重连TileServer.prototype.handleBalancerPing = function () {
//每次执行健康检查时,都会执行该函数,从而重置超时计时器 //如果因为各种原因TB没有进行节点健康监测,则视为断线,重新注册给TB var self = this; clearTimeout(this.balancer_timeout); this.balancer_timeout = setTimeout(function () {
log.warn('balancer', 'Balancer not checking in, re-notifying of existence...'); self.balancer.reconnect(); }, this.balancer_check_interval * 3 + Math.random() * 500);};

Tilestrata发送的内容清楚了,TB接受到Tilestrata发送的请求,如何处理以及注册到负载均衡的呢?先看处理请求:

Balancer.prototype.handleNodeRegister = function(req, res) {
function fail(err) {
log.error(ipaddr, 'Failed to register (' + err.message + ')'); res.writeHead(500, {
'Content-Type': 'text/plain'}); res.write(err.message); return res.end(); } var self = this; var ipaddr = req.connection.remoteAddress; //格式化注册实体 this._getJSONBody(req, function(err, body) {
if (err) return fail(err); //注册节点,反馈信息给Tilestra self.nodes.register(ipaddr, body, function(err, added) {
if (err) return fail(err); res.writeHead(added ? 201 : 200, {
'Content-Type': 'application/json'}); res.write(self.register_body); res.end(); }); });};

请求的处理其实没什么好说的.关键点在于节点的注册.TB是负载均衡服务,因此采用了常见的一致性哈希(hashring)来作为分配的请求的算法:

NodeList.prototype.register = function(ipaddr, body, callback) {
var self = this; var id = body.id; var target = ipaddr + ':' + body.listen_port; var existing_id = this.ids_by_host[target]; var update_ring = true; //避免重复注册 //重复注册存在双重验证:id重复,或ip+端口重复 if (this.hosts_by_id[id]) {
log.info('pool', target + ' already in pool'); return callback(null, false); } else if (existing_id && existing_id !== id) {
//对与id不重复,但ip+端口出现重复,可能是节点重启了 //释放上一次的id,但不用刷新一致性哈希表,因为除了id,其他的一切都没变 this.unreference(existing_id); update_ring = false; log.warn('pool', target + ' already in pool w/different id (node did not exit cleanly): ' + existing_id + ' != ' + id); } var entry = {
}; //权重 entry[target] = {
weight: body.node_weight || 1}; this.ids_by_host[target] = id; this.hosts_by_id[id] = target; this.layers_by_id[id] = []; //将信息插入一致性哈希中 body.layers.forEach(function(layer) {
self.layers_by_id[id].push(layer.name); self.layer_options[layer.name] = layer.options; if (update_ring) self.ring(layer.name).add(entry); }); //监视该节点 this.watch(id); if (update_ring) log.info('pool', 'Added ' + target); callback(null, update_ring);};

动态的负载均衡,需要根据节点的状态进行不同的负载指向:

//定时检测节点健康程度NodeList.prototype.watch = function(id) {
if (this.check_timers_by_id[id]) return; var self = this; var unhealthy = 0; var interval = this.options.checkInterval; var maxUnhealthy = this.options.unhealthyCount; var target = this.hosts_by_id[id]; //Tilestrata的获取健康度接口 var check_url = 'http://' + target + '/health'; function performCheck() {
self.check_timers_by_id[id] = setTimeout(function() {
request.get(check_url, {
timeout: interval, headers: {
'X-TileStrataBalancer-Token': self.token } }, function(err, res) {
//节点不健康程度超过阈值,就注销节点 if (err || res.statusCode !== 200) ++unhealthy; else unhealthy = Math.max(0, unhealthy-1); if (unhealthy >= maxUnhealthy) {
log.warn(null, target + ' is unhealthy. Removing...'); self.unregister(id); } else {
performCheck(); } }); }, interval); } performCheck();};

其中的获取健康度方法,调用了上一次我没有提到的Tilestrata中的’health’接口.

Tilestrata/routes/health.js中的代码:

module.exports = function(req, res, server) {
var status = 200; var host = process.env.TILESTRATA_HIDEHOSTNAME ? '(hidden)' : hostname; var data = _.extend({
ok: true, version: server.version, host: host, uptime: humanizeDuration(server.uptime().duration), uptime_s: server.uptime().duration / 1000 }, result); if (server.options.balancer) {
var balancer_status = 'initializing'; if (server.balancer) {
if (server.balancer.reconnecting()) {
balancer_status = 'connecting'; } else {
balancer_status = 'connected'; } } data.balancer = {
status: balancer_status}; } //告知Tilestrata,TB已经发现其存在,可以不断地进行心跳ping var incoming_token = req.headers && req.headers['x-tilestratabalancer-token']; if (incoming_token && incoming_token === server.balancer_token) {
server.handleBalancerPing(); } var resbuffer = new Buffer(JSON.stringify(data), 'utf8'); res.writeHead(status, {
'Content-Length': resbuffer.length, 'Content-Type': 'application/json'}); res.write(resbuffer); res.end();};

可以看出,Tilestrata只会主动相应200状态,因此,只有当节点上Tilestrata因负载太大发生超时或错误时,才会让TB注销该节点.

其实如果更加智能一些,可以根据Tilestrata的profile数据中的相应指标,动态的调节该节点在TB中的权重.

处理对外的请求:

Balancer.prototype._listenPublic = function(callback) {
var self = this; this.http_public = http.createServer(function(req, res) {
//处理特殊请求 if (req.url === '/robots.txt') {
res.writeHead(200, {
'Content-Length': BUFFER_ROBOTSTXT.length, 'Content-Type': 'text/plain'}); res.write(BUFFER_ROBOTSTXT); res.end(); } else if (req.url === '/health') {
//这里其实可以返回所有节点的健康状况 res.writeHead(200, {
'Content-Type': 'application/json'}); res.write('{"ok":true}'); res.end(); } else {
//反向代理地图瓦片请求 self.handleProxyRequest(req, res, function() {
res.writeHead(404, {
'Content-Type': 'text/plain'}); res.write('Unrecognized URL'); res.end(); }); } }); //监听对外公开接口 var port = this.options.port; var hostname = this.options.hostname; this.http_public.listen(port, hostname, function(err) {
callback(err); });};

TB的主要任务就是接受客户端的地图瓦片请求,负载均衡然后进行反代:

Balancer.prototype.handleProxyRequest = function(req, res, next) {
//初步筛选一些格式非法的请求 var parts = req.url.substring(1).split('/'); if (parts.length < 5) return next(); var layer = parts[0]; var z = Number(parts[1]); var x = Number(parts[2]); var y = Number(parts[3]); if (isNaN(x) || isNaN(y) || isNaN(z)) {
return next(); } var file = parts[4]; var qspos = file.indexOf('?'); if (qspos > -1) file = file.substring(0, qspos); //通过一致性哈希获取最适宜节点 var target = this.nodes.pick(layer, file, z, x, y); if (!target) {
res.writeHead(404, {
'Content-Type': 'text/plain'}); res.write('No servers found to handle the request'); return res.end(); } //进行反向代理 this.proxy.web(req, res, {
target: 'http://'+target});};

核心还是最适宜节点的选取:

NodeList.prototype.pick = function(layer, file, z, x, y) {
var layer_opts = this.layer_options[layer]; if (!layer_opts) return; //进一步根据缩放级别筛选不合理请求 if (typeof layer_opts.minZoom === 'number' && z < layer_opts.minZoom) return; if (typeof layer_opts.maxZoom === 'number' && z > layer_opts.maxZoom) return; //进一步根据请求范围筛选不合理请求 if (!testbbox(x, y, z, layer_opts.bbox)) return; //可以修复偏移 var metatile = layer_opts.metatile || 1; x -= x % metatile; y -= y % metatile; //这里TB不建议把文件类型加入key, //因为对于同一个图层的不同种类文件,如.png后缀和.jpg后缀,都应放在同一个服务上, //以提高缓存的命中率与利用率,因此不需要作为负载均衡的哈希key. var key = layer + '/' + z + '/' + x + '/' + y; return this.ring(layer).get(key);};

总结

TB的实现给了Tilestrata一种很灵活的分压模式:既可以将所有图层挂载到一个Tilestrata上,启动若干个相同的实例,也可以将图层打散,启动若干实例,每个实例挂载不重复的图层.

前一种模式是很普遍的负载均衡,后一种模式则可能更适用于某些特定情况:几个不常用图层,放到一台低性能主机上,几个常用图层放到另一台高性能主机上,甚至对于常用图层,可以开若干专用节点,达到最大性能利用.

转载地址:http://bgqws.baihongyu.com/

你可能感兴趣的文章
java 流使用
查看>>
java 用流收集数据
查看>>
java并行流
查看>>
CompletableFuture 组合式异步编程
查看>>
mysql查询某一个字段是否包含中文字符
查看>>
Java中equals和==的区别
查看>>
JVM内存管理及GC机制
查看>>
Java:按值传递还是按引用传递详细解说
查看>>
Java中Synchronized的用法
查看>>
阻塞队列
查看>>
linux的基础知识
查看>>
接口技术原理
查看>>
五大串口的基本原理
查看>>
PCB设计技巧与注意事项
查看>>
linux进程之间通讯常用信号
查看>>
main函数带参数
查看>>
PCB布线技巧
查看>>
关于PCB设计中过孔能否打在焊盘上的两种观点
查看>>
PCB反推理念
查看>>
京东技术架构(一)构建亿级前端读服务
查看>>