mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-10-30 04:42:19 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			523 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			523 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # SPDX-FileCopyrightText: 2023-2025 Espressif Systems (Shanghai) CO LTD
 | |
| # SPDX-License-Identifier: Apache-2.0
 | |
| from typing import Dict
 | |
| from typing import List
 | |
| from typing import Optional
 | |
| from typing import Set
 | |
| 
 | |
| from nvs_logger import NVS_Logger
 | |
| from nvs_parser import nvs_const
 | |
| from nvs_parser import NVS_Entry
 | |
| from nvs_parser import NVS_Page
 | |
| from nvs_parser import NVS_Partition
 | |
| 
 | |
| 
 | |
| EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased')
 | |
| 
 | |
| used_namespaces: Dict[int, Optional[str]] = {}
 | |
| found_namespaces: Dict[int, str] = {}
 | |
| blobs: Dict = {}
 | |
| blob_chunks: List[NVS_Entry] = []
 | |
| 
 | |
| 
 | |
| def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger, read_only: bool=False) -> bool:
 | |
|     """ Checks if the partition is large enough and has enough pages
 | |
|     """
 | |
|     if len(nvs_partition.raw_data) // 0x1000 < 3 and not read_only:
 | |
|         nvs_log.info(
 | |
|             nvs_log.yellow(
 | |
|                 'NVS Partition size must be at least 0x3000 (4kiB * 3 pages == 12kiB)!'
 | |
|             )
 | |
|         )
 | |
|         return False
 | |
|     if len(nvs_partition.raw_data) % 0x1000 != 0:
 | |
|         nvs_log.info(
 | |
|             nvs_log.yellow(
 | |
|                 'NVS Partition size must be a multiple of 0x1000 (4kiB)!'
 | |
|             )
 | |
|         )
 | |
|         return False
 | |
|     if len(nvs_partition.pages) < 3 and not read_only:
 | |
|         nvs_log.info(
 | |
|             nvs_log.yellow(
 | |
|                 'NVS Partition must contain 3 pages (sectors) at least to function properly!'
 | |
|             )
 | |
|         )
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
 | |
|     if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
 | |
|         nvs_log.info(
 | |
|             nvs_log.red(
 | |
|                 '''No free (empty) page found in the NVS partition,
 | |
| at least one free page is required for proper function!'''
 | |
|             )
 | |
|         )
 | |
|         nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
 | |
|     result = True
 | |
|     nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}'))
 | |
| 
 | |
|     if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
 | |
|         result = False
 | |
|         nvs_log.info(
 | |
|             nvs_log.red(
 | |
|                 'The page is reported as Empty but its entry state bitmap is not empty!'
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     if any([not e.is_empty for e in nvs_page.entries]):
 | |
|         result = False
 | |
|         nvs_log.info(
 | |
|             nvs_log.red('The page is reported as Empty but there are data written!')
 | |
|         )
 | |
| 
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
 | |
|     if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']:
 | |
|         nvs_log.info(
 | |
|             nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK'
 | |
|         )
 | |
|         return True
 | |
|     else:
 | |
|         nvs_log.info(
 | |
|             nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'),
 | |
|             f'Original CRC32:',
 | |
|             nvs_log.red(f'{nvs_page.header["crc"]["original"]:x}'),
 | |
|             f'Generated CRC32:',
 | |
|             nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'),
 | |
|         )
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def identify_entry_duplicates(entry: NVS_Entry, entry_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
 | |
|     """Identifies and logs written entries
 | |
| 
 | |
|     Part 1 of duplicate entry check mechanism
 | |
|     """
 | |
|     if entry.state == 'Written':
 | |
|         if entry.key in entry_dict:
 | |
|             entry_dict[entry.key].append(entry)
 | |
|         else:
 | |
|             entry_dict[entry.key] = [entry]
 | |
|     return entry_dict
 | |
| 
 | |
| 
 | |
| def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]:
 | |
|     """Checks entries in the given page (entry state, children CRC32, entry type, span and gathers blobs and namespaces)
 | |
|     """
 | |
|     seen_written_entires: Dict[str, List[NVS_Entry]] = {}
 | |
| 
 | |
|     for entry in nvs_page.entries:
 | |
|         # entry: NVS_Entry
 | |
|         entry.page = nvs_page
 | |
| 
 | |
|         # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
 | |
| 
 | |
|         # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless)
 | |
|         # and are stored in as entries inside string/blob data entry 'entry.children' list
 | |
| 
 | |
|         # Duplicate entry check (1) - same key, different index - find duplicates
 | |
|         seen_written_entires = identify_entry_duplicates(entry, seen_written_entires)
 | |
| 
 | |
|         # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
 | |
|         if entry.is_empty:
 | |
|             if entry.state == 'Written':
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.red(
 | |
|                         f' Entry #{entry.index:03d} is reported as Written but it is empty!'
 | |
|                     )
 | |
|                 )
 | |
|                 continue
 | |
|             elif entry.state == 'Erased':
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.yellow(
 | |
|                         f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)'
 | |
|                     )
 | |
|                 )
 | |
| 
 | |
|         if entry.state == 'Written':
 | |
|             # Entry CRC32 check
 | |
|             if (
 | |
|                 entry.metadata['crc']['original']
 | |
|                 != entry.metadata['crc']['computed']
 | |
|             ):
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.red(
 | |
|                         f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
 | |
|                     ),
 | |
|                     f'Written:',
 | |
|                     nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
 | |
|                     f'Generated:',
 | |
|                     nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
 | |
|                 )
 | |
| 
 | |
|             # Entry children CRC32 check
 | |
|             if (
 | |
|                 entry.metadata['span'] > 1
 | |
|                 and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed'])
 | |
|             ):
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.red(
 | |
|                         f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!'
 | |
|                     ),
 | |
|                     f'Written:',
 | |
|                     nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'),
 | |
|                     f'Generated:',
 | |
|                     nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'),
 | |
|                 )
 | |
| 
 | |
|             # Entry type check
 | |
|             if entry.metadata['type'] not in [
 | |
|                 nvs_const.item_type[key] for key in nvs_const.item_type
 | |
|             ]:
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.yellow(
 | |
|                         f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
 | |
|                     ),
 | |
|                     f'Type: {entry.metadata["type"]}',
 | |
|                 )
 | |
| 
 | |
|             # Span check
 | |
|             if (
 | |
|                 entry.index + entry.metadata['span'] - 1
 | |
|                 >= int(nvs_const.page_size / nvs_const.entry_size) - 2
 | |
|             ):
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.red(
 | |
|                         f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
 | |
|                     )
 | |
|                 )
 | |
|             # Spanned entry state checks
 | |
|             elif entry.metadata['span'] > 1:
 | |
|                 parent_state = entry.state
 | |
|                 for kid in entry.children:
 | |
|                     if parent_state != kid.state:
 | |
|                         nvs_log.info(
 | |
|                             nvs_log.yellow(' Inconsistent data state!'),
 | |
|                             f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
 | |
|                             f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
 | |
|                         )
 | |
| 
 | |
|             # Gather blobs & namespaces
 | |
|             if entry.metadata['type'] == 'blob_index':
 | |
|                 blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
 | |
|                     EMPTY_ENTRY
 | |
|                 ] * entry.data['chunk_count']
 | |
|             elif entry.metadata['type'] == 'blob_data':
 | |
|                 blob_chunks.append(entry)
 | |
| 
 | |
|             if entry.metadata['namespace'] == 0:
 | |
|                 found_namespaces[entry.data['value']] = entry.key
 | |
|             else:
 | |
|                 used_namespaces[entry.metadata['namespace']] = None
 | |
| 
 | |
|     return seen_written_entires
 | |
| 
 | |
| 
 | |
| def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
 | |
|     """Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates,
 | |
|     where entries which have the same key but under different namespaces are filtered out
 | |
| 
 | |
|     Use `filter_entry_duplicates()` to properly filter out all duplicates
 | |
|     """
 | |
|     new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
 | |
|     for key, duplicate_entries in duplicate_entries_dict.items():
 | |
|         seen_entries: List[NVS_Entry] = []
 | |
|         entry_same_namespace_collisions_list: Set[NVS_Entry] = set()
 | |
| 
 | |
|         # Search through the "duplicates" and see if there are real duplicates
 | |
|         # E.g. the key can be the same if the namespace is different
 | |
|         for entry in duplicate_entries:
 | |
|             if entry.metadata['type'] in nvs_const.item_type.values():
 | |
| 
 | |
|                 entry_same_namespace_collisions = set()
 | |
|                 for other_entry in seen_entries:
 | |
|                     if entry.metadata['namespace'] == other_entry.metadata['namespace']:
 | |
|                         entry_same_namespace_collisions.add(entry)
 | |
|                         entry_same_namespace_collisions.add(other_entry)
 | |
| 
 | |
|                 if len(entry_same_namespace_collisions) != 0:
 | |
|                     entry_same_namespace_collisions_list.update(entry_same_namespace_collisions)
 | |
|                 seen_entries.append(entry)
 | |
| 
 | |
|         # Catch real duplicates
 | |
|         new_duplicate_entries: List[NVS_Entry] = []
 | |
|         if len(seen_entries) > 1:
 | |
|             for entry in seen_entries:
 | |
|                 if entry in entry_same_namespace_collisions_list:
 | |
|                     new_duplicate_entries.append(entry)
 | |
| 
 | |
|         if len(new_duplicate_entries) > 0:
 | |
|             new_duplicate_entries_dict[key] = new_duplicate_entries
 | |
| 
 | |
|     return new_duplicate_entries_dict
 | |
| 
 | |
| 
 | |
| def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
 | |
|     """Takes a dictionary of entries (as written) and returns a new dictionary with "fake" duplicates,
 | |
|     where entries related to blob index and blob data under the same namespace are filtered out
 | |
| 
 | |
|     Use `filter_entry_duplicates()` to properly filter out all duplicates
 | |
|     """
 | |
|     new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
 | |
|     for key, duplicate_entries in duplicate_entries_dict.items():
 | |
|         seen_blob_index: List[NVS_Entry] = []
 | |
|         seen_blob_data: List[NVS_Entry] = []
 | |
|         seen_another_type_data: List[NVS_Entry] = []
 | |
|         blob_index_chunk_index_collisions_list: Set[NVS_Entry] = set()
 | |
|         blob_data_chunk_index_collisions_list: Set[NVS_Entry] = set()
 | |
| 
 | |
|         # Search through the "duplicates" and see if there are real duplicates
 | |
|         # E.g. the key can be the same for blob_index and blob_data
 | |
|         # (and even for more blob_data entries if they have a different chunk_index)
 | |
|         for entry in duplicate_entries:
 | |
|             if entry.metadata['type'] == 'blob_index':
 | |
| 
 | |
|                 blob_index_chunk_index_collisions = set()
 | |
|                 for other_entry in seen_blob_index:
 | |
|                     if entry.metadata['namespace'] == other_entry.metadata['namespace']:
 | |
|                         blob_index_chunk_index_collisions.add(entry)
 | |
|                         blob_index_chunk_index_collisions.add(other_entry)
 | |
| 
 | |
|                 if len(blob_index_chunk_index_collisions) != 0:
 | |
|                     blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions)
 | |
|                 seen_blob_index.append(entry)
 | |
| 
 | |
|             elif entry.metadata['type'] == 'blob_data':
 | |
| 
 | |
|                 blob_data_chunk_index_collisions = set()
 | |
|                 for other_entry in seen_blob_data:
 | |
|                     if (entry.metadata['namespace'] == other_entry.metadata['namespace']
 | |
|                             and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']):
 | |
|                         blob_data_chunk_index_collisions.add(entry)
 | |
|                         blob_data_chunk_index_collisions.add(other_entry)
 | |
| 
 | |
|                 if len(blob_data_chunk_index_collisions) != 0:
 | |
|                     blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions)
 | |
|                 seen_blob_data.append(entry)
 | |
| 
 | |
|             else:
 | |
|                 seen_another_type_data.append(entry)
 | |
| 
 | |
|         # Catch real duplicates
 | |
|         new_duplicate_entries: List[NVS_Entry] = []
 | |
|         if len(seen_blob_index) > 1:
 | |
|             for entry in seen_blob_index:
 | |
|                 if entry in blob_index_chunk_index_collisions_list:
 | |
|                     new_duplicate_entries.append(entry)
 | |
| 
 | |
|         if len(seen_blob_data) > 1:
 | |
|             for entry in seen_blob_data:
 | |
|                 if entry in blob_data_chunk_index_collisions_list:
 | |
|                         new_duplicate_entries.append(entry)
 | |
| 
 | |
|         for entry in seen_another_type_data:  # If there are any duplicates of other types
 | |
|             new_duplicate_entries.append(entry)
 | |
| 
 | |
|         if len(new_duplicate_entries) > 0:
 | |
|             new_duplicate_entries_dict[key] = new_duplicate_entries
 | |
| 
 | |
|     return new_duplicate_entries_dict
 | |
| 
 | |
| 
 | |
| def filter_entry_duplicates(entries: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
 | |
|     """Takes a dictionary of (seen written) entries and outputs a new dictionary with "fake" duplicates filtered out, keeping only real duplicates in
 | |
| 
 | |
|     (i.e. duplicate keys under different namespaces and blob index and blob data having the same key under the same namespace are allowed
 | |
|     and should be filtered out)
 | |
| 
 | |
|     Part 2 of duplicate entry check mechanism
 | |
|     """
 | |
|     # Only keep seen written entries which have been observerd multiple times (duplicates)
 | |
|     duplicate_entries_list = {key: v for key, v in entries.items() if len(v) > 1}
 | |
|     # Filter out "fake" duplicates 1 (duplicate keys under different namespaces are allowed)
 | |
|     duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list)
 | |
|     # Filter out "fake" duplicates 2 (blob index and blob data are allowed to have the same key even in the same namespace)
 | |
|     duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1)
 | |
|     return duplicate_entries_list_2
 | |
| 
 | |
| 
 | |
| def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None:
 | |
|     if len(duplicate_entries_list) > 0:
 | |
|         nvs_log.info(nvs_log.red('Found duplicate entries:'))
 | |
|         nvs_log.info(nvs_log.red('Entry\tKey\t\t\tType\t\tNamespace idx\tPage\tPage status'))
 | |
| 
 | |
|     for _, duplicate_entries in duplicate_entries_list.items():
 | |
|         # duplicate_entries: List[NVS_Entry]
 | |
|         for entry in duplicate_entries:
 | |
|             # entry: NVS_Entry
 | |
|             if entry.metadata['namespace'] == 0:
 | |
|                 entry_type = f'namespace ({entry.data["value"]})'
 | |
|             else:
 | |
|                 entry_type = entry.metadata['type']
 | |
| 
 | |
|             if entry.page is not None:
 | |
|                 page_num = entry.page.header['page_index']
 | |
|                 page_status = entry.page.header['status']
 | |
|             else:
 | |
|                 page_num = 'Unknown'
 | |
|                 page_status = 'Unknown'
 | |
| 
 | |
|             entry_key_tab_cnt = len(entry.key) // 8
 | |
|             entry_key_tab = '\t' * (3 - entry_key_tab_cnt)
 | |
| 
 | |
|             namespace_tab_cnt = len(entry_type) // 8
 | |
|             namepace_tab = '\t' * (2 - namespace_tab_cnt)
 | |
|             namespace_str = f'{entry.metadata["namespace"]}'
 | |
| 
 | |
|             nvs_log.info(
 | |
|                 nvs_log.red(
 | |
|                     f'#{entry.index:03d}\t{entry.key}{entry_key_tab}{entry_type}{namepace_tab}{namespace_str}\t\t{page_num}\t{page_status}'
 | |
|                 )
 | |
|             )
 | |
| 
 | |
| 
 | |
| def assemble_blobs(nvs_log: NVS_Logger) -> None:
 | |
|     """Assembles blob data from blob chunks
 | |
|     """
 | |
|     for chunk in blob_chunks:
 | |
|         # chunk: NVS_Entry
 | |
|         parent = blobs.get(
 | |
|             f'{chunk.metadata["namespace"]:03d}{chunk.key}', [EMPTY_ENTRY]
 | |
|         )[0]
 | |
|         # Blob chunk without blob index check
 | |
|         if parent is EMPTY_ENTRY:
 | |
|             nvs_log.info(
 | |
|                 nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
 | |
|                 f'Namespace index: {chunk.metadata["namespace"]:03d}',
 | |
|                 f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
 | |
|                 f'Chunk Index: {chunk.metadata["chunk_index"]:03d}',
 | |
|             )
 | |
|         else:
 | |
|             blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
 | |
|             chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
 | |
|             blobs[blob_key][chunk_index + 1] = chunk
 | |
| 
 | |
| 
 | |
| def check_blob_data(nvs_log: NVS_Logger) -> None:
 | |
|     """Checks blob data for missing chunks or data
 | |
|     """
 | |
|     for blob_key in blobs:
 | |
|         blob_index = blobs[blob_key][0]
 | |
|         blob_chunks = blobs[blob_key][1:]
 | |
|         blob_size = blob_index.data['size']
 | |
| 
 | |
|         for i, chunk in enumerate(blob_chunks):
 | |
|             # chunk: NVS_Entry
 | |
|             # Blob missing chunk check
 | |
|             if chunk is EMPTY_ENTRY:
 | |
|                 nvs_log.info(
 | |
|                     nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
 | |
|                     f'Namespace index: {blob_index.metadata["namespace"]:03d}',
 | |
|                     f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
 | |
|                     f'Chunk Index: {i:03d}',
 | |
|                 )
 | |
|             else:
 | |
|                 blob_size -= len(chunk.children) * nvs_const.entry_size
 | |
| 
 | |
|         # Blob missing data check
 | |
|         if blob_size > 0:
 | |
|             nvs_log.info(
 | |
|                 nvs_log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
 | |
|                 f'Namespace index: {blob_index.metadata["namespace"]:03d}',
 | |
|             )
 | |
| 
 | |
| 
 | |
| def check_blobs(nvs_log: NVS_Logger) -> None:
 | |
|     # Assemble blobs
 | |
|     assemble_blobs(nvs_log)
 | |
|     # Blob data check
 | |
|     check_blob_data(nvs_log)
 | |
| 
 | |
| 
 | |
| def check_namespaces(nvs_log: NVS_Logger) -> None:
 | |
|     """Checks namespaces (entries using undefined namespace indexes, unused namespaces)
 | |
|     """
 | |
|     # Undefined namespace index check
 | |
|     for used_ns in used_namespaces:
 | |
|         key = found_namespaces.pop(used_ns, None)
 | |
|         if key is None:
 | |
|             nvs_log.info(
 | |
|                 nvs_log.red('Undefined namespace index!'),
 | |
|                 f'Namespace index: {used_ns:03d}',
 | |
|                 f'[undefined]',
 | |
|             )
 | |
| 
 | |
|     # Unused namespace index check
 | |
|     for unused_ns in found_namespaces:
 | |
|         nvs_log.info(
 | |
|             nvs_log.yellow('Found unused namespace.'),
 | |
|             f'Namespace index: {unused_ns:03d}',
 | |
|             f'[{found_namespaces[unused_ns]}]',
 | |
|         )
 | |
| 
 | |
| 
 | |
| def reset_global_variables() -> None:
 | |
|     """Global variables need to be cleared out before calling `integrity_check()` multiple times from a script
 | |
|     (e.g. when running tests) to avoid incorrect output
 | |
|     """
 | |
|     global used_namespaces, found_namespaces, blobs, blob_chunks
 | |
|     used_namespaces = {}
 | |
|     found_namespaces = {}
 | |
|     blobs = {}
 | |
|     blob_chunks = []
 | |
| 
 | |
| 
 | |
| def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
 | |
|     """Function for multi-stage integrity check of a NVS partition
 | |
|     """
 | |
|     # Partition size check
 | |
|     check_partition_size(nvs_partition, nvs_log)
 | |
| 
 | |
|     # Free/empty page check
 | |
|     check_empty_page_present(nvs_partition, nvs_log)
 | |
| 
 | |
|     seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
 | |
| 
 | |
|     # Loop through all pages in the partition
 | |
|     for page in nvs_partition.pages:
 | |
|         # page: NVS_Page
 | |
| 
 | |
|         # Print a page header
 | |
|         if page.header['status'] == 'Empty':
 | |
|             # Check if a page is truly empty
 | |
|             check_empty_page_content(page, nvs_log)
 | |
|         else:
 | |
|             # Check a page header CRC32
 | |
|             check_page_crc(page, nvs_log)
 | |
| 
 | |
|         # Check all entries in a page
 | |
|         seen_written_entires = check_page_entries(page, nvs_log)
 | |
| 
 | |
|         # Collect all seen written entries
 | |
|         for key in seen_written_entires:
 | |
|             if key in seen_written_entires_all:
 | |
|                 seen_written_entires_all[key].extend(seen_written_entires[key])
 | |
|             else:
 | |
|                 seen_written_entires_all[key] = seen_written_entires[key]
 | |
| 
 | |
|     # Duplicate entry check (2) - same key, different index
 | |
|     duplicates = filter_entry_duplicates(seen_written_entires_all)
 | |
|     # Print duplicate entries
 | |
|     print_entry_duplicates(duplicates, nvs_log)
 | |
| 
 | |
|     nvs_log.info()  # Empty line
 | |
| 
 | |
|     # Blob checks
 | |
|     check_blobs(nvs_log)
 | |
| 
 | |
|     # Namespace checks
 | |
|     check_namespaces(nvs_log)
 | |
| 
 | |
|     reset_global_variables()
 | 
