ARTICLE AD BOX
I am building segment-based log storage system in python using mmap module in python.
I want to prioritize:
Zero copy as much as possible
Concurrent readers
dynamic growth of underlying file
Current Approach:
class Entry: """ Owned by SegmentMemory class, provides read and write guarantees. """ def __init__(self, meta:SegmentMeta, init_segment_size, segment_size_inc) -> None: self.__mmap: mmap.mmap | None = None self.__file_obj: _io.BufferedRandom | None = None self.__capacity: int|None = None self.__filepath = meta.get_filepath() self.__mutable:bool = meta.is_mutable() self._refcount:AtomicInt = AtomicInt(0) self._lock: threading.Lock = threading.Lock() self.init_segment_size = init_segment_size self.segment_size_inc = segment_size_inc def load(self) -> None: '''Load resource before use''' # Never call it internally while holding the lock with self._lock: # Only load the file onces if self.__mmap is None: # ensure directory exists os.makedirs(os.path.dirname(self.__filepath), exist_ok=True) exists = os.path.exists(self.__filepath) # Load the file self.__file_obj = open(self.__filepath, 'r+b' if exists else 'wb+') self.__capacity = os.fstat(self.__file_obj.fileno()).st_size if(self.__capacity == 0): # First time creating a file # ensure file capacity self.__file_obj.truncate(self.init_segment_size) self.__capacity = self.init_segment_size self.__mmap = mmap.mmap(self.__file_obj.fileno(), 0) # Hint the os for sequential reads set_sequential_hint(self.__mmap, self.__file_obj.fileno()) def read_bytes(self, offset: int, length: int) -> bytes: assert(self.__mmap is not None) return self.__mmap[offset : offset+length] def write(self, offset: int, msg: bytes) -> None: assert(self.__mmap is not None) assert(self.__mutable == True) required_capacity = offset+len(msg) with self._lock: self._ensure_capacity_locked(required_capacity) self.__mmap[offset:required_capacity] = msg def release(self) -> None: # Not thread-safe if self.__mmap is not None: if self.__mutable: self.__mmap.flush() self.__mmap.close() self.__mmap = None if self.__file_obj is not None: if self.__mutable: self.__file_obj.flush() self.__file_obj.close() self.__file_obj = None def _ensure_capacity_locked(self, capacity: int): """Internal function: Not thread safe """ assert(self.__capacity is not None) assert(self.__file_obj is not None) assert(self.__mmap is not None) if(self.__capacity < capacity): # Increasing the segment size new_capacity = max(capacity, self.segment_size_inc+self.__capacity) self.__file_obj.truncate(new_capacity) self.__mmap.resize(new_capacity) self.__capacity = new_capacityHere I am doing reads without any lock in def read_bytes and also need to growing the file size in def ensure_capacity_locked, it first truncate the file then resizes the mmap.
But I am not sure about the current behaviour. Can the mapping change while resizing and make the concurrent reads invalid or cause SIGBUS.
Python documentation also doesn't say anything about thread-safety
https://docs.python.org/3/library/mmap.html#mmap.mmap.resize
