Improve mqtt code

This commit is contained in:
Lara 2019-01-16 22:46:12 +01:00
parent 10f4743b25
commit cd2c88b43b
8 changed files with 240 additions and 89 deletions

3
Cargo.lock generated
View file

@ -478,7 +478,10 @@ name = "lifx-mqtt-bridge"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lifxi 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lifxi 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rumqtt 0.30.0 (git+https://github.com/AtherEnergy/rumqtt)", "rumqtt 0.30.0 (git+https://github.com/AtherEnergy/rumqtt)",
"serde 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -10,3 +10,6 @@ lifxi = "0.1"
clap = "2" clap = "2"
serde = "1" serde = "1"
serde_derive = "1" serde_derive = "1"
regex = "1"
lazy_static = "1"
crossbeam-channel = "0.3"

View file

@ -1,14 +1,22 @@
use crate::light::Light; use crate::light::{Command, Light, Status};
use lifxi::http::prelude::*; use lifxi::http::prelude::*;
pub struct Lifx { pub struct Lifx {
client: Client, client: Client,
updates: crossbeam_channel::Sender<Status>,
commands: crossbeam_channel::Receiver<Command>,
} }
impl Lifx { impl Lifx {
pub fn new<S: ToString>(secret: S) -> Self { pub fn new<S: ToString>(
secret: S,
updates: crossbeam_channel::Sender<Status>,
commands: crossbeam_channel::Receiver<Command>,
) -> Self {
Lifx { Lifx {
client: Client::new(secret), client: Client::new(secret),
updates,
commands,
} }
} }
@ -22,7 +30,7 @@ impl Lifx {
.unwrap() .unwrap()
} }
pub fn set_power(&self, id: String, state: bool) -> Result<(), lifxi::http::Error> { fn set_power(&self, id: String, state: bool) -> Result<(), lifxi::http::Error> {
self.client self.client
.select(Selector::Id(id)) .select(Selector::Id(id))
.change_state() .change_state()
@ -31,7 +39,7 @@ impl Lifx {
.and(Ok(())) .and(Ok(()))
} }
pub fn set_brightness(&self, id: String, brightness: f32) -> Result<(), lifxi::http::Error> { fn set_brightness(&self, id: String, brightness: f32) -> Result<(), lifxi::http::Error> {
self.client self.client
.select(Selector::Id(id)) .select(Selector::Id(id))
.change_state() .change_state()

View file

@ -1,4 +1,3 @@
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct Color { pub struct Color {
pub hue: f32, pub hue: f32,
@ -16,3 +15,54 @@ pub struct Light {
pub brightness: f32, pub brightness: f32,
} }
const POWER: &str = "power";
const BRIGHTNESS: &str = "brightness";
pub enum Value {
Power(String),
Brightness(f32),
}
impl Value {
pub fn new(label: &str, value: Vec<u8>) -> Self {
match label {
POWER => Value::Power(String::from_utf8(value).unwrap()),
BRIGHTNESS => Value::Brightness(Self::vec_to_f32(value)),
_ => unimplemented!(),
}
}
fn vec_to_f32(vec: Vec<u8>) -> f32 {
assert!(vec.len() == 4);
let mut value_u32: u32 = 0;
for val in vec.clone() {
value_u32 = value_u32 << 8;
value_u32 = value_u32 | val as u32;
}
println!("{:?} -> {}", vec, value_u32);
f32::from_bits(value_u32)
}
pub fn unravel(self) -> (&'static str, Vec<u8>) {
match self {
Value::Power(val) => (POWER, val.into_bytes()),
Value::Brightness(val) => (BRIGHTNESS, Vec::new()),
}
}
}
pub struct Command {
pub lampname: String,
pub command: Value,
}
pub struct Update {
pub lampname: String,
pub status: Value,
}
pub enum Status {
Update(Update),
New(Light),
Remove(String),
}

View file

@ -1,14 +1,20 @@
extern crate clap; extern crate clap;
#[macro_use]
extern crate lazy_static;
extern crate regex;
#[macro_use]
extern crate serde_derive;
mod lifx; mod lifx;
mod light; mod light;
mod mqtt; mod mqtt;
mod mqtt_commands;
mod mqtt_updates;
use crate::mqtt::Mqtt;
use clap::App; use clap::App;
use clap::Arg; use clap::Arg;
#[macro_use] use crossbeam_channel::unbounded;
extern crate serde_derive; use std::thread;
pub const MQTT_ID: &str = "lifx-mqtt-bridge"; pub const MQTT_ID: &str = "lifx-mqtt-bridge";
@ -45,28 +51,19 @@ fn main() {
let lifx_secret = matches.value_of("lifx_secret").unwrap(); let lifx_secret = matches.value_of("lifx_secret").unwrap();
println!("Connecting to {}:{}", host, port); println!("Connecting to {}:{}", host, port);
let mut mqtt = match Mqtt::connect(host, port) { let (s_commands, r_commands) = unbounded();
let (s_updates, r_updates) = unbounded();
let (mqtt_commands, mut mqtt_updates) = match mqtt::mqtt_connect(host, port, s_commands, r_updates)
{
Ok(mqtt) => mqtt, Ok(mqtt) => mqtt,
Err(err) => panic!("Error connecting: {}", err), Err(err) => panic!("Error connecting: {}", err),
}; };
let lifx_client = lifx::Lifx::new(lifx_secret); let lifx_client = lifx::Lifx::new(lifx_secret, s_updates, r_commands);
let lights = lifx_client.find_lights();
println!("lights: {:#?}", lights);
for light in lights { thread::spawn(move || mqtt_commands.listen());
mqtt.add_light(&light.id, &light.label); thread::spawn(move || mqtt_updates.listen());
}
loop { loop {}
match mqtt.notifications.recv() {
Ok(notification) => {
println!("MQTT notification received: {:#?}", notification);
}
Err(recv_error) => {
println!("MQTT channel closed: {}", recv_error);
return;
}
}
}
} }

View file

@ -1,17 +1,15 @@
use crate::light::{Command, Status};
use crate::mqtt_commands::MqttCommands;
use crate::mqtt_updates::MqttUpdates;
use rumqtt; use rumqtt;
use rumqtt::LastWill; use rumqtt::{ConnectionMethod, LastWill, MqttClient, MqttOptions, QoS, ReconnectOptions};
use rumqtt::MqttClient;
use rumqtt::Notification;
use rumqtt::Receiver;
use rumqtt::{ConnectionMethod, MqttOptions, QoS, ReconnectOptions};
pub struct Mqtt { pub fn mqtt_connect(
pub client: MqttClient, host: &str,
pub notifications: Receiver<Notification>, port: u16,
} commands: crossbeam_channel::Sender<Command>,
updates: crossbeam_channel::Receiver<Status>,
impl Mqtt { ) -> Result<(MqttCommands, MqttUpdates), String> {
pub fn connect(host: &str, port: u16) -> Result<Self, String> {
let last_will = LastWill { let last_will = LastWill {
topic: format!("{}/status", crate::MQTT_ID), topic: format!("{}/status", crate::MQTT_ID),
message: "disconnected".to_string(), message: "disconnected".to_string(),
@ -33,38 +31,13 @@ impl Mqtt {
true, true,
"connected", "connected",
) { ) {
Ok(()) => Ok(Mqtt { Ok(()) => Ok((
client, MqttCommands::new(notifications, commands),
notifications, MqttUpdates::new(client, updates),
}), )),
Err(conn_err) => Err(conn_err.to_string()), Err(conn_err) => Err(conn_err.to_string()),
} }
} }
Err(conn_err) => Err(conn_err.to_string()), Err(conn_err) => Err(conn_err.to_string()),
} }
} }
pub fn add_light(&mut self, id: &str, label: &str) -> Result<(), rumqtt::ClientError> {
self.client.publish(
format!("{}/lights", crate::MQTT_ID),
QoS::AtLeastOnce,
false,
format!("{}:{}", id, label),
)?;
self.client.publish(
format!("{}/{}/status/connected", crate::MQTT_ID, label),
QoS::AtLeastOnce,
false,
"true",
)?;
self.client.subscribe(
format!("{}/{}/command/power", crate::MQTT_ID, label),
QoS::AtLeastOnce,
)?;
self.client.subscribe(
format!("{}/{}/command/brightness", crate::MQTT_ID, label),
QoS::AtLeastOnce,
)?;
Ok(())
}
}

60
src/mqtt_commands.rs Normal file
View file

@ -0,0 +1,60 @@
use crate::light::{Command, Value};
use regex::Regex;
use rumqtt;
use rumqtt::{Notification, Publish, Receiver};
pub struct MqttCommands {
notifications: Receiver<Notification>,
commands: crossbeam_channel::Sender<Command>,
}
impl MqttCommands {
pub fn new(
notifications: Receiver<Notification>,
commands: crossbeam_channel::Sender<Command>,
) -> Self {
MqttCommands {
notifications,
commands,
}
}
pub fn listen(&self) {
loop {
match self.notifications.recv() {
Ok(notification) => {
println!("MQTT notification received: {:#?}", notification);
match notification {
Notification::Publish(data) => self.handle_publish(data),
Notification::PubAck(_) => {}
Notification::PubRec(_) => {}
Notification::PubRel(_) => {}
Notification::PubComp(_) => {}
Notification::SubAck(_) => {}
Notification::None => {}
}
}
Err(recv_error) => {
println!("MQTT channel closed: {}", recv_error);
}
}
}
}
fn handle_publish(&self, data: Publish) {
lazy_static! {
static ref matchStr: String = format!(r"^{}/(\w+)/command/(\w+)$", crate::MQTT_ID);
static ref RE: Regex = Regex::new(&matchStr).unwrap();
}
let mut matching = RE.find_iter(&data.topic_name);
let lamp = matching.next().unwrap().as_str();
let command = matching.next().unwrap().as_str();
self.commands
.send(Command {
lampname: lamp.to_owned(),
command: Value::new(command, data.payload.to_vec()),
})
.unwrap();
}
}

57
src/mqtt_updates.rs Normal file
View file

@ -0,0 +1,57 @@
use crate::light::Status;
use rumqtt;
use rumqtt::{MqttClient, QoS};
pub struct MqttUpdates {
client: MqttClient,
updates: crossbeam_channel::Receiver<Status>,
}
impl MqttUpdates {
pub fn new(client: MqttClient, updates: crossbeam_channel::Receiver<Status>) -> Self {
MqttUpdates { client, updates }
}
pub fn add_light(&mut self, id: &str, lampname: &str) -> Result<(), rumqtt::ClientError> {
self.client.publish(
format!("{}/lights", crate::MQTT_ID),
QoS::AtLeastOnce,
false,
format!("{}:{}", id, lampname),
)?;
self.client.publish(
format!("{}/{}/status/connected", crate::MQTT_ID, lampname),
QoS::AtLeastOnce,
false,
"true",
)?;
self.client.subscribe(
format!("{}/{}/command/power", crate::MQTT_ID, lampname),
QoS::AtLeastOnce,
)?;
self.client.subscribe(
format!("{}/{}/command/brightness", crate::MQTT_ID, lampname),
QoS::AtLeastOnce,
)?;
Ok(())
}
pub fn listen(&mut self) {
while let Ok(status) = self.updates.recv() {
match status {
Status::New(light) => self.add_light(&light.id, &light.label).unwrap(),
Status::Remove(_name) => unimplemented!(),
Status::Update(update) => {
let (detail, value) = update.status.unravel();
self.client
.publish(
format!("{}/{}/status/{}", crate::MQTT_ID, update.lampname, detail),
QoS::AtLeastOnce,
true,
value,
)
.unwrap();
}
}
}
}
}