diff --git a/iot/Makefile b/iot/Makefile index d9c12c7..0f0e886 100644 --- a/iot/Makefile +++ b/iot/Makefile @@ -1,3 +1,3 @@ -all: main.c mqtt.c temperature.c device_id.c - $(CC) -lmosquitto -lpthread -li2c main.c mqtt.c temperature.c device_id.c +all: main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c + $(CC) -lmosquitto -lrabbitmq -lpthread -li2c main.c brokers/mqtt.c brokers/amqp.c temperature.c device_id.c diff --git a/iot/brokers/amqp.c b/iot/brokers/amqp.c new file mode 100644 index 0000000..20d7c39 --- /dev/null +++ b/iot/brokers/amqp.c @@ -0,0 +1,35 @@ +#include +#include + +#include "../config.h" + +amqp_connection_state_t conn; +amqp_socket_t *socket; + +void broker_on_connect(void); + +void amqp_send_message(char *queue, char *message) +{ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_literal_bytes("text/plain"); + props.delivery_mode = 2; + + amqp_basic_publish(conn, 1, amqp_cstring_bytes(queue), amqp_cstring_bytes(queue), 0, 0, &props, amqp_cstring_bytes(message)); +} + +void init_amqp(void) +{ + conn = amqp_new_connection(); + + socket = amqp_tcp_socket_new(conn); + amqp_socket_open(socket, AMQP_IP, AMQP_PORT); + + amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, AMQP_USER, AMQP_PASSWORD); + + amqp_channel_open(conn, 1); + + broker_on_connect(); + + for (;;); +} diff --git a/iot/brokers/amqp.h b/iot/brokers/amqp.h new file mode 100644 index 0000000..be45cbb --- /dev/null +++ b/iot/brokers/amqp.h @@ -0,0 +1,4 @@ +void init_amqp(void); + +void amqp_send_message(char *topic, char *message); + diff --git a/iot/mqtt.c b/iot/brokers/mqtt.c similarity index 94% rename from iot/mqtt.c rename to iot/brokers/mqtt.c index ae9de13..d89d7a7 100644 --- a/iot/mqtt.c +++ b/iot/brokers/mqtt.c @@ -3,11 +3,11 @@ #include #include -#include "config.h" +#include "../config.h" struct mosquitto *mosq; -void mqtt_on_connect(void); +void broker_on_connect(void); void on_connect(struct mosquitto *client, void *obj, int rc) { @@ -18,7 +18,7 @@ void on_connect(struct mosquitto *client, void *obj, int rc) puts("Connected to " MQTT_IP); - mqtt_on_connect(); + broker_on_connect(); } void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) diff --git a/iot/mqtt.h b/iot/brokers/mqtt.h similarity index 100% rename from iot/mqtt.h rename to iot/brokers/mqtt.h diff --git a/iot/config.example.h b/iot/config.example.h index 20d78c9..dbd308f 100644 --- a/iot/config.example.h +++ b/iot/config.example.h @@ -3,3 +3,7 @@ #define MQTT_USER "user" #define MQTT_PASSWORD "password" +#define AMQP_IP "127.0.0.1" +#define AMQP_PORT 5672 +#define AMQP_USER "user" +#define AMQP_PASSWORD "password" diff --git a/iot/main.c b/iot/main.c index fb1571c..ad6014c 100644 --- a/iot/main.c +++ b/iot/main.c @@ -6,7 +6,7 @@ #include #include -#include "mqtt.h" +#include "brokers/amqp.h" #include "temperature.h" #include "device_id.h" @@ -33,7 +33,7 @@ void *watch_temperature(void *arg) char *str = malloc(snprintf(NULL, 0, format, temperature, device_id, timestamp) + 1); sprintf(str, format, temperature, device_id, timestamp); - mqtt_send_message("temperature", str); + amqp_send_message("temperature-logs", str); free(str); @@ -47,7 +47,7 @@ void *watch_temperature(void *arg) return NULL; } -void mqtt_on_connect(void) +void broker_on_connect(void) { pthread_t temperature_thread; pthread_create(&temperature_thread, NULL, watch_temperature, NULL); @@ -57,7 +57,7 @@ int main(void) { srand(time(NULL)); - init_mqtt(); + init_amqp(); return EXIT_SUCCESS; }