From 8c6037b0100858d61bcae33464bde2d4d67bcc5a Mon Sep 17 00:00:00 2001 From: Reimar Date: Tue, 25 Mar 2025 12:21:49 +0100 Subject: [PATCH 1/2] Implement AMQP in IoT --- iot/Makefile | 4 ++-- iot/brokers/amqp.c | 35 +++++++++++++++++++++++++++++++++++ iot/brokers/amqp.h | 4 ++++ iot/{ => brokers}/mqtt.c | 6 +++--- iot/{ => brokers}/mqtt.h | 0 iot/config.example.h | 4 ++++ iot/main.c | 8 ++++---- 7 files changed, 52 insertions(+), 9 deletions(-) create mode 100644 iot/brokers/amqp.c create mode 100644 iot/brokers/amqp.h rename iot/{ => brokers}/mqtt.c (94%) rename iot/{ => brokers}/mqtt.h (100%) diff --git a/iot/Makefile b/iot/Makefile index d9c12c7..0f0e886 100644 --- a/iot/Makefile +++ b/iot/Makefile @@ -1,3 +1,3 @@ -all: main.c mqtt.c temperature.c device_id.c - $(CC) -lmosquitto -lpthread -li2c main.c mqtt.c temperature.c device_id.c +all: main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c + $(CC) -lmosquitto -lrabbitmq -lpthread -li2c main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c diff --git a/iot/brokers/amqp.c b/iot/brokers/amqp.c new file mode 100644 index 0000000..20d7c39 --- /dev/null +++ b/iot/brokers/amqp.c @@ -0,0 +1,35 @@ +#include +#include + +#include "../config.h" + +amqp_connection_state_t conn; +amqp_socket_t *socket; + +void broker_on_connect(void); + +void amqp_send_message(char *queue, char *message) +{ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_literal_bytes("text/plain"); + props.delivery_mode = 2; + + amqp_basic_publish(conn, 1, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), 0, 0, &props, amqp_cstring_bytes(message)); +} + +void init_amqp(void) +{ + conn = amqp_new_connection(); + + socket = amqp_tcp_socket_new(conn); + amqp_socket_open(socket, AMQP_IP, AMQP_PORT); + + amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, AMQP_USER, AMQP_PASSWORD); + + amqp_channel_open(conn, 1); + + broker_on_connect(); + + for (;;); +} diff --git a/iot/brokers/amqp.h b/iot/brokers/amqp.h new file mode 100644 index 0000000..be45cbb --- /dev/null +++ b/iot/brokers/amqp.h @@ -0,0 +1,4 @@ +void init_amqp(void); + +void amqp_send_message(char *topic, char *message); + diff --git a/iot/mqtt.c b/iot/brokers/mqtt.c similarity index 94% rename from iot/mqtt.c rename to iot/brokers/mqtt.c index ae9de13..d89d7a7 100644 --- a/iot/mqtt.c +++ b/iot/brokers/mqtt.c @@ -3,11 +3,11 @@ #include #include -#include "config.h" +#include "../config.h" struct mosquitto *mosq; -void mqtt_on_connect(void); +void broker_on_connect(void); void on_connect(struct mosquitto *client, void *obj, int rc) { @@ -18,7 +18,7 @@ void on_connect(struct mosquitto *client, void *obj, int rc) puts("Connected to " MQTT_IP); - mqtt_on_connect(); + broker_on_connect(); } void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) diff --git a/iot/mqtt.h b/iot/brokers/mqtt.h similarity index 100% rename from iot/mqtt.h rename to iot/brokers/mqtt.h diff --git a/iot/config.example.h b/iot/config.example.h index 20d78c9..dbd308f 100644 --- a/iot/config.example.h +++ b/iot/config.example.h @@ -3,3 +3,7 @@ #define MQTT_USER "user" #define MQTT_PASSWORD "password" +#define AMQP_IP "127.0.0.1" +#define AMQP_PORT 5672 +#define AMQP_USER "user" +#define AMQP_PASSWORD "password" diff --git a/iot/main.c b/iot/main.c index fb1571c..ad6014c 100644 --- a/iot/main.c +++ b/iot/main.c @@ -6,7 +6,7 @@ #include #include -#include "mqtt.h" +#include "brokers/amqp.h" #include "temperature.h" #include "device_id.h" @@ -33,7 +33,7 @@ void *watch_temperature(void *arg) char *str = malloc(snprintf(NULL, 0, format, temperature, device_id, timestamp) + 1); sprintf(str, format, temperature, device_id, timestamp); - mqtt_send_message("temperature", str); + amqp_send_message("temperature-logs", str); free(str); @@ -47,7 +47,7 @@ void *watch_temperature(void *arg) return NULL; } -void mqtt_on_connect(void) +void broker_on_connect(void) { pthread_t temperature_thread; pthread_create(&temperature_thread, NULL, watch_temperature, NULL); @@ -57,7 +57,7 @@ int main(void) { srand(time(NULL)); - init_mqtt(); + init_amqp(); return EXIT_SUCCESS; } From 90cbf7ae810140ec5d4df518784df14e7dd94e7d Mon Sep 17 00:00:00 2001 From: Reimar Date: Tue, 25 Mar 2025 12:54:52 +0100 Subject: [PATCH 2/2] Fix listening for AMQP messages, remove test console app --- backend/Api/AMQPReciever/AMQPReciever.cs | 3 +- backend/ConsoleApp1/ConsoleApp1.csproj | 14 ------- backend/ConsoleApp1/ConsoleApp1.sln | 25 ------------ backend/ConsoleApp1/Program.cs | 52 ------------------------ 4 files changed, 1 insertion(+), 93 deletions(-) delete mode 100644 backend/ConsoleApp1/ConsoleApp1.csproj delete mode 100644 backend/ConsoleApp1/ConsoleApp1.sln delete mode 100644 backend/ConsoleApp1/Program.cs diff --git a/backend/Api/AMQPReciever/AMQPReciever.cs b/backend/Api/AMQPReciever/AMQPReciever.cs index 0b6cf56..36b3a44 100644 --- a/backend/Api/AMQPReciever/AMQPReciever.cs +++ b/backend/Api/AMQPReciever/AMQPReciever.cs @@ -67,8 +67,7 @@ namespace Api.AMQPReciever await channel.BasicConsumeAsync(queue, true, consumer); - Console.WriteLine("Press enter to exit."); - Console.ReadLine(); + while (true); } } } diff --git a/backend/ConsoleApp1/ConsoleApp1.csproj b/backend/ConsoleApp1/ConsoleApp1.csproj deleted file mode 100644 index 7dfb515..0000000 --- a/backend/ConsoleApp1/ConsoleApp1.csproj +++ /dev/null @@ -1,14 +0,0 @@ - - - - Exe - net8.0 - enable - enable - - - - - - - diff --git a/backend/ConsoleApp1/ConsoleApp1.sln b/backend/ConsoleApp1/ConsoleApp1.sln deleted file mode 100644 index b1ff789..0000000 --- a/backend/ConsoleApp1/ConsoleApp1.sln +++ /dev/null @@ -1,25 +0,0 @@ - -Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 17 -VisualStudioVersion = 17.9.34607.119 -MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleApp1", "ConsoleApp1.csproj", "{0BC93CF2-F92D-4DD6-83BE-A985CBB74960}" -EndProject -Global - GlobalSection(SolutionConfigurationPlatforms) = preSolution - Debug|Any CPU = Debug|Any CPU - Release|Any CPU = Release|Any CPU - EndGlobalSection - GlobalSection(ProjectConfigurationPlatforms) = postSolution - {0BC93CF2-F92D-4DD6-83BE-A985CBB74960}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {0BC93CF2-F92D-4DD6-83BE-A985CBB74960}.Debug|Any CPU.Build.0 = Debug|Any CPU - {0BC93CF2-F92D-4DD6-83BE-A985CBB74960}.Release|Any CPU.ActiveCfg = Release|Any CPU - {0BC93CF2-F92D-4DD6-83BE-A985CBB74960}.Release|Any CPU.Build.0 = Release|Any CPU - EndGlobalSection - GlobalSection(SolutionProperties) = preSolution - HideSolutionNode = FALSE - EndGlobalSection - GlobalSection(ExtensibilityGlobals) = postSolution - SolutionGuid = {55EE0E94-4585-4E79-AC67-4B0E809E99AB} - EndGlobalSection -EndGlobal diff --git a/backend/ConsoleApp1/Program.cs b/backend/ConsoleApp1/Program.cs deleted file mode 100644 index a5e0e23..0000000 --- a/backend/ConsoleApp1/Program.cs +++ /dev/null @@ -1,52 +0,0 @@ -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using System.Text; - - -var factory = new ConnectionFactory(); -var queue = "test"; - - -factory.UserName = "h5"; -factory.Password = "Merc1234"; -factory.HostName = "10.135.51.116"; -factory.Port = 5672; - -using var conn = await factory.CreateConnectionAsync(); -Console.WriteLine("AMQPClien connected"); -using var channel = await conn.CreateChannelAsync(); - -await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false); -Console.WriteLine($"{queue} connected"); - -var consumer = new AsyncEventingBasicConsumer(channel); -consumer.ReceivedAsync += (model, ea) => -{ - Console.WriteLine("Received application message."); - var body = ea.Body.ToArray(); - var message = Encoding.UTF8.GetString(body); - Console.WriteLine(message); - - return Task.CompletedTask; -}; - -await channel.BasicConsumeAsync(queue, true, consumer); - - -const string message = "Hello World!"; -var body = Encoding.UTF8.GetBytes(message); -await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body); -Console.WriteLine(" Press enter to continue."); -Console.ReadLine(); -await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body); -Console.WriteLine(" Press enter to continue."); -Console.ReadLine(); -await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body); -Console.WriteLine(" Press enter to continue."); -Console.ReadLine(); -await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body); -Console.WriteLine(" Press enter to continue."); -Console.ReadLine(); -await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queue, body: body); -Console.WriteLine(" Press enter to exit."); -Console.ReadLine(); \ No newline at end of file