Use fanout exchange for receiving temperature updates

This commit is contained in:
Reimar 2025-04-08 13:54:09 +02:00
parent 0d77807d56
commit 9a04cf14ac
Signed by: Reimar
GPG Key ID: 93549FA07F0AE268
5 changed files with 28 additions and 10 deletions

View File

@ -22,12 +22,18 @@ void amqp_send_message(char *queue, char *message)
props.content_type = amqp_literal_bytes("text/plain"); props.content_type = amqp_literal_bytes("text/plain");
props.delivery_mode = 2; props.delivery_mode = 2;
amqp_basic_publish(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), 0, 0, &props, amqp_cstring_bytes(message)); amqp_basic_publish(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), false, false, &props, amqp_cstring_bytes(message));
} }
void amqp_subscribe(char *queue) void amqp_subscribe(char *exchange, char *queue)
{ {
amqp_basic_consume(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes("iot"), 1, 0, 0, amqp_empty_table); amqp_exchange_declare(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes("fanout"), false, false, false, false, amqp_empty_table);
amqp_queue_declare(conn, channel, amqp_cstring_bytes(queue), false, false, false, true, amqp_empty_table);
amqp_queue_bind(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes(exchange), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes("iot"), true, true, false, amqp_empty_table);
} }
char *amqp_bytes_to_cstring(amqp_bytes_t bytes) char *amqp_bytes_to_cstring(amqp_bytes_t bytes)
@ -57,12 +63,16 @@ void init_amqp(void)
amqp_envelope_t envelope; amqp_envelope_t envelope;
amqp_consume_message(conn, &envelope, NULL, 0); amqp_consume_message(conn, &envelope, NULL, 0);
if (envelope.exchange.len == 0) {
continue;
}
char *exchange_name = amqp_bytes_to_cstring(envelope.exchange); char *exchange_name = amqp_bytes_to_cstring(envelope.exchange);
char *message = amqp_bytes_to_cstring(envelope.message.body); char *message = amqp_bytes_to_cstring(envelope.message.body);
if (broker_on_message(exchange_name, message)) { broker_on_message(exchange_name, message);
amqp_basic_ack(conn, channel, envelope.delivery_tag, 0);
} amqp_basic_ack(conn, channel, envelope.delivery_tag, false);
free(exchange_name); free(exchange_name);
free(message); free(message);

View File

@ -2,5 +2,5 @@ void init_amqp(void);
void amqp_send_message(char *topic, char *message); void amqp_send_message(char *topic, char *message);
void amqp_subscribe(char *queue); void amqp_subscribe(char *exchange, char *queue);

View File

@ -4,8 +4,6 @@
#include "device_properties.h" #include "device_properties.h"
#define DEVICE_ID_SIZE 5
char *get_device_id(void) char *get_device_id(void)
{ {
FILE *file = fopen("device_id.txt", "r"); FILE *file = fopen("device_id.txt", "r");

View File

@ -1,3 +1,5 @@
#define DEVICE_ID_SIZE 5
char *get_device_id(void); char *get_device_id(void);
char *generate_device_id(void); char *generate_device_id(void);

View File

@ -97,8 +97,16 @@ void *watch_temperature(void *arg)
void broker_on_connect(void) void broker_on_connect(void)
{ {
amqp_subscribe("temperature-limits"); // Subscribe to messages from queue
char *queue = malloc(strlen("device-") + DEVICE_ID_SIZE + 1);
strcpy(queue, "device-");
strcat(queue, get_device_id());
amqp_subscribe("temperature-limits", queue);
free(queue);
// Start thread for watching temperature
pthread_t temperature_thread; pthread_t temperature_thread;
pthread_create(&temperature_thread, NULL, watch_temperature, NULL); pthread_create(&temperature_thread, NULL, watch_temperature, NULL);
} }