439 lines
10 KiB
JavaScript
439 lines
10 KiB
JavaScript
'use strict';
|
|
|
|
const Events = require('events');
|
|
const Flatten = require('lodash.flatten');
|
|
const Triton = require('triton');
|
|
const VAsync = require('vasync');
|
|
const Wreck = require('wreck');
|
|
const CIDRMatcher = require('cidr-matcher');
|
|
const ForceArray = require('force-array');
|
|
const Get = require('lodash.get');
|
|
const Uniq = require('lodash.uniq');
|
|
const Queue = require('./queue');
|
|
|
|
module.exports = class ContainerPilotWatcher extends Events {
|
|
constructor (options) {
|
|
super();
|
|
|
|
options = options || {};
|
|
|
|
// todo assert options
|
|
this._data = options.data;
|
|
this._frequency = options.frequency || 1000;
|
|
|
|
Triton.createClient({
|
|
profile: {
|
|
url: options.url || process.env.SDC_URL,
|
|
account: options.account || process.env.SDC_ACCOUNT,
|
|
keyId: options.keyId || process.env.SDC_KEY_ID
|
|
}
|
|
}, (err, client) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
this._triton = client.cloudapi;
|
|
});
|
|
}
|
|
|
|
poll () {
|
|
if (this._timeoutId) {
|
|
return;
|
|
}
|
|
|
|
this._timeoutId = setTimeout(() => {
|
|
this.check((err) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
}
|
|
|
|
delete this._timeoutId;
|
|
this.poll();
|
|
});
|
|
}, this._frequency);
|
|
}
|
|
|
|
_getDeploymentGroups (cb) {
|
|
const getInstances = (service, next) => {
|
|
service.instances({}, (err, instances) => {
|
|
if (err) {
|
|
return next(err);
|
|
}
|
|
|
|
next(null, Object.assign({}, service, {
|
|
instances
|
|
}));
|
|
});
|
|
};
|
|
|
|
const getServices = (deploymentGroup, next) => {
|
|
deploymentGroup.services({}, (err, services) => {
|
|
if (err) {
|
|
return next(err);
|
|
}
|
|
|
|
if (!services || !services.length) {
|
|
return next();
|
|
}
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: services,
|
|
func: getInstances
|
|
}, (err, result) => {
|
|
if (err) {
|
|
return next(err);
|
|
}
|
|
|
|
next(null, Object.assign({}, deploymentGroup, {
|
|
services: result.successes
|
|
}));
|
|
});
|
|
});
|
|
};
|
|
|
|
const handleDeploymentGroups = (err, deploymentGroups) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: deploymentGroups,
|
|
func: getServices
|
|
}, (err, result) => {
|
|
cb(err, result.successes);
|
|
});
|
|
};
|
|
|
|
const getDeploymentGroups = (err, portal) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
|
|
portal.deploymentGroups({}, handleDeploymentGroups);
|
|
};
|
|
|
|
this._data.getPortal({}, getDeploymentGroups);
|
|
}
|
|
|
|
_getNetworks (networkIds = [], cb) {
|
|
VAsync.forEachParallel({
|
|
inputs: networkIds,
|
|
func: (id, next) => {
|
|
this._triton.getNetwork(id, next);
|
|
}
|
|
}, (err, results) => {
|
|
cb(err, ForceArray((results || {}).successes));
|
|
});
|
|
}
|
|
|
|
_getPublicIps (machine, cb) {
|
|
this._getNetworks(machine.networks, (err, networks) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
|
|
const privateNetworkSubnets = networks
|
|
.filter((network) => {
|
|
return !network['public'];
|
|
})
|
|
.map((network) => {
|
|
return network.subnet;
|
|
})
|
|
.filter(Boolean);
|
|
|
|
const cidr = new CIDRMatcher(privateNetworkSubnets);
|
|
|
|
const nonPrivateIps = machine.ips.filter((ip) => {
|
|
return !cidr.contains(ip);
|
|
});
|
|
|
|
cb(null, nonPrivateIps);
|
|
});
|
|
}
|
|
|
|
_fetchInstanceStatus (instance, cb) {
|
|
const { machineId } = instance;
|
|
|
|
const handleStatuses = (err, results) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return cb();
|
|
}
|
|
|
|
const statuses = ForceArray((results || {}).successes);
|
|
|
|
const status = statuses.filter(Boolean).shift();
|
|
|
|
if (!status) {
|
|
return cb(null, instance);
|
|
}
|
|
|
|
const services = ForceArray(status.Services).filter(({ Name }) => {
|
|
return Name !== 'containerpilot';
|
|
});
|
|
|
|
cb(null, Object.assign({}, instance, {
|
|
cp: Object.assign({}, status, {
|
|
Services: services
|
|
})
|
|
}));
|
|
};
|
|
|
|
const fetchStatus = (ip, next) => {
|
|
Wreck.get(`http://${ip}:9090/status`, {
|
|
timeout: 1000 // 1s
|
|
}, (err, res, payload) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return next();
|
|
}
|
|
|
|
if (Buffer.isBuffer(payload)) {
|
|
payload = payload.toString();
|
|
}
|
|
|
|
try {
|
|
const status = JSON.parse(payload);
|
|
next(null, status);
|
|
} catch (err) {
|
|
next();
|
|
}
|
|
});
|
|
};
|
|
|
|
const handlePublicIps = (err, ips) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return cb();
|
|
}
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: ips,
|
|
func: fetchStatus
|
|
}, handleStatuses);
|
|
};
|
|
|
|
this._triton.getMachine(machineId, (err, machine) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return cb();
|
|
}
|
|
|
|
this._getPublicIps(machine, handlePublicIps);
|
|
});
|
|
}
|
|
|
|
_fetchServiceStatus (service, cb) {
|
|
VAsync.forEachParallel({
|
|
inputs: service.instances,
|
|
func: (instance, next) => {
|
|
this._fetchInstanceStatus(instance, next);
|
|
}
|
|
}, (err, results) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
}
|
|
|
|
cb(null, Object.assign({}, service, {
|
|
instances: ForceArray((results || {}).successes)
|
|
}));
|
|
});
|
|
}
|
|
|
|
_fetchDeploymentGroupStatus (dg, cb) {
|
|
if (!dg || !dg.services || !dg.services.length) {
|
|
return cb();
|
|
}
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: dg.services,
|
|
func: (service, next) => {
|
|
this._fetchServiceStatus(service, next);
|
|
}
|
|
}, (err, results) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
}
|
|
|
|
cb(null, Object.assign({}, dg, {
|
|
services: ForceArray((results || {}).successes)
|
|
}));
|
|
});
|
|
}
|
|
|
|
_saveInstance ({ id, healthy, watchers, jobs }, cb) {
|
|
if (!id) {
|
|
return cb();
|
|
}
|
|
|
|
this._data.updateInstance({
|
|
id,
|
|
healthy,
|
|
watchers,
|
|
jobs
|
|
}, cb);
|
|
}
|
|
|
|
_saveService ({ id, instances, connections }, cb) {
|
|
if (!id) {
|
|
return cb();
|
|
}
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: instances,
|
|
func: (instance, next) => {
|
|
this._saveInstance(instance, next);
|
|
}
|
|
}, (err) => {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
|
|
this._data.updateService({
|
|
id,
|
|
connections
|
|
}, cb);
|
|
});
|
|
}
|
|
|
|
_saveDeploymentGroup (dg, cb) {
|
|
VAsync.forEachParallel({
|
|
inputs: dg.services,
|
|
func: (service, next) => {
|
|
this._saveService(service, next);
|
|
}
|
|
}, cb);
|
|
}
|
|
|
|
check (cb) {
|
|
if (!this._triton) {
|
|
return cb();
|
|
}
|
|
|
|
const resolveServiceConnections = ({ services, service }) => {
|
|
const watches = Uniq(
|
|
Flatten(ForceArray(service.instances).map(({ watches }) => { return watches; }))
|
|
);
|
|
|
|
return watches
|
|
.map((jobName) => {
|
|
return services.reduce((serviceId, service) => {
|
|
if (serviceId) {
|
|
return serviceId;
|
|
}
|
|
|
|
const thisServiceJobs = Uniq(
|
|
Flatten(ForceArray(service.instances).map(({ jobs }) => { return jobs; }))
|
|
);
|
|
|
|
if (thisServiceJobs.indexOf(jobName) >= 0) {
|
|
return service.id;
|
|
}
|
|
|
|
return serviceId;
|
|
}, null);
|
|
})
|
|
.filter(Boolean);
|
|
};
|
|
|
|
const resolveInstanceHealth = ({ name }, instance) => {
|
|
if (!instance) {
|
|
return;
|
|
}
|
|
|
|
if (!instance.cp) {
|
|
return 'UNAVAILABLE';
|
|
}
|
|
|
|
const jobNames = Get(instance, 'cp.Services');
|
|
|
|
const serviceJobs = jobNames.filter(({ Name }) => { return Name === name; });
|
|
|
|
if (serviceJobs.length) {
|
|
return serviceJobs.shift().Status.toUpperCase();
|
|
}
|
|
|
|
const almostJobNameRegexp = new RegExp(`${name}-.*`);
|
|
const almostServiceJobs = jobNames.filter((n) => {
|
|
return almostJobNameRegexp.test(n);
|
|
});
|
|
|
|
if (almostServiceJobs.length) {
|
|
return almostServiceJobs.shift().Status.toUpperCase();
|
|
}
|
|
|
|
return 'UNKNOWN';
|
|
};
|
|
|
|
const handleStatuses = (err, results) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
}
|
|
|
|
const dgs = ForceArray((results || {}).successes)
|
|
.filter(Boolean)
|
|
.map((dg) => {
|
|
return Object.assign({}, dg, {
|
|
services: ForceArray(dg.services).map((service) => {
|
|
return Object.assign({}, service, {
|
|
instances: ForceArray(service.instances).map((instance) => {
|
|
return Object.assign({}, instance, {
|
|
healthy: resolveInstanceHealth(service, instance),
|
|
jobs: Get(instance, 'cp.Services', []).map(
|
|
({ Name }) => { return Name; }
|
|
),
|
|
watches: Get(instance, 'cp.Watches', [])
|
|
});
|
|
})
|
|
});
|
|
})
|
|
});
|
|
})
|
|
.map((dg) => {
|
|
return Object.assign({}, dg, {
|
|
services: ForceArray(dg.services).map((service) => {
|
|
return Object.assign({}, service, {
|
|
connections: resolveServiceConnections({
|
|
services: dg.services,
|
|
service
|
|
})
|
|
});
|
|
})
|
|
});
|
|
});
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: dgs,
|
|
func: (dg, next) => {
|
|
Queue(dg.id, () => {
|
|
return new Promise((resolve) => {
|
|
this._saveDeploymentGroup(dg, (err) => {
|
|
resolve();
|
|
next(err);
|
|
});
|
|
});
|
|
});
|
|
}
|
|
}, cb);
|
|
};
|
|
|
|
const fetchStatuses = (err, dgs) => {
|
|
if (err) {
|
|
this.emit('error', err);
|
|
return cb();
|
|
}
|
|
|
|
VAsync.forEachParallel({
|
|
inputs: dgs,
|
|
func: (dg, next) => {
|
|
this._fetchDeploymentGroupStatus(dg, next);
|
|
}
|
|
}, handleStatuses);
|
|
};
|
|
|
|
this._getDeploymentGroups(fetchStatuses);
|
|
}
|
|
};
|
|
|