joyent-portal/packages/portal-api/lib/watch/container-pilot.js

430 lines
9.8 KiB
JavaScript
Raw Normal View History

'use strict';
const Events = require('events');
const Flatten = require('lodash.flatten');
const Triton = require('triton');
const VAsync = require('vasync');
const Wreck = require('wreck');
const CIDRMatcher = require('cidr-matcher');
const ForceArray = require('force-array');
const Get = require('lodash.get');
const Uniq = require('lodash.uniq');
const Queue = require('./queue');
module.exports = class ContainerPilotWatcher extends Events {
constructor (options) {
super();
options = options || {};
// todo assert options
this._data = options.data;
this._frequency = options.frequency || 1000;
Triton.createClient({
profile: {
url: options.url || process.env.SDC_URL,
account: options.account || process.env.SDC_ACCOUNT,
keyId: options.keyId || process.env.SDC_KEY_ID
}
}, (err, client) => {
if (err) {
this.emit('error', err);
return;
}
this._triton = client.cloudapi;
});
}
poll () {
if (this._timeoutId) {
return;
}
this._timeoutId = setTimeout(() => {
this.check((err) => {
if (err) {
this.emit('error', err);
}
delete this._timeoutId;
this.poll();
});
}, this._frequency);
}
_getDeploymentGroups (cb) {
const getInstances = (service, next) => {
service.instances({}, (err, instances) => {
if (err) {
return next(err);
}
next(null, Object.assign({}, service, {
instances
}));
});
};
const getServices = (deploymentGroup, next) => {
deploymentGroup.services({}, (err, services) => {
if (err) {
return next(err);
}
VAsync.forEachParallel({
inputs: services,
func: getInstances
}, (err, result) => {
if (err) {
return next(err);
}
next(null, Object.assign({}, deploymentGroup, {
services: result.successes
}));
});
});
};
const handleDeploymentGroups = (err, deploymentGroups) => {
if (err) {
return cb(err);
}
VAsync.forEachParallel({
inputs: deploymentGroups,
func: getServices
}, (err, result) => {
cb(err, result.successes);
});
};
const getDeploymentGroups = (err, portal) => {
if (err) {
return cb(err);
}
portal.deploymentGroups({}, handleDeploymentGroups);
};
this._data.getPortal({}, getDeploymentGroups);
}
_getNetworks (networkIds = [], cb) {
VAsync.forEachParallel({
inputs: networkIds,
func: (id, next) => {
this._triton.getNetwork(id, next);
}
}, (err, results) => {
cb(err, ForceArray((results || {}).successes));
});
}
_getPublicIps (machine, cb) {
this._getNetworks(machine.networks, (err, networks) => {
if (err) {
return cb(err);
}
const privateNetworkSubnets = networks
.filter((network) => {
return !network['public'];
})
.map((network) => {
return network.subnet;
})
.filter(Boolean);
const cidr = new CIDRMatcher(privateNetworkSubnets);
const nonPrivateIps = machine.ips.filter((ip) => {
return !cidr.contains(ip);
});
cb(null, nonPrivateIps);
});
}
_fetchInstanceStatus (instance, cb) {
const { machineId } = instance;
const handleStatuses = (err, results) => {
if (err) {
this.emit('error', err);
return cb();
}
const statuses = ForceArray((results || {}).successes);
const status = statuses.filter(Boolean).shift();
if (!status) {
return cb(null, instance);
}
const services = ForceArray(status.Services).filter(({ Name }) => {
return Name !== 'containerpilot';
});
cb(null, Object.assign({}, instance, {
cp: Object.assign({}, status, {
Services: services
})
}));
};
const fetchStatus = (ip, next) => {
Wreck.get(`http://${ip}:9090/status`, {
timeout: 1000 // 1s
}, (err, res, payload) => {
if (err) {
this.emit('error', err);
return next();
}
if (Buffer.isBuffer(payload)) {
payload = payload.toString();
}
try {
const status = JSON.parse(payload);
next(null, status);
} catch (err) {
next();
}
});
};
const handlePublicIps = (err, ips) => {
if (err) {
this.emit('error', err);
return cb();
}
VAsync.forEachParallel({
inputs: ips,
func: fetchStatus
}, handleStatuses);
};
this._triton.getMachine(machineId, (err, machine) => {
if (err) {
this.emit('error', err);
return cb();
}
this._getPublicIps(machine, handlePublicIps);
});
}
_fetchServiceStatus (service, cb) {
VAsync.forEachParallel({
inputs: service.instances,
func: (instance, next) => {
this._fetchInstanceStatus(instance, next);
}
}, (err, results) => {
if (err) {
this.emit('error', err);
}
cb(null, Object.assign({}, service, {
instances: ForceArray((results || {}).successes)
}));
});
}
_fetchDeploymentGroupStatus (dg, cb) {
VAsync.forEachParallel({
inputs: dg.services,
func: (service, next) => {
this._fetchServiceStatus(service, next);
}
}, (err, results) => {
if (err) {
this.emit('error', err);
}
cb(null, Object.assign({}, dg, {
services: ForceArray((results || {}).successes)
}));
});
}
_saveInstance ({ id, healthy, watchers, jobs }, cb) {
if (!id) {
return cb();
}
this._data.updateInstance({
id,
healthy,
watchers,
jobs
}, cb);
}
_saveService ({ id, instances, connections }, cb) {
if (!id) {
return cb();
}
VAsync.forEachParallel({
inputs: instances,
func: (instance, next) => {
this._saveInstance(instance, next);
}
}, (err) => {
if (err) {
return cb(err);
}
this._data.updateService({
id,
connections
}, cb);
});
}
_saveDeploymentGroup (dg, cb) {
VAsync.forEachParallel({
inputs: dg.services,
func: (service, next) => {
this._saveService(service, next);
}
}, cb);
}
check (cb) {
if (!this._triton) {
return cb();
}
const resolveServiceConnections = ({ services, service }) => {
const watches = Uniq(
Flatten(ForceArray(service.instances).map(({ watches }) => { return watches; }))
);
return watches
.map((jobName) => {
return services.reduce((serviceId, service) => {
if (serviceId) {
return serviceId;
}
const thisServiceJobs = Uniq(
Flatten(ForceArray(service.instances).map(({ jobs }) => { return jobs; }))
);
if (thisServiceJobs.indexOf(jobName) >= 0) {
return service.id;
}
return serviceId;
}, null);
})
.filter(Boolean);
};
const resolveInstanceHealth = ({ name }, instance) => {
if (!instance) {
return;
}
if (!instance.cp) {
return 'UNAVAILABLE';
}
const jobNames = Get(instance, 'cp.Services');
const serviceJobs = jobNames.filter(({ Name }) => { return Name === name; });
if (serviceJobs.length) {
return serviceJobs.shift().Status.toUpperCase();
}
const almostJobNameRegexp = new RegExp(`${name}-.*`);
const almostServiceJobs = jobNames.filter((n) => {
return almostJobNameRegexp.test(n);
});
if (almostServiceJobs.length) {
return almostServiceJobs.shift().Status.toUpperCase();
}
return 'UNKNOWN';
};
const handleStatuses = (err, results) => {
if (err) {
this.emit('error', err);
}
const dgs = ForceArray((results || {}).successes)
.map((dg) => {
return Object.assign({}, dg, {
services: ForceArray(dg.services).map((service) => {
return Object.assign({}, service, {
instances: ForceArray(service.instances).map((instance) => {
return Object.assign({}, instance, {
healthy: resolveInstanceHealth(service, instance),
jobs: Get(instance, 'cp.Services', []).map(
({ Name }) => { return Name; }
),
watches: Get(instance, 'cp.Watches', [])
});
})
});
})
});
})
.map((dg) => {
return Object.assign({}, dg, {
services: ForceArray(dg.services).map((service) => {
return Object.assign({}, service, {
connections: resolveServiceConnections({
services: dg.services,
service
})
});
})
});
});
VAsync.forEachParallel({
inputs: dgs,
func: (dg, next) => {
Queue(dg.id, () => {
return new Promise((resolve) => {
this._saveDeploymentGroup(dg, (err) => {
resolve();
next(err);
});
});
});
}
}, cb);
};
const fetchStatuses = (err, dgs) => {
if (err) {
this.emit('error', err);
return cb();
}
VAsync.forEachParallel({
inputs: dgs,
func: (dg, next) => {
this._fetchDeploymentGroupStatus(dg, next);
}
}, handleStatuses);
};
this._getDeploymentGroups(fetchStatuses);
}
};