X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/maxminddb
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
maxminddb
/
??
..
??
__init__.py
(2.5 KB)
??
__pycache__
??
const.py
(122 B)
??
decoder.py
(6.84 KB)
??
errors.py
(226 B)
??
extension.pyi
(1.41 KB)
??
file.py
(2.1 KB)
??
py.typed
(0 B)
??
reader.py
(10.74 KB)
??
types.py
(558 B)
Editing: decoder.py
""" maxminddb.decoder ~~~~~~~~~~~~~~~~~ This package contains code for decoding the MaxMind DB data section. """ import struct from typing import cast, Dict, List, Tuple, Union try: # pylint: disable=unused-import import mmap except ImportError: # pylint: disable=invalid-name mmap = None # type: ignore from maxminddb.errors import InvalidDatabaseError from maxminddb.file import FileBuffer from maxminddb.types import Record class Decoder: # pylint: disable=too-few-public-methods """Decoder for the data section of the MaxMind DB""" def __init__( self, database_buffer: Union[FileBuffer, "mmap.mmap", bytes], pointer_base: int = 0, pointer_test: bool = False, ) -> None: """Created a Decoder for a MaxMind DB Arguments: database_buffer -- an mmap'd MaxMind DB file. pointer_base -- the base number to use when decoding a pointer pointer_test -- used for internal unit testing of pointer code """ self._pointer_test = pointer_test self._buffer = database_buffer self._pointer_base = pointer_base def _decode_array(self, size: int, offset: int) -> Tuple[List[Record], int]: array = [] for _ in range(size): (value, offset) = self.decode(offset) array.append(value) return array, offset def _decode_boolean(self, size: int, offset: int) -> Tuple[bool, int]: return size != 0, offset def _decode_bytes(self, size: int, offset: int) -> Tuple[bytes, int]: new_offset = offset + size return self._buffer[offset:new_offset], new_offset def _decode_double(self, size: int, offset: int) -> Tuple[float, int]: self._verify_size(size, 8) new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] (value,) = struct.unpack(b"!d", packed_bytes) return value, new_offset def _decode_float(self, size: int, offset: int) -> Tuple[float, int]: self._verify_size(size, 4) new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] (value,) = struct.unpack(b"!f", packed_bytes) return value, new_offset def _decode_int32(self, size: int, offset: int) -> Tuple[int, int]: if size == 0: return 0, offset new_offset = offset + size packed_bytes = self._buffer[offset:new_offset] if size != 4: packed_bytes = packed_bytes.rjust(4, b"\x00") (value,) = struct.unpack(b"!i", packed_bytes) return value, new_offset def _decode_map(self, size: int, offset: int) -> Tuple[Dict[str, Record], int]: container: Dict[str, Record] = {} for _ in range(size): (key, offset) = self.decode(offset) (value, offset) = self.decode(offset) container[cast(str, key)] = value return container, offset def _decode_pointer(self, size: int, offset: int) -> Tuple[Record, int]: pointer_size = (size >> 3) + 1 buf = self._buffer[offset : offset + pointer_size] new_offset = offset + pointer_size if pointer_size == 1: buf = bytes([size & 0x7]) + buf pointer = struct.unpack(b"!H", buf)[0] + self._pointer_base elif pointer_size == 2: buf = b"\x00" + bytes([size & 0x7]) + buf pointer = struct.unpack(b"!I", buf)[0] + 2048 + self._pointer_base elif pointer_size == 3: buf = bytes([size & 0x7]) + buf pointer = struct.unpack(b"!I", buf)[0] + 526336 + self._pointer_base else: pointer = struct.unpack(b"!I", buf)[0] + self._pointer_base if self._pointer_test: return pointer, new_offset (value, _) = self.decode(pointer) return value, new_offset def _decode_uint(self, size: int, offset: int) -> Tuple[int, int]: new_offset = offset + size uint_bytes = self._buffer[offset:new_offset] return int.from_bytes(uint_bytes, "big"), new_offset def _decode_utf8_string(self, size: int, offset: int) -> Tuple[str, int]: new_offset = offset + size return self._buffer[offset:new_offset].decode("utf-8"), new_offset _type_decoder = { 1: _decode_pointer, 2: _decode_utf8_string, 3: _decode_double, 4: _decode_bytes, 5: _decode_uint, # uint16 6: _decode_uint, # uint32 7: _decode_map, 8: _decode_int32, 9: _decode_uint, # uint64 10: _decode_uint, # uint128 11: _decode_array, 14: _decode_boolean, 15: _decode_float, } def decode(self, offset: int) -> Tuple[Record, int]: """Decode a section of the data section starting at offset Arguments: offset -- the location of the data structure to decode """ new_offset = offset + 1 ctrl_byte = self._buffer[offset] type_num = ctrl_byte >> 5 # Extended type if not type_num: (type_num, new_offset) = self._read_extended(new_offset) try: decoder = self._type_decoder[type_num] except KeyError as ex: raise InvalidDatabaseError( f"Unexpected type number ({type_num}) encountered" ) from ex (size, new_offset) = self._size_from_ctrl_byte(ctrl_byte, new_offset, type_num) return decoder(self, size, new_offset) def _read_extended(self, offset: int) -> Tuple[int, int]: next_byte = self._buffer[offset] type_num = next_byte + 7 if type_num < 7: raise InvalidDatabaseError( "Something went horribly wrong in the decoder. An " f"extended type resolved to a type number < 8 ({type_num})" ) return type_num, offset + 1 @staticmethod def _verify_size(expected: int, actual: int) -> None: if expected != actual: raise InvalidDatabaseError( "The MaxMind DB file's data section contains bad data " "(unknown data type or corrupt data)" ) def _size_from_ctrl_byte( self, ctrl_byte: int, offset: int, type_num: int ) -> Tuple[int, int]: size = ctrl_byte & 0x1F if type_num == 1 or size < 29: return size, offset if size == 29: size = 29 + self._buffer[offset] return size, offset + 1 # Using unpack rather than int_from_bytes as it is faster # here and below. if size == 30: new_offset = offset + 2 size_bytes = self._buffer[offset:new_offset] size = 285 + struct.unpack(b"!H", size_bytes)[0] return size, new_offset new_offset = offset + 3 size_bytes = self._buffer[offset:new_offset] size = struct.unpack(b"!I", b"\x00" + size_bytes)[0] + 65821 return size, new_offset
Upload File
Create Folder