mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-08 12:10:59 +00:00
fatfsparse.py: add cluster chaining support to enable reading longer-than-sector files
Closes IDF-4975
This commit is contained in:
325
components/fatfs/fatfs_utils/fs_object.py
Normal file
325
components/fatfs/fatfs_utils/fs_object.py
Normal file
@@ -0,0 +1,325 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
from .entry import Entry
|
||||
from .exceptions import FatalError, WriteDirectoryException
|
||||
from .fat import FAT, Cluster
|
||||
from .fatfs_state import FATFSState
|
||||
from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order,
|
||||
get_required_lfn_entries_count, split_name_to_lfn_entries,
|
||||
split_name_to_lfn_entry_blocks)
|
||||
from .utils import (DATETIME, MAX_EXT_SIZE, MAX_NAME_SIZE, FATDefaults, build_lfn_short_entry_name, lfn_checksum,
|
||||
required_clusters_count, split_content_into_sectors, split_to_name_and_extension)
|
||||
|
||||
|
||||
class File:
|
||||
"""
|
||||
The class File provides API to write into the files. It represents file in the FS.
|
||||
"""
|
||||
ATTR_ARCHIVE: int = 0x20
|
||||
ENTITY_TYPE: int = ATTR_ARCHIVE
|
||||
|
||||
def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
|
||||
self.name: str = name
|
||||
self.extension: str = extension
|
||||
self.fatfs_state: FATFSState = fatfs_state
|
||||
self.fat: FAT = fat
|
||||
self.size: int = 0
|
||||
self._first_cluster: Optional[Cluster] = None
|
||||
self._entry: Entry = entry
|
||||
|
||||
@property
|
||||
def entry(self) -> Entry:
|
||||
return self._entry
|
||||
|
||||
@property
|
||||
def first_cluster(self) -> Optional[Cluster]:
|
||||
return self._first_cluster
|
||||
|
||||
@first_cluster.setter
|
||||
def first_cluster(self, value: Cluster) -> None:
|
||||
self._first_cluster = value
|
||||
|
||||
def name_equals(self, name: str, extension: str) -> bool:
|
||||
return self.name == name and self.extension == extension
|
||||
|
||||
def write(self, content: bytes) -> None:
|
||||
self.entry.update_content_size(len(content))
|
||||
# we assume that the correct amount of clusters is allocated
|
||||
current_cluster = self._first_cluster
|
||||
for content_part in split_content_into_sectors(content, self.fatfs_state.boot_sector_state.sector_size):
|
||||
content_as_list = content_part
|
||||
if current_cluster is None:
|
||||
raise FatalError('No free space left!')
|
||||
|
||||
address: int = current_cluster.cluster_data_address
|
||||
self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
|
||||
current_cluster = current_cluster.next_cluster
|
||||
|
||||
|
||||
class Directory:
|
||||
"""
|
||||
The Directory class provides API to add files and directories into the directory
|
||||
and to find the file according to path and write it.
|
||||
"""
|
||||
ATTR_DIRECTORY: int = 0x10
|
||||
ATTR_ARCHIVE: int = 0x20
|
||||
ENTITY_TYPE: int = ATTR_DIRECTORY
|
||||
|
||||
CURRENT_DIRECTORY = '.'
|
||||
PARENT_DIRECTORY = '..'
|
||||
|
||||
def __init__(self,
|
||||
name,
|
||||
fat,
|
||||
fatfs_state,
|
||||
entry=None,
|
||||
cluster=None,
|
||||
size=None,
|
||||
extension='',
|
||||
parent=None):
|
||||
# type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
|
||||
self.name: str = name
|
||||
self.fatfs_state: FATFSState = fatfs_state
|
||||
self.extension: str = extension
|
||||
|
||||
self.fat: FAT = fat
|
||||
self.size: int = size or self.fatfs_state.boot_sector_state.sector_size
|
||||
|
||||
# if directory is root its parent is itself
|
||||
self.parent: Directory = parent or self
|
||||
self._first_cluster: Cluster = cluster
|
||||
|
||||
# entries will be initialized after the cluster allocation
|
||||
self.entries: List[Entry] = []
|
||||
self.entities: List[Union[File, Directory]] = [] # type: ignore
|
||||
self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
|
||||
|
||||
@property
|
||||
def is_root(self) -> bool:
|
||||
return self.parent is self
|
||||
|
||||
@property
|
||||
def first_cluster(self) -> Cluster:
|
||||
return self._first_cluster
|
||||
|
||||
@first_cluster.setter
|
||||
def first_cluster(self, value: Cluster) -> None:
|
||||
self._first_cluster = value
|
||||
|
||||
def name_equals(self, name: str, extension: str) -> bool:
|
||||
return self.name == name and self.extension == extension
|
||||
|
||||
@property
|
||||
def entries_count(self) -> int:
|
||||
entries_count_: int = self.size // FATDefaults.ENTRY_SIZE
|
||||
return entries_count_
|
||||
|
||||
def create_entries(self, cluster: Cluster) -> List[Entry]:
|
||||
return [Entry(entry_id=i,
|
||||
parent_dir_entries_address=cluster.cluster_data_address,
|
||||
fatfs_state=self.fatfs_state)
|
||||
for i in range(self.entries_count)]
|
||||
|
||||
def init_directory(self) -> None:
|
||||
self.entries = self.create_entries(self._first_cluster)
|
||||
|
||||
# the root directory doesn't contain link to itself nor the parent
|
||||
if self.is_root:
|
||||
return
|
||||
# if the directory is not root we initialize the reference to itself and to the parent directory
|
||||
for dir_id, name_ in ((self, self.CURRENT_DIRECTORY), (self.parent, self.PARENT_DIRECTORY)):
|
||||
new_dir_: Entry = self.find_free_entry() or self.chain_directory()
|
||||
new_dir_.allocate_entry(first_cluster_id=dir_id.first_cluster.id,
|
||||
entity_name=name_,
|
||||
entity_extension='',
|
||||
entity_type=dir_id.ENTITY_TYPE)
|
||||
|
||||
def lookup_entity(self, object_name: str, extension: str): # type: ignore
|
||||
for entity in self.entities:
|
||||
if entity.name == object_name and entity.extension == extension:
|
||||
return entity
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _if_end_of_path(path_as_list: List[str]) -> bool:
|
||||
"""
|
||||
:param path_as_list: path split into the list
|
||||
|
||||
:returns: True if the file is the leaf of the directory tree, False otherwise
|
||||
The method is part of the base of recursion,
|
||||
determines if the path is target file or directory in the tree folder structure.
|
||||
"""
|
||||
return len(path_as_list) == 1
|
||||
|
||||
def recursive_search(self, path_as_list, current_dir): # type: ignore
|
||||
name, extension = split_to_name_and_extension(path_as_list[0])
|
||||
next_obj = current_dir.lookup_entity(name, extension)
|
||||
if next_obj is None:
|
||||
raise FileNotFoundError('No such file or directory!')
|
||||
if self._if_end_of_path(path_as_list) and next_obj.name_equals(name, extension):
|
||||
return next_obj
|
||||
return self.recursive_search(path_as_list[1:], next_obj)
|
||||
|
||||
def find_free_entry(self) -> Optional[Entry]:
|
||||
for entry in self.entries:
|
||||
if entry.is_empty:
|
||||
return entry
|
||||
return None
|
||||
|
||||
def _extend_directory(self) -> None:
|
||||
current: Cluster = self.first_cluster
|
||||
while current.next_cluster is not None:
|
||||
current = current.next_cluster
|
||||
new_cluster: Cluster = self.fat.find_free_cluster()
|
||||
current.set_in_fat(new_cluster.id)
|
||||
assert current is not new_cluster
|
||||
current.next_cluster = new_cluster
|
||||
self.entries += self.create_entries(new_cluster)
|
||||
|
||||
def chain_directory(self) -> Entry:
|
||||
"""
|
||||
:returns: First free entry
|
||||
|
||||
The method adds new Cluster to the Directory and returns first free entry.
|
||||
"""
|
||||
self._extend_directory()
|
||||
free_entry: Entry = self.find_free_entry()
|
||||
if free_entry is None:
|
||||
raise FatalError('No more space left!')
|
||||
return free_entry
|
||||
|
||||
@staticmethod
|
||||
def allocate_long_name_object(free_entry,
|
||||
name,
|
||||
extension,
|
||||
target_dir,
|
||||
free_cluster,
|
||||
entity_type,
|
||||
date,
|
||||
time):
|
||||
# type: (Entry, str, str, Directory, Cluster, int, DATETIME, DATETIME) -> Tuple[Cluster, Entry, Directory]
|
||||
lfn_full_name: str = build_lfn_full_name(name, extension)
|
||||
lfn_unique_entry_order: int = build_lfn_unique_entry_name_order(target_dir.entities, name)
|
||||
lfn_short_entry_name: str = build_lfn_short_entry_name(name, extension, lfn_unique_entry_order)
|
||||
checksum: int = lfn_checksum(lfn_short_entry_name)
|
||||
entries_count: int = get_required_lfn_entries_count(lfn_full_name)
|
||||
|
||||
# entries in long file name entries chain starts with the last entry
|
||||
split_names_reversed = reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count))))
|
||||
for i, name_split_to_entry in split_names_reversed:
|
||||
order: int = i + 1
|
||||
lfn_names: List[bytes] = list(
|
||||
map(lambda x: x.lower(), split_name_to_lfn_entry_blocks(name_split_to_entry))) # type: ignore
|
||||
free_entry.allocate_entry(first_cluster_id=free_cluster.id,
|
||||
entity_name=name,
|
||||
entity_extension=extension,
|
||||
entity_type=entity_type,
|
||||
lfn_order=order,
|
||||
lfn_names=lfn_names,
|
||||
lfn_checksum_=checksum,
|
||||
lfn_is_last=order == entries_count)
|
||||
free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
|
||||
free_entry.allocate_entry(first_cluster_id=free_cluster.id,
|
||||
entity_name=lfn_short_entry_name[:MAX_NAME_SIZE],
|
||||
entity_extension=lfn_short_entry_name[MAX_NAME_SIZE:],
|
||||
entity_type=entity_type,
|
||||
lfn_order=Entry.SHORT_ENTRY_LN,
|
||||
date=date,
|
||||
time=time)
|
||||
return free_cluster, free_entry, target_dir
|
||||
|
||||
def allocate_object(self,
|
||||
name,
|
||||
entity_type,
|
||||
object_timestamp_,
|
||||
path_from_root=None,
|
||||
extension=''):
|
||||
# type: (str, int, datetime, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory]
|
||||
"""
|
||||
Method finds the target directory in the path
|
||||
and allocates cluster (both the record in FAT and cluster in the data region)
|
||||
and entry in the specified directory
|
||||
"""
|
||||
free_cluster: Cluster = self.fat.find_free_cluster()
|
||||
target_dir: Directory = self if not path_from_root else self.recursive_search(path_from_root, self)
|
||||
free_entry: Entry = target_dir.find_free_entry() or target_dir.chain_directory()
|
||||
|
||||
name_fits_short_struct: bool = len(name) <= MAX_NAME_SIZE and len(extension) <= MAX_EXT_SIZE
|
||||
|
||||
fatfs_date_ = (object_timestamp_.year, object_timestamp_.month, object_timestamp_.day)
|
||||
fatfs_time_ = (object_timestamp_.hour, object_timestamp_.minute, object_timestamp_.second)
|
||||
|
||||
if not self.fatfs_state.long_names_enabled or name_fits_short_struct:
|
||||
free_entry.allocate_entry(first_cluster_id=free_cluster.id,
|
||||
entity_name=name,
|
||||
entity_extension=extension,
|
||||
date=fatfs_date_,
|
||||
time=fatfs_time_,
|
||||
entity_type=entity_type)
|
||||
return free_cluster, free_entry, target_dir
|
||||
return self.allocate_long_name_object(free_entry=free_entry,
|
||||
name=name,
|
||||
extension=extension,
|
||||
target_dir=target_dir,
|
||||
free_cluster=free_cluster,
|
||||
entity_type=entity_type,
|
||||
date=fatfs_date_,
|
||||
time=fatfs_time_)
|
||||
|
||||
def new_file(self,
|
||||
name: str,
|
||||
extension: str,
|
||||
path_from_root: Optional[List[str]],
|
||||
object_timestamp_: datetime) -> None:
|
||||
free_cluster, free_entry, target_dir = self.allocate_object(name=name,
|
||||
extension=extension,
|
||||
entity_type=Directory.ATTR_ARCHIVE,
|
||||
path_from_root=path_from_root,
|
||||
object_timestamp_=object_timestamp_)
|
||||
|
||||
file: File = File(name=name,
|
||||
fat=self.fat,
|
||||
extension=extension,
|
||||
fatfs_state=self.fatfs_state,
|
||||
entry=free_entry)
|
||||
file.first_cluster = free_cluster
|
||||
target_dir.entities.append(file)
|
||||
|
||||
def new_directory(self, name, parent, path_from_root, object_timestamp_):
|
||||
# type: (str, Directory, Optional[List[str]], datetime) -> None
|
||||
free_cluster, free_entry, target_dir = self.allocate_object(name=name,
|
||||
entity_type=Directory.ATTR_DIRECTORY,
|
||||
path_from_root=path_from_root,
|
||||
object_timestamp_=object_timestamp_)
|
||||
|
||||
directory: Directory = Directory(name=name,
|
||||
fat=self.fat,
|
||||
parent=parent,
|
||||
fatfs_state=self.fatfs_state,
|
||||
entry=free_entry)
|
||||
directory.first_cluster = free_cluster
|
||||
directory.init_directory()
|
||||
target_dir.entities.append(directory)
|
||||
|
||||
def write_to_file(self, path: List[str], content: bytes) -> None:
|
||||
"""
|
||||
Writes to file existing in the directory structure.
|
||||
|
||||
:param path: path split into the list
|
||||
:param content: content as a string to be written into a file
|
||||
:returns: None
|
||||
:raises WriteDirectoryException: raised is the target object for writing is a directory
|
||||
"""
|
||||
entity_to_write: Entry = self.recursive_search(path, self)
|
||||
if isinstance(entity_to_write, File):
|
||||
clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.boot_sector_state.sector_size,
|
||||
content=content)
|
||||
self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
|
||||
entity_to_write.write(content)
|
||||
else:
|
||||
raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')
|
Reference in New Issue
Block a user