Publish automatisk ved edit af temphigh og low
This commit is contained in:
parent
fd1b4ace5d
commit
1f372355d4
@ -10,15 +10,13 @@ namespace Api.AMQP
|
|||||||
public class AMQPPublisher
|
public class AMQPPublisher
|
||||||
{
|
{
|
||||||
private readonly IConfiguration _configuration;
|
private readonly IConfiguration _configuration;
|
||||||
private readonly DbAccess _dbAccess;
|
|
||||||
private IConnection _conn;
|
private IConnection _conn;
|
||||||
private IChannel _channel;
|
private IChannel _channel;
|
||||||
private ConnectionFactory _factory;
|
private ConnectionFactory _factory;
|
||||||
private string _queue;
|
private string _queue;
|
||||||
|
|
||||||
public AMQPPublisher(IConfiguration configuration, DbAccess dbAccess)
|
public AMQPPublisher(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
_dbAccess = dbAccess;
|
|
||||||
_configuration = configuration;
|
_configuration = configuration;
|
||||||
_factory = new ConnectionFactory();
|
_factory = new ConnectionFactory();
|
||||||
_queue = "temperature-limits";
|
_queue = "temperature-limits";
|
||||||
@ -26,51 +24,22 @@ namespace Api.AMQP
|
|||||||
InitFactory();
|
InitFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Handle_Push_Device_Limits()
|
public async void Handle_Push_Device_Limits(DeviceLimit deviceLimit)
|
||||||
{
|
{
|
||||||
while (true)
|
// Connecting to rabbitMQ
|
||||||
{
|
await Connect();
|
||||||
await Connect();
|
|
||||||
|
|
||||||
// Publishes all devices limits
|
string message = JsonSerializer.Serialize(deviceLimit);
|
||||||
var devices = _dbAccess.ReadDevices();
|
var body = Encoding.UTF8.GetBytes(message);
|
||||||
foreach (var device in devices)
|
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: _queue, body: body);
|
||||||
{
|
|
||||||
var deviceLimit = new DeviceLimit();
|
|
||||||
deviceLimit.ReferenceId = device.ReferenceId;
|
|
||||||
deviceLimit.TempHigh = device.TempHigh;
|
|
||||||
deviceLimit.TempLow = device.TempLow;
|
|
||||||
string message = JsonSerializer.Serialize(deviceLimit);
|
|
||||||
var body = Encoding.UTF8.GetBytes(message);
|
|
||||||
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: _queue, body: body);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Short delay before disconnecting from rabbitMQ
|
|
||||||
await Task.Delay(1000);
|
|
||||||
|
|
||||||
// Disconnecting from rabbitMQ to save resources
|
// Short delay before disconnecting from rabbitMQ
|
||||||
await Dispose();
|
await Task.Delay(1000);
|
||||||
// 1 hour delay
|
|
||||||
await Task.Delay(3600000);
|
|
||||||
|
|
||||||
await Connect();
|
// Disconnecting from rabbitMQ to save resources
|
||||||
|
await Dispose();
|
||||||
|
|
||||||
// Here all messages is consumed so the queue is empty
|
|
||||||
var consumer = new AsyncEventingBasicConsumer(_channel);
|
|
||||||
consumer.ReceivedAsync += (model, ea) =>
|
|
||||||
{
|
|
||||||
Console.WriteLine("Emptying queue");
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Consumes the data in the queue
|
|
||||||
await _channel.BasicConsumeAsync(_queue, true, consumer);
|
|
||||||
|
|
||||||
// Short delay before disconnecting from rabbitMQ
|
|
||||||
await Task.Delay(1000);
|
|
||||||
await Dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disconnects from rabbitMQ
|
// Disconnects from rabbitMQ
|
||||||
@ -92,7 +61,7 @@ namespace Api.AMQP
|
|||||||
_channel = await _conn.CreateChannelAsync();
|
_channel = await _conn.CreateChannelAsync();
|
||||||
|
|
||||||
// Here we connect to the queue through the channel that got created earlier
|
// Here we connect to the queue through the channel that got created earlier
|
||||||
await _channel.QueueDeclareAsync(queue: _queue, durable: false, exclusive: false, autoDelete: false);
|
await _channel.QueueDeclareAsync(queue: _queue, durable: true, exclusive: false, autoDelete: false);
|
||||||
Console.WriteLine($"{_queue} connected");
|
Console.WriteLine($"{_queue} connected");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ using Api.Models.Devices;
|
|||||||
using Api.Models.Users;
|
using Api.Models.Users;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Api.AMQP;
|
||||||
|
|
||||||
namespace Api.BusinessLogic
|
namespace Api.BusinessLogic
|
||||||
{
|
{
|
||||||
@ -68,11 +69,17 @@ namespace Api.BusinessLogic
|
|||||||
Device device = new Device
|
Device device = new Device
|
||||||
{
|
{
|
||||||
Name = "Undefined",
|
Name = "Undefined",
|
||||||
TempHigh = 0,
|
TempHigh = 100,
|
||||||
TempLow = 0,
|
TempLow = -40,
|
||||||
ReferenceId = referenceId,
|
ReferenceId = referenceId,
|
||||||
Logs = new List<TemperatureLogs>(),
|
Logs = new List<TemperatureLogs>(),
|
||||||
};
|
};
|
||||||
|
DeviceLimit deviceLimit = new DeviceLimit();
|
||||||
|
deviceLimit.TempHigh = device.TempHigh;
|
||||||
|
deviceLimit.TempLow = device.TempLow;
|
||||||
|
deviceLimit.ReferenceId = device.ReferenceId;
|
||||||
|
AMQPPublisher publisher = new AMQPPublisher(_configuration);
|
||||||
|
publisher.Handle_Push_Device_Limits(deviceLimit);
|
||||||
|
|
||||||
user.Devices.Add(device);
|
user.Devices.Add(device);
|
||||||
|
|
||||||
@ -96,7 +103,12 @@ namespace Api.BusinessLogic
|
|||||||
device.Name = request.Name;
|
device.Name = request.Name;
|
||||||
device.TempLow = request.TempLow;
|
device.TempLow = request.TempLow;
|
||||||
device.TempHigh = request.TempHigh;
|
device.TempHigh = request.TempHigh;
|
||||||
|
DeviceLimit deviceLimit = new DeviceLimit();
|
||||||
|
deviceLimit.TempHigh = device.TempHigh;
|
||||||
|
deviceLimit.TempLow = device.TempLow;
|
||||||
|
deviceLimit.ReferenceId = device.ReferenceId;
|
||||||
|
AMQPPublisher publisher = new AMQPPublisher(_configuration);
|
||||||
|
publisher.Handle_Push_Device_Limits(deviceLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
return await _dbAccess.EditDevice(device);
|
return await _dbAccess.EditDevice(device);
|
||||||
|
@ -29,8 +29,6 @@ class Program
|
|||||||
{
|
{
|
||||||
AMQPReciever amqpReciever = new AMQPReciever(configuration, dbAccess);
|
AMQPReciever amqpReciever = new AMQPReciever(configuration, dbAccess);
|
||||||
amqpReciever.Handle_Received_Application_Message().Wait();
|
amqpReciever.Handle_Received_Application_Message().Wait();
|
||||||
AMQPPublisher aMQPPush = new AMQPPublisher(configuration, dbAccess);
|
|
||||||
aMQPPush.Handle_Push_Device_Limits().Wait();
|
|
||||||
}
|
}
|
||||||
else if (rabbitMQ == "MQTT")
|
else if (rabbitMQ == "MQTT")
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user