Made AMQPPush for pushing the device limits to RabbitMQ and consume after 1 hour
This commit is contained in:
parent
6f311fabd4
commit
47797d18a0
88
backend/Api/AMQP/AMQPPush.cs
Normal file
88
backend/Api/AMQP/AMQPPush.cs
Normal file
@ -0,0 +1,88 @@
|
||||
using Api.DBAccess;
|
||||
using Api.Models;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace Api.AMQP
|
||||
{
|
||||
public class AMQPPush
|
||||
{
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly DbAccess _dbAccess;
|
||||
|
||||
public AMQPPush(IConfiguration configuration, DbAccess dbAccess)
|
||||
{
|
||||
_dbAccess = dbAccess;
|
||||
_configuration = configuration;
|
||||
}
|
||||
|
||||
public async Task Handle_Push_Device_Limits()
|
||||
{
|
||||
var factory = new ConnectionFactory();
|
||||
var queue = "temperature-limits";
|
||||
|
||||
factory.UserName = _configuration["AMQP:username"];
|
||||
factory.Password = _configuration["AMQP:password"];
|
||||
factory.HostName = _configuration["AMQP:host"];
|
||||
factory.Port = Convert.ToInt32(_configuration["AMQP:port"]);
|
||||
|
||||
// Connecting to our rabbitmq and after that it create's a channel where you can connect to a queue
|
||||
var conn = await factory.CreateConnectionAsync();
|
||||
Console.WriteLine("AMQPClient connected");
|
||||
var channel = await conn.CreateChannelAsync();
|
||||
|
||||
// Here we connect to the queue through the channel that got created earlier
|
||||
await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false);
|
||||
Console.WriteLine($"{queue} connected");
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
||||
var devices = _dbAccess.ReadDevices();
|
||||
foreach (var device in devices)
|
||||
{
|
||||
var deviceLimit = new DeviceLimit();
|
||||
deviceLimit.ReferenceId = device.ReferenceId;
|
||||
deviceLimit.TempHigh = device.TempHigh;
|
||||
deviceLimit.TempLow = device.TempLow;
|
||||
string message = JsonSerializer.Serialize(deviceLimit);
|
||||
var body = Encoding.UTF8.GetBytes(message);
|
||||
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body);
|
||||
}
|
||||
|
||||
await Task.Delay(10000);
|
||||
|
||||
await channel.CloseAsync();
|
||||
Console.WriteLine($"{queue} disconnected");
|
||||
await conn.CloseAsync();
|
||||
Console.WriteLine("AMQPClient disconnected");
|
||||
await channel.DisposeAsync();
|
||||
await conn.DisposeAsync();
|
||||
await Task.Delay(3600000);
|
||||
|
||||
conn = await factory.CreateConnectionAsync();
|
||||
Console.WriteLine("AMQPClient connected");
|
||||
channel = await conn.CreateChannelAsync();
|
||||
|
||||
// Here we connect to the queue through the channel that got created earlier
|
||||
await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false);
|
||||
Console.WriteLine($"{queue} connected");
|
||||
|
||||
// Everytime a message is recieved from the queue it goes into this consumer.ReceivedAsync
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += (model, ea) =>
|
||||
{
|
||||
Console.WriteLine("Emptying queue");
|
||||
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
// Consumes the data in the queue
|
||||
await channel.BasicConsumeAsync(queue, true, consumer);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -30,7 +30,7 @@ namespace Api.AMQPReciever
|
||||
|
||||
// Connecting to our rabbitmq and after that it create's a channel where you can connect to a queue
|
||||
using var conn = await factory.CreateConnectionAsync();
|
||||
Console.WriteLine("AMQPClien connected");
|
||||
Console.WriteLine("AMQPClient connected");
|
||||
using var channel = await conn.CreateChannelAsync();
|
||||
|
||||
// Here we connect to the queue through the channel that got created earlier
|
||||
@ -45,7 +45,7 @@ namespace Api.AMQPReciever
|
||||
var body = ea.Body.ToArray();
|
||||
var message = Encoding.UTF8.GetString(body);
|
||||
|
||||
var messageReceive = JsonSerializer.Deserialize<MQTTMessageReceive>(message);
|
||||
var messageReceive = JsonSerializer.Deserialize<MessageReceive>(message);
|
||||
|
||||
// Checks if the message has the data we need
|
||||
if (messageReceive == null || messageReceive.device_id == null || messageReceive.timestamp == 0)
|
@ -189,6 +189,12 @@ namespace Api.DBAccess
|
||||
return _context.Devices.FirstOrDefault(d => d.ReferenceId == refenreId);
|
||||
}
|
||||
|
||||
// Returns all devices
|
||||
public List<Device> ReadDevices()
|
||||
{
|
||||
return _context.Devices.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates a device in the database
|
||||
/// </summary>
|
||||
|
@ -43,7 +43,7 @@ namespace Api.MQTTReciever
|
||||
|
||||
string sensorData = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
||||
|
||||
var messageReceive = JsonSerializer.Deserialize<MQTTMessageReceive>(sensorData);
|
||||
var messageReceive = JsonSerializer.Deserialize<MessageReceive>(sensorData);
|
||||
|
||||
// Checks if the message has the data we need
|
||||
if (messageReceive == null || messageReceive.device_id == null || messageReceive.timestamp == 0)
|
||||
|
11
backend/Api/Models/DeviceLimit.cs
Normal file
11
backend/Api/Models/DeviceLimit.cs
Normal file
@ -0,0 +1,11 @@
|
||||
namespace Api.Models
|
||||
{
|
||||
public class DeviceLimit
|
||||
{
|
||||
public double TempHigh { get; set; }
|
||||
|
||||
public double TempLow { get; set; }
|
||||
|
||||
public string? ReferenceId { get; set; }
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
namespace Api.Models
|
||||
{
|
||||
public class MQTTMessageReceive
|
||||
public class MessageReceive
|
||||
{
|
||||
public double temperature { get; set; }
|
||||
|
@ -1,4 +1,5 @@
|
||||
using Api;
|
||||
using Api.AMQP;
|
||||
using Api.AMQPReciever;
|
||||
using Api.DBAccess;
|
||||
using Api.MQTTReciever;
|
||||
@ -27,8 +28,10 @@ class Program
|
||||
// Choose to either connect AMQP or MQTT
|
||||
if (rabbitMQ == "AMQP")
|
||||
{
|
||||
AMQPReciever amqp = new AMQPReciever(configuration, dbAccess);
|
||||
amqp.Handle_Received_Application_Message().Wait();
|
||||
AMQPReciever amqpReciever = new AMQPReciever(configuration, dbAccess);
|
||||
amqpReciever.Handle_Received_Application_Message().Wait();
|
||||
AMQPPush aMQPPush = new AMQPPush(configuration, dbAccess);
|
||||
aMQPPush.Handle_Push_Device_Limits().Wait();
|
||||
}
|
||||
else if (rabbitMQ == "MQTT")
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user