Implement AMQP in IoT

This commit is contained in:
Reimar 2025-03-25 12:21:49 +01:00
parent fccea25296
commit 8c6037b010
Signed by: Reimar
GPG Key ID: 93549FA07F0AE268
7 changed files with 52 additions and 9 deletions

View File

@ -1,3 +1,3 @@
all: main.c mqtt.c temperature.c device_id.c
$(CC) -lmosquitto -lpthread -li2c main.c mqtt.c temperature.c device_id.c
all: main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c
$(CC) -lmosquitto -lrabbitmq -lpthread -li2c main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c

35
iot/brokers/amqp.c Normal file
View File

@ -0,0 +1,35 @@
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "../config.h"
amqp_connection_state_t conn;
amqp_socket_t *socket;
void broker_on_connect(void);
void amqp_send_message(char *queue, char *message)
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_literal_bytes("text/plain");
props.delivery_mode = 2;
amqp_basic_publish(conn, 1, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), 0, 0, &props, amqp_cstring_bytes(message));
}
void init_amqp(void)
{
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
amqp_socket_open(socket, AMQP_IP, AMQP_PORT);
amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, AMQP_USER, AMQP_PASSWORD);
amqp_channel_open(conn, 1);
broker_on_connect();
for (;;);
}

4
iot/brokers/amqp.h Normal file
View File

@ -0,0 +1,4 @@
void init_amqp(void);
void amqp_send_message(char *topic, char *message);

View File

@ -3,11 +3,11 @@
#include <string.h>
#include <stdbool.h>
#include "config.h"
#include "../config.h"
struct mosquitto *mosq;
void mqtt_on_connect(void);
void broker_on_connect(void);
void on_connect(struct mosquitto *client, void *obj, int rc)
{
@ -18,7 +18,7 @@ void on_connect(struct mosquitto *client, void *obj, int rc)
puts("Connected to " MQTT_IP);
mqtt_on_connect();
broker_on_connect();
}
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)

View File

@ -3,3 +3,7 @@
#define MQTT_USER "user"
#define MQTT_PASSWORD "password"
#define AMQP_IP "127.0.0.1"
#define AMQP_PORT 5672
#define AMQP_USER "user"
#define AMQP_PASSWORD "password"

View File

@ -6,7 +6,7 @@
#include <stdio.h>
#include <time.h>
#include "mqtt.h"
#include "brokers/amqp.h"
#include "temperature.h"
#include "device_id.h"
@ -33,7 +33,7 @@ void *watch_temperature(void *arg)
char *str = malloc(snprintf(NULL, 0, format, temperature, device_id, timestamp) + 1);
sprintf(str, format, temperature, device_id, timestamp);
mqtt_send_message("temperature", str);
amqp_send_message("temperature-logs", str);
free(str);
@ -47,7 +47,7 @@ void *watch_temperature(void *arg)
return NULL;
}
void mqtt_on_connect(void)
void broker_on_connect(void)
{
pthread_t temperature_thread;
pthread_create(&temperature_thread, NULL, watch_temperature, NULL);
@ -57,7 +57,7 @@ int main(void)
{
srand(time(NULL));
init_mqtt();
init_amqp();
return EXIT_SUCCESS;
}