'use strict';
const test = require('ava').cb;
const amqp = require('amqplib').connect('amqp://rabbit');
const transit = require('../lib/index');
class Var {
constructor() {
this.value = null;
this.callback = null;
}
set(value) {
if (this.callback === null) {
this.value = value;
return;
}
this.callback(value);
this.callback = null;
}
get(callback) {
if (this.value === null) {
this.callback = callback;
return;
}
callback(this.value);
this.value = null;
}
}
const endpointName = 'anEndpoint';
const consumeName = 'Some.Name.Space';
const faultyConsumerName = 'Some.Faulty.Class';
let consumerValue = new Var();
let testBus;
let publishEndpoint;
test.before(t => {
testBus = transit('amqp://rabbit');
testBus.receiveEndpoint(endpointName)
.then(endpoint => {
endpoint.consume(consumeName, (data, done) => {
consumerValue.set(data);
done();
});
endpoint.consume(faultyConsumerName, (data, done) => {
throw new Error('an error occurred');
done();
});
})
.then(() => t.end());
publishEndpoint = testBus.publishEndpoint();
});
test.after(t => {
amqp.then(conn => {
conn
.createChannel()
.then(ch => {
Promise.all([
ch.deleteQueue(endpointName),
ch.deleteQueue(endpointName + '_error'),
ch.deleteExchange(endpointName),
ch.deleteExchange(consumeName),
ch.deleteExchange(faultyConsumerName)
]).then(() => ch.close())
.then(() => conn.close())
.then(() => t.end(), t.end);
});
});
});
test('receiveEndpoint creates a queue', t => {
amqp.then(conn => {
conn
.createChannel()
.then(ch => {
ch.checkQueue(endpointName)
.then(() => ch.close())
.then(() => t.end());
});
});
});
test('receiveEndpoint creates an exchange', t => {
amqp.then(conn => {
conn
.createChannel()
.then(ch => {
ch.checkExchange(endpointName)
.then(() => ch.close())
.then(() => t.end());
});
});
});
test('consume creates an exchange', t => {
amqp.then(conn => {
conn
.createChannel()
.then(ch => {
ch.checkExchange(consumeName)
.then(() => ch.close())
.then(() => t.end());
});
});
});
test.serial('publish to namespace triggers consumer', t => {
const content = 'hey';
amqp.then(conn => {
conn
.createChannel()
.then(ch => {
ch.publish(endpointName, endpointName, Buffer.from(content, 'utf8'), {contentType: 'application/vnd.masstransit+json'});
ch.close();
});
});
consumerValue.get(data => {
t.is(data.content.toString('utf8'), content);
t.is(data.properties.contentType, 'application/vnd.masstransit+json');
t.end();
});
});
test.serial('publish with message triggers consumer', t => {
const content = 'hey';
publishEndpoint
.then(endpoint => endpoint.publish(endpointName, Buffer.from(content, 'utf8')));
consumerValue.get(data => {
t.is(data.content.toString('utf8'), content);
t.is(data.properties.contentType, 'application/vnd.masstransit+json');
t.end();
});
});
test.serial('consumer fault adds to error queue', t => {
const content = 'hey';
publishEndpoint
.then(endpoint => endpoint.publish(faultyConsumerName, Buffer.from(content, 'utf8')));
setTimeout(() => amqp.then(conn => {
conn
.createChannel()
.then(ch => {
ch.checkQueue(endpointName + '_error')
.then(() => {
ch.get(endpointName + '_error')
.then(err => {
t.is(err.content.toString('utf8'), content);
t.is(err.properties.contentType, 'application/vnd.masstransit+json');
ch.ack(err);
})
.then(() => ch.close())
.then(() => t.end(), t.end);
});
});
}), 1000);
});