multiple bug fixes (#528)

* fix(portal-data): don't fallback on service instances

* feat(portal-data): run delete service in background

* fix(portal-watch): throtle (by dg-service) changes resolver

* feat(portal-watch): resolve all machines on start

* fix(portal-watch): add missing dependency
This commit is contained in:
Sérgio Ramos 2017-06-28 16:04:34 +01:00 committed by Wyatt Preul
parent 8e56cb0ada
commit 48549e5d38
5 changed files with 94 additions and 58 deletions

View File

@ -1101,42 +1101,42 @@ module.exports = class Data extends EventEmitter {
return cb();
}
const instanceIds = services.reduce((instanceIds, service) => {
return instanceIds.concat(service.instance_ids);
}, []);
VAsync.forEachParallel({
func: (instanceId, next) => {
this._db.instances.get(instanceId, (err, instance) => {
if (err) {
return next(err);
}
const container = this._docker.getContainer(instance.machine_id.split(/-/)[0]);
// Use force in case the container is running. TODO: should we keep force?
container.remove({ force: true }, next);
});
},
inputs: instanceIds
}, (err, results) => {
inputs: ids,
func: (serviceId, next) => {
this.updateService({
id: serviceId,
active: false
}, next);
}
}, (err) => {
if (err) {
return cb(err);
}
VAsync.forEachParallel({
inputs: ids,
func: (serviceId, next) => {
this.updateService({
id: serviceId,
active: false
});
}
}, (err) => {
if (err) {
return cb(err);
}
this.getServices({ ids }, cb);
this.getServices({ ids }, cb);
const instanceIds = services.reduce((instanceIds, service) => {
return instanceIds.concat(service.instance_ids);
}, []);
VAsync.forEachParallel({
func: (instanceId, next) => {
this._db.instances.get(instanceId, (err, instance) => {
if (err) {
return next(err);
}
const container = this._docker.getContainer(instance.machine_id.split(/-/)[0]);
// Use force in case the container is running. TODO: should we keep force?
container.remove({ force: true }, next);
});
},
inputs: instanceIds
}, (err, results) => {
if (err) {
console.error(err);
}
});
});
});

View File

@ -71,7 +71,7 @@ exports.toService = function (clientService) {
name: clientService.name,
slug: clientService.slug,
environment: clientService.environment || {},
instance_ids: clientService.instances ? clientService.instances.map((instance) => { return instance.id; }) : [],
instance_ids: clientService.instances ? clientService.instances.map((instance) => { return instance.id; }) : undefined,
service_dependency_ids: clientService.connections || [],
package_id: clientService.package ? clientService.package.id : '',
parent_id: clientService.parent || '',

View File

@ -1,6 +1,7 @@
'use strict';
// const Assert = require('assert');
const Throat = require('throat');
const TritonWatch = require('triton-watch');
const util = require('util');
@ -28,7 +29,15 @@ module.exports = class Watcher {
}
});
this._queues = {};
this._tritonWatch.on('change', (container) => { return this.onChange(container); });
this._tritonWatch.on('all', (containers) => {
containers.forEach((container) => {
this.onChange(container);
});
});
}
poll () {
@ -39,6 +48,18 @@ module.exports = class Watcher {
return this._tritonWatch.getContainers();
}
pushToQueue ({ serviceName, deploymentGroupId }, cb) {
const name = `${deploymentGroupId}-${serviceName}`;
if (this._queues[name]) {
this._queues[name](cb);
return;
}
this._queues[name] = Throat(1);
this._queues[name](cb);
}
getDeploymentGroupId (name, cb) {
this._data.getDeploymentGroup({ name }, (err, deploymentGroup) => {
if (err) {
@ -69,7 +90,7 @@ module.exports = class Watcher {
.catch((err) => { return cb(err); });
}
resolveChanges ({ machine, service, instances }) {
resolveChanges ({ machine, service, instances }, cb) {
// 1. if instance doesn't exist, create new
// 2. if instance exist, update status
@ -93,15 +114,21 @@ module.exports = class Watcher {
.filter(({ machineId }) => { return machine.id === machineId; })
.pop();
const updateService = (updatedService) => {
const updateService = (updatedService, cb) => {
console.log('-> updating service', util.inspect(updatedService));
return this._data.updateService(updatedService, handleError);
return this._data.updateService(updatedService, handleError(cb));
};
const create = () => {
const create = (cb) => {
const status = (machine.state || '').toUpperCase();
if (status === 'DELETED') {
return cb();
}
const instance = {
name: machine.name,
status: (machine.state || '').toUpperCase(),
status,
machineId: machine.id
};
@ -110,11 +137,11 @@ module.exports = class Watcher {
return updateService({
id: service.id,
instances: instances.concat(instance)
});
}, cb);
}));
};
const update = () => {
const update = (cb) => {
const updatedInstance = {
id: instance.id,
status: (machine.state || '').toUpperCase()
@ -123,7 +150,7 @@ module.exports = class Watcher {
console.log('-> updating instance', util.inspect(updatedInstance));
return this._data.updateInstance(updatedInstance, handleError(() => {
if (['DELETED', 'DESTROYED'].indexOf(machine.state.toUpperCase()) < 0) {
return;
return cb();
}
return setTimeout(() => {
@ -132,14 +159,14 @@ module.exports = class Watcher {
instances: instances.filter(({ id }) => {
return id !== instance.id;
})
});
}, this._frequency * 4);
}, cb);
}, this._frequency * 3);
}));
};
return isNew ?
create() :
update();
create(cb) :
update(cb);
}
onChange (machine) {
@ -182,26 +209,30 @@ module.exports = class Watcher {
};
};
const getInstances = (service) => {
const getInstances = (service, cb) => {
this.getInstances(service, handleError((instances) => {
return this.resolveChanges({
machine,
service,
instances
});
}, cb);
}));
};
// assert that service exists
const assertService = (deploymentGroupId) => {
this.getService({ serviceName, deploymentGroupId }, handleError((service) => {
if (!service) {
console.error(`Service "${serviceName}" form DeploymentGroup "${deploymentGroupName}" for machine ${id} not found`);
return;
}
this.pushToQueue({ serviceName, deploymentGroupId }, () => {
return new Promise((resolve) => {
this.getService({ serviceName, deploymentGroupId }, handleError((service) => {
if (!service) {
console.error(`Service "${serviceName}" form DeploymentGroup "${deploymentGroupName}" for machine ${id} not found`);
return;
}
getInstances(service);
}));
getInstances(service, resolve);
}));
});
});
};
// assert that project managed by this portal

View File

@ -12,6 +12,7 @@
"test-ci": "echo 0 `# lab -c -r console -o stdout -r tap -o $CIRCLE_TEST_REPORTS/test/portal-watch.xml`"
},
"dependencies": {
"throat": "^4.0.0",
"triton-watch": "^1.1.0"
},
"devDependencies": {

View File

@ -895,8 +895,8 @@ babel-plugin-istanbul@^4.1.4:
test-exclude "^4.1.1"
babel-plugin-styled-components@^1.1.4:
version "1.1.5"
resolved "https://registry.yarnpkg.com/babel-plugin-styled-components/-/babel-plugin-styled-components-1.1.5.tgz#ff2c8e0e2f3a0d3279e7454a7aaa2973749e714d"
version "1.1.6"
resolved "https://registry.yarnpkg.com/babel-plugin-styled-components/-/babel-plugin-styled-components-1.1.6.tgz#72ffdc1be310f0521af3b574bffedd730405cda9"
dependencies:
stylis "^3.0.19"
@ -3165,13 +3165,13 @@ encoding@^0.1.11:
dependencies:
iconv-lite "~0.4.13"
end-of-stream@1.0.0:
end-of-stream@1.0.0, end-of-stream@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/end-of-stream/-/end-of-stream-1.0.0.tgz#d4596e702734a93e40e9af864319eabd99ff2f0e"
dependencies:
once "~1.3.0"
end-of-stream@^1.0.0, end-of-stream@^1.1.0:
end-of-stream@^1.1.0:
version "1.4.0"
resolved "https://registry.yarnpkg.com/end-of-stream/-/end-of-stream-1.4.0.tgz#7a90d833efda6cfa6eac0f4949dbb0fad3a63206"
dependencies:
@ -3707,11 +3707,11 @@ extsprintf@1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/extsprintf/-/extsprintf-1.0.2.tgz#e1080e0658e300b06294990cc70e1502235fd550"
extsprintf@1.2.0, extsprintf@^1.2.0:
extsprintf@1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/extsprintf/-/extsprintf-1.2.0.tgz#5ad946c22f5b32ba7f8cd7426711c6e8a3fc2529"
extsprintf@1.3.0:
extsprintf@1.3.0, extsprintf@^1.2.0:
version "1.3.0"
resolved "https://registry.yarnpkg.com/extsprintf/-/extsprintf-1.3.0.tgz#96918440e3041a7a414f8c52e3c574eb3c3e1e05"
@ -8998,6 +8998,10 @@ thenify-all@^1.0.0:
dependencies:
any-promise "^1.0.0"
throat@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/throat/-/throat-4.0.0.tgz#e8d397aeb3f335c3bae404a83dc264b813a41e1b"
through2@^0.6.1, through2@^0.6.3, through2@~0.6.1:
version "0.6.5"
resolved "https://registry.yarnpkg.com/through2/-/through2-0.6.5.tgz#41ab9c67b29d57209071410e1d7a7a968cd3ad48"