fix(esp_local_ctrl): update for changes in protocomm security2 scheme

This commit is contained in:
Mahavir Jain
2025-03-04 22:51:27 +05:30
parent ab7d37d014
commit 00e8d1a832
4 changed files with 75 additions and 24 deletions

View File

@@ -1,9 +1,8 @@
#!/usr/bin/env python
#
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2018-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
import argparse
import asyncio
import json
@@ -113,9 +112,9 @@ def on_except(err):
print(err)
def get_security(secver, username, password, pop='', verbose=False):
def get_security(secver, sec_patch_ver, username, password, pop='', verbose=False):
if secver == 2:
return security.Security2(username, password, verbose)
return security.Security2(sec_patch_ver, username, password, verbose)
if secver == 1:
return security.Security1(pop, verbose)
if secver == 0:
@@ -148,6 +147,32 @@ async def get_transport(sel_transport, service_name, check_hostname):
return None
async def get_sec_patch_ver(tp, verbose=False):
try:
response = await tp.send_data('esp_local_ctrl/version', '---')
if verbose:
print('esp_local_ctrl/version response : ', response)
try:
# Interpret this as JSON structure containing
# information with security version information
info = json.loads(response)
try:
sec_patch_ver = info['local_ctrl']['sec_patch_ver']
except KeyError:
sec_patch_ver = 0
return sec_patch_ver
except ValueError:
# If decoding as JSON fails, we assume default patch level
return 0
except Exception as e:
on_except(e)
return None
async def version_match(tp, protover, verbose=False):
try:
response = await tp.send_data('esp_local_ctrl/version', protover)
@@ -164,7 +189,7 @@ async def version_match(tp, protover, verbose=False):
# information with versions and capabilities of both
# provisioning service and application
info = json.loads(response)
if info['prov']['ver'].lower() == protover.lower():
if info['local_ctrl']['ver'].lower() == protover.lower():
return True
except ValueError:
@@ -191,14 +216,19 @@ async def has_capability(tp, capability='none', verbose=False):
# information with versions and capabilities of both
# provisioning service and application
info = json.loads(response)
supported_capabilities = info['prov']['cap']
if capability.lower() == 'none':
# No specific capability to check, but capabilities
# feature is present so return True
return True
elif capability in supported_capabilities:
return True
return False
try:
supported_capabilities = info['local_ctrl']['cap']
if capability.lower() == 'none':
# No specific capability to check, but capabilities
# feature is present so return True
return True
elif capability in supported_capabilities:
return True
return False
except KeyError:
# If capabilities field is not present, it means
# that capabilities are not supported
return False
except ValueError:
# If decoding as JSON fails, it means that capabilities
@@ -341,6 +371,7 @@ async def main():
if obj_transport is None:
raise RuntimeError('Failed to establish connection')
sec_patch_ver = 0
# If security version not specified check in capabilities
if args.secver is None:
# First check if capabilities are supported or not
@@ -362,13 +393,14 @@ async def main():
args.pop = ''
if (args.secver == 2):
sec_patch_ver = await get_sec_patch_ver(obj_transport, args.verbose)
if len(args.sec2_usr) == 0:
args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
if len(args.sec2_pwd) == 0:
prompt_str = 'Security Scheme 2 - SRP6a Password required: '
args.sec2_pwd = getpass(prompt_str)
obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.pop, args.verbose)
obj_security = get_security(args.secver, sec_patch_ver, args.sec2_usr, args.sec2_pwd, args.pop, args.verbose)
if obj_security is None:
raise ValueError('Invalid Security Version')