mirror of
https://github.com/espressif/esp-rainmaker.git
synced 2026-01-19 05:37:09 +00:00
380 lines
14 KiB
Python
380 lines
14 KiB
Python
# Copyright 2020 Espressif Systems (Shanghai) PTE LTD
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import json
|
|
import errno
|
|
import os
|
|
import base64
|
|
import time
|
|
import requests
|
|
from pathlib import Path
|
|
from os import path
|
|
|
|
from rmaker_lib import serverconfig
|
|
from rmaker_lib.exceptions import NetworkError,\
|
|
InvalidConfigError,\
|
|
InvalidUserError,\
|
|
InvalidApiVersionError,\
|
|
ExpiredSessionError,\
|
|
SSLError
|
|
from rmaker_lib.logger import log
|
|
|
|
CONFIG_DIRECTORY = '.espressif/rainmaker'
|
|
CONFIG_FILE = CONFIG_DIRECTORY + '/rainmaker_config.json'
|
|
HOME_DIRECTORY = '~/'
|
|
CURR_DIR = os.path.dirname(__file__)
|
|
CERT_FILE = CURR_DIR + '/../server_cert/server_cert.pem'
|
|
|
|
|
|
class Config:
|
|
"""
|
|
Config class used to instantiate instances of config to
|
|
perform various get/set configuration operations
|
|
"""
|
|
def set_config(self, data, config_file=CONFIG_FILE):
|
|
"""
|
|
Set the configuration file.
|
|
|
|
:params data: Config Data to write to file
|
|
:type data: dict
|
|
|
|
:params config_file: Config filename to write config data to
|
|
:type data: str
|
|
|
|
:raises OSError: If there is an OS issue while creating new directory
|
|
for config file
|
|
:raises Exception: If there is a File Handling error while saving
|
|
config to file
|
|
|
|
:return: None on Success and Failure
|
|
:rtype: None
|
|
"""
|
|
log.info("Configuring config file.")
|
|
file_dir = Path(path.expanduser(HOME_DIRECTORY + CONFIG_DIRECTORY))
|
|
file = Path(path.expanduser(HOME_DIRECTORY) + config_file)
|
|
if not file.exists():
|
|
try:
|
|
if not file_dir.exists():
|
|
log.debug('Config directory does not exist,'
|
|
'creating new directory.')
|
|
os.makedirs(path.expanduser(HOME_DIRECTORY) +
|
|
CONFIG_DIRECTORY)
|
|
except OSError as set_config_err:
|
|
log.error(set_config_err)
|
|
if set_config_err.errno != errno.EEXIST:
|
|
raise set_config_err
|
|
try:
|
|
with open(path.join(path.expanduser(HOME_DIRECTORY),
|
|
config_file), 'w') as config_file:
|
|
json.dump(data, config_file)
|
|
except Exception as set_config_err:
|
|
raise set_config_err
|
|
log.info("Configured config file successfully.")
|
|
|
|
def get_config(self, node=None, config_file=CONFIG_FILE):
|
|
"""
|
|
Get the configuration details from config file.
|
|
|
|
:params node: Name of node
|
|
:type node: str
|
|
|
|
:params config_file: Config filename to read config data from
|
|
:type data: str
|
|
|
|
:raises Exception: If there is a File Handling error while reading
|
|
from config file
|
|
|
|
:return:
|
|
idtoken - Id Token from config saved\n
|
|
refreshtoken - Refresh Token from config saved\n
|
|
accesstoken - Access Token from config saved\n
|
|
:rtype: str
|
|
"""
|
|
file = Path(path.expanduser(HOME_DIRECTORY) + config_file)
|
|
if not file.exists():
|
|
if node and node.lower() == 'esp32':
|
|
return None
|
|
else:
|
|
raise InvalidUserError
|
|
try:
|
|
with open(path.join(path.expanduser(HOME_DIRECTORY),
|
|
config_file), 'r') as config_file:
|
|
data = json.load(config_file)
|
|
if node and node.lower() == "esp32":
|
|
return data
|
|
else:
|
|
idtoken = data['idtoken']
|
|
refresh_token = data['refreshtoken']
|
|
access_token = data['accesstoken']
|
|
except Exception as get_config_err:
|
|
raise get_config_err
|
|
return idtoken, refresh_token, access_token
|
|
|
|
def get_binary_config(self, node=None, config_file=CONFIG_FILE):
|
|
"""
|
|
Get the configuration details from binary config file.
|
|
|
|
:params node: Name of node
|
|
:type node: str
|
|
|
|
:params config_file: Config filename to read config data from
|
|
:type data: str
|
|
|
|
:raises Exception: If there is a File Handling error while reading
|
|
from config file
|
|
|
|
:return: Config data read from file on Success, None on Failure
|
|
:rtype: str | None
|
|
"""
|
|
file = Path(path.expanduser(HOME_DIRECTORY) + config_file)
|
|
if not file.exists():
|
|
if node and node.lower() == 'esp32':
|
|
return None
|
|
try:
|
|
with open(path.join(path.expanduser(HOME_DIRECTORY),
|
|
config_file), 'rb') as config_file:
|
|
data = config_file.read()
|
|
if node and node.lower() == "esp32":
|
|
return data
|
|
except Exception as get_config_err:
|
|
raise get_config_err
|
|
return
|
|
|
|
def update_config(self, access_token, id_token):
|
|
"""
|
|
Update the configuration file.
|
|
|
|
:params access_token: Access Token to update in config file
|
|
:type access_token: str
|
|
|
|
:params id_token: Id Token to update in config file
|
|
:type id_token: str
|
|
|
|
:raises OSError: If there is an OS issue while creating new directory
|
|
for config file
|
|
:raises Exception: If there is a FILE Handling error while reading
|
|
from/writing config to file
|
|
|
|
:return: None on Success and Failure
|
|
:rtype: None
|
|
"""
|
|
file = Path(path.expanduser(HOME_DIRECTORY) + CONFIG_FILE)
|
|
if not file.exists():
|
|
try:
|
|
os.makedirs(path.expanduser(HOME_DIRECTORY) + CONFIG_DIRECTORY)
|
|
except OSError as set_config_err:
|
|
if set_config_err.errno != errno.EEXIST:
|
|
raise set_config_err
|
|
try:
|
|
with open(path.join(path.expanduser(HOME_DIRECTORY),
|
|
CONFIG_FILE), 'r') as config_file:
|
|
config_data = json.load(config_file)
|
|
config_data['accesstoken'] = access_token
|
|
config_data['idtoken'] = id_token
|
|
with open(path.join(path.expanduser(HOME_DIRECTORY),
|
|
CONFIG_FILE), 'w') as config_file:
|
|
json.dump(config_data, config_file)
|
|
except Exception as set_config_err:
|
|
raise set_config_err
|
|
|
|
def get_token_attribute(self, attribute_name, is_access_token=False):
|
|
"""
|
|
Get access token attributes.
|
|
|
|
:params attribute_name: Attribute Name
|
|
:type attribute_name: str
|
|
|
|
:params is_access_token: Is Access Token
|
|
:type is_access_token: bool
|
|
|
|
:raises InvalidConfigError: If there is an error in the config
|
|
:raises Exception: If there is a File Handling error while reading
|
|
from/writing config to file
|
|
|
|
:return: Attribute Value on Success, None on Failure
|
|
:rtype: int | str | None
|
|
"""
|
|
if is_access_token:
|
|
log.debug('Getting access token for attribute ' + attribute_name)
|
|
_, _, token = self.get_config()
|
|
else:
|
|
log.debug('Getting idtoken for attribute ' + attribute_name)
|
|
token, _, _ = self.get_config()
|
|
token_payload = token.split('.')[1]
|
|
if len(token_payload) % 4:
|
|
token_payload += '=' * (4 - len(token_payload) % 4)
|
|
try:
|
|
str_token_payload = base64.b64decode(token_payload).decode("utf-8")
|
|
# If user is logged in through github then to extend session
|
|
# we need 'cognito:username' (Github generated username) as email
|
|
if attribute_name == 'email':
|
|
if 'identities' in json.loads(str_token_payload):
|
|
return json.loads(str_token_payload)['cognito:username']
|
|
attribute_value = json.loads(str_token_payload)[attribute_name]
|
|
except Exception:
|
|
raise InvalidConfigError
|
|
if attribute_value is None:
|
|
raise InvalidConfigError
|
|
return attribute_value
|
|
|
|
def get_access_token(self):
|
|
"""
|
|
Get Access Token for User
|
|
|
|
:raises InvalidConfigError: If there is an issue in getting config
|
|
from file
|
|
|
|
:return: Access Token on Success
|
|
:rtype: str
|
|
"""
|
|
_, _, access_token = self.get_config()
|
|
if access_token is None:
|
|
raise InvalidConfigError
|
|
if self.__is_valid_token() is False:
|
|
username = self.get_token_attribute('email')
|
|
refresh_token = self.get_refresh_token()
|
|
access_token, id_token = self.__get_new_token(username,
|
|
refresh_token)
|
|
self.update_config(access_token, id_token)
|
|
return access_token
|
|
|
|
def get_user_id(self):
|
|
"""
|
|
Get User Id
|
|
|
|
:return: Attribute value for attribute name passed
|
|
:rtype: str
|
|
"""
|
|
return self.get_token_attribute('custom:user_id')
|
|
|
|
def get_refresh_token(self):
|
|
"""
|
|
Get Refresh Token
|
|
|
|
:raises InvalidApiVersionError: If current API version is not supported
|
|
|
|
:return: Refresh Token
|
|
:rtype: str
|
|
"""
|
|
if self.__is_valid_version() is False:
|
|
raise InvalidApiVersionError
|
|
_, refresh_token, _ = self.get_config()
|
|
return refresh_token
|
|
|
|
def __is_valid_token(self):
|
|
"""
|
|
Check if access token is valid i.e. login session is still active
|
|
or session is expired
|
|
|
|
:return True on Success and False on Failure
|
|
:rtype: bool
|
|
"""
|
|
log.info("Checking for session timeout.")
|
|
exp_timestamp = self.get_token_attribute('exp', is_access_token=True)
|
|
current_timestamp = int(time.time())
|
|
if exp_timestamp > current_timestamp:
|
|
return True
|
|
log.info("Session expired.")
|
|
return False
|
|
|
|
def __is_valid_version(self):
|
|
"""
|
|
Check if API Version is valid
|
|
|
|
:raises NetworkError: If there is a network connection issue during
|
|
HTTP request for getting version
|
|
:raises Exception: If there is an HTTP issue or JSON format issue in
|
|
HTTP response
|
|
|
|
:return: True on Success, False on Failure
|
|
:rtype: bool
|
|
"""
|
|
log.info("Checking for supported version.")
|
|
path = 'apiversions'
|
|
request_url = serverconfig.HOST.split(serverconfig.VERSION)[0] + path
|
|
try:
|
|
log.debug("Version check request url : " + request_url)
|
|
response = requests.get(url=request_url, verify=CERT_FILE)
|
|
log.debug("Version check response : " + response.text)
|
|
response.raise_for_status()
|
|
except requests.exceptions.SSLError:
|
|
raise SSLError
|
|
except requests.exceptions.ConnectionError:
|
|
raise NetworkError
|
|
except Exception as ver_err:
|
|
raise ver_err
|
|
|
|
try:
|
|
response = json.loads(response.text)
|
|
except Exception as json_decode_err:
|
|
raise json_decode_err
|
|
|
|
if 'supported_versions' in response:
|
|
supported_versions = response['supported_versions']
|
|
if serverconfig.VERSION in supported_versions:
|
|
supported_versions.sort()
|
|
latest_version = supported_versions[len(supported_versions)
|
|
- 1]
|
|
if serverconfig.VERSION < latest_version:
|
|
print('Please check the updates on GitHub for newer'
|
|
'functionality enabled by ' + latest_version +
|
|
' APIs.')
|
|
return True
|
|
return False
|
|
|
|
def __get_new_token(self, username, refresh_token):
|
|
"""
|
|
Get new token for User Login Session
|
|
|
|
:raises NetworkError: If there is a network connection issue during
|
|
HTTP request for getting token
|
|
:raises Exception: If there is an HTTP issue or JSON format issue in
|
|
HTTP response
|
|
|
|
:return: accesstoken and idtoken on Success, None on Failure
|
|
:rtype: str | None
|
|
|
|
"""
|
|
log.info("Extending user login session.")
|
|
path = 'login'
|
|
request_payload = {
|
|
'user_name': username,
|
|
'refreshtoken': refresh_token
|
|
}
|
|
|
|
request_url = serverconfig.HOST + path
|
|
try:
|
|
log.debug("Extend session url : " + request_url)
|
|
response = requests.post(url=request_url,
|
|
data=json.dumps(request_payload),
|
|
verify=CERT_FILE)
|
|
response.raise_for_status()
|
|
log.debug("Extend session response : " + response.text)
|
|
except requests.exceptions.SSLError:
|
|
raise SSLError
|
|
except requests.exceptions.ConnectionError:
|
|
raise NetworkError
|
|
except Exception:
|
|
raise ExpiredSessionError
|
|
|
|
try:
|
|
response = json.loads(response.text)
|
|
except Exception:
|
|
raise ExpiredSessionError
|
|
|
|
if 'accesstoken' in response and 'idtoken' in response:
|
|
log.info("User session extended successfully.")
|
|
return response['accesstoken'], response['idtoken']
|
|
return None
|