diff --git a/iot/brokers/amqp.c b/iot/brokers/amqp.c index 32223e8..85ab0ec 100644 --- a/iot/brokers/amqp.c +++ b/iot/brokers/amqp.c @@ -22,12 +22,18 @@ void amqp_send_message(char *queue, char *message) props.content_type = amqp_literal_bytes("text/plain"); props.delivery_mode = 2; - amqp_basic_publish(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), 0, 0, &props, amqp_cstring_bytes(message)); + amqp_basic_publish(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), false, false, &props, amqp_cstring_bytes(message)); } -void amqp_subscribe(char *queue) +void amqp_subscribe(char *exchange, char *queue) { - amqp_basic_consume(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes("iot"), 1, 0, 0, amqp_empty_table); + amqp_exchange_declare(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes("fanout"), false, false, false, false, amqp_empty_table); + + amqp_queue_declare(conn, channel, amqp_cstring_bytes(queue), false, false, false, true, amqp_empty_table); + + amqp_queue_bind(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes(exchange), amqp_empty_bytes, amqp_empty_table); + + amqp_basic_consume(conn, channel, amqp_cstring_bytes(queue), amqp_cstring_bytes("iot"), true, true, false, amqp_empty_table); } char *amqp_bytes_to_cstring(amqp_bytes_t bytes) @@ -57,12 +63,16 @@ void init_amqp(void) amqp_envelope_t envelope; amqp_consume_message(conn, &envelope, NULL, 0); + if (envelope.exchange.len == 0) { + continue; + } + char *exchange_name = amqp_bytes_to_cstring(envelope.exchange); char *message = amqp_bytes_to_cstring(envelope.message.body); - if (broker_on_message(exchange_name, message)) { - amqp_basic_ack(conn, channel, envelope.delivery_tag, 0); - } + broker_on_message(exchange_name, message); + + amqp_basic_ack(conn, channel, envelope.delivery_tag, false); free(exchange_name); free(message); diff --git a/iot/brokers/amqp.h b/iot/brokers/amqp.h index 735c778..9a6fd8d 100644 --- a/iot/brokers/amqp.h +++ b/iot/brokers/amqp.h @@ -2,5 +2,5 @@ void init_amqp(void); void amqp_send_message(char *topic, char *message); -void amqp_subscribe(char *queue); +void amqp_subscribe(char *exchange, char *queue); diff --git a/iot/device_properties.c b/iot/device_properties.c index 19bef32..53cdcaa 100644 --- a/iot/device_properties.c +++ b/iot/device_properties.c @@ -4,8 +4,6 @@ #include "device_properties.h" -#define DEVICE_ID_SIZE 5 - char *get_device_id(void) { FILE *file = fopen("device_id.txt", "r"); diff --git a/iot/device_properties.h b/iot/device_properties.h index 76a1b34..23b6e0c 100644 --- a/iot/device_properties.h +++ b/iot/device_properties.h @@ -1,3 +1,5 @@ +#define DEVICE_ID_SIZE 5 + char *get_device_id(void); char *generate_device_id(void); diff --git a/iot/main.c b/iot/main.c index 0a598b8..1b509a4 100644 --- a/iot/main.c +++ b/iot/main.c @@ -97,8 +97,16 @@ void *watch_temperature(void *arg) void broker_on_connect(void) { - amqp_subscribe("temperature-limits"); + // Subscribe to messages from queue + char *queue = malloc(strlen("device-") + DEVICE_ID_SIZE + 1); + strcpy(queue, "device-"); + strcat(queue, get_device_id()); + amqp_subscribe("temperature-limits", queue); + + free(queue); + + // Start thread for watching temperature pthread_t temperature_thread; pthread_create(&temperature_thread, NULL, watch_temperature, NULL); }