chore: fixup logic for health checks

This commit is contained in:
geek 2017-06-30 14:57:42 -05:00 committed by Sérgio Ramos
parent 9c8ea8c489
commit 28699d0061
5 changed files with 113 additions and 38 deletions

View File

@ -1218,8 +1218,16 @@ module.exports = class Data extends EventEmitter {
});
}
updateInstance ({ id, status }, cb) {
this._db.instances.update([{ id, status }], (err, instances) => {
updateInstance ({ id, status, healthy }, cb) {
const changes = { id };
if (typeof healthy === 'boolean') {
changes.healthy = healthy;
}
if (status) {
changes.status = status;
}
this._db.instances.update([changes], (err, instances) => {
if (err) {
return cb(err);
}

View File

@ -171,7 +171,8 @@ exports.fromInstance = function (instance) {
name: instance.name,
machineId: instance.machine_id,
status: instance.status || '',
ips: instance.ips || []
ips: instance.ips || [],
healthy: instance.healthy
};
};
@ -181,7 +182,8 @@ exports.toInstance = function (clientInstance) {
name: clientInstance.name,
machine_id: clientInstance.machineId,
status: clientInstance.status || '',
ips: clientInstance.ips || []
ips: clientInstance.ips || [],
healthy: clientInstance.healthy
};
};

View File

@ -14,11 +14,6 @@ module.exports = class Health extends Events {
// todo assert options
this._data = options.data;
this._frequency = options.frequency || 2000;
// consul is the base url to the consul cluster
if (options.consul) {
Consulite.config({ consul: options.consul });
}
}
poll () {
@ -38,8 +33,80 @@ module.exports = class Health extends Events {
}, this._frequency);
}
// check() follows these steps:
// 1. grab all services from the db
// 2. filter to only the unique consul hosts
// 3. grab all instances from the db
// 4. query each consul host for service names
// 5. query the respective consul host for service health
// 6. match node to instance using the address and update `healthy` if the status changes
check (cb) {
Consulite.getServiceNames((err, consulServices) => {
// 1. grab all services from the db
this._data.services.all((err, services) => {
if (err) {
return cb(err);
}
if (!services) {
return cb();
}
// 2. filter to only the unique consul hosts
const consulHosts = [];
services.forEach((service) => {
if (!service.consul) {
return;
}
if (consulHosts.indexOf(service.consul) === -1) {
consulHosts.push(service.consul);
}
});
// 3. grab all instances from the db
this._data.instances.all((err, instances) => {
if (err) {
return cb(err);
}
// we match consul nodes using the IP address, remove those that won't match
instances = instances.filter((instance) => {
return instance.ips && instance.ips.length;
});
// include consul host on each instance
// helps to identify the correct instance when comparing addresses
instances.forEach((instance) => {
const service = services.find((service) => {
return (service.instance_ids.indexOf(instance.id) !== -1);
});
if (service) {
instance.consul = service.consul;
}
});
VAsync.forEachParallel({
inputs: consulHosts,
// 4. query each consul host for service names
// 5. query the respective consul host for service health
func: this._checkServicesHealth
}, (err, results) => {
if (err) {
return cb(err);
}
// 6. match node to instance using the address and update `healthy` if the status changes
this._findAndUpdateInstances(instances, results.successes, cb);
});
});
});
}
_checkServicesHealth (consul, cb) {
const consulite = new Consulite({ consul });
consulite.getServiceNames((err, consulServices) => {
if (err) {
return cb(err);
}
@ -49,45 +116,43 @@ module.exports = class Health extends Events {
return consulService !== 'containerpilot';
});
this._data.instances.all((err, instances) => {
if (err) {
return cb(err);
}
VAsync.forEachParallel({
inputs: consulServices,
func: (consulService, next) => {
consulite.getServiceStatus(consulService, (err, nodes) => {
if (err) {
return next(err);
}
// we match consul nodes using the IP address
instances = instances.filter((instance) => {
return instance.ips && instance.ips.length;
});
VAsync.forEachParallel({
inputs: consulServices,
func: (consulService, next) => {
Consulite.getServiceStatus(consulService, (err, nodes) => {
if (err) {
return next(err);
}
this.findAndUpdateInstances(instances, nodes, next);
nodes = nodes.map((node) => {
node.consul = consul;
return node;
});
}
}, cb);
});
next(null, nodes);
});
}
}, cb);
});
}
findAndUpdateInstances (instances, nodes, cb) {
_findAndUpdateInstances (instances, nodes, cb) {
VAsync.forEachPipeline({
inputs: nodes,
func: (node, next) => {
const healthy = (node.status === 'passing');
const instance = instances.find((instance) => {
return (instance.ips.indexOf(nodes.address) !== -1);
return (instance.ips.indexOf(node.address) !== -1) &&
(instance.consul === node.consul) &&
(instance.healthy !== healthy);
});
if (!instance) {
return next();
}
this._data.updateInstance({ id: instance.id, status: node.status }, next);
this._data.updateInstance({ id: instance.id, healthy }, next);
}
}, cb);
}

View File

@ -31,7 +31,7 @@
},
"dependencies": {
"boom": "^5.1.0",
"consulite": "^1.8.0",
"consulite": "^2.0.0",
"docker-compose-client": "^1.0.8",
"dockerode": "^2.5.0",
"graphi": "^2.2.1",

View File

@ -372,9 +372,9 @@ consulite@1.6.x:
or-promise "1.x.x"
wreck "10.x.x"
consulite@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/consulite/-/consulite-1.8.0.tgz#71ca77d7dc19c252e9e6df6752b90e992eee9113"
consulite@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/consulite/-/consulite-2.0.0.tgz#4d35228d24f70a8fb01fdb8ed921d4e54d8bbb16"
dependencies:
or-promise "1.x.x"
wreck "12.x.x"