mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-10-31 13:09:38 +00:00 
			
		
		
		
	Added support for security1 in local control
1. Added config options to chose from protocom security.
    It can be chosen 0/1 or custom.
    Possible to set POP as well
2. Added support in `esp_local_ctrl.py` test script for sec_ver selection
Signed-off-by: Vikram Dattu <vikram.dattu@espressif.com>
			
			
This commit is contained in:
		| @@ -18,19 +18,28 @@ | ||||
| from __future__ import print_function | ||||
|  | ||||
| import argparse | ||||
| import json | ||||
| import os | ||||
| import ssl | ||||
| import struct | ||||
| import sys | ||||
| import textwrap | ||||
| from builtins import input | ||||
|  | ||||
| import proto | ||||
| import proto_lc | ||||
| from future.utils import tobytes | ||||
|  | ||||
| # The tools directory is already in the PATH in environment prepared by install.sh which would allow to import | ||||
| # esp_prov as file but not as complete module. | ||||
| sys.path.insert(0, os.path.join(os.environ['IDF_PATH'], 'tools/esp_prov')) | ||||
| import esp_prov  # noqa: E402 | ||||
| try: | ||||
|     import esp_prov | ||||
|     import security | ||||
|  | ||||
| except ImportError: | ||||
|     idf_path = os.environ['IDF_PATH'] | ||||
|     sys.path.insert(0, idf_path + '/components/protocomm/python') | ||||
|     sys.path.insert(1, idf_path + '/tools/esp_prov') | ||||
|  | ||||
|     import esp_prov | ||||
|     import security | ||||
|  | ||||
| # Set this to true to allow exceptions to be thrown | ||||
| config_throw_except = False | ||||
| @@ -118,6 +127,14 @@ def on_except(err): | ||||
|         print(err) | ||||
|  | ||||
|  | ||||
| def get_security(secver, pop=None, verbose=False): | ||||
|     if secver == 1: | ||||
|         return security.Security1(pop, verbose) | ||||
|     elif secver == 0: | ||||
|         return security.Security0(verbose) | ||||
|     return None | ||||
|  | ||||
|  | ||||
| def get_transport(sel_transport, service_name, check_hostname): | ||||
|     try: | ||||
|         tp = None | ||||
| @@ -140,29 +157,99 @@ def get_transport(sel_transport, service_name, check_hostname): | ||||
|         return None | ||||
|  | ||||
|  | ||||
| def version_match(tp, expected, verbose=False): | ||||
| def version_match(tp, protover, verbose=False): | ||||
|     try: | ||||
|         response = tp.send_data('esp_local_ctrl/version', expected) | ||||
|         return (response.lower() == expected.lower()) | ||||
|         response = tp.send_data('proto-ver', protover) | ||||
|  | ||||
|         if verbose: | ||||
|             print('proto-ver response : ', response) | ||||
|  | ||||
|         # First assume this to be a simple version string | ||||
|         if response.lower() == protover.lower(): | ||||
|             return True | ||||
|  | ||||
|         try: | ||||
|             # Else interpret this as JSON structure containing | ||||
|             # information with versions and capabilities of both | ||||
|             # provisioning service and application | ||||
|             info = json.loads(response) | ||||
|             if info['prov']['ver'].lower() == protover.lower(): | ||||
|                 return True | ||||
|  | ||||
|         except ValueError: | ||||
|             # If decoding as JSON fails, it means that capabilities | ||||
|             # are not supported | ||||
|             return False | ||||
|  | ||||
|     except Exception as e: | ||||
|         on_except(e) | ||||
|         return None | ||||
|  | ||||
|  | ||||
| def get_all_property_values(tp): | ||||
| def has_capability(tp, capability='none', verbose=False): | ||||
|     # Note : default value of `capability` argument cannot be empty string | ||||
|     # because protocomm_httpd expects non zero content lengths | ||||
|     try: | ||||
|         response = tp.send_data('proto-ver', capability) | ||||
|  | ||||
|         if verbose: | ||||
|             print('proto-ver response : ', response) | ||||
|  | ||||
|         try: | ||||
|             # Interpret this as JSON structure containing | ||||
|             # information with versions and capabilities of both | ||||
|             # provisioning service and application | ||||
|             info = json.loads(response) | ||||
|             supported_capabilities = info['prov']['cap'] | ||||
|             if capability.lower() == 'none': | ||||
|                 # No specific capability to check, but capabilities | ||||
|                 # feature is present so return True | ||||
|                 return True | ||||
|             elif capability in supported_capabilities: | ||||
|                 return True | ||||
|             return False | ||||
|  | ||||
|         except ValueError: | ||||
|             # If decoding as JSON fails, it means that capabilities | ||||
|             # are not supported | ||||
|             return False | ||||
|  | ||||
|     except RuntimeError as e: | ||||
|         on_except(e) | ||||
|  | ||||
|     return False | ||||
|  | ||||
|  | ||||
| def establish_session(tp, sec): | ||||
|     try: | ||||
|         response = None | ||||
|         while True: | ||||
|             request = sec.security_session(response) | ||||
|             if request is None: | ||||
|                 break | ||||
|             response = tp.send_data('esp_local_ctrl/session', request) | ||||
|             if (response is None): | ||||
|                 return False | ||||
|         return True | ||||
|     except RuntimeError as e: | ||||
|         on_except(e) | ||||
|         return None | ||||
|  | ||||
|  | ||||
| def get_all_property_values(tp, security_ctx): | ||||
|     try: | ||||
|         props = [] | ||||
|         message = proto.get_prop_count_request() | ||||
|         message = proto_lc.get_prop_count_request(security_ctx) | ||||
|         response = tp.send_data('esp_local_ctrl/control', message) | ||||
|         count = proto.get_prop_count_response(response) | ||||
|         count = proto_lc.get_prop_count_response(security_ctx, response) | ||||
|         if count == 0: | ||||
|             raise RuntimeError('No properties found!') | ||||
|         indices = [i for i in range(count)] | ||||
|         message = proto.get_prop_vals_request(indices) | ||||
|         message = proto_lc.get_prop_vals_request(security_ctx, indices) | ||||
|         response = tp.send_data('esp_local_ctrl/control', message) | ||||
|         props = proto.get_prop_vals_response(response) | ||||
|         props = proto_lc.get_prop_vals_response(security_ctx, response) | ||||
|         if len(props) != count: | ||||
|             raise RuntimeError('Incorrect count of properties!') | ||||
|             raise RuntimeError('Incorrect count of properties!', len(props), count) | ||||
|         for p in props: | ||||
|             p['value'] = decode_prop_value(p, p['value']) | ||||
|         return props | ||||
| @@ -171,20 +258,27 @@ def get_all_property_values(tp): | ||||
|         return [] | ||||
|  | ||||
|  | ||||
| def set_property_values(tp, props, indices, values, check_readonly=False): | ||||
| def set_property_values(tp, security_ctx, props, indices, values, check_readonly=False): | ||||
|     try: | ||||
|         if check_readonly: | ||||
|             for index in indices: | ||||
|                 if prop_is_readonly(props[index]): | ||||
|                     raise RuntimeError('Cannot set value of Read-Only property') | ||||
|         message = proto.set_prop_vals_request(indices, values) | ||||
|         message = proto_lc.set_prop_vals_request(security_ctx, indices, values) | ||||
|         response = tp.send_data('esp_local_ctrl/control', message) | ||||
|         return proto.set_prop_vals_response(response) | ||||
|         return proto_lc.set_prop_vals_response(security_ctx, response) | ||||
|     except RuntimeError as e: | ||||
|         on_except(e) | ||||
|         return False | ||||
|  | ||||
|  | ||||
| def desc_format(*args): | ||||
|     desc = '' | ||||
|     for arg in args: | ||||
|         desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n' | ||||
|     return desc | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     parser = argparse.ArgumentParser(add_help=False) | ||||
|  | ||||
| @@ -199,6 +293,22 @@ if __name__ == '__main__': | ||||
|     parser.add_argument('--name', dest='service_name', type=str, | ||||
|                         help='BLE Device Name / HTTP Server hostname or IP', default='') | ||||
|  | ||||
|     parser.add_argument('--sec_ver', dest='secver', type=int, default=None, | ||||
|                         help=desc_format( | ||||
|                             'Protocomm security scheme used by the provisioning service for secure ' | ||||
|                             'session establishment. Accepted values are :', | ||||
|                             '\t- 0 : No security', | ||||
|                             '\t- 1 : X25519 key exchange + AES-CTR encryption', | ||||
|                             '\t      + Authentication using Proof of Possession (PoP)', | ||||
|                             'In case device side application uses IDF\'s provisioning manager, ' | ||||
|                             'the compatible security version is automatically determined from ' | ||||
|                             'capabilities retrieved via the version endpoint')) | ||||
|  | ||||
|     parser.add_argument('--pop', dest='pop', type=str, default='', | ||||
|                         help=desc_format( | ||||
|                             'This specifies the Proof of possession (PoP) when security scheme 1 ' | ||||
|                             'is used')) | ||||
|  | ||||
|     parser.add_argument('--dont-check-hostname', action='store_true', | ||||
|                         # If enabled, the certificate won't be rejected for hostname mismatch. | ||||
|                         # This option is hidden because it should be used only for testing purposes. | ||||
| @@ -220,6 +330,31 @@ if __name__ == '__main__': | ||||
|         print('---- Invalid transport ----') | ||||
|         exit(1) | ||||
|  | ||||
|     # If security version not specified check in capabilities | ||||
|     if args.secver is None: | ||||
|         # First check if capabilities are supported or not | ||||
|         if not has_capability(obj_transport): | ||||
|             print('Security capabilities could not be determined. Please specify \'--sec_ver\' explicitly') | ||||
|             print('---- Invalid Security Version ----') | ||||
|             exit(2) | ||||
|  | ||||
|         # When no_sec is present, use security 0, else security 1 | ||||
|         args.secver = int(not has_capability(obj_transport, 'no_sec')) | ||||
|         print('Security scheme determined to be :', args.secver) | ||||
|  | ||||
|         if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'): | ||||
|             if len(args.pop) == 0: | ||||
|                 print('---- Proof of Possession argument not provided ----') | ||||
|                 exit(2) | ||||
|         elif len(args.pop) != 0: | ||||
|             print('---- Proof of Possession will be ignored ----') | ||||
|             args.pop = '' | ||||
|  | ||||
|     obj_security = get_security(args.secver, args.pop, False) | ||||
|     if obj_security is None: | ||||
|         print('---- Invalid Security Version ----') | ||||
|         exit(2) | ||||
|  | ||||
|     if args.version != '': | ||||
|         print('\n==== Verifying protocol version ====') | ||||
|         if not version_match(obj_transport, args.version, args.verbose): | ||||
| @@ -227,8 +362,15 @@ if __name__ == '__main__': | ||||
|             exit(2) | ||||
|         print('==== Verified protocol version successfully ====') | ||||
|  | ||||
|     print('\n==== Starting Session ====') | ||||
|     if not establish_session(obj_transport, obj_security): | ||||
|         print('Failed to establish session. Ensure that security scheme and proof of possession are correct') | ||||
|         print('---- Error in establishing session ----') | ||||
|         exit(3) | ||||
|     print('==== Session Established ====') | ||||
|  | ||||
|     while True: | ||||
|         properties = get_all_property_values(obj_transport) | ||||
|         properties = get_all_property_values(obj_transport, obj_security) | ||||
|         if len(properties) == 0: | ||||
|             print('---- Error in reading property values ----') | ||||
|             exit(4) | ||||
| @@ -245,7 +387,7 @@ if __name__ == '__main__': | ||||
|         select = 0 | ||||
|         while True: | ||||
|             try: | ||||
|                 inval = input("\nSelect properties to set (0 to re-read, 'q' to quit) : ") | ||||
|                 inval = input('\nSelect properties to set (0 to re-read, \'q\' to quit) : ') | ||||
|                 if inval.lower() == 'q': | ||||
|                     print('Quitting...') | ||||
|                     exit(5) | ||||
| @@ -274,5 +416,5 @@ if __name__ == '__main__': | ||||
|             set_values += [value] | ||||
|             set_indices += [select - 1] | ||||
|  | ||||
|         if not set_property_values(obj_transport, properties, set_indices, set_values): | ||||
|         if not set_property_values(obj_transport, obj_security, properties, set_indices, set_values): | ||||
|             print('Failed to set values!') | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Vikram Dattu
					Vikram Dattu