1
0
mirror of https://github.com/yldio/copilot.git synced 2025-01-07 17:40:14 +02:00
copilot/packages/portal-data/lib/index.js
2017-06-28 10:18:10 +01:00

1520 lines
39 KiB
JavaScript

'use strict';
// core modules
const EventEmitter = require('events');
const Util = require('util');
// 3rd party modules
const CPClient = require('cp-client');
const DockerClient = require('docker-compose-client');
const Dockerode = require('dockerode');
const Hoek = require('hoek');
const ParamCase = require('param-case');
const Penseur = require('penseur');
const { DEPLOYMENT_GROUP, SERVICE, HASH } = require('portal-watch');
const UniqBy = require('lodash.uniqby');
const Uuid = require('uuid/v4');
const VAsync = require('vasync');
// local modules
const Transform = require('./transform');
const NON_IMPORTABLE_STATES = [
'EXITED',
'DELETED',
'STOPPED',
'FAILED'
];
const internals = {
defaults: {
name: 'portal',
db: {
test: false
},
dockerComposeHost: 'tcp://0.0.0.0:4242'
},
tables: {
'portals': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'datacenters': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'deployment_groups': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'versions': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'manifests': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'services': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'packages': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'instances': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false },
'users': { id: { type: 'uuid' }, primary: 'id', secondary: false, purge: false }
},
resolveCb: (resolve, reject) => {
return (err, ...args) => {
if (err) {
return reject(err);
}
resolve(...args);
};
}
};
module.exports = class Data extends EventEmitter {
constructor (options) {
super();
const settings = Hoek.applyToDefaults(internals.defaults, options || {});
// Penseur will assert that the options are correct
this._db = new Penseur.Db(settings.name, settings.db);
this._dockerCompose = new DockerClient(settings.dockerComposeHost);
this._docker = new Dockerode(settings.docker);
this._watcher = null;
if (settings.consul && settings.consul.address) {
CPClient.config(settings.consul);
}
this._dockerCompose.on('error', (err) => {
this.emit('error', err);
});
}
setWatcher (watcher) {
this._watcher = watcher;
}
connect (cb) {
this._db.establish(internals.tables, cb);
}
reconnectCompose (dockerComposeHost) {
this._dockerCompose.close();
this._dockerCompose = new DockerClient(dockerComposeHost);
this._dockerCompose.on('error', (err) => {
this.emit('error', err);
});
}
// portals
createPortal (clientPortal, cb) {
const portal = Transform.toPortal(clientPortal);
this._db.portals.insert(portal, (err, key) => {
if (err) {
return cb(err);
}
portal.id = key;
cb(null, Transform.fromPortal({ portal }));
});
}
getPortal (options, cb) {
this._db.portals.all((err, portals) => {
if (err) {
return cb(err);
}
if (!portals) {
return cb();
}
const portal = portals.shift();
// Sub query/filter for deploymentGroups
const deploymentGroups = (args) => {
return new Promise((resolve, reject) => {
this.getDeploymentGroups(args, internals.resolveCb(resolve, reject));
});
};
// Sub query/filter for user
const user = () => {
return new Promise((resolve, reject) => {
this.getUser({}, internals.resolveCb(resolve, reject));
});
};
// Sub query/filter for datacenter
const datacenter = () => {
return new Promise((resolve, reject) => {
this.getDatacenter({ id: portal.datacenter_id }, internals.resolveCb(resolve, reject));
});
};
cb(null, Transform.fromPortal({
portal,
deploymentGroups,
datacenter,
user
}));
});
}
// datacenters
createDatacenter (datacenter, cb) {
this._db.datacenters.insert(datacenter, (err, key) => {
if (err) {
return cb(err);
}
datacenter.id = key;
cb(null, datacenter);
});
}
getDatacenters (cb) {
this._db.datacenters.all(cb);
}
getDatacenter ({ id, region }, cb) {
Hoek.assert(id || region, 'id or region are required to retrieve a datacenter');
if (region) {
return this._db.datacenters.query({ region }, (err, datacenters) => {
if (err) {
return cb(err);
}
return cb(null, datacenters && datacenters.length ? datacenters[0] : null);
});
}
this._db.datacenters.get(id, cb);
}
// users
createUser (clientUser, cb) {
const user = Transform.toUser(clientUser);
this._db.users.insert(user, (err, key) => {
if (err) {
return cb(err);
}
user.id = key;
cb(null, Transform.fromUser(user));
});
}
getUser (options, cb) {
this._db.users.all((err, users) => {
if (err) {
return cb(err);
}
if (!users || !users.length) {
return cb();
}
cb(null, Transform.fromUser(users[0]));
});
}
// deployment_groups
createDeploymentGroup (clientDeploymentGroup, cb) {
const deploymentGroup = Transform.toDeploymentGroup(clientDeploymentGroup);
this._db.deployment_groups.query({
slug: deploymentGroup.slug
}, (err, deploymentGroups) => {
if (err) {
return cb(err);
}
if (deploymentGroups && deploymentGroups.length) {
return cb(new Error(`DeploymentGroup "${deploymentGroup.slug}" already exists (${deploymentGroups[0].id})`));
}
this._db.deployment_groups.insert(deploymentGroup, (err, key) => {
if (err) {
return cb(err);
}
deploymentGroup.id = key;
cb(null, Transform.fromDeploymentGroup(deploymentGroup));
});
});
}
updateDeploymentGroup ({ id, name }, cb) {
this._db.deployment_groups.update([{ id, name }], (err) => {
if (err) {
return cb(err);
}
cb(null, Transform.fromDeploymentGroup({ id, name }));
});
}
_getDeploymentGroupVersion (deploymentGroup) {
const getServices = (args) => {
args = args || {};
args.deploymentGroupId = deploymentGroup.id;
return new Promise((resolve, reject) => {
this.getServices(args, internals.resolveCb(resolve, reject));
});
};
const getVersion = (args) => {
args = args || {};
args.id = deploymentGroup.version_id;
return new Promise((resolve, reject) => {
return deploymentGroup.version_id ?
this.getVersion(args, internals.resolveCb(resolve, reject)) :
resolve(null);
});
};
return Object.assign(deploymentGroup, {
services: getServices,
version: getVersion
});
}
getDeploymentGroups ({ ids, name, slug }, cb) {
const finish = (err, deploymentGroups) => {
if (err) {
return cb(err);
}
if (!deploymentGroups || !deploymentGroups.length) {
return cb(null, []);
}
// todo getHistory
cb(null, deploymentGroups.map((dg) => { return Transform.fromDeploymentGroup(this._getDeploymentGroupVersion(dg)); }));
};
if (ids) {
return this._db.deployment_groups.get(ids, finish);
}
if (name) {
return this._db.deployment_groups.query({ name }, finish);
}
if (slug) {
return this._db.deployment_groups.query({ slug }, finish);
}
return this._db.deployment_groups.all(finish);
}
getDeploymentGroup (query, cb) {
query = query || {};
this._db.deployment_groups.query(query, (err, deploymentGroups) => {
if (err) {
return cb(err);
}
if (!deploymentGroups || !deploymentGroups.length) {
return cb(null, {});
}
// todo getHistory
cb(null, Transform.fromDeploymentGroup(this._getDeploymentGroupVersion(deploymentGroups[0])));
});
}
_versionManifest (version) {
return Object.assign(version, {
manifest: (args) => {
return new Promise((resolve, reject) => {
return this.getManifest({
id: version.manifest_id
}, internals.resolveCb(resolve, reject));
});
}
});
}
// versions
createVersion (clientVersion, cb) {
Hoek.assert(clientVersion, 'version is required');
Hoek.assert(clientVersion.manifest, 'manifest is required');
Hoek.assert(clientVersion.deploymentGroupId, 'deploymentGroupId is required');
console.log(`-> creating new Version for DeploymentGroup ${clientVersion.deploymentGroupId}`);
const version = Transform.toVersion(clientVersion);
this._db.versions.insert(version, (err, key) => {
if (err) {
return cb(err);
}
console.log(`-> new Version for DeploymentGroup ${clientVersion.deploymentGroupId} created: ${key}`);
const changes = {
id: clientVersion.deploymentGroupId,
version_id: key,
history_version_ids: this._db.append(key)
};
if (clientVersion.serviceIds) {
changes['service_ids'] = clientVersion.serviceIds;
}
console.log(`-> updating DeploymentGroup ${clientVersion.deploymentGroupId} to add Version ${key}`);
this._db.deployment_groups.update([changes], (err) => {
if (err) {
return cb(err);
}
version.id = key;
cb(null, Transform.fromVersion(this._versionManifest(version)));
});
});
}
updateVersion (clientVersion, cb) {
this._db.versions.update([Transform.toVersion(clientVersion)], (err, versions) => {
if (err) {
return cb(err);
}
if (!versions || !versions.length) {
return cb(null, null);
}
cb(null, Transform.fromVersion(this._versionManifest(versions[0])));
});
}
getVersion ({ id, manifestId }, cb) {
const query = id ? { id } : { manifest_id: manifestId };
this._db.versions.single(query, (err, version) => {
if (err) {
return cb(err);
}
if (!version) {
return cb(null, null);
}
cb(null, Transform.fromVersion(this._versionManifest(version)));
});
}
getVersions ({ manifestId, deploymentGroupId }, cb) {
const finish = (err, versions) => {
if (err) {
return cb(err);
}
versions = versions || [];
cb(null, versions.map((version) => { return Transform.fromVersion(this._versionManifest(version)); }));
};
// ensure the data is in sync
this._db.versions.sync(() => {
if (manifestId) {
return this._db.versions.query({ manifest_id: manifestId }, finish);
}
this.getDeploymentGroup({ id: deploymentGroupId }, (err, deploymentGroup) => {
if (err) {
return finish(err);
}
this._db.versions.get(deploymentGroup.history, finish);
});
});
}
scale ({ id, replicas }, cb) {
Hoek.assert(id, 'service id is required');
Hoek.assert(typeof replicas === 'number' && replicas >= 0, 'replicas must be a number no less than 0');
// get the service then get the deployment group
// use the deployment group to find the current version and manifest
// scale the service
// maybe update the machine ids and instances
console.log('-> scale request received');
console.log(`-> fetching Service ${id}`);
this._db.services.single({ id }, (err, service) => {
if (err) {
return cb(err);
}
if (!service) {
return cb(new Error(`service not found for id: ${id}`));
}
console.log(`-> fetching DeploymentGroup ${service.deployment_group_id}`);
this._db.deployment_groups.single({ id: service.deployment_group_id }, (err, deployment_group) => {
if (err) {
return cb(err);
}
if (!deployment_group) {
return cb(new Error(`deployment group not found for service with service id: ${id}`));
}
console.log(`-> fetching Version ${deployment_group.version_id}`);
this._db.versions.single({ id: deployment_group.version_id }, (err, version) => {
if (err) {
return cb(err);
}
if (!version) {
return cb(new Error(`version not found for service with service id: ${id}`));
}
console.log(`-> fetching Manifest ${version.manifest_id}`);
this._db.manifests.single({ id: version.manifest_id }, (err, manifest) => {
if (err) {
return cb(err);
}
if (!manifest) {
return cb(new Error(`manifest not found for service with service id: ${id}`));
}
this._scale({ service, deployment_group, version, manifest, replicas }, cb);
});
});
});
});
}
_scale ({ service, deployment_group, version, manifest, replicas }, cb) {
let isFinished = false;
const finish = () => {
if (isFinished) {
return;
}
isFinished = true;
console.log(`-> docker-compose scaled "${service.name}" from DeploymentGroup ${deployment_group.id} to ${replicas} replicas`);
if (!version.service_scales || !version.service_scales.length) {
console.log(`-> no scale data found for service "${service.name}" from DeploymentGroup ${deployment_group.id} in current Version (${version.id})`);
version.service_scales = [{
service_name: service.name
}];
}
const clientVersion = {
deploymentGroupId: deployment_group.id,
manifest,
plan: version.plan,
scale: version.service_scales.map((scale) => {
if (scale.service_name !== service.name) {
return scale;
}
return {
serviceName: service.name,
replicas
};
})
};
console.log(`-> creating new Version for DeploymentGroup ${deployment_group.id}`);
// createVersion updates the deployment group
this.createVersion(clientVersion, (...args) => {
isFinished = true;
cb(...args);
});
};
console.log(`-> requesting docker-compose to scale "${service.name}" from DeploymentGroup ${deployment_group.id} to ${replicas} replicas`);
this._dockerCompose.scale({
projectName: deployment_group.name,
services: {
[service.name]: replicas
},
manifest: manifest.raw
}, (err, res) => {
if (err) {
return cb(err);
}
finish();
});
}
// manifests
provisionManifest (clientManifest, cb) {
// 1. check that the deploymentgroup exists
// 2. create a new manifest
// 3. create a new version
// 4. return said version
// 5. request docker-compose-api to provision manifest
// 6. create/update/prune services by calling provisionServices with the respose from docker-compose-api
// 7. update version with the provision plan and new service ids
// todo we are not doing anything with the action plans right now
// but if we were, we would do that in portal-watch. with that said, we might
// run into a race condition where the event happens before we update the
// new version with the plan
console.log('-> provision request received');
const provision = ({ deploymentGroup, manifest, newVersion }) => {
let isHandled = false;
console.log(`-> requesting docker-compose provision for DeploymentGroup ${deploymentGroup.name}`);
this._dockerCompose.provision({
projectName: deploymentGroup.name,
manifest: clientManifest.raw
}, (err, provisionRes) => {
if (err) {
this.emit('error', err);
return;
}
// callback can execute multiple times, ensure responses are only handled once
if (isHandled) {
return;
}
isHandled = true;
console.log('-> update/create/remove services based on response from docker-compose');
// create/update services based on hashes
// return the new set of service ids
this.provisionServices({
deploymentGroup,
provisionRes
}, (err, newServiceIds) => {
if (err) {
this.emit('error', err);
return;
}
console.log(`-> update Version ${newVersion.id} based on docker-compose response and new service ids`);
const actions = Object.keys(provisionRes).map((serviceName) => {
return ({
type: provisionRes[serviceName].plan.action,
service: serviceName,
machines: provisionRes[serviceName].plan.containers.map(({ id }) => { return id; })
});
});
// create new version
this.updateVersion({
id: newVersion.id,
manifest,
newServiceIds,
plan: {
running: true,
actions: actions
}
}, (err) => {
if (err) {
this.emit('error', err);
return;
}
console.log(`-> updated Version ${newVersion.id}`);
console.log('-> provisionManifest DONE');
});
});
});
};
const createVersion = ({ deploymentGroup, currentVersion, manifest }) => {
// create new version
this.createVersion({
manifest,
deploymentGroupId: deploymentGroup.id,
scale: currentVersion.scale,
plan: {
running: true,
actions: []
}
}, (err, newVersion) => {
if (err) {
return cb(err);
}
console.log(`-> new Version created with id ${newVersion.id}`);
console.log('newVersion', newVersion);
setImmediate(() => {
provision({ deploymentGroup, manifest, newVersion });
});
cb(null, newVersion);
});
};
this.getDeploymentGroup({
id: clientManifest.deploymentGroupId
}, (err, deploymentGroup) => {
if (err) {
return cb(err);
}
if (!deploymentGroup) {
return cb(new Error('Deployment group not found for manifest'));
}
console.log(`-> new DeploymentGroup created with id ${deploymentGroup.id}`);
const newManifest = Transform.toManifest(clientManifest);
this._db.manifests.insert(newManifest, (err, manifestId) => {
if (err) {
return cb(err);
}
console.log(`-> new Manifest created with id ${manifestId}`);
deploymentGroup.version().then((currentVersion) => {
if (!currentVersion) {
console.log(`-> detected first provision for DeploymentGroup ${deploymentGroup.id}`);
} else {
console.log(`-> creating new Version based on old version ${currentVersion.id}`);
}
return createVersion({
deploymentGroup,
manifest: { id: manifestId },
currentVersion: currentVersion || {}
});
}).catch((err) => { return cb(err); });
});
});
}
getManifest ({ id }, cb) {
this._db.manifests.single({ id }, (err, manifest) => {
if (err) {
return cb(err);
}
cb(null, Transform.fromManifest(manifest || {}));
});
}
getManifests ({ type, deploymentGroupId }, cb) {
const query = type ? { type } : { deployment_group_id: deploymentGroupId };
this._db.manifests.query(query, (err, manifests) => {
if (err) {
return cb(err);
}
manifests = manifests || [];
cb(null, manifests.map(Transform.fromManifest));
});
}
// services
provisionServices ({ deploymentGroup, provisionRes }, cb) {
// 1. get current set of services
// 2. compare names and hashes
// 3. if name doesn't exist anymore, disable service
// 4. if hash is new, update service
// 5. compare previous services with new ones
// 6. deactivate pruned ones
console.log('-> provision services in our data layer');
const createService = ({ provision, serviceName }, cb) => {
console.log(`-> creating Service "${serviceName}" from DeploymentGroup ${deploymentGroup.id}`);
this.createService({
hash: provision.hash,
deploymentGroupId: deploymentGroup.id,
name: serviceName,
slug: ParamCase(serviceName)
}, (err, service) => {
if (err) {
return cb(err);
}
cb(null, service.id);
});
};
const updateService = ({ provision, service }, cb) => {
console.log(`-> updating Service "${service.name}" from DeploymentGroup ${deploymentGroup.id}`);
this.updateService({
id: service.id,
hash: provision.hash
}, (err) => {
if (err) {
return cb(err);
}
cb(null, service.id);
});
};
const resolveService = (serviceName, next) => {
console.log(`-> fetching Service "${serviceName}" from DeploymentGroup ${deploymentGroup.id}`);
const provision = provisionRes[serviceName];
this.getServices({
name: serviceName,
deploymentGroupId: deploymentGroup.id
}, (err, services) => {
if (err) {
return cb(err);
}
// no services for given name
if (!services || !services.length) {
return createService({ provision, serviceName }, next);
}
const service = services.shift();
VAsync.forEachPipeline({
inputs: services,
// disable old services
func: ({ id }, next) => {
console.log(`-> deactivating Service ${id} from DeploymentGroup ${deploymentGroup.id}`);
this.updateService({ active: false, id }, next);
}
}, (err) => {
if (err) {
return cb(err);
}
// service changed
if (service.hash !== provision.hash) {
return updateService({ provision, service }, next);
}
console.log(`-> no changes for Service "${serviceName}" from DeploymentGroup ${deploymentGroup.id}`);
return next(null, service.id);
});
});
};
const pruneService = ({ id, instances }, next) => {
// if it has instances, just mark as inactive
console.log(`-> pruning Service ${id} from DeploymentGroup ${deploymentGroup.id}`);
const update = () => { return this.updateService({ active: false, id }, next); };
const remove = () => { return this.deleteServices({ ids: [id] }, next); };
return (instances && instances.length) ?
update() :
remove();
};
// deactivate pruned services
const pruneServices = (err, result) => {
if (err) {
return cb(err);
}
console.log(`-> pruning Services from DeploymentGroup ${deploymentGroup.id}`);
const new_service_ids = result.successes;
this.getServices({
deploymentGroupId: deploymentGroup.id
}, (err, oldServices) => {
if (err) {
return cb(err);
}
const servicesToPrune = oldServices
.filter(({ id }) => { return new_service_ids.indexOf(id) < 0; });
VAsync.forEachPipeline({
inputs: servicesToPrune,
func: pruneService
}, (err) => { return cb(err, new_service_ids); });
});
};
VAsync.forEachPipeline({
inputs: Object.keys(provisionRes),
func: resolveService
}, pruneServices);
}
createService (clientService, cb) {
const newService = Object.assign(Transform.toService(clientService), {
active: true
});
this._db.services.insert(newService, (err, key) => {
if (err) {
return cb(err);
}
clientService.id = key;
cb(null, clientService);
});
}
updateService (clientService, cb) {
this._db.services.update([Transform.toService(clientService)], (err, services) => {
if (err) {
return cb(err);
}
cb(null, services && services.length ? Transform.fromService(services[0]) : {});
});
}
getService ({ id, hash }, cb) {
const query = id ? { id } : { version_hash: hash };
this._db.services.query(query, (err, service) => {
if (err) {
return cb(err);
}
if (!service) {
return cb(null, null);
}
this._db.packages.single({ id: service.package_id }, (err, packages) => {
if (err) {
return cb(err);
}
cb(null, Transform.fromService({ service, instances: this._instancesFilter(service.instance_ids), packages }));
});
});
}
_getDeploymentGroupServices (deploymentGroupSlug, cb) {
this.getDeploymentGroup({ slug: deploymentGroupSlug }, (err, deploymentGroup) => {
if (err) {
return cb(err);
}
if (!deploymentGroup) {
return cb(null, {});
}
return this.getServices({ deploymentGroupId: deploymentGroup.id }, cb);
});
}
getServices (options, cb) {
if (options.deploymentGroupSlug) {
return this._getDeploymentGroupServices(options.deploymentGroupSlug, cb);
}
const query = {};
if (options.ids && options.ids.length) {
query.id = this._db.or(options.ids);
}
if (options.name) {
query.name = options.name;
}
if (options.slug) {
query.slug = options.slug;
}
if (options.parentId) {
query.parent_id = options.parentId;
}
if (options.deploymentGroupId) {
query.deployment_group_id = options.deploymentGroupId;
}
this._db.services.query(query, (err, services) => {
if (err) {
return cb(err);
}
if (!services || !services.length) {
return cb();
}
return cb(null, services.map((service) => {
return Transform.fromService({ service, instances: this._instancesFilter(service.instance_ids) });
}));
});
}
_instancesFilter (instanceIds) {
return (query) => {
query = query || {};
return new Promise((resolve, reject) => {
query.ids = instanceIds;
this.getInstances(query, internals.resolveCb(resolve, reject));
});
};
}
stopServices ({ ids }, cb) {
this._db.services.get(ids, (err, services) => {
if (err) {
return cb(err);
}
if (!services || !services.length) {
return cb();
}
const instanceIds = services.reduce((instanceIds, service) => {
return instanceIds.concat(service.instance_ids);
}, []);
VAsync.forEachParallel({
func: (instanceId, next) => {
this._db.instances.get(instanceId, (err, instance) => {
if (err) {
return next(err);
}
const container = this._docker.getContainer(instance.machine_id);
container.stop(next);
});
},
inputs: instanceIds
}, (err, results) => {
if (err) {
return cb(err);
}
this.getServices({ ids }, cb);
});
});
}
startServices ({ ids }, cb) {
this._db.services.get(ids, (err, services) => {
if (err) {
return cb(err);
}
if (!services || !services.length) {
return cb();
}
const instanceIds = services.reduce((instanceIds, service) => {
return instanceIds.concat(service.instance_ids);
}, []);
VAsync.forEachParallel({
func: (instanceId, next) => {
this._db.instances.get(instanceId, (err, instance) => {
if (err) {
return next(err);
}
const container = this._docker.getContainer(instance.machine_id);
container.start(next);
});
},
inputs: instanceIds
}, (err, results) => {
if (err) {
return cb(err);
}
this.getServices({ ids }, cb);
});
});
}
restartServices ({ ids }, cb) {
this._db.services.get(ids, (err, services) => {
if (err) {
return cb(err);
}
if (!services || !services.length) {
return cb();
}
const instanceIds = services.reduce((instanceIds, service) => {
return instanceIds.concat(service.instance_ids);
}, []);
VAsync.forEachParallel({
func: (instanceId, next) => {
this._db.instances.get(instanceId, (err, instance) => {
if (err) {
return next(err);
}
const container = this._docker.getContainer(instance.machine_id);
container.restart(next);
});
},
inputs: instanceIds
}, (err, results) => {
if (err) {
return cb(err);
}
this.getServices({ ids }, cb);
});
});
}
deleteServices ({ ids }, cb) {
// todo could this be done with scale = 0?
this._db.services.get(ids, (err, services) => {
if (err) {
return cb(err);
}
if (!services || !services.length) {
return cb();
}
const instanceIds = services.reduce((instanceIds, service) => {
return instanceIds.concat(service.instance_ids);
}, []);
VAsync.forEachParallel({
func: (instanceId, next) => {
this._db.instances.get(instanceId, (err, instance) => {
if (err) {
return next(err);
}
const container = this._docker.getContainer(instance.machine_id);
// Use force in case the container is running. TODO: should we keep force?
container.remove({ force: true }, next);
});
},
inputs: instanceIds
}, (err, results) => {
if (err) {
return cb(err);
}
VAsync.forEachParallel({
inputs: ids,
func: (serviceId, next) => {
this.updateService({
id: serviceId,
active: false
});
}
}, (err) => {
if (err) {
return cb(err);
}
this.getServices({ ids }, cb);
});
});
});
}
// instances
createInstance (clientInstance, cb) {
this._db.instances.insert(Transform.toInstance(clientInstance), (err, key) => {
if (err) {
return cb(err);
}
clientInstance.id = key;
cb(null, clientInstance);
});
}
getInstance ({ id }, cb) {
this._db.instances.single({ id }, (err, instance) => {
if (err) {
return cb(err);
}
cb(null, instance ? Transform.fromInstance(instance) : {});
});
}
getInstances ({ ids, name, machineId, status }, cb) {
const query = {};
if (ids) {
query.id = this._db.or(ids);
}
if (name) {
query.name = name;
}
if (machineId) {
query.machine_id = machineId;
}
if (status) {
query.status = status;
}
this._db.instances.query(query, (err, instances) => {
if (err) {
return cb(err);
}
if (!instances || !instances.length) {
return cb(null, []);
}
cb(null, instances.map(Transform.fromInstance));
});
}
updateInstance ({ id, status }, cb) {
this._db.instances.update([{ id, status }], (err, instances) => {
if (err) {
return cb(err);
}
cb(null, instances && instances.length ? Transform.fromInstance(instances[0]) : {});
});
}
stopInstances ({ ids }, cb) {
this._db.instances.get(ids, (err, instances) => {
if (err) {
return cb(err);
}
if (!instances || !instances.length) {
return cb();
}
VAsync.forEachParallel({
func: (instance, next) => {
const container = this._docker.getContainer(instance.machine_id);
container.stop(next);
},
inputs: instances
}, (err, results) => {
if (err) {
return cb(err);
}
this.getInstances({ ids }, cb);
});
});
}
startInstances ({ ids }, cb) {
this._db.instances.get(ids, (err, instances) => {
if (err) {
return cb(err);
}
if (!instances || !instances.length) {
return cb();
}
VAsync.forEachParallel({
func: (instance, next) => {
const container = this._docker.getContainer(instance.machine_id);
container.start((err) => {
if (err) {
return next(err);
}
// Update the IPAddress for the instance
container.inspect((err, details) => {
if (err) {
return next(err);
}
this._db.instances.update(instance.id, { ip_address: details.NetworkSettings.IPAddress }, next);
});
});
},
inputs: instances
}, (err) => {
if (err) {
return cb(err);
}
this.getInstances({ ids }, cb);
});
});
}
restartInstances ({ ids }, cb) {
this._db.instances.get(ids, (err, instances) => {
if (err) {
return cb(err);
}
if (!instances || !instances.length) {
return cb();
}
VAsync.forEachParallel({
func: (instance, next) => {
this.updateInstance({ id: instance.id, status: 'RESTARTING' }, () => {
const container = this._docker.getContainer(instance.machine_id);
container.restart(next);
});
},
inputs: instances
}, (err, results) => {
if (err) {
return cb(err);
}
this.getInstances({ ids }, cb);
});
});
}
// packages
createPackage (clientPackage, cb) {
this._db.packages.insert(Transform.toPackage(clientPackage), (err, key) => {
if (err) {
return cb(err);
}
clientPackage.id = key;
cb(null, clientPackage);
});
}
getPackage ({ id }, cb) {
this._db.packages.single({ id }, (err, dbPackage) => {
if (err) {
return cb(err);
}
cb(null, dbPackage ? Transform.fromPackage(dbPackage) : {});
});
}
getPackages ({ name, type }, cb) {
const query = name ? { name } : { type };
this._db.packages.query(query, (err, dbPackages) => {
if (err) {
return cb(err);
}
cb(null, dbPackages ? dbPackages.map(Transform.fromPackage) : []);
});
}
getConfig ({deploymentGroupName = '', type = '', format = '', raw = '' }, cb) {
if (type.toUpperCase() !== 'COMPOSE') {
return cb(new Error('"COMPOSE" is the only `type` supported'));
}
if (format.toUpperCase() !== 'YAML') {
return cb(new Error('"YAML" is the only `format` supported'));
}
let isFinished = false;
this._dockerCompose.config({
projectName: deploymentGroupName,
manifest: raw
}, (err, config = {}) => {
if (err) {
return cb(err);
}
if (isFinished) {
return;
}
isFinished = true;
const { services } = config;
if (!services || !Object.keys(services).length) {
return cb(null, []);
}
cb(null, Object.keys(services).reduce((acc, serviceName) => {
return acc.concat([{
id: Uuid(),
hash: Uuid(),
name: serviceName,
slug: ParamCase(serviceName),
instances: [],
package: {},
active: true,
image: services[serviceName].image
}]);
}, []));
});
}
getImportableDeploymentGroups (args, cb) {
if (!this._watcher) {
return cb(null, []);
}
const machines = this._watcher.getContainers();
if (!Array.isArray(machines)) {
return cb(null, []);
}
this.getDeploymentGroups({}, (err, dgs) => {
if (err) {
return cb(err);
}
const names = dgs.map(({ name }) => { return name; });
return cb(
null,
UniqBy(
machines
.filter(({ tags = {} }) => { return names.indexOf(tags[DEPLOYMENT_GROUP]) < 0; })
.filter(({ state }) => { return NON_IMPORTABLE_STATES.indexOf(state.toUpperCase()) < 0; })
.filter(({ tags = {} }) => { return [DEPLOYMENT_GROUP, SERVICE, HASH].every((name) => { return tags[name]; }); }
)
.map(({ tags = {} }) => {
return ({
id: Uuid(),
name: tags[DEPLOYMENT_GROUP],
slug: ParamCase(tags[DEPLOYMENT_GROUP])
});
}),
'slug'
)
);
});
}
importDeploymentGroup ({ deploymentGroupSlug }, cb) {
console.log(`-> import requested for ${deploymentGroupSlug}`);
if (!this._watcher) {
console.log('-> watcher not yet defined');
return cb(null, null);
}
const machines = this._watcher.getContainers();
if (!Array.isArray(machines)) {
console.log('-> no machines found');
return cb(null, null);
}
const containers = machines
.filter(
({ tags = {} }) => { return tags[DEPLOYMENT_GROUP] && ParamCase(tags[DEPLOYMENT_GROUP]) === deploymentGroupSlug; }
)
.filter(
({ state }) => { return NON_IMPORTABLE_STATES.indexOf(state.toUpperCase()) < 0; }
);
if (!containers.length) {
console.log(`-> no containers found for ${deploymentGroupSlug}`);
return cb(null, null);
}
const { tags = [] } = containers[0];
const services = containers.reduce((acc, { tags = [], id = '', state = '', name = '' }) => {
const hash = tags[HASH];
const slug = ParamCase(tags[SERVICE]);
const attr = `${hash}-${slug}`;
const instance = {
name: name,
machineId: id,
status: state.toUpperCase()
};
if (acc[attr]) {
acc[attr].instances.push(instance);
return acc;
}
return Object.assign(acc, {
[attr]: {
hash,
name: tags[SERVICE],
slug,
instances: [instance]
}
});
}, {});
const createService = (deploymentGroupId) => {
return (serviceId, next) => {
const service = services[serviceId];
console.log(`-> creating Service ${Util.inspect(service)}`);
VAsync.forEachParallel({
inputs: service.instances,
func: (instance, next) => { return this.createInstance(instance, next); }
}, (err, results) => {
if (err) {
return cb(err);
}
console.log(`-> created Instances ${Util.inspect(results.successes)}`);
this.createService(Object.assign(service, {
instances: results.successes,
deploymentGroupId
}), next);
});
};
};
const deploymentGroup = {
name: tags[DEPLOYMENT_GROUP],
slug: ParamCase(tags[DEPLOYMENT_GROUP]),
imported: true
};
console.log(`-> creating DeploymentGroup ${Util.inspect(deploymentGroup)}`);
this.createDeploymentGroup(deploymentGroup, (err, dg) => {
if (err) {
return cb(err);
}
VAsync.forEachParallel({
inputs: Object.keys(services),
func: createService(dg.id)
}, (err) => { return cb(err, dg); });
});
}
};