diff --git a/backend/Api/AMQP/AMQPPush.cs b/backend/Api/AMQP/AMQPPush.cs new file mode 100644 index 0000000..fe3065f --- /dev/null +++ b/backend/Api/AMQP/AMQPPush.cs @@ -0,0 +1,88 @@ +using Api.DBAccess; +using Api.Models; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Text; +using System.Text.Json; + +namespace Api.AMQP +{ + public class AMQPPush + { + private readonly IConfiguration _configuration; + private readonly DbAccess _dbAccess; + + public AMQPPush(IConfiguration configuration, DbAccess dbAccess) + { + _dbAccess = dbAccess; + _configuration = configuration; + } + + public async Task Handle_Push_Device_Limits() + { + var factory = new ConnectionFactory(); + var queue = "temperature-limits"; + + factory.UserName = _configuration["AMQP:username"]; + factory.Password = _configuration["AMQP:password"]; + factory.HostName = _configuration["AMQP:host"]; + factory.Port = Convert.ToInt32(_configuration["AMQP:port"]); + + // Connecting to our rabbitmq and after that it create's a channel where you can connect to a queue + var conn = await factory.CreateConnectionAsync(); + Console.WriteLine("AMQPClient connected"); + var channel = await conn.CreateChannelAsync(); + + // Here we connect to the queue through the channel that got created earlier + await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false); + Console.WriteLine($"{queue} connected"); + + while (true) + { + + var devices = _dbAccess.ReadDevices(); + foreach (var device in devices) + { + var deviceLimit = new DeviceLimit(); + deviceLimit.ReferenceId = device.ReferenceId; + deviceLimit.TempHigh = device.TempHigh; + deviceLimit.TempLow = device.TempLow; + string message = JsonSerializer.Serialize(deviceLimit); + var body = Encoding.UTF8.GetBytes(message); + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body); + } + + await Task.Delay(10000); + + await channel.CloseAsync(); + Console.WriteLine($"{queue} disconnected"); + await conn.CloseAsync(); + Console.WriteLine("AMQPClient disconnected"); + await channel.DisposeAsync(); + await conn.DisposeAsync(); + await Task.Delay(3600000); + + conn = await factory.CreateConnectionAsync(); + Console.WriteLine("AMQPClient connected"); + channel = await conn.CreateChannelAsync(); + + // Here we connect to the queue through the channel that got created earlier + await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false); + Console.WriteLine($"{queue} connected"); + + // Everytime a message is recieved from the queue it goes into this consumer.ReceivedAsync + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += (model, ea) => + { + Console.WriteLine("Emptying queue"); + + return Task.CompletedTask; + }; + + // Consumes the data in the queue + await channel.BasicConsumeAsync(queue, true, consumer); + + } + } + } +} diff --git a/backend/Api/AMQPReciever/AMQPReciever.cs b/backend/Api/AMQP/AMQPReciever.cs similarity index 97% rename from backend/Api/AMQPReciever/AMQPReciever.cs rename to backend/Api/AMQP/AMQPReciever.cs index 149d205..558d1ca 100644 --- a/backend/Api/AMQPReciever/AMQPReciever.cs +++ b/backend/Api/AMQP/AMQPReciever.cs @@ -30,7 +30,7 @@ namespace Api.AMQPReciever // Connecting to our rabbitmq and after that it create's a channel where you can connect to a queue using var conn = await factory.CreateConnectionAsync(); - Console.WriteLine("AMQPClien connected"); + Console.WriteLine("AMQPClient connected"); using var channel = await conn.CreateChannelAsync(); // Here we connect to the queue through the channel that got created earlier @@ -45,7 +45,7 @@ namespace Api.AMQPReciever var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); - var messageReceive = JsonSerializer.Deserialize(message); + var messageReceive = JsonSerializer.Deserialize(message); // Checks if the message has the data we need if (messageReceive == null || messageReceive.device_id == null || messageReceive.timestamp == 0) diff --git a/backend/Api/DBAccess/DBAccess.cs b/backend/Api/DBAccess/DBAccess.cs index a239f8c..3500641 100644 --- a/backend/Api/DBAccess/DBAccess.cs +++ b/backend/Api/DBAccess/DBAccess.cs @@ -189,6 +189,12 @@ namespace Api.DBAccess return _context.Devices.FirstOrDefault(d => d.ReferenceId == refenreId); } + // Returns all devices + public List ReadDevices() + { + return _context.Devices.ToList(); + } + /// /// Updates a device in the database /// diff --git a/backend/Api/MQTTReciever/MQTTReciever.cs b/backend/Api/MQTTReciever/MQTTReciever.cs index 8c72341..f68cbe0 100644 --- a/backend/Api/MQTTReciever/MQTTReciever.cs +++ b/backend/Api/MQTTReciever/MQTTReciever.cs @@ -43,7 +43,7 @@ namespace Api.MQTTReciever string sensorData = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); - var messageReceive = JsonSerializer.Deserialize(sensorData); + var messageReceive = JsonSerializer.Deserialize(sensorData); // Checks if the message has the data we need if (messageReceive == null || messageReceive.device_id == null || messageReceive.timestamp == 0) diff --git a/backend/Api/Models/DeviceLimit.cs b/backend/Api/Models/DeviceLimit.cs new file mode 100644 index 0000000..ee2e3d7 --- /dev/null +++ b/backend/Api/Models/DeviceLimit.cs @@ -0,0 +1,11 @@ +namespace Api.Models +{ + public class DeviceLimit + { + public double TempHigh { get; set; } + + public double TempLow { get; set; } + + public string? ReferenceId { get; set; } + } +} diff --git a/backend/Api/Models/MQTTMessageReceive.cs b/backend/Api/Models/MessageReceive.cs similarity index 83% rename from backend/Api/Models/MQTTMessageReceive.cs rename to backend/Api/Models/MessageReceive.cs index 16d264a..cb27d59 100644 --- a/backend/Api/Models/MQTTMessageReceive.cs +++ b/backend/Api/Models/MessageReceive.cs @@ -1,6 +1,6 @@ namespace Api.Models { - public class MQTTMessageReceive + public class MessageReceive { public double temperature { get; set; } diff --git a/backend/Api/Program.cs b/backend/Api/Program.cs index 8ab6897..c9a0c33 100644 --- a/backend/Api/Program.cs +++ b/backend/Api/Program.cs @@ -1,4 +1,5 @@ using Api; +using Api.AMQP; using Api.AMQPReciever; using Api.DBAccess; using Api.MQTTReciever; @@ -27,8 +28,10 @@ class Program // Choose to either connect AMQP or MQTT if (rabbitMQ == "AMQP") { - AMQPReciever amqp = new AMQPReciever(configuration, dbAccess); - amqp.Handle_Received_Application_Message().Wait(); + AMQPReciever amqpReciever = new AMQPReciever(configuration, dbAccess); + amqpReciever.Handle_Received_Application_Message().Wait(); + AMQPPush aMQPPush = new AMQPPush(configuration, dbAccess); + aMQPPush.Handle_Push_Device_Limits().Wait(); } else if (rabbitMQ == "MQTT") {