mirror of
https://github.com/11notes/docker-traefik-labels.git
synced 2026-03-11 20:43:43 -05:00
main/worker model for better scalability
This commit is contained in:
@@ -89,6 +89,7 @@ docker run --name traefik-labels \
|
||||
* Only use rootless container runtime (podman, rootless docker)
|
||||
* Allow non-root ports < 1024 via `echo "net.ipv4.ip_unprivileged_port_start=53" > /etc/sysctl.d/ports.conf`
|
||||
* Use a reverse proxy like Traefik, Nginx to terminate TLS with a valid certificate
|
||||
* Use Let’s Encrypt certificates to protect your SSL endpoints
|
||||
|
||||
# DISCLAIMERS
|
||||
* <sup>1</sup> For TLS to work you need proper certificates in place for your dockerd and your clients. The CN in the certificate needs to match the FQDN or IP you have set on the docker node, you can set multiple by using SAN. See an example of a daemon.json configuration to enable TLS.
|
||||
|
||||
@@ -16,8 +16,8 @@ labels:
|
||||
# timeout in seconds for the connection to a Docker node
|
||||
timeout: 2.5
|
||||
rfc2136:
|
||||
# only nsupdate on entries which are different (remove existing entry)
|
||||
update-only: true
|
||||
# only nsupdate on entries which are different (do not update same data)
|
||||
verify: false
|
||||
poll:
|
||||
# polling all containers on a node every {n} seconds
|
||||
interval: 300
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
const { fork } = require('node:child_process');
|
||||
const fs = require('fs');
|
||||
const Docker = require('dockerode');
|
||||
const yaml = require('js-yaml');
|
||||
const redis = require('redis');
|
||||
const { nsupdate } = require('./nsupdate');
|
||||
@@ -8,98 +8,47 @@ const { elevenLogJSON } = require('/labels/lib/util.js');
|
||||
|
||||
process
|
||||
.on('unhandledRejection', (e, p) => {
|
||||
elevenLogJSON('error', JSON.stringify({rejection:{exception:e.toString()}}));
|
||||
elevenLogJSON('error', {unhandledRejection:e.toString()});
|
||||
})
|
||||
.on('uncaughtException', e => {
|
||||
elevenLogJSON('error', JSON.stringify({exception:{exception:e.toString()}}));
|
||||
elevenLogJSON('error', {uncaughtException:e.toString()});
|
||||
});
|
||||
|
||||
|
||||
class Labels{
|
||||
#config = yaml.load(fs.readFileSync(`${process.env.APP_ROOT}/etc/config.yaml`, 'utf8'))?.labels;
|
||||
#defaults = {
|
||||
interval:0,
|
||||
timeout:2.5,
|
||||
tls:{
|
||||
ca:`${process.env.APP_ROOT}/ssl/ca.crt`,
|
||||
crt:`${process.env.APP_ROOT}/ssl/labels.crt`,
|
||||
key:`${process.env.APP_ROOT}/ssl/labels.key`,
|
||||
port:2376
|
||||
},poll:{interval:300}, ping:{interval:2.5}, redis:{url:'rediss://localhost:6379/0'}, rfc2136:{'update-only':false}};
|
||||
#intervals = {ping:false, poll:false, nodes:false};
|
||||
#loops = {ping:false, poll:false, nodes:false};
|
||||
#config = {webhook:{headers:{'Content-Type':'application/json'}}};
|
||||
#workers = {};
|
||||
#interval = {run:false, fok:false};
|
||||
#redis;
|
||||
#nodes = {};
|
||||
#tls = {ca:'', crt:'', key:''};
|
||||
|
||||
constructor(){
|
||||
this.#tls.ca = fs.readFileSync(this.#config?.tls?.ca || this.#defaults.tls.ca);
|
||||
this.#tls.crt = fs.readFileSync(this.#config?.tls?.crt || this.#defaults.tls.crt);
|
||||
this.#tls.key = fs.readFileSync(this.#config?.tls?.key || this.#defaults.tls.key);
|
||||
const config = yaml.load(fs.readFileSync(`${process.env.APP_ROOT}/etc/config.yaml`, 'utf8'))?.labels;
|
||||
this.#config.redis = {url:(config?.redis?.url || 'rediss://localhost:6379/0')};
|
||||
this.#config.webhook.url = (config?.webhook?.url || null);
|
||||
if(this.#config.webhook.url && config?.webhook?.auth?.basic){
|
||||
this.#config.webhook.headers['Authorization'] = 'Basic ' + Buffer.from(config.webhook.auth.basic).toString('base64');
|
||||
elevenLogJSON('info', `using webhook ${this.#config.webhook.url} with basic authentication`);
|
||||
}
|
||||
this.#config.rfc2136 = {verify:(config?.rfc2136?.verify || false)};
|
||||
this.#config.poll = {interval:(config?.poll?.interval || 300)};
|
||||
this.#config.ping = {interval:(config?.ping?.interval || 2.5)};
|
||||
this.#config.port = (config?.port || 2376);
|
||||
this.#config.timeout = (config?.timeout || 5);
|
||||
this.#config.interval = (config?.interval || 0);
|
||||
this.#config.tls = {
|
||||
ca:(config?.tls?.ca || `${process.env.APP_ROOT}/ssl/ca.crt`),
|
||||
crt:(config?.tls?.crt || `${process.env.APP_ROOT}/ssl/labels.crt`),
|
||||
key:(config?.tls?.key || `${process.env.APP_ROOT}/ssl/labels.key`),
|
||||
};
|
||||
|
||||
this.#loadNodes(true);
|
||||
|
||||
if(this.#config?.webhook?.url){
|
||||
this.#config.webhook.headers = {'Content-Type':'application/json'};
|
||||
switch(true){
|
||||
case this.#config?.webhook?.auth?.basic:
|
||||
this.#config.webhook.headers['Authorization'] = 'Basic ' + Buffer.from(this.#config.webhook.auth.basic).toString('base64');
|
||||
break;
|
||||
}
|
||||
if(this.#config.interval > 0){
|
||||
elevenLogJSON('info', `${process.env.APP_ROOT}/etc/config.yaml will reload labels.nodes every ${this.#config.interval}s`);
|
||||
}
|
||||
}
|
||||
|
||||
async #loadNodes(init){
|
||||
if(!this.#intervals.nodes && (this.#config?.interval || this.#defaults.interval) > 0){
|
||||
this.#intervals.nodes = true;
|
||||
setInterval(async() => {
|
||||
if(!this.#loops.nodes){
|
||||
this.#loops.nodes = true;
|
||||
try{
|
||||
elevenLogJSON('info', `reload nodes from config.yaml`);
|
||||
this.#config = yaml.load(fs.readFileSync(`${process.env.APP_ROOT}/etc/config.yaml`, 'utf8'))?.labels;
|
||||
await this.#loadNodes(false);
|
||||
}catch(e){
|
||||
elevenLogJSON('error', JSON.stringify({nodes:{exception:e.toString()}}));
|
||||
}finally{
|
||||
this.#loops.nodes = false;
|
||||
}
|
||||
}
|
||||
}, (this.#config?.interval || this.#defaults.interval)*1000);
|
||||
}
|
||||
|
||||
for(const node of this.#config?.nodes){
|
||||
if(!this.#nodes[node]){
|
||||
this.#nodes[node] = new Docker({
|
||||
protocol:'https',
|
||||
host:node,
|
||||
port:this.#config?.tls?.port || this.#defaults.tls.port,
|
||||
ca:this.#tls.ca,
|
||||
cert:this.#tls.crt,
|
||||
key:this.#tls.key,
|
||||
timeout:(this.#config?.timeout || this.#defaults.timeout)*1000
|
||||
});
|
||||
this.#nodes[node].labels = {ping:false, firstConnect:init};
|
||||
}
|
||||
}
|
||||
|
||||
for(const pnode in this.#nodes){
|
||||
let valid = false;
|
||||
for(const node of this.#config?.nodes){
|
||||
if(pnode === node){
|
||||
valid = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!valid){
|
||||
elevenLogJSON('info', `removed node [${pnode}] from configuration`);
|
||||
delete(this.#nodes[pnode]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async watch(){
|
||||
async run(){
|
||||
this.#redis = await redis.createClient({
|
||||
url:this.#config?.redis?.url || this.#defaults.redis.url,
|
||||
url:this.#config.redis.url,
|
||||
pingInterval:30000,
|
||||
socket:{
|
||||
rejectUnauthorized:false,
|
||||
@@ -110,173 +59,131 @@ class Labels{
|
||||
|
||||
this.#redis.on('ready', async()=>{
|
||||
elevenLogJSON('info', `connected to redis`);
|
||||
await this.#ping();
|
||||
elevenLogJSON('info', `start initial poll after container start`);
|
||||
await this.#poll();
|
||||
|
||||
this.#run();
|
||||
|
||||
if(this.#config.interval > 0){
|
||||
setInterval(async ()=>{
|
||||
if(!this.#interval.run){
|
||||
this.#interval.run = true;
|
||||
try{
|
||||
await this.#run();
|
||||
}catch(e){
|
||||
elevenLogJSON('error', {error:e});
|
||||
}finally{
|
||||
this.#interval.run = false;
|
||||
}
|
||||
}
|
||||
}, this.#config.interval*1000);
|
||||
}
|
||||
|
||||
setInterval(async()=>{
|
||||
if(!this.#interval.fork){
|
||||
this.#interval.fork = true;
|
||||
try{
|
||||
await this.#fork();
|
||||
}catch(e){
|
||||
elevenLogJSON('error', {error:e});
|
||||
}finally{
|
||||
this.#interval.fork = false;
|
||||
}
|
||||
}
|
||||
}, this.#config.ping.interval*1000);
|
||||
});
|
||||
|
||||
this.#redis.on('error', error =>{
|
||||
elevenLogJSON('error', JSON.stringify({redis:{exception:error.toString()}}));
|
||||
elevenLogJSON('error', {redis:error.toString()});
|
||||
});
|
||||
|
||||
this.#redis.connect();
|
||||
}
|
||||
|
||||
async #ping(){
|
||||
if(!this.#intervals.ping){
|
||||
this.#intervals.ping = true;
|
||||
setInterval(async() => {
|
||||
if(!this.#loops.ping){
|
||||
this.#loops.ping = true;
|
||||
try{
|
||||
await this.#ping()
|
||||
}catch(e){
|
||||
elevenLogJSON('error', JSON.stringify({ping:{exception:e.toString()}}));
|
||||
}finally{
|
||||
this.#loops.ping = false;
|
||||
}
|
||||
}
|
||||
}, (this.#config?.ping?.interval || this.#defaults.ping.interval)*1000);
|
||||
#run(){
|
||||
const nodes = yaml.load(fs.readFileSync(`${process.env.APP_ROOT}/etc/config.yaml`, 'utf8'))?.labels?.nodes;
|
||||
for(const node of nodes){
|
||||
if(!this.#workers[node]){
|
||||
this.#workers[node] = new Worker(this.#config, node, this);
|
||||
this.#workers[node].fork();
|
||||
elevenLogJSON('info', `created new worker for node [${node}]`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(const node in this.#nodes){
|
||||
try{
|
||||
await this.#nodes[node].ping();
|
||||
if(!this.#nodes[node].labels.ping){
|
||||
elevenLogJSON('info', `connected to node [${node}]`);
|
||||
this.#nodes[node].getEvents({}, (error, data) => {
|
||||
if(!error){
|
||||
data.on('data', async(chunk) =>{
|
||||
const event = JSON.parse(chunk.toString('utf8'));
|
||||
if(/Container/i.test(event?.Type) && /^(start|die)$/i.test(event?.status)){
|
||||
await this.#inspect(node, event.id, event.status);
|
||||
}
|
||||
});
|
||||
async #fork(){
|
||||
for(const node in this.#workers){
|
||||
if(!this.#workers[node].run){
|
||||
await this.#workers[node].fork();
|
||||
if(!this.#workers[node].log.disconnect){
|
||||
this.#workers[node].log.disconnect = true;
|
||||
elevenLogJSON('info', `trying to fork existing worker for node [${node}] after disconnect`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async inspect(container){
|
||||
try{
|
||||
const counter = {
|
||||
add:0,
|
||||
del:0,
|
||||
};
|
||||
const rfc2136 = {
|
||||
WAN:{server:'', key:'', commands:[]},
|
||||
LAN:{server:'', key:'', commands:[]},
|
||||
}
|
||||
|
||||
for(const label in container.labels){
|
||||
switch(true){
|
||||
case /traefik\//i.test(label):
|
||||
if(container.start){
|
||||
counter.add++;
|
||||
await this.#redis.set(label, container.labels[label], {EX:this.#config.poll.interval + 30});
|
||||
}else{
|
||||
elevenLogJSON('error', JSON.stringify({getEvents:{exception:error.toString()}}));
|
||||
counter.del++;
|
||||
await this.#redis.del(label);
|
||||
}
|
||||
});
|
||||
}
|
||||
this.#nodes[node].labels.ping = true;
|
||||
}catch(e){
|
||||
if(this.#nodes[node].labels.ping && !this.#nodes[node].labels.firstConnect){
|
||||
elevenLogJSON('warning', `connection to node [${node}] lost!`);
|
||||
}else if(this.#nodes[node].labels.firstConnect){
|
||||
this.#nodes[node].labels.firstConnect = false;
|
||||
elevenLogJSON('warning', `connection to node [${node}] failed!`);
|
||||
}
|
||||
this.#nodes[node].labels.ping = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
async #poll(){
|
||||
if(!this.#intervals.poll){
|
||||
this.#intervals.poll = true;
|
||||
setInterval(async() => {
|
||||
if(!this.#loops.poll){
|
||||
this.#loops.poll = true;
|
||||
try{
|
||||
await this.#poll()
|
||||
}catch(e){
|
||||
elevenLogJSON('error', JSON.stringify({poll:{exception:e.toString()}}));
|
||||
}finally{
|
||||
this.#loops.poll = false;
|
||||
}
|
||||
}
|
||||
}, (this.#config?.poll?.interval || this.#defaults.poll.interval)*1000);
|
||||
}
|
||||
|
||||
for(const node in this.#nodes){
|
||||
if(this.#nodes[node].labels.ping){
|
||||
try{
|
||||
await this.#nodes[node].listContainers((error, containers) => {
|
||||
elevenLogJSON('info', `poll started on node [${node}]`);
|
||||
if(!error){
|
||||
containers.forEach(async(container) => {
|
||||
await this.#inspect(node, container.Id, 'poll');
|
||||
});
|
||||
}
|
||||
});
|
||||
}catch(e){
|
||||
elevenLogJSON('error', JSON.stringify({listContainers:{exception:e.toString()}}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async #inspect(node, id, event){
|
||||
return(new Promise((resolve, reject) => {
|
||||
const container = this.#nodes[node].getContainer(id);
|
||||
container.inspect(async(error, data) => {
|
||||
if(!error){
|
||||
const container = {
|
||||
name:(data?.Name || data?.id).replace(/^\//i, ''),
|
||||
event:event,
|
||||
run:(/start|poll/i.test(event)) ? true : false,
|
||||
labels:{
|
||||
traefik:[],
|
||||
rfc2136:[],
|
||||
},
|
||||
};
|
||||
|
||||
elevenLogJSON('info', `[${node}] container [${container.name}] event [${container.event}]`);
|
||||
|
||||
const rfc2136 = {
|
||||
WAN:{server:'', key:'', commands:[]},
|
||||
LAN:{server:'', key:'', commands:[]},
|
||||
}
|
||||
|
||||
for(const label in data?.Config?.Labels){
|
||||
case /rfc2136\//i.test(label):
|
||||
const type = ((label.match(/rfc2136\/WAN\//i)) ? 'WAN' : 'LAN');
|
||||
switch(true){
|
||||
case /traefik\//i.test(label):
|
||||
if(container.run){
|
||||
await this.#redis.set(label, data.Config.Labels[label], {EX:(this.#config?.poll?.interval || this.#defaults.poll.interval) + 30});
|
||||
}else{
|
||||
await this.#redis.del(label);
|
||||
}
|
||||
container.labels.traefik[label] = data.Config.Labels[label];
|
||||
case /rfc2136\/\S+\/server/i.test(label):
|
||||
rfc2136[type].server = container.labels[label];
|
||||
break;
|
||||
|
||||
case /rfc2136\//i.test(label):
|
||||
container.labels.rfc2136[label] = data.Config.Labels[label];
|
||||
const type = ((label.match(/rfc2136\/WAN\//i)) ? 'WAN' : 'LAN');
|
||||
switch(true){
|
||||
case /rfc2136\/\S+\/server/i.test(label):
|
||||
rfc2136[type].server = data.Config.Labels[label];
|
||||
break;
|
||||
|
||||
case /rfc2136\/\S+\/key/i.test(label):
|
||||
rfc2136[type].key = data.Config.Labels[label];
|
||||
break;
|
||||
|
||||
default:
|
||||
if(!container.run){
|
||||
data.Config.Labels[label] = data.Config.Labels[label].replace(/update add/i, 'update delete');
|
||||
}
|
||||
rfc2136[type].commands.push(data.Config.Labels[label]);
|
||||
}
|
||||
case /rfc2136\/\S+\/key/i.test(label):
|
||||
rfc2136[type].key = container.labels[label];
|
||||
break;
|
||||
|
||||
default:
|
||||
if(!container.start){
|
||||
container.labels[label] = container.labels[label].replace(/update add/i, 'update delete');
|
||||
}
|
||||
rfc2136[type].commands.push(container.labels[label]);
|
||||
}
|
||||
}
|
||||
|
||||
if(rfc2136.LAN.commands.length > 0 || rfc2136.WAN.commands.length){
|
||||
await this.#rfc2136(rfc2136);
|
||||
}
|
||||
|
||||
if(this.#config?.webhook?.url){
|
||||
await this.#webhook(container);
|
||||
}
|
||||
break;
|
||||
}
|
||||
})
|
||||
resolve(true);
|
||||
}));
|
||||
}
|
||||
|
||||
if(rfc2136.LAN.commands.length > 0 || rfc2136.WAN.commands.length){
|
||||
await this.#rfc2136(rfc2136);
|
||||
}
|
||||
|
||||
if(this.#config?.webhook?.url){
|
||||
await this.#webhook(container);
|
||||
}
|
||||
|
||||
elevenLogJSON('info', `[${container.worker.node}] container [${container.name}] event [${container.event}]; Traefik: add ${counter.add} / del ${counter.del}; rfc2136: WAN ${rfc2136.WAN.commands.length} / LAN ${rfc2136.LAN.commands.length}`);
|
||||
|
||||
}catch(e){
|
||||
elevenLogJSON('error', {inspect:e.toString(), exception:e});
|
||||
}
|
||||
}
|
||||
|
||||
async #rfc2136(rfc2136){
|
||||
for(const type in rfc2136){
|
||||
if(rfc2136[type].commands.length > 0 && rfc2136[type].server && rfc2136[type].key){
|
||||
if(this.#config?.rfc2136?.['update-only'] || this.#defaults?.rfc2136?.['update-only']){
|
||||
if(this.#config.rfc2136.verify){
|
||||
for(let i=0; i<rfc2136[type].commands.length; i++){
|
||||
if(await this.#rfc2136knownRecord(rfc2136[type].server, rfc2136[type].commands[i])){
|
||||
rfc2136[type].commands.splice(i, 1);
|
||||
@@ -288,7 +195,7 @@ class Labels{
|
||||
await nsupdate(rfc2136[type].server, rfc2136[type].key, rfc2136[type].commands);
|
||||
}
|
||||
}catch(e){
|
||||
elevenLogJSON('error', JSON.stringify({nsupdate:{exception:e.toString()}}));
|
||||
elevenLogJSON('error', {nsupdate:{exception:e.toString()}});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -309,12 +216,63 @@ class Labels{
|
||||
async #webhook(container){
|
||||
try{
|
||||
await fetch(this.#config.webhook.url, {method:(
|
||||
(container.run) ? 'PUT' : 'DELETE'
|
||||
(container.start) ? 'PUT' : 'DELETE'
|
||||
), body:JSON.stringify(container), headers:this.#config.webhook.headers, signal:AbortSignal.timeout(2500)});
|
||||
}catch(e){
|
||||
elevenLogJSON('error', JSON.stringify({webhook:{exception:e.toString()}}));
|
||||
elevenLogJSON('error', {webhook:{exception:e.toString()}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
new Labels().watch();
|
||||
class Worker{
|
||||
run = false;
|
||||
#fork;
|
||||
#config;
|
||||
#parent;
|
||||
log = {
|
||||
disconnect:false,
|
||||
}
|
||||
|
||||
constructor(config, node, parent){
|
||||
this.#config = {
|
||||
tls:config.tls,
|
||||
poll:config.poll.interval,
|
||||
ping:config.ping.interval,
|
||||
port:config.port,
|
||||
node:node,
|
||||
};
|
||||
this.#parent = parent;
|
||||
}
|
||||
|
||||
async fork(){
|
||||
return(new Promise((resolve, reject) => {
|
||||
this.#fork = fork(`${process.env.APP_ROOT}/worker.js`, [JSON.stringify(this.#config)], {stdio: 'inherit'});
|
||||
this.#fork.on('spawn', () =>{
|
||||
this.run = true;
|
||||
resolve();
|
||||
});
|
||||
this.#fork.on('error', () =>{
|
||||
this.run = false;
|
||||
reject();
|
||||
});
|
||||
this.#fork.on('close', (code) =>{
|
||||
this.run = false;
|
||||
});
|
||||
this.#fork.on('message', (message) =>{
|
||||
if(message.error){
|
||||
if(!this.log.disconnect){
|
||||
elevenLogJSON('error', {fork:message.error});
|
||||
}
|
||||
}else{
|
||||
this.run = true;
|
||||
this.log.disconnect = false;
|
||||
if(message?.labels){
|
||||
this.#parent.inspect.apply(this.#parent, [message]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
new Labels().run();
|
||||
146
rootfs/labels/worker.js
Normal file
146
rootfs/labels/worker.js
Normal file
@@ -0,0 +1,146 @@
|
||||
const fs = require('fs');
|
||||
const Docker = require('dockerode');
|
||||
const args = process.argv.slice(2);
|
||||
|
||||
process
|
||||
.on('unhandledRejection', (e, p) => {
|
||||
console.error(e, p);
|
||||
process.send({error:e.toString()});
|
||||
})
|
||||
.on('uncaughtException', e => {
|
||||
console.error(e);
|
||||
process.send({error:e.toString()});
|
||||
});
|
||||
|
||||
class Worker{
|
||||
#config;
|
||||
#docker;
|
||||
#interval = {poll:false, ping:false};
|
||||
|
||||
constructor(config){
|
||||
this.#config = config;
|
||||
this.#docker = new Docker({
|
||||
protocol:'https',
|
||||
host:this.#config.node,
|
||||
port:this.#config.port,
|
||||
ca:fs.readFileSync(this.#config.tls.ca),
|
||||
cert:fs.readFileSync(this.#config.tls.crt),
|
||||
key:fs.readFileSync(this.#config.tls.key),
|
||||
timeout:parseInt(this.#config.timeout*1000)
|
||||
});
|
||||
}
|
||||
|
||||
async run(){
|
||||
try{
|
||||
this.#docker.getEvents({}, (error, data) => {
|
||||
if(!error){
|
||||
data.on('data', async(chunk) =>{
|
||||
const response = chunk.toString('utf8').replace(',"ti"', '');
|
||||
try{
|
||||
const event = JSON.parse(response);
|
||||
if(/Container/i.test(event?.Type) && /^(start|die)$/i.test(event?.status)){
|
||||
this.#labels(event.id, event.status);
|
||||
}
|
||||
}catch(e){
|
||||
//JSON parse error
|
||||
}
|
||||
});
|
||||
}else{
|
||||
process.send({error:error.toString()});
|
||||
}
|
||||
});
|
||||
|
||||
setInterval(async()=>{
|
||||
if(!this.#interval.poll){
|
||||
this.#interval.poll = true;
|
||||
try{
|
||||
await this.#poll();
|
||||
}catch(e){
|
||||
process.send({error:e.toString()});
|
||||
}finally{
|
||||
this.#interval.poll = false;
|
||||
}
|
||||
}
|
||||
}, this.#config.poll*1000);
|
||||
|
||||
setInterval(async()=>{
|
||||
if(!this.#interval.ping){
|
||||
this.#interval.ping = true;
|
||||
try{
|
||||
await this.#ping();
|
||||
}catch(e){
|
||||
process.send({error:e.toString()});
|
||||
}finally{
|
||||
this.#interval.ping = false;
|
||||
}
|
||||
}
|
||||
}, this.#config.ping*1000);
|
||||
|
||||
await this.#ping();
|
||||
await this.#poll();
|
||||
|
||||
}catch(e){
|
||||
process.send({error:e.toString()});
|
||||
}
|
||||
}
|
||||
|
||||
async #poll(){
|
||||
try{
|
||||
await this.#docker.listContainers((error, containers) => {
|
||||
if(!error){
|
||||
containers.forEach(async(container) => {
|
||||
this.#labels(container.Id, 'poll');
|
||||
});
|
||||
}else{
|
||||
process.send({error:e.toString()});
|
||||
}
|
||||
});
|
||||
}catch(e){
|
||||
process.send({error:e.toString()});
|
||||
}
|
||||
}
|
||||
|
||||
async #ping(){
|
||||
try{
|
||||
await this.#docker.ping();
|
||||
}catch(e){
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
async #labels(id, event){
|
||||
try{
|
||||
const container = this.#docker.getContainer(id);
|
||||
container.inspect(async(error, data) => {
|
||||
if(!error){
|
||||
const result = {
|
||||
name:(data?.Name || data?.id).replace(/^\//i, ''),
|
||||
event:event,
|
||||
labels:{},
|
||||
start:(/start|poll/i.test(event)) ? true : false,
|
||||
worker:this.#config.node,
|
||||
};
|
||||
for(const label in data?.Config?.Labels){
|
||||
if(/traefik\/|rfc2136\//i.test(label)){
|
||||
result.labels[label] = data.Config.Labels[label];
|
||||
}
|
||||
}
|
||||
process.send(result);
|
||||
}else{
|
||||
throw(error);
|
||||
}
|
||||
});
|
||||
}catch(e){
|
||||
process.send({error:e.toString()});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(Array.isArray(args) && args.length > 0){
|
||||
try{
|
||||
new Worker(JSON.parse(args)).run();
|
||||
}catch(e){
|
||||
process.send({error:e.toString()});
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user