Changed the name of the publisher
This commit is contained in:
parent
47797d18a0
commit
998cb90acc
@ -7,12 +7,12 @@ using System.Text.Json;
|
|||||||
|
|
||||||
namespace Api.AMQP
|
namespace Api.AMQP
|
||||||
{
|
{
|
||||||
public class AMQPPush
|
public class AMQPPublisher
|
||||||
{
|
{
|
||||||
private readonly IConfiguration _configuration;
|
private readonly IConfiguration _configuration;
|
||||||
private readonly DbAccess _dbAccess;
|
private readonly DbAccess _dbAccess;
|
||||||
|
|
||||||
public AMQPPush(IConfiguration configuration, DbAccess dbAccess)
|
public AMQPPublisher(IConfiguration configuration, DbAccess dbAccess)
|
||||||
{
|
{
|
||||||
_dbAccess = dbAccess;
|
_dbAccess = dbAccess;
|
||||||
_configuration = configuration;
|
_configuration = configuration;
|
||||||
@ -39,7 +39,7 @@ namespace Api.AMQP
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
|
// Publishes all devices limits
|
||||||
var devices = _dbAccess.ReadDevices();
|
var devices = _dbAccess.ReadDevices();
|
||||||
foreach (var device in devices)
|
foreach (var device in devices)
|
||||||
{
|
{
|
||||||
@ -52,16 +52,20 @@ namespace Api.AMQP
|
|||||||
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body);
|
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Short delay before disconnecting from rabbitMQ
|
||||||
await Task.Delay(10000);
|
await Task.Delay(10000);
|
||||||
|
|
||||||
|
// Disconnecting from rabbitMQ to save resources
|
||||||
await channel.CloseAsync();
|
await channel.CloseAsync();
|
||||||
Console.WriteLine($"{queue} disconnected");
|
Console.WriteLine($"{queue} disconnected");
|
||||||
await conn.CloseAsync();
|
await conn.CloseAsync();
|
||||||
Console.WriteLine("AMQPClient disconnected");
|
Console.WriteLine("AMQPClient disconnected");
|
||||||
await channel.DisposeAsync();
|
await channel.DisposeAsync();
|
||||||
await conn.DisposeAsync();
|
await conn.DisposeAsync();
|
||||||
|
// 1 hour delay
|
||||||
await Task.Delay(3600000);
|
await Task.Delay(3600000);
|
||||||
|
|
||||||
|
// Creating a new connection to rabbitMQ
|
||||||
conn = await factory.CreateConnectionAsync();
|
conn = await factory.CreateConnectionAsync();
|
||||||
Console.WriteLine("AMQPClient connected");
|
Console.WriteLine("AMQPClient connected");
|
||||||
channel = await conn.CreateChannelAsync();
|
channel = await conn.CreateChannelAsync();
|
||||||
@ -70,7 +74,7 @@ namespace Api.AMQP
|
|||||||
await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false);
|
await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false);
|
||||||
Console.WriteLine($"{queue} connected");
|
Console.WriteLine($"{queue} connected");
|
||||||
|
|
||||||
// Everytime a message is recieved from the queue it goes into this consumer.ReceivedAsync
|
// Here all messages is consumed so the queue is empty
|
||||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||||
consumer.ReceivedAsync += (model, ea) =>
|
consumer.ReceivedAsync += (model, ea) =>
|
||||||
{
|
{
|
@ -30,7 +30,7 @@ class Program
|
|||||||
{
|
{
|
||||||
AMQPReciever amqpReciever = new AMQPReciever(configuration, dbAccess);
|
AMQPReciever amqpReciever = new AMQPReciever(configuration, dbAccess);
|
||||||
amqpReciever.Handle_Received_Application_Message().Wait();
|
amqpReciever.Handle_Received_Application_Message().Wait();
|
||||||
AMQPPush aMQPPush = new AMQPPush(configuration, dbAccess);
|
AMQPPublisher aMQPPush = new AMQPPublisher(configuration, dbAccess);
|
||||||
aMQPPush.Handle_Push_Device_Limits().Wait();
|
aMQPPush.Handle_Push_Device_Limits().Wait();
|
||||||
}
|
}
|
||||||
else if (rabbitMQ == "MQTT")
|
else if (rabbitMQ == "MQTT")
|
||||||
|
Loading…
Reference in New Issue
Block a user