# _ _ _ _____ _ _ _____ _ _ ___ ___ _ __
# /_\ | | |_ _| |_ (_)_ _ __ _ __|_ _|_ _| | |__ / __| \| |/ /
# / _ \| | | | | | ' \| | ' \/ _` (_-< | |/ _` | | / / \__ \ |) | ' <
# /_/ \_\_|_| |_| |_||_|_|_||_\__, /__/ |_|\__,_|_|_\_\ |___/___/|_|\_\
# |___/
#
# Copyright 2017 AllThingsTalk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import paho.mqtt.client as paho_mqtt
import requests
from .assets import Asset
from .asset_state import AssetState
from .exceptions import AssetStateRetrievalException, AccessForbiddenException
logger = logging.getLogger('allthingstalk')
[docs]class BaseClient:
"""BaseClient is a base class used for implementing AllThingsTalk Platform
clients, which are used for interfacing the SDK code with the Platform. It
doesn't implement any of the client methods."""
def _attach_device(self, device):
pass
def get_assets(self, device_id):
raise NotImplementedError('get_assets not implemented')
def create_asset(self, device_id, asset):
raise NotImplementedError('create_asset not implemented')
def get_asset_state(self, device_id, asset_name):
raise NotImplementedError('get_asset_state not implemented')
def publish_asset_state(self, device_id, asset_name, value):
raise NotImplementedError('publish_asset_state not implemented')
[docs]class Client(BaseClient):
"""Client is the recommended class used for connecting to AllThingsTalk
Platform, that uses HTTP and MQTT in the background. By default, it
connects to api.allthingstalk.io."""
[docs] def __init__(self, token, *, api='api.allthingstalk.io', http=None, mqtt=None):
"""Initializes the Client with an AllThingsTalk token, and optional endpoints.
:param str token: AllThingsTalk Token, e.g. a Device Token
:param str api: AllThingsTalk API endpoint, shared by HTTP & MQTT
:param str http: AllThingsTalk HTTP endpoint. Resolved from api by default
:param str mqtt: AllThingsTalk MQTT endpoint. Resolved from api by default
"""
self.token = token
def prefix_http(url):
return url if url.startswith('http') else 'https://%s' % url
if api.startswith('http://') or api.startswith('https://'):
api = api.split('//', 1)[1]
# HTTP Client
if http:
self.http = prefix_http(http)
elif api:
self.http = prefix_http(api)
else:
raise ValueError('Either api or http must be set.')
# MQTT Client
if mqtt:
if ':' in mqtt:
host, port = mqtt.split(':')[:2]
else:
host, port = mqtt, '1883'
self.mqtt = self._make_mqtt_client(host, port, token)
elif api:
host, port = host, port = api, '1883'
self.mqtt = self._make_mqtt_client(host, port, token)
else:
self.mqtt = None
self._devices = {}
def _make_mqtt_client(self, host, port, token):
def on_mqtt_connect(client, userdata, rc):
logger.debug('MQTT client connected to %s:%s' % (host, port))
def on_mqtt_disconnect(client, userdata, rc):
logger.debug('MQTT client disconnected with status %s' % rc)
def on_mqtt_subscribe(client, userdata, mid, granted_qos):
pass
def on_mqtt_message(client, userdata, message):
logger.debug('MQTT client received a message on topic "%s": %s'
% (message.topic, message.payload))
parts = message.topic.split('/')
_, device_id, _, asset_name, stream = parts
self._devices[device_id]._on_message(stream, asset_name, message.payload)
client = paho_mqtt.Client()
client.username_pw_set(token, token)
client.on_connect = on_mqtt_connect
client.on_disconnect = on_mqtt_disconnect
client.on_message = on_mqtt_message
client.on_subscribe = on_mqtt_subscribe
client.connect(host, int(port), 60)
client.loop_start()
return client
def _attach_device(self, device):
if self.mqtt:
logger.debug('Client %s attaching device %s' % (self, device.id))
for action in ['feed', 'command']:
logger.debug('Subscribing to %s\'s %ss' % (device.id, action))
self.mqtt.subscribe('device/%s/asset/+/%s' % (device.id, action))
self._devices[device.id] = device
[docs] def get_assets(self, device_id):
"""Retrieves assets for the device identified by device_id.
:param str device_id: AllThingsTalk Device Identifier
:return: Asset list returned by AllThingsTalk API.
:rtype: list of Asset
"""
r = requests.get('%s/device/%s/assets' % (self.http, device_id),
headers={'Authorization': 'Bearer %s' % self.token})
if r.status_code == 403:
raise AccessForbiddenException('Could not use token "%s" to access device "%s" on "%s".'
% (self.token, device_id, self.http))
return [Asset.from_dict(asset_dict) for asset_dict in r.json()]
[docs] def create_asset(self, device_id, asset):
"""Creates a device asset.
:param str device_id: AllThingsTalk Device Identifier
:param Asset asset: The asset
:return: The asset
:rtype: Asset
"""
attalk_asset = {
'Name': asset.name,
'Title': asset.title,
'Description': asset.description,
'Is': asset.kind,
'Profile': asset.profile
}
asset_dict = requests.post('%s/device/%s/assets' % (self.http, device_id),
headers={'Authorization': 'Bearer %s' % self.token},
json=attalk_asset).json()
return Asset.from_dict(asset_dict)
[docs] def get_asset_state(self, device_id, asset_name):
"""Low-level device asset state retrieval. Most of the time,
you should be using device asset getters.
:param str device_id: AllThingsTalk Device Identifier
:param str asset_name: Asset name
:return: The Asset state
:rtype: AssetState
"""
r = requests.get('%s/device/%s/asset/%s/state' % (self.http, device_id, asset_name),
headers={'Authorization': 'Bearer %s' % self.token})
if r.status_code != 200:
raise AssetStateRetrievalException()
response_json = r.json()
return AssetState(
value=response_json['state']['value'],
at=response_json['state']['at'])
[docs] def publish_asset_state(self, device_id, asset_name, state):
"""Low-level device asset state publishing. Most of the time,
you should be using device asset setters.
:param str device_id: AllThingsTalk Device Identifier
:param str asset_name: Asset name
:param AssetState state: The asset state
"""
if isinstance(state, AssetState):
json_state = {'value': state.value, 'at': state.at.isoformat()}
else:
json_state = {'value': state}
requests.put('%s/device/%s/asset/%s/state' % (self.http, device_id, asset_name),
headers={'Authorization': 'Bearer %s' % self.token},
json=json_state)
def __del__(self):
try:
self.mqtt.loop_stop(force=True)
except:
pass