From 43374196846421f57796450aca916b918e642c93 Mon Sep 17 00:00:00 2001
From: Reimar <mail@reim.ar>
Date: Tue, 25 Mar 2025 12:21:49 +0100
Subject: [PATCH] Implement AMQP in IoT

---
 iot/Makefile             |  4 ++--
 iot/brokers/amqp.c       | 35 +++++++++++++++++++++++++++++++++++
 iot/brokers/amqp.h       |  4 ++++
 iot/{ => brokers}/mqtt.c |  6 +++---
 iot/{ => brokers}/mqtt.h |  0
 iot/config.example.h     |  4 ++++
 iot/main.c               |  8 ++++----
 7 files changed, 52 insertions(+), 9 deletions(-)
 create mode 100644 iot/brokers/amqp.c
 create mode 100644 iot/brokers/amqp.h
 rename iot/{ => brokers}/mqtt.c (94%)
 rename iot/{ => brokers}/mqtt.h (100%)

diff --git a/iot/Makefile b/iot/Makefile
index d9c12c7..0f0e886 100644
--- a/iot/Makefile
+++ b/iot/Makefile
@@ -1,3 +1,3 @@
-all: main.c mqtt.c temperature.c device_id.c
-	$(CC) -lmosquitto -lpthread -li2c main.c mqtt.c temperature.c device_id.c
+all: main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c
+	$(CC) -lmosquitto -lrabbitmq -lpthread -li2c main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c
 
diff --git a/iot/brokers/amqp.c b/iot/brokers/amqp.c
new file mode 100644
index 0000000..20d7c39
--- /dev/null
+++ b/iot/brokers/amqp.c
@@ -0,0 +1,35 @@
+#include <rabbitmq-c/amqp.h>
+#include <rabbitmq-c/tcp_socket.h>
+
+#include "../config.h"
+
+amqp_connection_state_t conn;
+amqp_socket_t *socket;
+
+void broker_on_connect(void);
+
+void amqp_send_message(char *queue, char *message)
+{
+	amqp_basic_properties_t props;
+	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+	props.content_type = amqp_literal_bytes("text/plain");
+	props.delivery_mode = 2;
+
+	amqp_basic_publish(conn, 1, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), 0, 0, &props, amqp_cstring_bytes(message));
+}
+
+void init_amqp(void)
+{
+	conn = amqp_new_connection();
+
+	socket = amqp_tcp_socket_new(conn);
+	amqp_socket_open(socket, AMQP_IP, AMQP_PORT);
+
+	amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, AMQP_USER, AMQP_PASSWORD);
+
+	amqp_channel_open(conn, 1);
+
+	broker_on_connect();
+
+	for (;;);
+}
diff --git a/iot/brokers/amqp.h b/iot/brokers/amqp.h
new file mode 100644
index 0000000..be45cbb
--- /dev/null
+++ b/iot/brokers/amqp.h
@@ -0,0 +1,4 @@
+void init_amqp(void);
+
+void amqp_send_message(char *topic, char *message);
+
diff --git a/iot/mqtt.c b/iot/brokers/mqtt.c
similarity index 94%
rename from iot/mqtt.c
rename to iot/brokers/mqtt.c
index ae9de13..d89d7a7 100644
--- a/iot/mqtt.c
+++ b/iot/brokers/mqtt.c
@@ -3,11 +3,11 @@
 #include <string.h>
 #include <stdbool.h>
 
-#include "config.h"
+#include "../config.h"
 
 struct mosquitto *mosq;
 
-void mqtt_on_connect(void);
+void broker_on_connect(void);
 
 void on_connect(struct mosquitto *client, void *obj, int rc)
 {
@@ -18,7 +18,7 @@ void on_connect(struct mosquitto *client, void *obj, int rc)
 
 	puts("Connected to " MQTT_IP);
 
-	mqtt_on_connect();
+	broker_on_connect();
 }
 
 void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
diff --git a/iot/mqtt.h b/iot/brokers/mqtt.h
similarity index 100%
rename from iot/mqtt.h
rename to iot/brokers/mqtt.h
diff --git a/iot/config.example.h b/iot/config.example.h
index 20d78c9..dbd308f 100644
--- a/iot/config.example.h
+++ b/iot/config.example.h
@@ -3,3 +3,7 @@
 #define MQTT_USER "user"
 #define MQTT_PASSWORD "password"
 
+#define AMQP_IP "127.0.0.1"
+#define AMQP_PORT 5672
+#define AMQP_USER "user"
+#define AMQP_PASSWORD "password"
diff --git a/iot/main.c b/iot/main.c
index fb1571c..6cf4ec5 100644
--- a/iot/main.c
+++ b/iot/main.c
@@ -6,7 +6,7 @@
 #include <stdio.h>
 #include <time.h>
 
-#include "mqtt.h"
+#include "brokers/amqp.h"
 #include "temperature.h"
 #include "device_id.h"
 
@@ -33,7 +33,7 @@ void *watch_temperature(void *arg)
 		char *str = malloc(snprintf(NULL, 0, format, temperature, device_id, timestamp) + 1);
 		sprintf(str, format, temperature, device_id, timestamp);
 
-		mqtt_send_message("temperature", str);
+		amqp_send_message("temperature", str);
 
 		free(str);
 
@@ -47,7 +47,7 @@ void *watch_temperature(void *arg)
 	return NULL;
 }
 
-void mqtt_on_connect(void)
+void broker_on_connect(void)
 {
 	pthread_t temperature_thread;
 	pthread_create(&temperature_thread, NULL, watch_temperature, NULL);
@@ -57,7 +57,7 @@ int main(void)
 {
 	srand(time(NULL));
 
-	init_mqtt();
+	init_amqp();
 
 	return EXIT_SUCCESS;
 }