fix(provisioning): fix incorrect AES-GCM IV usage in security2 scheme

Using same IV in AES-GCM across multiple invocation of
encryption/decryption operations can pose a security risk. It can help
to reveal co-relation between different plaintexts.

This commit introduces a change to use part of IV as a monotonic
counter, which must be incremented after every AES-GCM invocation
on both the client and the device side.

Concept of patch version for a security scheme has been introduced here
which can help to differentiate a protocol behavior for the provisioning
entity. The security patch version will be available in the JSON
response for `proto-ver` endpoint request with the field
`sec_patch_ver`.

Please refer to documentation for more details on the changes required
on the provisioning entity side (e.g., PhoneApps).
This commit is contained in:
Mahavir Jain
2025-03-10 09:58:49 +05:30
parent 7e8251d16b
commit af1bd1472c
7 changed files with 183 additions and 47 deletions

View File

@@ -1,18 +1,19 @@
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2018-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
# APIs for interpreting and creating protobuf packets for
# protocomm endpoint with security type protocomm_security2
from typing import Any, Type
import struct
from typing import Any
from typing import Type
import proto
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from utils import long_to_bytes, str_to_bytes
from utils import long_to_bytes
from utils import str_to_bytes
from .security import Security
from .srp6a import Srp6a, generate_salt_and_verifier
from .srp6a import generate_salt_and_verifier
from .srp6a import Srp6a
AES_KEY_LEN = 256 // 8
@@ -30,17 +31,18 @@ def sec2_gen_salt_verifier(username: str, password: str, salt_len: int) -> Any:
salt_str = ', '.join([format(b, '#04x') for b in salt])
salt_c_arr = '\n '.join(salt_str[i: i + 96] for i in range(0, len(salt_str), 96))
print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n')
print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n') # noqa E702
verifier_str = ', '.join([format(b, '#04x') for b in verifier])
verifier_c_arr = '\n '.join(verifier_str[i: i + 96] for i in range(0, len(verifier_str), 96))
print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n')
print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n') # noqa E702
class Security2(Security):
def __init__(self, username: str, password: str, verbose: bool) -> None:
def __init__(self, sec_patch_ver:int, username: str, password: str, verbose: bool) -> None:
# Initialize state of the security2 FSM
self.session_state = security_state.REQUEST1
self.sec_patch_ver = sec_patch_ver
self.username = username
self.password = password
self.verbose = verbose
@@ -49,7 +51,7 @@ class Security2(Security):
self.cipher: Type[AESGCM]
self.client_pop_key = None
self.nonce = None
self.nonce = bytearray()
Security.__init__(self, self.security2_session)
@@ -75,7 +77,7 @@ class Security2(Security):
def _print_verbose(self, data: str) -> None:
if (self.verbose):
print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
print(f'\x1b[32;20m++++ {data} ++++\x1b[0m') # noqa E702
def setup0_request(self) -> Any:
# Form SessionCmd0 request packet using client public key
@@ -148,7 +150,7 @@ class Security2(Security):
self._print_verbose(f'Session Key:\t0x{session_key.hex()}')
# 96-bit nonce
self.nonce = setup_resp.sec2.sr1.device_nonce
self.nonce = bytearray(setup_resp.sec2.sr1.device_nonce)
if self.nonce is None:
raise RuntimeError('Received invalid nonce from device!')
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
@@ -158,8 +160,23 @@ class Security2(Security):
if self.cipher is None:
raise RuntimeError('Failed to initialize AES-GCM cryptographic engine!')
def _increment_nonce(self) -> None:
"""Increment the last 4 bytes of nonce (big-endian counter)."""
if self.sec_patch_ver == 1:
counter = struct.unpack('>I', self.nonce[8:])[0] # Read last 4 bytes as big-endian integer
counter += 1 # Increment counter
if counter > 0xFFFFFFFF: # Check for overflow
raise RuntimeError('Nonce counter overflow')
self.nonce[8:] = struct.pack('>I', counter) # Store back as big-endian
def encrypt_data(self, data: bytes) -> Any:
return self.cipher.encrypt(self.nonce, data, None)
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
ciphertext = self.cipher.encrypt(self.nonce, data, None)
self._increment_nonce()
return ciphertext
def decrypt_data(self, data: bytes) -> Any:
return self.cipher.decrypt(self.nonce, data, None)
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
plaintext = self.cipher.decrypt(self.nonce, data, None)
self._increment_nonce()
return plaintext