Firt of all ,Let’s take a look the following receive code,You will find that If the message is 1,The consumer will sleep 1 seconds, if the message is 10 ,The consumer will sleep 10 seconds.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30TasksDistributeLimit(args, "DirectTestQueue");
static void TasksDistributeLimit(string[] args, string queueName)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
// channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine("queue:" + queueName);
Console.WriteLine("[x] Received {0}", message);
int len = int.Parse(message);
Console.WriteLine("Sleep:{0} seconds", len);
Thread.Sleep(len*1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //Manually confirm message
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
RabbitMQ_Consumer Acknowledgements and Publisher Confirms(6)
Background:
Sometimes the consumers may crash or have connection issues. In this case,We may miss the queue messages if we set the acknowledgement to auto confirm, Please ses the following code:
Receive project code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27ConsumeMsg(args, "DirectTestQueue");
static void ConsumeMsg(string[] args, string queueName)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine("queue:"+queueName);
Console.WriteLine("[x] Received {0}", message);
Thread.Sleep(60000);
Console.WriteLine(" [x] Done");
//channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //Manually send message acknowledgments
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
RabbitMQ_message persistent-Durable queue(5)
Background:
Queue message may miss once RabbitMQ restart or server crash. How can we handle this situation ?
Solution:
1.Declare a durable exchange
2.Declare a durable queue
3.Set Persistent to true when we publish message
Send project code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23DurableQueueTest(args, "DurableQueue");
static void DurableQueueTest(string[] args, string quename)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string exchangeName = "DurableQueueTest";
string routingKey = "DRK";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
channel.QueueDeclare(queue: quename, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(quename, exchangeName, routingKey, null);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
string message = args.Length > 0 ? exchangeName + " " + args[0] : "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey, basicProperties: properties, body: body);
Console.WriteLine("[x] Sent {0} ", message);
}
}
}
RabbitMQ_ExchangeType Topic(4)
Exchange: Exchange is a rout map
ExchangeType: Topic, the message will be pushed to the specified queues which are matched to the Routingkey like regular expression.
1.Send project code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 TopicExchangeTest(args, "TopicQueue");
static void TopicExchangeTest(string[] args, string quename)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string exchangeName = "TopicTest";
string routingKey = "TopicRK.*"; //* for single word ,# for multiple words
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
channel.QueueDeclare(queue: quename, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(quename, exchangeName, routingKey, null);
string message = args.Length > 0 ? exchangeName + " " + args[0] : "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, "TopicRK.one", basicProperties: null, body: body);
Console.WriteLine("[x] Sent {0} ", message);
}
}
}
RabbitMQ_ExchangeType Fanout(3)
Exchange: Exchange is a rout map
ExchangeType: FanOut, the message will be pushed to all queues which bind to queue,Routingkey is not needed
1.Send project code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25FanOutExchangeTest(args);
static void FanOutExchangeTest(string[] args)
{
string queueName1 = "Queue1";
string queueName2 = "Queue2";
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string exchangeName = "FanOutTest";
string routingKey = "";
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
channel.QueueDeclare(queue: queueName1, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: queueName2, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queueName1, exchangeName, routingKey, null);
channel.QueueBind(queueName2, exchangeName, routingKey, null);
string message = args.Length > 0 ? exchangeName + " " + args[0] : "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey, basicProperties: null, body: body);
Console.WriteLine("[x] Sent {0} ", message);
}
}
}
RabbitMQ_ExchangeType Direct(2)
DirectExchangeTest
Exchange: Exchange is a rout map
ExchangeType: Direct, the message will be pushed to specified queue which have binded with the exchange.
1.Send project code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 DirectExchangeTest(args, "DirectTestQueue");
static void DirectExchangeTest(string[] args,string quename)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string exchangeName = "DirectTest";
string routingKey = "DirectRK";
channel.ExchangeDeclare(exchangeName,ExchangeType.Direct,false,false,null);
channel.QueueDeclare(queue: "DirectTestQueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(quename, exchangeName, routingKey, null);
string message = args.Length > 0 ? exchangeName+" "+args[0] : "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName,routingKey, basicProperties: null, body: body);
Console.WriteLine("[x] Sent {0} ", message);
}
}
}
2.Receive project code:
1
ConsumeMsg(args, "DirectTestQueue");
RabbitMQ_No Exchange (1)
No Exchange test
1.Install RabbitMQ :https://www.rabbitmq.com/download.html
2.Create two .net core projects for testing: Send and Receive
3.Send project Code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21static void Main(string[] args)
{
NoExchangeTest(args,"NoExchangeQueue");
}
static void NoExchangeTest(string[] args,string queuename)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queuename, durable: false, exclusive: false, autoDelete: false, arguments: null);
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: queuename, basicProperties: null, body: body);
Console.WriteLine("[x] Sent {0} ", message);
}
}
}