IP : 3.145.170.18 Hostname : host45.registrar-servers.com Kernel : Linux host45.registrar-servers.com 4.18.0-513.18.1.lve.2.el8.x86_64 #1 SMP Sat Mar 30 15:36:11 UTC 2024 x86_64 Disable Function : None :) OS : Linux PATH: / home/ jackpotjunglegam/ .cagefs/ ../ ./ ../ ../ lib64/ libnl/ ./ ../ man-db/ ../ python3.6/ _compression.py/ /

"""Internal classes used by the gzip, lzma and bz2 modules"""

import io


BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE # Compressed data read chunk size


class BaseStream(io.BufferedIOBase):
"""Mode-checking helper functions."""

def _check_not_closed(self):
if self.closed:
raise ValueError("I/O operation on closed file")

def _check_can_read(self):
if not self.readable():
raise io.UnsupportedOperation("File not open for reading")

def _check_can_write(self):
if not self.writable():
raise io.UnsupportedOperation("File not open for writing")

def _check_can_seek(self):
if not self.readable():
raise io.UnsupportedOperation("Seeking is only supported "
"on files open for reading")
if not self.seekable():
raise io.UnsupportedOperation("The underlying file object "
"does not support seeking")


class DecompressReader(io.RawIOBase):
"""Adapts the decompressor API to a RawIOBase reader API"""

def readable(self):
return True

def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args):
self._fp = fp
self._eof = False
self._pos = 0 # Current offset in decompressed stream

# Set to size of decompressed stream once it is known, for SEEK_END
self._size = -1

# Save the decompressor factory and arguments.
# If the file contains multiple compressed streams, each
# stream will need a separate decompressor object. A new decompressor
# object is also needed when implementing a backwards seek().
self._decomp_factory = decomp_factory
self._decomp_args = decomp_args
self._decompressor = self._decomp_factory(**self._decomp_args)

# Exception class to catch from decompressor signifying invalid
# trailing data to ignore
self._trailing_error = trailing_error

def close(self):
self._decompressor = None
return super().close()

def seekable(self):
return self._fp.seekable()

def readinto(self, b):
with memoryview(b) as view, view.cast("B") as byte_view:
data = self.read(len(byte_view))
byte_view[:len(data)] = data
return len(data)

def read(self, size=-1):
if size < 0:
return self.readall()

if not size or self._eof:
return b""
data = None # Default if EOF is encountered
# Depending on the input data, our call to the decompressor may not
# return any data. In this case, try again after reading another block.
while True:
if self._decompressor.eof:
rawblock = (self._decompressor.unused_data or
self._fp.read(BUFFER_SIZE))
if not rawblock:
break
# Continue to next stream.
self._decompressor = self._decomp_factory(
**self._decomp_args)
try:
data = self._decompressor.decompress(rawblock, size)
except self._trailing_error:
# Trailing data isn't a valid compressed stream; ignore it.
break
else:
if self._decompressor.needs_input:
rawblock = self._fp.read(BUFFER_SIZE)
if not rawblock:
raise EOFError("Compressed file ended before the "
"end-of-stream marker was reached")
else:
rawblock = b""
data = self._decompressor.decompress(rawblock, size)
if data:
break
if not data:
self._eof = True
self._size = self._pos
return b""
self._pos += len(data)
return data

# Rewind the file to the beginning of the data stream.
def _rewind(self):
self._fp.seek(0)
self._eof = False
self._pos = 0
self._decompressor = self._decomp_factory(**self._decomp_args)

def seek(self, offset, whence=io.SEEK_SET):
# Recalculate offset as an absolute file position.
if whence == io.SEEK_SET:
pass
elif whence == io.SEEK_CUR:
offset = self._pos + offset
elif whence == io.SEEK_END:
# Seeking relative to EOF - we need to know the file's size.
if self._size < 0:
while self.read(io.DEFAULT_BUFFER_SIZE):
pass
offset = self._size + offset
else:
raise ValueError("Invalid value for whence: {}".format(whence))

# Make it so that offset is the number of bytes to skip forward.
if offset < self._pos:
self._rewind()
else:
offset -= self._pos

# Read and discard data until we reach the desired position.
while offset > 0:
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
if not data:
break
offset -= len(data)

return self._pos

def tell(self):
"""Return the current file position."""
return self._pos