261 lines
8.8 KiB
Python
261 lines
8.8 KiB
Python
|
|
""" View and control devices of your HomeAssistant.io instance.
|
|
|
|
Synopsis: <trigger> <entity filter>"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import requests
|
|
from albert import Item, ClipAction, FuncAction, UrlAction, configLocation
|
|
from albert import critical, warning, debug
|
|
|
|
__title__ = "Home Assistant"
|
|
__version__ = "0.0.1"
|
|
__triggers__ = "ha "
|
|
__authors__ = "hoellen"
|
|
__py_deps__ =["requests"]
|
|
|
|
|
|
# global variables
|
|
config = {}
|
|
configurationFileName = "homeassistant_config.json"
|
|
configuration_directory = os.path.join(configLocation())
|
|
configuration_file = os.path.join(configuration_directory, configurationFileName)
|
|
config["sort_order"] = {"light": 1, "switch": 1, "scene": 2, "group": 2, "automation": 3}
|
|
|
|
icon_files = {
|
|
"logo": "icons/icon.png",
|
|
"automation": "icons/automation.png",
|
|
"cover": "icons/cover.png",
|
|
"group": "icons/group.png",
|
|
"light": "icons/light.png",
|
|
"scene": "icons/scene.png",
|
|
"switch": "icons/switch.png",
|
|
}
|
|
|
|
toggle_types = ["light", "switch", "automation", "group", "script", "input_boolean", "climate", "camera"]
|
|
on_off_types = ["scene", "media_player"] # use turn_{on,off} instead of toggle
|
|
|
|
|
|
def initialize():
|
|
global config
|
|
|
|
# load HASS config
|
|
if os.path.exists(configuration_file):
|
|
with open(configuration_file) as json_config:
|
|
config.update(json.load(json_config))
|
|
else:
|
|
config["hass_url"] = "http://192.168.1.5:8123"
|
|
config["hass_key"] = ""
|
|
try:
|
|
os.makedirs(configuration_directory, exist_ok=True)
|
|
try:
|
|
with open(configuration_file, "w") as output_file:
|
|
json.dump(config, output_file, indent=4)
|
|
except OSError:
|
|
critical("There was an error opening the file: %s" % configuration_file)
|
|
except OSError:
|
|
critical("There was an error making the directory: %s" % configuration_directory)
|
|
|
|
# Set up HASS state query
|
|
config["hass_url"] = config["hass_url"].strip("/")
|
|
config["state_query"] = config["hass_url"] + "/api/states"
|
|
config["headers"]= {
|
|
"Authorization": "Bearer " + config["hass_key"],
|
|
"content-type": "application/json",
|
|
}
|
|
|
|
# Set up sort_order
|
|
config["sort_order"]["."] = 999 # Wildcard for sorting
|
|
config["sort_order"] = RegexDict(config["sort_order"])
|
|
|
|
debug("Loaded config: " + str(config))
|
|
|
|
|
|
# check config
|
|
if not config["hass_url"]:
|
|
critical("Invalid Home Assistant URL")
|
|
|
|
# Trim hass URL
|
|
config["hass_url"] = config["hass_url"].strip("/")
|
|
|
|
if not config["hass_key"]:
|
|
critical("Empty Home Assistant API Key")
|
|
|
|
|
|
def handleQuery(query):
|
|
global config
|
|
|
|
if query.isTriggered:
|
|
query.disableSort()
|
|
|
|
if not query.isValid:
|
|
return
|
|
|
|
if "hass_url" not in config or not config["hass_key"]:
|
|
return Item(id=__title__,
|
|
icon=os.path.dirname(__file__) + "/" + icon_files["logo"],
|
|
text="Invalid Home Assistant URL",
|
|
subtext=config["hass_url"])
|
|
|
|
|
|
if "hass_key" not in config or not config["hass_key"]:
|
|
return Item(id=__title__,
|
|
icon=os.path.dirname(__file__) + "/" + icon_files["logo"],
|
|
text="Empty Home Assistant API Key",
|
|
subtext="Please add you Home Assistant API Key to your config."
|
|
)
|
|
|
|
return showEntities(query)
|
|
|
|
|
|
def showEntities(query):
|
|
|
|
results = []
|
|
|
|
# empty query
|
|
if not query.string.strip():
|
|
return Item(id=__title__,
|
|
icon=os.path.dirname(__file__) + "/" + icon_files["logo"],
|
|
text=__title__,
|
|
subtext="Enter the name of the entity which you want to control",
|
|
actions=[UrlAction("Open in Browser", config["hass_url"])])
|
|
|
|
entity_query_list = query.string.split()
|
|
|
|
# Query entities from HASS
|
|
try:
|
|
response = requests.get(config["state_query"], headers=config["headers"])
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as error:
|
|
critical(str(error))
|
|
return Item(id=__title__,
|
|
icon=os.path.dirname(__file__) + "/" + icon_files["logo"],
|
|
text="Error while getting entity states from Home Assistant",
|
|
subtext=str(error))
|
|
|
|
# Sort entries by class
|
|
entities = sorted(response.json(), key=lambda e:
|
|
config["sort_order"].get_matching(e["entity_id"].split(".")[0]))
|
|
|
|
# Sort entries by query matching
|
|
entities = sorted(entities, key=lambda e:
|
|
RegexDict({query.string.lower() + ".": 0, ".": 1})
|
|
.get_matching(e["attributes"]["friendly_name"].lower()
|
|
if "friendly_name" in e["attributes"] else e["entity_id"]))
|
|
|
|
# Parse all entities and states
|
|
for entity in entities:
|
|
|
|
# number of items
|
|
if len(results) > 10:
|
|
break
|
|
|
|
# Not likely, but worth checking
|
|
if "entity_id" not in entity or "attributes" not in entity:
|
|
continue
|
|
|
|
# Use entity_id if friendly_name is not given
|
|
if "friendly_name" not in entity["attributes"]:
|
|
entity["attributes"]["friendly_name"] = entity["entity_id"]
|
|
|
|
entity_class = entity["entity_id"].split(".")[0]
|
|
|
|
# Don't add this item if the query doesn't appear in either friendly_name or id
|
|
entity_appears_in_search = True
|
|
for entity_query_item in entity_query_list:
|
|
if (
|
|
entity_query_item.lower()
|
|
not in entity["attributes"]["friendly_name"].lower()
|
|
and entity_query_item.lower() not in entity["entity_id"]
|
|
):
|
|
entity_appears_in_search = False
|
|
|
|
if not entity_appears_in_search:
|
|
debug("Exclude from search %s" % entity["entity_id"])
|
|
continue
|
|
|
|
entity_icon = os.path.dirname(__file__) + "/" + (
|
|
icon_files[entity_class]
|
|
if entity_class in icon_files
|
|
else icon_files["logo"]
|
|
)
|
|
|
|
data = {
|
|
"endpoint": "{}/api/services/".format(config["hass_url"]),
|
|
"service_data": {"entity_id": entity["entity_id"]},
|
|
}
|
|
state = entity["state"]
|
|
state_colored = "<font color=\"{}\">{}</font>".format(
|
|
"Green" if entity["state"] == "on" else
|
|
"Red" if entity["state"] == "off" else "",
|
|
entity["state"].capitalize()
|
|
)
|
|
|
|
if state == "unavailable" and "unavailable" not in entity:
|
|
entity["unavailable"] = True
|
|
entities.append(entity)
|
|
continue
|
|
|
|
# Build item for list
|
|
item = Item(id=__title__,
|
|
icon=entity_icon,
|
|
text=entity["attributes"]["friendly_name"],
|
|
completion="%s%s" % (__triggers__, entity["attributes"]["friendly_name"]),
|
|
subtext="%s | %s" % (state_colored, entity_class.capitalize())
|
|
)
|
|
|
|
# Add actions depending on class
|
|
if entity_class in toggle_types:
|
|
item.addAction(FuncAction("Toggle", lambda d=data: sendCommand(d, "homeassistant/toggle")))
|
|
|
|
if entity_class in on_off_types:
|
|
on_or_off = "on" if state != "on" else "off"
|
|
item.addAction(FuncAction("Turn %s" % (on_or_off), lambda d=data: sendCommand(d, "homeassistant/turn_%s" % (on_or_off))))
|
|
|
|
if entity_class == "cover":
|
|
open_or_close = "open" if state != "open" else "close"
|
|
item.addAction(FuncAction("%s Cover" % (open_or_close.capitalize()), lambda d=data: sendCommand(d, "cover/%s_cover") % (open_or_close)))
|
|
|
|
if entity_class == "automation":
|
|
item.addAction(FuncAction("Trigger", lambda d=data: sendCommand(d, "automation/trigger")))
|
|
|
|
item.addAction(ClipAction("Copy ID", entity["entity_id"]))
|
|
|
|
results.append(item)
|
|
|
|
# No entity found
|
|
if len(results) == 0:
|
|
results.append(
|
|
Item(id=__title__,
|
|
icon=os.path.dirname(__file__) + "/" + icon_files["logo"],
|
|
text="Entity not found!",
|
|
subtext="Please search for another entity."
|
|
)
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
def sendCommand(data, service):
|
|
data["endpoint"] += service
|
|
debug(__title__ + ": Sending command \"" + service + "\" to " + data["service_data"]["entity_id"])
|
|
|
|
# Make POST request to HA service
|
|
try:
|
|
response = requests.post(
|
|
data["endpoint"],
|
|
data=json.dumps(data["service_data"]),
|
|
headers=config["headers"],
|
|
)
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as error:
|
|
warning("Error while sending command to Home Assistant:\n%s" % (str(error)))
|
|
|
|
# New dictionary class to match keys with regex
|
|
class RegexDict(dict):
|
|
def get_matching(self, event):
|
|
return next(self[key] for key in self if re.match(key, event))
|
|
|