esp_local_ctrl: added sec1 and sec2 support in esp_local_ctrl example

This commit is contained in:
harshal.patil
2022-10-11 23:21:40 +05:30
parent 6e65b61c8a
commit f9db1ec366
5 changed files with 205 additions and 20 deletions

View File

@@ -12,6 +12,7 @@ import ssl
import struct
import sys
import textwrap
from getpass import getpass
import proto_lc
@@ -112,10 +113,12 @@ def on_except(err):
print(err)
def get_security(secver, pop=None, verbose=False):
def get_security(secver, username, password, pop='', verbose=False):
if secver == 2:
return security.Security2(username, password, verbose)
if secver == 1:
return security.Security1(pop, verbose)
elif secver == 0:
if secver == 0:
return security.Security0(verbose)
return None
@@ -145,10 +148,10 @@ async def get_transport(sel_transport, service_name, check_hostname):
async def version_match(tp, protover, verbose=False):
try:
response = await tp.send_data('proto-ver', protover)
response = await tp.send_data('esp_local_ctrl/version', protover)
if verbose:
print('proto-ver response : ', response)
print('esp_local_ctrl/version response : ', response)
# First assume this to be a simple version string
if response.lower() == protover.lower():
@@ -176,10 +179,10 @@ async def has_capability(tp, capability='none', verbose=False):
# Note : default value of `capability` argument cannot be empty string
# because protocomm_httpd expects non zero content lengths
try:
response = await tp.send_data('proto-ver', capability)
response = await tp.send_data('esp_local_ctrl/version', capability)
if verbose:
print('proto-ver response : ', response)
print('esp_local_ctrl/version response : ', response)
try:
# Interpret this as JSON structure containing
@@ -281,20 +284,32 @@ async def main():
parser.add_argument('--sec_ver', dest='secver', type=int, default=None,
help=desc_format(
'Protocomm security scheme used by the provisioning service for secure '
'Protocomm security scheme used for secure '
'session establishment. Accepted values are :',
'\t- 0 : No security',
'\t- 1 : X25519 key exchange + AES-CTR encryption',
'\t + Authentication using Proof of Possession (PoP)',
'In case device side application uses IDF\'s provisioning manager, '
'the compatible security version is automatically determined from '
'capabilities retrieved via the version endpoint'))
'\t- 2 : SRP6a + AES-GCM encryption',
'\t + Authentication using Proof of Possession (PoP)'))
parser.add_argument('--pop', dest='pop', type=str, default='',
help=desc_format(
'This specifies the Proof of possession (PoP) when security scheme 1 '
'is used'))
parser.add_argument('--sec2_username', dest='sec2_usr', type=str, default='',
help=desc_format(
'Username for security scheme 2 (SRP6a)'))
parser.add_argument('--sec2_pwd', dest='sec2_pwd', type=str, default='',
help=desc_format(
'Password for security scheme 2 (SRP6a)'))
parser.add_argument('--sec2_gen_cred', help='Generate salt and verifier for security scheme 2 (SRP6a)', action='store_true')
parser.add_argument('--sec2_salt_len', dest='sec2_salt_len', type=int, default=16,
help=desc_format(
'Salt length for security scheme 2 (SRP6a)'))
parser.add_argument('--dont-check-hostname', action='store_true',
# If enabled, the certificate won't be rejected for hostname mismatch.
# This option is hidden because it should be used only for testing purposes.
@@ -304,6 +319,14 @@ async def main():
args = parser.parse_args()
if args.secver == 2 and args.sec2_gen_cred:
if not args.sec2_usr or not args.sec2_pwd:
raise ValueError('Username/password cannot be empty for security scheme 2 (SRP6a)')
print('==== Salt-verifier for security scheme 2 (SRP6a) ====')
security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len)
sys.exit()
if args.version != '':
print(f'==== Esp_Ctrl Version: {args.version} ====')
@@ -327,7 +350,8 @@ async def main():
args.secver = int(not await has_capability(obj_transport, 'no_sec'))
print(f'==== Security Scheme: {args.secver} ====')
if (args.secver != 0) and not await has_capability(obj_transport, 'no_pop'):
if (args.secver == 1):
if not await has_capability(obj_transport, 'no_pop'):
if len(args.pop) == 0:
print('---- Proof of Possession argument not provided ----')
exit(2)
@@ -335,7 +359,14 @@ async def main():
print('---- Proof of Possession will be ignored ----')
args.pop = ''
obj_security = get_security(args.secver, args.pop, args.verbose)
if (args.secver == 2):
if len(args.sec2_usr) == 0:
args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
if len(args.sec2_pwd) == 0:
prompt_str = 'Security Scheme 2 - SRP6a Password required: '
args.sec2_pwd = getpass(prompt_str)
obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.pop, args.verbose)
if obj_security is None:
raise ValueError('Invalid Security Version')