'use strict';
module.exports = class ReceiveEndpoint {
constructor(conn, name) {
this.conn = conn;
this.name = name;
this.errName = name + '_error';
}
bind() {
return new Promise((resolve, reject) => {
this.conn.createChannel()
.then(ch => {
Promise.all([
ch.assertQueue(this.name),
ch.assertExchange(this.name, 'fanout'),
ch.bindQueue(this.name, this.name, '*')
]).then(() => ch.close(), reject)
.then(() => resolve(this), reject);
}, reject);
});
}
consume(namespace, callback) {
return new Promise((resolve, reject) => {
this.conn.createChannel()
.then(ch => {
Promise.all([
ch.assertExchange(namespace, 'fanout'),
ch.bindExchange(this.name, namespace)
]).then(() => {
ch.consume(this.name, msg => {
if (msg !== null) {
try {
callback(msg, () => ch.ack(msg));
} catch (err) {
ch.assertQueue(this.errName)
.then(() => ch.sendToQueue(this.errName, Buffer.from(msg.content, 'utf8'), {contentType: 'application/vnd.masstransit+json'}))
.then(() => ch.ack(msg));
}
}
});
resolve();
}, reject);
}, reject);
});
}
};