Files
esp-rainmaker/cli/rmaker_lib/configmanager.py
Shivani Tipnis 42ad3f89ac cli-provisioning: Fixed connectivity and import checks
Added checks and fixed error handling for poor internet connectivity

Fixed error handling checks while importing modules

Fixed mock imports in docs
2020-05-18 19:40:02 +05:30

380 lines
14 KiB
Python

# Copyright 2020 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import errno
import os
import base64
import time
import requests
import socket
from pathlib import Path
from os import path
from rmaker_lib import serverconfig
from rmaker_lib.exceptions import NetworkError,\
InvalidConfigError,\
InvalidUserError,\
InvalidApiVersionError,\
ExpiredSessionError,\
SSLError,\
RequestTimeoutError
from rmaker_lib.logger import log
CONFIG_DIRECTORY = '.espressif/rainmaker'
CONFIG_FILE = CONFIG_DIRECTORY + '/rainmaker_config.json'
HOME_DIRECTORY = '~/'
CURR_DIR = os.path.dirname(__file__)
CERT_FILE = CURR_DIR + '/../server_cert/server_cert.pem'
class Config:
"""
Config class used to instantiate instances of config to
perform various get/set configuration operations
"""
def set_config(self, data, config_file=CONFIG_FILE):
"""
Set the configuration file.
:params data: Config Data to write to file
:type data: dict
:params config_file: Config filename to write config data to
:type data: str
:raises OSError: If there is an OS issue while creating new directory
for config file
:raises Exception: If there is a File Handling error while saving
config to file
:return: None on Success and Failure
:rtype: None
"""
log.info("Configuring config file.")
file_dir = Path(path.expanduser(HOME_DIRECTORY + CONFIG_DIRECTORY))
file = Path(path.expanduser(HOME_DIRECTORY) + config_file)
if not file.exists():
try:
if not file_dir.exists():
log.debug('Config directory does not exist,'
'creating new directory.')
os.makedirs(path.expanduser(HOME_DIRECTORY) +
CONFIG_DIRECTORY)
except OSError as set_config_err:
log.error(set_config_err)
if set_config_err.errno != errno.EEXIST:
raise set_config_err
try:
with open(path.join(path.expanduser(HOME_DIRECTORY),
config_file), 'w') as config_file:
json.dump(data, config_file)
except Exception as set_config_err:
raise set_config_err
log.info("Configured config file successfully.")
def get_config(self, config_file=CONFIG_FILE):
"""
Get the configuration details from config file.
:params config_file: Config filename to read config data from
:type data: str
:raises Exception: If there is a File Handling error while reading
from config file
:return:
idtoken - Id Token from config saved\n
refreshtoken - Refresh Token from config saved\n
accesstoken - Access Token from config saved\n
:rtype: str
"""
file = Path(path.expanduser(HOME_DIRECTORY) + config_file)
if not file.exists():
raise InvalidUserError
try:
with open(path.join(path.expanduser(HOME_DIRECTORY),
config_file), 'r') as config_file:
data = json.load(config_file)
idtoken = data['idtoken']
refresh_token = data['refreshtoken']
access_token = data['accesstoken']
except Exception as get_config_err:
raise get_config_err
return idtoken, refresh_token, access_token
def get_binary_config(self, config_file=CONFIG_FILE):
"""
Get the configuration details from binary config file.
:params config_file: Config filename to read config data from
:type data: str
:raises Exception: If there is a File Handling error while reading
from config file
:return: Config data read from file on Success, None on Failure
:rtype: str | None
"""
file = Path(path.expanduser(HOME_DIRECTORY) + config_file)
if not file.exists():
return None
try:
with open(file, 'rb') as cfg_file:
data = cfg_file.read()
return data
except Exception as get_config_err:
raise get_config_err
return
def update_config(self, access_token, id_token):
"""
Update the configuration file.
:params access_token: Access Token to update in config file
:type access_token: str
:params id_token: Id Token to update in config file
:type id_token: str
:raises OSError: If there is an OS issue while creating new directory
for config file
:raises Exception: If there is a FILE Handling error while reading
from/writing config to file
:return: None on Success and Failure
:rtype: None
"""
file = Path(path.expanduser(HOME_DIRECTORY) + CONFIG_FILE)
if not file.exists():
try:
os.makedirs(path.expanduser(HOME_DIRECTORY) + CONFIG_DIRECTORY)
except OSError as set_config_err:
if set_config_err.errno != errno.EEXIST:
raise set_config_err
try:
with open(path.join(path.expanduser(HOME_DIRECTORY),
CONFIG_FILE), 'r') as config_file:
config_data = json.load(config_file)
config_data['accesstoken'] = access_token
config_data['idtoken'] = id_token
with open(path.join(path.expanduser(HOME_DIRECTORY),
CONFIG_FILE), 'w') as config_file:
json.dump(config_data, config_file)
except Exception as set_config_err:
raise set_config_err
def get_token_attribute(self, attribute_name, is_access_token=False):
"""
Get access token attributes.
:params attribute_name: Attribute Name
:type attribute_name: str
:params is_access_token: Is Access Token
:type is_access_token: bool
:raises InvalidConfigError: If there is an error in the config
:raises Exception: If there is a File Handling error while reading
from/writing config to file
:return: Attribute Value on Success, None on Failure
:rtype: int | str | None
"""
if is_access_token:
log.debug('Getting access token for attribute ' + attribute_name)
_, _, token = self.get_config()
else:
log.debug('Getting idtoken for attribute ' + attribute_name)
token, _, _ = self.get_config()
token_payload = token.split('.')[1]
if len(token_payload) % 4:
token_payload += '=' * (4 - len(token_payload) % 4)
try:
str_token_payload = base64.b64decode(token_payload).decode("utf-8")
# If user is logged in through github then to extend session
# we need 'cognito:username' (Github generated username) as email
if attribute_name == 'email':
if 'identities' in json.loads(str_token_payload):
return json.loads(str_token_payload)['cognito:username']
attribute_value = json.loads(str_token_payload)[attribute_name]
except Exception:
raise InvalidConfigError
if attribute_value is None:
raise InvalidConfigError
return attribute_value
def get_access_token(self):
"""
Get Access Token for User
:raises InvalidConfigError: If there is an issue in getting config
from file
:return: Access Token on Success
:rtype: str
"""
_, _, access_token = self.get_config()
if access_token is None:
raise InvalidConfigError
if self.__is_valid_token() is False:
print('Previous Session expired. Initialising new session...')
log.info('Previous Session expired. Initialising new session...')
username = self.get_token_attribute('email')
refresh_token = self.get_refresh_token()
access_token, id_token = self.__get_new_token(username,
refresh_token)
self.update_config(access_token, id_token)
print('Previous Session expired. Initialising new session...'
'Success')
log.info('Previous Session expired. Initialising new session...'
'Success')
return access_token
def get_user_id(self):
"""
Get User Id
:return: Attribute value for attribute name passed
:rtype: str
"""
return self.get_token_attribute('custom:user_id')
def get_refresh_token(self):
"""
Get Refresh Token
:raises InvalidApiVersionError: If current API version is not supported
:return: Refresh Token
:rtype: str
"""
if self.__is_valid_version() is False:
raise InvalidApiVersionError
_, refresh_token, _ = self.get_config()
return refresh_token
def __is_valid_token(self):
"""
Check if access token is valid i.e. login session is still active
or session is expired
:return True on Success and False on Failure
:rtype: bool
"""
log.info("Checking for session timeout.")
exp_timestamp = self.get_token_attribute('exp', is_access_token=True)
current_timestamp = int(time.time())
if exp_timestamp > current_timestamp:
return True
return False
def __is_valid_version(self):
"""
Check if API Version is valid
:raises NetworkError: If there is a network connection issue during
HTTP request for getting version
:raises Exception: If there is an HTTP issue or JSON format issue in
HTTP response
:return: True on Success, False on Failure
:rtype: bool
"""
socket.setdefaulttimeout(10)
log.info("Checking for supported version.")
path = 'apiversions'
request_url = serverconfig.HOST.split(serverconfig.VERSION)[0] + path
try:
log.debug("Version check request url : " + request_url)
response = requests.get(url=request_url, verify=CERT_FILE,
timeout=(5.0, 5.0))
log.debug("Version check response : " + response.text)
response.raise_for_status()
except requests.exceptions.SSLError:
raise SSLError
except requests.exceptions.Timeout:
raise RequestTimeoutError
except requests.exceptions.ConnectionError:
raise NetworkError
except Exception as ver_err:
raise ver_err
try:
response = json.loads(response.text)
except Exception as json_decode_err:
raise json_decode_err
if 'supported_versions' in response:
supported_versions = response['supported_versions']
if serverconfig.VERSION in supported_versions:
supported_versions.sort()
latest_version = supported_versions[len(supported_versions)
- 1]
if serverconfig.VERSION < latest_version:
print('Please check the updates on GitHub for newer'
'functionality enabled by ' + latest_version +
' APIs.')
return True
return False
def __get_new_token(self, username, refresh_token):
"""
Get new token for User Login Session
:raises NetworkError: If there is a network connection issue during
HTTP request for getting token
:raises Exception: If there is an HTTP issue or JSON format issue in
HTTP response
:return: accesstoken and idtoken on Success, None on Failure
:rtype: str | None
"""
socket.setdefaulttimeout(10)
log.info("Extending user login session.")
path = 'login'
request_payload = {
'user_name': username,
'refreshtoken': refresh_token
}
request_url = serverconfig.HOST + path
try:
log.debug("Extend session url : " + request_url)
response = requests.post(url=request_url,
data=json.dumps(request_payload),
verify=CERT_FILE,
timeout=(5.0, 5.0))
response.raise_for_status()
log.debug("Extend session response : " + response.text)
except requests.exceptions.SSLError:
raise SSLError
except requests.exceptions.ConnectionError:
raise NetworkError
except requests.exceptions.Timeout:
raise RequestTimeoutError
except Exception:
raise ExpiredSessionError
try:
response = json.loads(response.text)
except Exception:
raise ExpiredSessionError
if 'accesstoken' in response and 'idtoken' in response:
log.info("User session extended successfully.")
return response['accesstoken'], response['idtoken']
return None