mirror of
https://github.com/kennethreitz/pipenv.git
synced 2026-06-05 22:50:18 +00:00
Upgrade charset-normalizer to 2.0.7
This commit is contained in:
+30
-14
@@ -1,3 +1,4 @@
|
||||
# -*- coding: utf_8 -*-
|
||||
"""
|
||||
Charset-Normalizer
|
||||
~~~~~~~~~~~~~~
|
||||
@@ -8,24 +9,39 @@ All IANA character set names for which the Python core library provides codecs a
|
||||
|
||||
Basic usage:
|
||||
>>> from charset_normalizer import from_bytes
|
||||
>>> results = from_bytes('Bсеки човек има право на образование. Oбразованието трябва да бъде безплатно, поне що се отнася до началното и основното образование.'.encode('utf_8'))
|
||||
>>> "utf_8" in results
|
||||
True
|
||||
>>> best_result = results.best()
|
||||
>>> str(best_result)
|
||||
'Bсеки човек има право на образование. Oбразованието трябва да бъде безплатно, поне що се отнася до началното и основното образование.'
|
||||
>>> results = from_bytes('Bсеки човек има право на образование. Oбразованието!'.encode('utf_8'))
|
||||
>>> best_guess = results.best()
|
||||
>>> str(best_guess)
|
||||
'Bсеки човек има право на образование. Oбразованието!'
|
||||
|
||||
Others methods and usages are available - see the full documentation
|
||||
at <https://github.com/Ousret/charset_normalizer>.
|
||||
:copyright: (c) 2021 by Ahmed TAHRI
|
||||
:license: MIT, see LICENSE for more details.
|
||||
"""
|
||||
from pipenv.vendor.charset_normalizer.api import from_fp, from_path, from_bytes, normalize
|
||||
from pipenv.vendor.charset_normalizer.legacy import detect
|
||||
from pipenv.vendor.charset_normalizer.version import __version__, VERSION
|
||||
from pipenv.vendor.charset_normalizer.models import CharsetMatch, CharsetMatches
|
||||
from .api import from_bytes, from_fp, from_path, normalize
|
||||
from .legacy import (
|
||||
CharsetDetector,
|
||||
CharsetDoctor,
|
||||
CharsetNormalizerMatch,
|
||||
CharsetNormalizerMatches,
|
||||
detect,
|
||||
)
|
||||
from .models import CharsetMatch, CharsetMatches
|
||||
from .version import VERSION, __version__
|
||||
|
||||
# Backward-compatible v1 imports
|
||||
from pipenv.vendor.charset_normalizer.models import CharsetNormalizerMatch
|
||||
import pipenv.vendor.charset_normalizer.api as CharsetDetector
|
||||
CharsetNormalizerMatches = CharsetDetector
|
||||
__all__ = (
|
||||
"from_fp",
|
||||
"from_path",
|
||||
"from_bytes",
|
||||
"normalize",
|
||||
"detect",
|
||||
"CharsetMatch",
|
||||
"CharsetMatches",
|
||||
"CharsetNormalizerMatch",
|
||||
"CharsetNormalizerMatches",
|
||||
"CharsetDetector",
|
||||
"CharsetDoctor",
|
||||
"__version__",
|
||||
"VERSION",
|
||||
)
|
||||
|
||||
+253
-146
@@ -1,38 +1,48 @@
|
||||
from os.path import splitext, basename
|
||||
from typing import List, BinaryIO, Optional, Set, Union
|
||||
from os.path import basename, splitext
|
||||
from typing import BinaryIO, List, Optional, Set
|
||||
|
||||
try:
|
||||
from os import PathLike
|
||||
except ImportError:
|
||||
PathLike = Union[str, 'os.PathLike[str]'] # type: ignore
|
||||
except ImportError: # pragma: no cover
|
||||
PathLike = str # type: ignore
|
||||
|
||||
from pipenv.vendor.charset_normalizer.constant import TOO_SMALL_SEQUENCE, TOO_BIG_SEQUENCE, IANA_SUPPORTED
|
||||
from pipenv.vendor.charset_normalizer.md import mess_ratio
|
||||
from pipenv.vendor.charset_normalizer.models import CharsetMatches, CharsetMatch
|
||||
from warnings import warn
|
||||
import logging
|
||||
|
||||
from pipenv.vendor.charset_normalizer.utils import any_specified_encoding, is_multi_byte_encoding, identify_sig_or_bom, \
|
||||
should_strip_sig_or_bom, is_cp_similar, iana_name
|
||||
from pipenv.vendor.charset_normalizer.cd import coherence_ratio, encoding_languages, mb_encoding_languages, merge_coherence_ratios
|
||||
from .cd import (
|
||||
coherence_ratio,
|
||||
encoding_languages,
|
||||
mb_encoding_languages,
|
||||
merge_coherence_ratios,
|
||||
)
|
||||
from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE
|
||||
from .md import mess_ratio
|
||||
from .models import CharsetMatch, CharsetMatches
|
||||
from .utils import (
|
||||
any_specified_encoding,
|
||||
iana_name,
|
||||
identify_sig_or_bom,
|
||||
is_cp_similar,
|
||||
is_multi_byte_encoding,
|
||||
should_strip_sig_or_bom,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("charset_normalizer")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter('%(asctime)s | %(levelname)s | %(message)s'))
|
||||
handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
|
||||
logger.addHandler(handler)
|
||||
|
||||
|
||||
def from_bytes(
|
||||
sequences: bytes,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.2,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
explain: bool = False
|
||||
sequences: bytes,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.2,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
explain: bool = False,
|
||||
) -> CharsetMatches:
|
||||
"""
|
||||
Given a raw bytes sequence, return the best possibles charset usable to render str objects.
|
||||
@@ -49,6 +59,13 @@ def from_bytes(
|
||||
This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
|
||||
"""
|
||||
|
||||
if not isinstance(sequences, (bytearray, bytes)):
|
||||
raise TypeError(
|
||||
"Expected object of type bytes or bytearray, got: {0}".format(
|
||||
type(sequences)
|
||||
)
|
||||
)
|
||||
|
||||
if not explain:
|
||||
logger.setLevel(logging.CRITICAL)
|
||||
else:
|
||||
@@ -57,41 +74,38 @@ def from_bytes(
|
||||
length = len(sequences) # type: int
|
||||
|
||||
if length == 0:
|
||||
logger.warning("Given content is empty, stopping the process very early, returning empty utf_8 str match")
|
||||
return CharsetMatches(
|
||||
[
|
||||
CharsetMatch(
|
||||
sequences,
|
||||
"utf_8",
|
||||
0.,
|
||||
False,
|
||||
[],
|
||||
""
|
||||
)
|
||||
]
|
||||
logger.warning(
|
||||
"Given content is empty, stopping the process very early, returning empty utf_8 str match"
|
||||
)
|
||||
return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
|
||||
|
||||
if cp_isolation is not None:
|
||||
logger.warning('cp_isolation is set. use this flag for debugging purpose. '
|
||||
'limited list of encoding allowed : %s.',
|
||||
', '.join(cp_isolation))
|
||||
logger.warning(
|
||||
"cp_isolation is set. use this flag for debugging purpose. "
|
||||
"limited list of encoding allowed : %s.",
|
||||
", ".join(cp_isolation),
|
||||
)
|
||||
cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
|
||||
else:
|
||||
cp_isolation = []
|
||||
|
||||
if cp_exclusion is not None:
|
||||
logger.warning(
|
||||
'cp_exclusion is set. use this flag for debugging purpose. '
|
||||
'limited list of encoding excluded : %s.',
|
||||
', '.join(cp_exclusion))
|
||||
"cp_exclusion is set. use this flag for debugging purpose. "
|
||||
"limited list of encoding excluded : %s.",
|
||||
", ".join(cp_exclusion),
|
||||
)
|
||||
cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
|
||||
else:
|
||||
cp_exclusion = []
|
||||
|
||||
if length <= (chunk_size * steps):
|
||||
logger.warning(
|
||||
'override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.',
|
||||
steps, chunk_size, length)
|
||||
"override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
|
||||
steps,
|
||||
chunk_size,
|
||||
length,
|
||||
)
|
||||
steps = 1
|
||||
chunk_size = length
|
||||
|
||||
@@ -102,15 +116,30 @@ def from_bytes(
|
||||
is_too_large_sequence = len(sequences) >= TOO_BIG_SEQUENCE # type: bool
|
||||
|
||||
if is_too_small_sequence:
|
||||
warn('Trying to detect encoding from a tiny portion of ({}) byte(s).'.format(length))
|
||||
logger.warning(
|
||||
"Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
|
||||
length
|
||||
)
|
||||
)
|
||||
elif is_too_large_sequence:
|
||||
logger.info(
|
||||
"Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
|
||||
length
|
||||
)
|
||||
)
|
||||
|
||||
prioritized_encodings = [] # type: List[str]
|
||||
|
||||
specified_encoding = any_specified_encoding(sequences) if preemptive_behaviour is True else None # type: Optional[str]
|
||||
specified_encoding = (
|
||||
any_specified_encoding(sequences) if preemptive_behaviour is True else None
|
||||
) # type: Optional[str]
|
||||
|
||||
if specified_encoding is not None:
|
||||
prioritized_encodings.append(specified_encoding)
|
||||
logger.info('Detected declarative mark in sequence. Priority +1 given for %s.', specified_encoding)
|
||||
logger.info(
|
||||
"Detected declarative mark in sequence. Priority +1 given for %s.",
|
||||
specified_encoding,
|
||||
)
|
||||
|
||||
tested = set() # type: Set[str]
|
||||
tested_but_hard_failure = [] # type: List[str]
|
||||
@@ -118,9 +147,7 @@ def from_bytes(
|
||||
|
||||
fallback_ascii = None # type: Optional[CharsetMatch]
|
||||
fallback_u8 = None # type: Optional[CharsetMatch]
|
||||
|
||||
single_byte_hard_failure_count = 0 # type: int
|
||||
single_byte_soft_failure_count = 0 # type: int
|
||||
fallback_specified = None # type: Optional[CharsetMatch]
|
||||
|
||||
results = CharsetMatches() # type: CharsetMatches
|
||||
|
||||
@@ -128,14 +155,18 @@ def from_bytes(
|
||||
|
||||
if sig_encoding is not None:
|
||||
prioritized_encodings.append(sig_encoding)
|
||||
logger.info('Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.', len(sig_payload), sig_encoding)
|
||||
logger.info(
|
||||
"Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
|
||||
len(sig_payload),
|
||||
sig_encoding,
|
||||
)
|
||||
|
||||
prioritized_encodings.append("ascii")
|
||||
|
||||
if "utf_8" not in prioritized_encodings:
|
||||
prioritized_encodings.append("utf_8")
|
||||
|
||||
for encoding_iana in prioritized_encodings+IANA_SUPPORTED:
|
||||
for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
|
||||
|
||||
if cp_isolation and encoding_iana not in cp_isolation:
|
||||
continue
|
||||
@@ -150,39 +181,48 @@ def from_bytes(
|
||||
|
||||
decoded_payload = None # type: Optional[str]
|
||||
bom_or_sig_available = sig_encoding == encoding_iana # type: bool
|
||||
strip_sig_or_bom = bom_or_sig_available and should_strip_sig_or_bom(encoding_iana) # type: bool
|
||||
strip_sig_or_bom = bom_or_sig_available and should_strip_sig_or_bom(
|
||||
encoding_iana
|
||||
) # type: bool
|
||||
|
||||
if encoding_iana in {"utf_16", "utf_32"} and bom_or_sig_available is False:
|
||||
logger.info("Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", encoding_iana)
|
||||
logger.info(
|
||||
"Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
|
||||
encoding_iana,
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
is_multi_byte_decoder = is_multi_byte_encoding(encoding_iana) # type: bool
|
||||
except (ModuleNotFoundError, ImportError):
|
||||
logger.debug("Encoding %s does not provide an IncrementalDecoder", encoding_iana)
|
||||
logger.debug(
|
||||
"Encoding %s does not provide an IncrementalDecoder", encoding_iana
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
if is_too_large_sequence and is_multi_byte_decoder is False:
|
||||
str(
|
||||
sequences[:int(50e4)] if strip_sig_or_bom is False else sequences[len(sig_payload):int(50e4)],
|
||||
encoding=encoding_iana
|
||||
sequences[: int(50e4)]
|
||||
if strip_sig_or_bom is False
|
||||
else sequences[len(sig_payload) : int(50e4)],
|
||||
encoding=encoding_iana,
|
||||
)
|
||||
else:
|
||||
decoded_payload = str(
|
||||
sequences if strip_sig_or_bom is False else sequences[len(sig_payload):],
|
||||
encoding=encoding_iana
|
||||
sequences
|
||||
if strip_sig_or_bom is False
|
||||
else sequences[len(sig_payload) :],
|
||||
encoding=encoding_iana,
|
||||
)
|
||||
except (UnicodeDecodeError, LookupError) as e:
|
||||
if not isinstance(e, LookupError):
|
||||
logger.warning(
|
||||
"Code page %s does not fit given bytes sequence at ALL. %s",
|
||||
encoding_iana,
|
||||
str(e),
|
||||
)
|
||||
except UnicodeDecodeError as e:
|
||||
logger.warning('Code page %s does not fit given bytes sequence at ALL. %s', encoding_iana, str(e))
|
||||
tested_but_hard_failure.append(encoding_iana)
|
||||
if not is_multi_byte_decoder:
|
||||
single_byte_hard_failure_count += 1
|
||||
continue
|
||||
except LookupError:
|
||||
tested_but_hard_failure.append(encoding_iana)
|
||||
if not is_multi_byte_decoder:
|
||||
single_byte_hard_failure_count += 1
|
||||
continue
|
||||
|
||||
similar_soft_failure_test = False # type: bool
|
||||
@@ -193,19 +233,31 @@ def from_bytes(
|
||||
break
|
||||
|
||||
if similar_soft_failure_test:
|
||||
logger.warning("%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", encoding_iana, encoding_soft_failed)
|
||||
logger.warning(
|
||||
"%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
|
||||
encoding_iana,
|
||||
encoding_soft_failed,
|
||||
)
|
||||
continue
|
||||
|
||||
r_ = range(
|
||||
0 if bom_or_sig_available is False else len(sig_payload),
|
||||
length,
|
||||
int(length / steps)
|
||||
int(length / steps),
|
||||
)
|
||||
|
||||
multi_byte_bonus = is_multi_byte_decoder and decoded_payload is not None and len(decoded_payload) < length # type: bool
|
||||
multi_byte_bonus = (
|
||||
is_multi_byte_decoder
|
||||
and decoded_payload is not None
|
||||
and len(decoded_payload) < length
|
||||
) # type: bool
|
||||
|
||||
if multi_byte_bonus:
|
||||
logger.info('Code page %s is a multi byte encoding table and it appear that at least one character was encoded using n-bytes. Should not be a coincidence. Priority +1 given.', encoding_iana)
|
||||
logger.info(
|
||||
"Code page %s is a multi byte encoding table and it appear that at least one character "
|
||||
"was encoded using n-bytes.",
|
||||
encoding_iana,
|
||||
)
|
||||
|
||||
max_chunk_gave_up = int(len(r_) / 4) # type: int
|
||||
|
||||
@@ -218,62 +270,79 @@ def from_bytes(
|
||||
md_ratios = []
|
||||
|
||||
for i in r_:
|
||||
cut_sequence = sequences[i:i + chunk_size]
|
||||
cut_sequence = sequences[i : i + chunk_size]
|
||||
|
||||
if bom_or_sig_available and strip_sig_or_bom is False:
|
||||
cut_sequence = sig_payload+cut_sequence
|
||||
cut_sequence = sig_payload + cut_sequence
|
||||
|
||||
chunk = cut_sequence.decode(encoding_iana, errors="ignore") # type: str
|
||||
|
||||
# multi-byte bad cutting detector and adjustment
|
||||
# not the cleanest way to perform that fix but clever enough for now.
|
||||
if is_multi_byte_decoder and i > 0 and sequences[i] >= 0x80:
|
||||
|
||||
chunk_partial_size_chk = (
|
||||
16 if chunk_size > 16 else chunk_size
|
||||
) # type: int
|
||||
|
||||
if (
|
||||
decoded_payload
|
||||
and chunk[:chunk_partial_size_chk] not in decoded_payload
|
||||
):
|
||||
for j in range(i, i - 4, -1):
|
||||
cut_sequence = sequences[j : i + chunk_size]
|
||||
|
||||
if bom_or_sig_available and strip_sig_or_bom is False:
|
||||
cut_sequence = sig_payload + cut_sequence
|
||||
|
||||
chunk = cut_sequence.decode(encoding_iana, errors="ignore")
|
||||
|
||||
if chunk[:chunk_partial_size_chk] in decoded_payload:
|
||||
break
|
||||
|
||||
md_chunks.append(chunk)
|
||||
|
||||
md_ratios.append(
|
||||
mess_ratio(
|
||||
chunk,
|
||||
threshold
|
||||
)
|
||||
)
|
||||
md_ratios.append(mess_ratio(chunk, threshold))
|
||||
|
||||
if md_ratios[-1] >= threshold:
|
||||
early_stop_count += 1
|
||||
|
||||
if (early_stop_count >= max_chunk_gave_up) or (bom_or_sig_available and strip_sig_or_bom is False):
|
||||
if (early_stop_count >= max_chunk_gave_up) or (
|
||||
bom_or_sig_available and strip_sig_or_bom is False
|
||||
):
|
||||
break
|
||||
|
||||
if md_ratios:
|
||||
mean_mess_ratio = sum(md_ratios) / len(md_ratios) # type: float
|
||||
else:
|
||||
mean_mess_ratio = 0.
|
||||
mean_mess_ratio = 0.0
|
||||
|
||||
if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
|
||||
tested_but_soft_failure.append(encoding_iana)
|
||||
if not is_multi_byte_decoder:
|
||||
single_byte_soft_failure_count += 1
|
||||
logger.warning('%s was excluded because of initial chaos probing. Gave up %i time(s). '
|
||||
'Computed mean chaos is %f %%.',
|
||||
encoding_iana,
|
||||
early_stop_count,
|
||||
round(mean_mess_ratio * 100, ndigits=3))
|
||||
logger.warning(
|
||||
"%s was excluded because of initial chaos probing. Gave up %i time(s). "
|
||||
"Computed mean chaos is %f %%.",
|
||||
encoding_iana,
|
||||
early_stop_count,
|
||||
round(mean_mess_ratio * 100, ndigits=3),
|
||||
)
|
||||
# Preparing those fallbacks in case we got nothing.
|
||||
if encoding_iana in ["ascii", "utf_8"]:
|
||||
if encoding_iana in ["ascii", "utf_8", specified_encoding]:
|
||||
fallback_entry = CharsetMatch(
|
||||
sequences,
|
||||
encoding_iana,
|
||||
threshold,
|
||||
False,
|
||||
[],
|
||||
decoded_payload
|
||||
sequences, encoding_iana, threshold, False, [], decoded_payload
|
||||
)
|
||||
if encoding_iana == "ascii":
|
||||
if encoding_iana == specified_encoding:
|
||||
fallback_specified = fallback_entry
|
||||
elif encoding_iana == "ascii":
|
||||
fallback_ascii = fallback_entry
|
||||
else:
|
||||
fallback_u8 = fallback_entry
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
'%s passed initial chaos probing. Mean measured chaos is %f %%',
|
||||
"%s passed initial chaos probing. Mean measured chaos is %f %%",
|
||||
encoding_iana,
|
||||
round(mean_mess_ratio * 100, ndigits=3)
|
||||
round(mean_mess_ratio * 100, ndigits=3),
|
||||
)
|
||||
|
||||
if not is_multi_byte_decoder:
|
||||
@@ -282,21 +351,29 @@ def from_bytes(
|
||||
target_languages = mb_encoding_languages(encoding_iana)
|
||||
|
||||
if target_languages:
|
||||
logger.info("{} should target any language(s) of {}".format(encoding_iana, str(target_languages)))
|
||||
logger.info(
|
||||
"{} should target any language(s) of {}".format(
|
||||
encoding_iana, str(target_languages)
|
||||
)
|
||||
)
|
||||
|
||||
cd_ratios = []
|
||||
|
||||
for chunk in md_chunks:
|
||||
chunk_languages = coherence_ratio(chunk, 0.1, ",".join(target_languages) if target_languages else None)
|
||||
|
||||
cd_ratios.append(
|
||||
chunk_languages
|
||||
chunk_languages = coherence_ratio(
|
||||
chunk, 0.1, ",".join(target_languages) if target_languages else None
|
||||
)
|
||||
|
||||
cd_ratios.append(chunk_languages)
|
||||
|
||||
cd_ratios_merged = merge_coherence_ratios(cd_ratios)
|
||||
|
||||
if cd_ratios_merged:
|
||||
logger.info("We detected language {} using {}".format(cd_ratios_merged, encoding_iana))
|
||||
logger.info(
|
||||
"We detected language {} using {}".format(
|
||||
cd_ratios_merged, encoding_iana
|
||||
)
|
||||
)
|
||||
|
||||
results.append(
|
||||
CharsetMatch(
|
||||
@@ -305,37 +382,46 @@ def from_bytes(
|
||||
mean_mess_ratio,
|
||||
bom_or_sig_available,
|
||||
cd_ratios_merged,
|
||||
decoded_payload
|
||||
decoded_payload,
|
||||
)
|
||||
)
|
||||
|
||||
if encoding_iana in [specified_encoding, "ascii", "utf_8"] and mean_mess_ratio < 0.1:
|
||||
logger.info("%s is most likely the one. Stopping the process.", encoding_iana)
|
||||
return CharsetMatches(
|
||||
[results[encoding_iana]]
|
||||
if (
|
||||
encoding_iana in [specified_encoding, "ascii", "utf_8"]
|
||||
and mean_mess_ratio < 0.1
|
||||
):
|
||||
logger.info(
|
||||
"%s is most likely the one. Stopping the process.", encoding_iana
|
||||
)
|
||||
return CharsetMatches([results[encoding_iana]])
|
||||
|
||||
if encoding_iana == sig_encoding:
|
||||
logger.info(
|
||||
"%s is most likely the one as we detected a BOM or SIG within the beginning of the sequence.",
|
||||
encoding_iana
|
||||
)
|
||||
return CharsetMatches(
|
||||
[results[encoding_iana]]
|
||||
)
|
||||
|
||||
if results[-1].languages:
|
||||
logger.info(
|
||||
"Using %s code page we detected the following languages: %s",
|
||||
encoding_iana,
|
||||
results[-1]._languages
|
||||
)
|
||||
return CharsetMatches([results[encoding_iana]])
|
||||
|
||||
if len(results) == 0:
|
||||
if fallback_u8 or fallback_ascii:
|
||||
logger.warning("Nothing got out of the detection process. Using ASCII/UTF-8 fallback.")
|
||||
if fallback_u8 or fallback_ascii or fallback_specified:
|
||||
logger.warning(
|
||||
"Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback."
|
||||
)
|
||||
|
||||
if (fallback_u8 and fallback_ascii is None) or (fallback_u8 and fallback_u8.fingerprint != fallback_ascii.fingerprint):
|
||||
if fallback_specified:
|
||||
logger.warning(
|
||||
"%s will be used as a fallback match", fallback_specified.encoding
|
||||
)
|
||||
results.append(fallback_specified)
|
||||
elif (
|
||||
(fallback_u8 and fallback_ascii is None)
|
||||
or (
|
||||
fallback_u8
|
||||
and fallback_ascii
|
||||
and fallback_u8.fingerprint != fallback_ascii.fingerprint
|
||||
)
|
||||
or (fallback_u8 is not None)
|
||||
):
|
||||
logger.warning("utf_8 will be used as a fallback match")
|
||||
results.append(fallback_u8)
|
||||
elif fallback_ascii:
|
||||
@@ -346,14 +432,14 @@ def from_bytes(
|
||||
|
||||
|
||||
def from_fp(
|
||||
fp: BinaryIO,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.20,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
explain: bool = False
|
||||
fp: BinaryIO,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.20,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
explain: bool = False,
|
||||
) -> CharsetMatches:
|
||||
"""
|
||||
Same thing than the function from_bytes but using a file pointer that is already ready.
|
||||
@@ -367,29 +453,46 @@ def from_fp(
|
||||
cp_isolation,
|
||||
cp_exclusion,
|
||||
preemptive_behaviour,
|
||||
explain
|
||||
explain,
|
||||
)
|
||||
|
||||
|
||||
def from_path(
|
||||
path: PathLike,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.20,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
explain: bool = False
|
||||
path: PathLike,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.20,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
explain: bool = False,
|
||||
) -> CharsetMatches:
|
||||
"""
|
||||
Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
|
||||
Can raise IOError.
|
||||
"""
|
||||
with open(path, 'rb') as fp:
|
||||
return from_fp(fp, steps, chunk_size, threshold, cp_isolation, cp_exclusion, preemptive_behaviour, explain)
|
||||
with open(path, "rb") as fp:
|
||||
return from_fp(
|
||||
fp,
|
||||
steps,
|
||||
chunk_size,
|
||||
threshold,
|
||||
cp_isolation,
|
||||
cp_exclusion,
|
||||
preemptive_behaviour,
|
||||
explain,
|
||||
)
|
||||
|
||||
|
||||
def normalize(path: PathLike, steps: int = 5, chunk_size: int = 512, threshold: float = 0.20, cp_isolation: List[str] = None, cp_exclusion: List[str] = None, preemptive_behaviour: bool = True) -> CharsetMatch:
|
||||
def normalize(
|
||||
path: PathLike,
|
||||
steps: int = 5,
|
||||
chunk_size: int = 512,
|
||||
threshold: float = 0.20,
|
||||
cp_isolation: List[str] = None,
|
||||
cp_exclusion: List[str] = None,
|
||||
preemptive_behaviour: bool = True,
|
||||
) -> CharsetMatch:
|
||||
"""
|
||||
Take a (text-based) file path and try to create another file next to it, this time using UTF-8.
|
||||
"""
|
||||
@@ -400,22 +503,26 @@ def normalize(path: PathLike, steps: int = 5, chunk_size: int = 512, threshold:
|
||||
threshold,
|
||||
cp_isolation,
|
||||
cp_exclusion,
|
||||
preemptive_behaviour
|
||||
preemptive_behaviour,
|
||||
)
|
||||
|
||||
filename = basename(path)
|
||||
target_extensions = list(splitext(filename))
|
||||
|
||||
if len(results) == 0:
|
||||
raise IOError('Unable to normalize "{}", no encoding charset seems to fit.'.format(filename))
|
||||
raise IOError(
|
||||
'Unable to normalize "{}", no encoding charset seems to fit.'.format(
|
||||
filename
|
||||
)
|
||||
)
|
||||
|
||||
result = results.best()
|
||||
|
||||
target_extensions[0] += '-' + result.encoding # type: ignore
|
||||
target_extensions[0] += "-" + result.encoding # type: ignore
|
||||
|
||||
with open('{}'.format(path.replace(filename, ''.join(target_extensions))), 'wb') as fp:
|
||||
fp.write(
|
||||
result.output() # type: ignore
|
||||
)
|
||||
with open(
|
||||
"{}".format(str(path).replace(filename, "".join(target_extensions))), "wb"
|
||||
) as fp:
|
||||
fp.write(result.output()) # type: ignore
|
||||
|
||||
return result # type: ignore
|
||||
|
||||
+1239
-47
File diff suppressed because it is too large
Load Diff
+138
-58
@@ -1,13 +1,20 @@
|
||||
from codecs import IncrementalDecoder
|
||||
from functools import lru_cache
|
||||
from typing import List, Set, Optional, Tuple, Dict
|
||||
import importlib
|
||||
from codecs import IncrementalDecoder
|
||||
from collections import Counter, OrderedDict
|
||||
from functools import lru_cache
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from pipenv.vendor.charset_normalizer.models import CoherenceMatches
|
||||
from pipenv.vendor.charset_normalizer.utils import unicode_range, is_unicode_range_secondary, is_multi_byte_encoding
|
||||
from pipenv.vendor.charset_normalizer.md import is_suspiciously_successive_range
|
||||
from pipenv.vendor.charset_normalizer.assets import FREQUENCIES
|
||||
from collections import Counter
|
||||
from .assets import FREQUENCIES
|
||||
from .constant import KO_NAMES, TOO_SMALL_SEQUENCE, ZH_NAMES
|
||||
from .md import is_suspiciously_successive_range
|
||||
from .models import CoherenceMatches
|
||||
from .utils import (
|
||||
is_accentuated,
|
||||
is_latin,
|
||||
is_multi_byte_encoding,
|
||||
is_unicode_range_secondary,
|
||||
unicode_range,
|
||||
)
|
||||
|
||||
|
||||
def encoding_unicode_range(iana_name: str) -> List[str]:
|
||||
@@ -17,15 +24,14 @@ def encoding_unicode_range(iana_name: str) -> List[str]:
|
||||
if is_multi_byte_encoding(iana_name):
|
||||
raise IOError("Function not supported on multi-byte code page")
|
||||
|
||||
decoder = importlib.import_module('encodings.{}'.format(iana_name)).IncrementalDecoder # type: ignore
|
||||
decoder = importlib.import_module("encodings.{}".format(iana_name)).IncrementalDecoder # type: ignore
|
||||
|
||||
p = decoder(errors="ignore") # type: IncrementalDecoder
|
||||
seen_ranges = set() # type: Set[str]
|
||||
seen_ranges = {} # type: Dict[str, int]
|
||||
character_count = 0 # type: int
|
||||
|
||||
for i in range(48, 255):
|
||||
chunk = p.decode(
|
||||
bytes([i])
|
||||
) # type: str
|
||||
for i in range(0x40, 0xFF):
|
||||
chunk = p.decode(bytes([i])) # type: str
|
||||
|
||||
if chunk:
|
||||
character_range = unicode_range(chunk) # type: Optional[str]
|
||||
@@ -34,9 +40,18 @@ def encoding_unicode_range(iana_name: str) -> List[str]:
|
||||
continue
|
||||
|
||||
if is_unicode_range_secondary(character_range) is False:
|
||||
seen_ranges.add(character_range)
|
||||
if character_range not in seen_ranges:
|
||||
seen_ranges[character_range] = 0
|
||||
seen_ranges[character_range] += 1
|
||||
character_count += 1
|
||||
|
||||
return sorted(list(seen_ranges))
|
||||
return sorted(
|
||||
[
|
||||
character_range
|
||||
for character_range in seen_ranges
|
||||
if seen_ranges[character_range] / character_count >= 0.15
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def unicode_range_languages(primary_range: str) -> List[str]:
|
||||
@@ -74,42 +89,78 @@ def encoding_languages(iana_name: str) -> List[str]:
|
||||
return unicode_range_languages(primary_range)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def mb_encoding_languages(iana_name: str) -> List[str]:
|
||||
"""
|
||||
Multi-byte encoding language association. Some code page are heavily linked to particular language(s).
|
||||
This function does the correspondence.
|
||||
"""
|
||||
if iana_name.startswith("shift_") or iana_name.startswith("iso2022_jp") or iana_name.startswith("euc_j") or iana_name in {"cp932"}:
|
||||
if (
|
||||
iana_name.startswith("shift_")
|
||||
or iana_name.startswith("iso2022_jp")
|
||||
or iana_name.startswith("euc_j")
|
||||
or iana_name == "cp932"
|
||||
):
|
||||
return ["Japanese"]
|
||||
if iana_name.startswith("gb") or iana_name in {"big5", "cp950", "big5hkscs"}:
|
||||
if iana_name.startswith("gb") or iana_name in ZH_NAMES:
|
||||
return ["Chinese", "Classical Chinese"]
|
||||
if iana_name.startswith("iso2022_kr") or iana_name in {"johab", "cp949", "euc_kr"}:
|
||||
if iana_name.startswith("iso2022_kr") or iana_name in KO_NAMES:
|
||||
return ["Korean"]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def alphabet_languages(characters: List[str]) -> List[str]:
|
||||
def alphabet_languages(
|
||||
characters: List[str], ignore_non_latin: bool = False
|
||||
) -> List[str]:
|
||||
"""
|
||||
Return associated languages associated to given characters.
|
||||
"""
|
||||
languages = [] # type: List[str]
|
||||
languages = [] # type: List[Tuple[str, float]]
|
||||
|
||||
source_have_accents = False # type: bool
|
||||
|
||||
for character in characters:
|
||||
if is_accentuated(character):
|
||||
source_have_accents = True
|
||||
break
|
||||
|
||||
for language, language_characters in FREQUENCIES.items():
|
||||
character_match_count = 0 # type: int
|
||||
|
||||
target_have_accents = False # type: bool
|
||||
target_pure_latin = True # type: bool
|
||||
|
||||
for language_character in language_characters:
|
||||
if target_have_accents is False and is_accentuated(language_character):
|
||||
target_have_accents = True
|
||||
if target_pure_latin is True and is_latin(language_character) is False:
|
||||
target_pure_latin = False
|
||||
|
||||
if ignore_non_latin and target_pure_latin is False:
|
||||
continue
|
||||
|
||||
if target_have_accents is False and source_have_accents:
|
||||
continue
|
||||
|
||||
character_count = len(language_characters) # type: int
|
||||
|
||||
for character in language_characters:
|
||||
if character in characters:
|
||||
character_match_count += 1
|
||||
character_match_count = len(
|
||||
[c for c in language_characters if c in characters]
|
||||
) # type: int
|
||||
|
||||
if character_match_count / character_count >= 0.2:
|
||||
languages.append(language)
|
||||
ratio = character_match_count / character_count # type: float
|
||||
|
||||
return languages
|
||||
if ratio >= 0.2:
|
||||
languages.append((language, ratio))
|
||||
|
||||
languages = sorted(languages, key=lambda x: x[1], reverse=True)
|
||||
|
||||
return [compatible_language[0] for compatible_language in languages]
|
||||
|
||||
|
||||
def characters_popularity_compare(language: str, ordered_characters: List[str]) -> float:
|
||||
def characters_popularity_compare(
|
||||
language: str, ordered_characters: List[str]
|
||||
) -> float:
|
||||
"""
|
||||
Determine if a ordered characters list (by occurrence from most appearance to rarest) match a particular language.
|
||||
The result is a ratio between 0. (absolutely no correspondence) and 1. (near perfect fit).
|
||||
@@ -124,14 +175,30 @@ def characters_popularity_compare(language: str, ordered_characters: List[str])
|
||||
if character not in FREQUENCIES[language]:
|
||||
continue
|
||||
|
||||
characters_before_source = FREQUENCIES[language][0:FREQUENCIES[language].index(character)] # type: List[str]
|
||||
characters_after_source = FREQUENCIES[language][FREQUENCIES[language].index(character):] # type: List[str]
|
||||
characters_before_source = FREQUENCIES[language][
|
||||
0 : FREQUENCIES[language].index(character)
|
||||
] # type: List[str]
|
||||
characters_after_source = FREQUENCIES[language][
|
||||
FREQUENCIES[language].index(character) :
|
||||
] # type: List[str]
|
||||
|
||||
characters_before = ordered_characters[0:ordered_characters.index(character)] # type: List[str]
|
||||
characters_after = ordered_characters[ordered_characters.index(character):] # type: List[str]
|
||||
characters_before = ordered_characters[
|
||||
0 : ordered_characters.index(character)
|
||||
] # type: List[str]
|
||||
characters_after = ordered_characters[
|
||||
ordered_characters.index(character) :
|
||||
] # type: List[str]
|
||||
|
||||
before_match_count = [e in characters_before for e in characters_before_source].count(True) # type: int
|
||||
after_match_count = [e in characters_after for e in characters_after_source].count(True) # type: int
|
||||
before_match_count = [
|
||||
e in characters_before for e in characters_before_source
|
||||
].count(
|
||||
True
|
||||
) # type: int
|
||||
after_match_count = [
|
||||
e in characters_after for e in characters_after_source
|
||||
].count(
|
||||
True
|
||||
) # type: int
|
||||
|
||||
if len(characters_before_source) == 0 and before_match_count <= 4:
|
||||
character_approved_count += 1
|
||||
@@ -141,7 +208,10 @@ def characters_popularity_compare(language: str, ordered_characters: List[str])
|
||||
character_approved_count += 1
|
||||
continue
|
||||
|
||||
if before_match_count / len(characters_before_source) >= 0.4 or after_match_count / len(characters_after_source) >= 0.4:
|
||||
if (
|
||||
before_match_count / len(characters_before_source) >= 0.4
|
||||
or after_match_count / len(characters_after_source) >= 0.4
|
||||
):
|
||||
character_approved_count += 1
|
||||
continue
|
||||
|
||||
@@ -154,18 +224,24 @@ def alpha_unicode_split(decoded_sequence: str) -> List[str]:
|
||||
Ex. a text containing English/Latin with a bit a Hebrew will return two items in the resulting list;
|
||||
One containing the latin letters and the other hebrew.
|
||||
"""
|
||||
layers = {} # type: Dict[str, str]
|
||||
layers = OrderedDict() # type: Dict[str, str]
|
||||
|
||||
for character in decoded_sequence:
|
||||
if character.isalpha() is False:
|
||||
continue
|
||||
|
||||
character_range = unicode_range(character) # type: str
|
||||
character_range = unicode_range(character) # type: Optional[str]
|
||||
|
||||
if character_range is None:
|
||||
continue
|
||||
|
||||
layer_target_range = None # type: Optional[str]
|
||||
|
||||
for discovered_range in layers:
|
||||
if is_suspiciously_successive_range(discovered_range, character_range) is False:
|
||||
if (
|
||||
is_suspiciously_successive_range(discovered_range, character_range)
|
||||
is False
|
||||
):
|
||||
layer_target_range = discovered_range
|
||||
break
|
||||
|
||||
@@ -186,7 +262,7 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches:
|
||||
This function merge results previously given by the function coherence_ratio.
|
||||
The return type is the same as coherence_ratio.
|
||||
"""
|
||||
per_language_ratios = {} # type: Dict[str, List[float]]
|
||||
per_language_ratios = OrderedDict() # type: Dict[str, List[float]]
|
||||
merge = [] # type: CoherenceMatches
|
||||
|
||||
for result in results:
|
||||
@@ -195,20 +271,17 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches:
|
||||
if language not in per_language_ratios:
|
||||
per_language_ratios[language] = [ratio]
|
||||
continue
|
||||
per_language_ratios[language].append(
|
||||
ratio
|
||||
)
|
||||
per_language_ratios[language].append(ratio)
|
||||
|
||||
for language in per_language_ratios:
|
||||
merge.append(
|
||||
(
|
||||
language,
|
||||
round(
|
||||
sum(
|
||||
per_language_ratios[language]
|
||||
) / len(per_language_ratios[language]),
|
||||
4
|
||||
)
|
||||
sum(per_language_ratios[language])
|
||||
/ len(per_language_ratios[language]),
|
||||
4,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -216,21 +289,26 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches:
|
||||
|
||||
|
||||
@lru_cache(maxsize=2048)
|
||||
def coherence_ratio(decoded_sequence: str, threshold: float = 0.1, lg_inclusion: Optional[str] = None) -> CoherenceMatches:
|
||||
def coherence_ratio(
|
||||
decoded_sequence: str, threshold: float = 0.1, lg_inclusion: Optional[str] = None
|
||||
) -> CoherenceMatches:
|
||||
"""
|
||||
Detect ANY language that can be identified in given sequence. The sequence will be analysed by layers.
|
||||
A layer = Character extraction by alphabets/ranges.
|
||||
"""
|
||||
|
||||
results = [] # type: List[Tuple[str, float]]
|
||||
lg_inclusion_list = [] # type: List[str]
|
||||
ignore_non_latin = False # type: bool
|
||||
|
||||
sufficient_match_count = 0 # type: int
|
||||
|
||||
if lg_inclusion is not None:
|
||||
lg_inclusion = lg_inclusion.split(",")
|
||||
lg_inclusion_list = lg_inclusion.split(",")
|
||||
|
||||
if lg_inclusion is not None and "Latin Based" in lg_inclusion:
|
||||
lg_inclusion.remove("Latin Based")
|
||||
if "Latin Based" in lg_inclusion_list:
|
||||
ignore_non_latin = True
|
||||
lg_inclusion_list.remove("Latin Based")
|
||||
|
||||
for layer in alpha_unicode_split(decoded_sequence):
|
||||
sequence_frequencies = Counter(layer) # type: Counter
|
||||
@@ -238,22 +316,24 @@ def coherence_ratio(decoded_sequence: str, threshold: float = 0.1, lg_inclusion:
|
||||
|
||||
character_count = sum([o for c, o in most_common]) # type: int
|
||||
|
||||
if character_count <= 32:
|
||||
if character_count <= TOO_SMALL_SEQUENCE:
|
||||
continue
|
||||
|
||||
popular_character_ordered = [c for c, o in most_common] # type: List[str]
|
||||
|
||||
for language in lg_inclusion or alphabet_languages(popular_character_ordered):
|
||||
ratio = characters_popularity_compare(language, popular_character_ordered) # type: float
|
||||
for language in lg_inclusion_list or alphabet_languages(
|
||||
popular_character_ordered, ignore_non_latin
|
||||
):
|
||||
ratio = characters_popularity_compare(
|
||||
language, popular_character_ordered
|
||||
) # type: float
|
||||
|
||||
if ratio < threshold:
|
||||
continue
|
||||
elif ratio >= 0.8:
|
||||
sufficient_match_count += 1
|
||||
|
||||
results.append(
|
||||
(language, round(ratio, 4))
|
||||
)
|
||||
results.append((language, round(ratio, 4)))
|
||||
|
||||
if sufficient_match_count >= 3:
|
||||
break
|
||||
|
||||
+202
-121
@@ -1,16 +1,16 @@
|
||||
import argparse
|
||||
import sys
|
||||
from os.path import abspath
|
||||
from json import dumps
|
||||
|
||||
from pipenv.vendor.charset_normalizer import from_fp
|
||||
from pipenv.vendor.charset_normalizer.models import CliDetectionResult
|
||||
from pipenv.vendor.charset_normalizer.version import __version__
|
||||
|
||||
from os.path import abspath
|
||||
from platform import python_version
|
||||
from typing import List
|
||||
|
||||
from charset_normalizer import from_fp
|
||||
from charset_normalizer.models import CliDetectionResult
|
||||
from charset_normalizer.version import __version__
|
||||
|
||||
|
||||
def query_yes_no(question, default="yes"):
|
||||
def query_yes_no(question: str, default: str = "yes") -> bool:
|
||||
"""Ask a yes/no question via input() and return their answer.
|
||||
|
||||
"question" is a string that is presented to the user.
|
||||
@@ -22,8 +22,7 @@ def query_yes_no(question, default="yes"):
|
||||
|
||||
Credit goes to (c) https://stackoverflow.com/questions/3041986/apt-command-line-interface-like-yes-no-input
|
||||
"""
|
||||
valid = {"yes": True, "y": True, "ye": True,
|
||||
"no": False, "n": False}
|
||||
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
|
||||
if default is None:
|
||||
prompt = " [y/n] "
|
||||
elif default == "yes":
|
||||
@@ -36,16 +35,15 @@ def query_yes_no(question, default="yes"):
|
||||
while True:
|
||||
sys.stdout.write(question + prompt)
|
||||
choice = input().lower()
|
||||
if default is not None and choice == '':
|
||||
if default is not None and choice == "":
|
||||
return valid[default]
|
||||
elif choice in valid:
|
||||
return valid[choice]
|
||||
else:
|
||||
sys.stdout.write("Please respond with 'yes' or 'no' "
|
||||
"(or 'y' or 'n').\n")
|
||||
sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
|
||||
|
||||
|
||||
def cli_detect(argv=None):
|
||||
def cli_detect(argv: List[str] = None) -> int:
|
||||
"""
|
||||
CLI assistant using ARGV and ArgumentParser
|
||||
:param argv:
|
||||
@@ -53,133 +51,215 @@ def cli_detect(argv=None):
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="The Real First Universal Charset Detector. "
|
||||
"Discover originating encoding used on text file. "
|
||||
"Normalize text to unicode."
|
||||
"Discover originating encoding used on text file. "
|
||||
"Normalize text to unicode."
|
||||
)
|
||||
|
||||
parser.add_argument('files', type=argparse.FileType('rb'), nargs='+', help='File(s) to be analysed')
|
||||
parser.add_argument('-v', '--verbose', action="store_true", default=False, dest='verbose',
|
||||
help='Display complementary information about file if any. Stdout will contain logs about the detection process.')
|
||||
parser.add_argument('-a', '--with-alternative', action="store_true", default=False, dest='alternatives',
|
||||
help='Output complementary possibilities if any. Top-level JSON WILL be a list.')
|
||||
parser.add_argument('-n', '--normalize', action="store_true", default=False, dest='normalize',
|
||||
help='Permit to normalize input file. If not set, program does not write anything.')
|
||||
parser.add_argument('-m', '--minimal', action="store_true", default=False, dest='minimal',
|
||||
help='Only output the charset detected to STDOUT. Disabling JSON output.')
|
||||
parser.add_argument('-r', '--replace', action="store_true", default=False, dest='replace',
|
||||
help='Replace file when trying to normalize it instead of creating a new one.')
|
||||
parser.add_argument('-f', '--force', action="store_true", default=False, dest='force',
|
||||
help='Replace file without asking if you are sure, use this flag with caution.')
|
||||
parser.add_argument('-t', '--threshold', action="store", default=0.1, type=float, dest='threshold',
|
||||
help="Define a custom maximum amount of chaos allowed in decoded content. 0. <= chaos <= 1.")
|
||||
parser.add_argument(
|
||||
"files", type=argparse.FileType("rb"), nargs="+", help="File(s) to be analysed"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
default=False,
|
||||
dest="verbose",
|
||||
help="Display complementary information about file if any. "
|
||||
"Stdout will contain logs about the detection process.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-a",
|
||||
"--with-alternative",
|
||||
action="store_true",
|
||||
default=False,
|
||||
dest="alternatives",
|
||||
help="Output complementary possibilities if any. Top-level JSON WILL be a list.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-n",
|
||||
"--normalize",
|
||||
action="store_true",
|
||||
default=False,
|
||||
dest="normalize",
|
||||
help="Permit to normalize input file. If not set, program does not write anything.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
"--minimal",
|
||||
action="store_true",
|
||||
default=False,
|
||||
dest="minimal",
|
||||
help="Only output the charset detected to STDOUT. Disabling JSON output.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--replace",
|
||||
action="store_true",
|
||||
default=False,
|
||||
dest="replace",
|
||||
help="Replace file when trying to normalize it instead of creating a new one.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--force",
|
||||
action="store_true",
|
||||
default=False,
|
||||
dest="force",
|
||||
help="Replace file without asking if you are sure, use this flag with caution.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--threshold",
|
||||
action="store",
|
||||
default=0.1,
|
||||
type=float,
|
||||
dest="threshold",
|
||||
help="Define a custom maximum amount of chaos allowed in decoded content. 0. <= chaos <= 1.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--version",
|
||||
action="version",
|
||||
version="Charset-Normalizer {} - Python {}".format(__version__, python_version()),
|
||||
help="Show version information and exit."
|
||||
version="Charset-Normalizer {} - Python {}".format(
|
||||
__version__, python_version()
|
||||
),
|
||||
help="Show version information and exit.",
|
||||
)
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.replace is True and args.normalize is False:
|
||||
print('Use --replace in addition of --normalize only.', file=sys.stderr)
|
||||
print("Use --replace in addition of --normalize only.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
if args.force is True and args.replace is False:
|
||||
print('Use --force in addition of --replace only.', file=sys.stderr)
|
||||
print("Use --force in addition of --replace only.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
if args.threshold < 0. or args.threshold > 1.:
|
||||
print('--threshold VALUE should be between 0. AND 1.', file=sys.stderr)
|
||||
if args.threshold < 0.0 or args.threshold > 1.0:
|
||||
print("--threshold VALUE should be between 0. AND 1.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
x_ = []
|
||||
|
||||
for my_file in args.files:
|
||||
|
||||
matches = from_fp(
|
||||
my_file,
|
||||
threshold=args.threshold,
|
||||
explain=args.verbose
|
||||
)
|
||||
matches = from_fp(my_file, threshold=args.threshold, explain=args.verbose)
|
||||
|
||||
if len(matches) == 0:
|
||||
print('Unable to identify originating encoding for "{}". {}'.format(my_file.name, 'Maybe try increasing maximum amount of chaos.' if args.threshold < 1. else ''), file=sys.stderr)
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
continue
|
||||
best_guess = matches.best()
|
||||
|
||||
x_ = []
|
||||
|
||||
r_ = matches.best()
|
||||
p_ = r_.first()
|
||||
|
||||
x_.append(
|
||||
CliDetectionResult(
|
||||
abspath(my_file.name),
|
||||
p_.encoding,
|
||||
p_.encoding_aliases,
|
||||
[cp for cp in p_.could_be_from_charset if cp != p_.encoding],
|
||||
p_.language,
|
||||
p_.alphabets,
|
||||
p_.bom,
|
||||
p_.percent_chaos,
|
||||
p_.percent_coherence,
|
||||
None,
|
||||
True
|
||||
if best_guess is None:
|
||||
print(
|
||||
'Unable to identify originating encoding for "{}". {}'.format(
|
||||
my_file.name,
|
||||
"Maybe try increasing maximum amount of chaos."
|
||||
if args.threshold < 1.0
|
||||
else "",
|
||||
),
|
||||
file=sys.stderr,
|
||||
)
|
||||
x_.append(
|
||||
CliDetectionResult(
|
||||
abspath(my_file.name),
|
||||
None,
|
||||
[],
|
||||
[],
|
||||
"Unknown",
|
||||
[],
|
||||
False,
|
||||
1.0,
|
||||
0.0,
|
||||
None,
|
||||
True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
x_.append(
|
||||
CliDetectionResult(
|
||||
abspath(my_file.name),
|
||||
best_guess.encoding,
|
||||
best_guess.encoding_aliases,
|
||||
[
|
||||
cp
|
||||
for cp in best_guess.could_be_from_charset
|
||||
if cp != best_guess.encoding
|
||||
],
|
||||
best_guess.language,
|
||||
best_guess.alphabets,
|
||||
best_guess.bom,
|
||||
best_guess.percent_chaos,
|
||||
best_guess.percent_coherence,
|
||||
None,
|
||||
True,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if len(matches) > 1 and args.alternatives:
|
||||
for el in matches:
|
||||
if el != p_:
|
||||
x_.append(
|
||||
CliDetectionResult(
|
||||
abspath(my_file.name),
|
||||
el.encoding,
|
||||
el.encoding_aliases,
|
||||
[cp for cp in el.could_be_from_charset if cp != el.encoding],
|
||||
el.language,
|
||||
el.alphabets,
|
||||
el.bom,
|
||||
el.percent_chaos,
|
||||
el.percent_coherence,
|
||||
None,
|
||||
False
|
||||
if len(matches) > 1 and args.alternatives:
|
||||
for el in matches:
|
||||
if el != best_guess:
|
||||
x_.append(
|
||||
CliDetectionResult(
|
||||
abspath(my_file.name),
|
||||
el.encoding,
|
||||
el.encoding_aliases,
|
||||
[
|
||||
cp
|
||||
for cp in el.could_be_from_charset
|
||||
if cp != el.encoding
|
||||
],
|
||||
el.language,
|
||||
el.alphabets,
|
||||
el.bom,
|
||||
el.percent_chaos,
|
||||
el.percent_coherence,
|
||||
None,
|
||||
False,
|
||||
)
|
||||
)
|
||||
|
||||
if args.normalize is True:
|
||||
|
||||
if best_guess.encoding.startswith("utf") is True:
|
||||
print(
|
||||
'"{}" file does not need to be normalized, as it already came from unicode.'.format(
|
||||
my_file.name
|
||||
),
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
if args.normalize is True:
|
||||
|
||||
if p_.encoding.startswith('utf') is True:
|
||||
print('"{}" file does not need to be normalized, as it already came from unicode.'.format(my_file.name), file=sys.stderr)
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
continue
|
||||
|
||||
o_ = my_file.name.split('.') # type: list[str]
|
||||
|
||||
if args.replace is False:
|
||||
o_.insert(-1, p_.encoding)
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
else:
|
||||
if args.force is False and query_yes_no(
|
||||
'Are you sure to normalize "{}" by replacing it ?'.format(my_file.name), 'no') is False:
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
continue
|
||||
|
||||
try:
|
||||
x_[0].unicode_path = './{}'.format('.'.join(o_))
|
||||
o_ = my_file.name.split(".") # type: List[str]
|
||||
|
||||
with open(x_[0].unicode_path, 'w', encoding='utf-8') as fp:
|
||||
fp.write(
|
||||
str(p_)
|
||||
)
|
||||
except IOError as e:
|
||||
print(str(e), file=sys.stderr)
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
return 2
|
||||
if args.replace is False:
|
||||
o_.insert(-1, best_guess.encoding)
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
else:
|
||||
if (
|
||||
args.force is False
|
||||
and query_yes_no(
|
||||
'Are you sure to normalize "{}" by replacing it ?'.format(
|
||||
my_file.name
|
||||
),
|
||||
"no",
|
||||
)
|
||||
is False
|
||||
):
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
continue
|
||||
|
||||
try:
|
||||
x_[0].unicode_path = abspath("./{}".format(".".join(o_)))
|
||||
|
||||
with open(x_[0].unicode_path, "w", encoding="utf-8") as fp:
|
||||
fp.write(str(best_guess))
|
||||
except IOError as e:
|
||||
print(str(e), file=sys.stderr)
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
return 2
|
||||
|
||||
if my_file.closed is False:
|
||||
my_file.close()
|
||||
@@ -187,24 +267,25 @@ def cli_detect(argv=None):
|
||||
if args.minimal is False:
|
||||
print(
|
||||
dumps(
|
||||
[
|
||||
el.__dict__ for el in x_
|
||||
] if args.alternatives else x_[0].__dict__,
|
||||
[el.__dict__ for el in x_] if len(x_) > 1 else x_[0].__dict__,
|
||||
ensure_ascii=True,
|
||||
indent=4
|
||||
indent=4,
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(
|
||||
', '.join(
|
||||
[
|
||||
el.encoding for el in x_
|
||||
]
|
||||
for my_file in args.files:
|
||||
print(
|
||||
", ".join(
|
||||
[
|
||||
el.encoding if el.encoding else "undefined"
|
||||
for el in x_
|
||||
if el.path == abspath(my_file.name)
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
cli_detect()
|
||||
|
||||
+420
-267
File diff suppressed because one or more lines are too long
+68
-11
@@ -1,7 +1,10 @@
|
||||
from pipenv.vendor.charset_normalizer.api import from_bytes
|
||||
from pipenv.vendor.charset_normalizer.constant import CHARDET_CORRESPONDENCE
|
||||
import warnings
|
||||
from typing import Dict, Optional, Union
|
||||
|
||||
from .api import from_bytes, from_fp, from_path, normalize
|
||||
from .constant import CHARDET_CORRESPONDENCE
|
||||
from .models import CharsetMatch, CharsetMatches
|
||||
|
||||
|
||||
def detect(byte_str: bytes) -> Dict[str, Optional[Union[str, float]]]:
|
||||
"""
|
||||
@@ -14,8 +17,10 @@ def detect(byte_str: bytes) -> Dict[str, Optional[Union[str, float]]]:
|
||||
:param byte_str: The byte sequence to examine.
|
||||
"""
|
||||
if not isinstance(byte_str, (bytearray, bytes)):
|
||||
raise TypeError('Expected object of type bytes or bytearray, got: '
|
||||
'{0}'.format(type(byte_str)))
|
||||
raise TypeError( # pragma: nocover
|
||||
"Expected object of type bytes or bytearray, got: "
|
||||
"{0}".format(type(byte_str))
|
||||
)
|
||||
|
||||
if isinstance(byte_str, bytearray):
|
||||
byte_str = bytes(byte_str)
|
||||
@@ -23,16 +28,68 @@ def detect(byte_str: bytes) -> Dict[str, Optional[Union[str, float]]]:
|
||||
r = from_bytes(byte_str).best()
|
||||
|
||||
encoding = r.encoding if r is not None else None
|
||||
language = r.language if r is not None and r.language != 'Unknown' else ''
|
||||
confidence = 1. - r.chaos if r is not None else None
|
||||
language = r.language if r is not None and r.language != "Unknown" else ""
|
||||
confidence = 1.0 - r.chaos if r is not None else None
|
||||
|
||||
# Note: CharsetNormalizer does not return 'UTF-8-SIG' as the sig get stripped in the detection/normalization process
|
||||
# but chardet does return 'utf-8-sig' and it is a valid codec name.
|
||||
if r is not None and encoding == 'utf_8' and r.bom:
|
||||
encoding += '_sig'
|
||||
if r is not None and encoding == "utf_8" and r.bom:
|
||||
encoding += "_sig"
|
||||
|
||||
return {
|
||||
'encoding': encoding if encoding not in CHARDET_CORRESPONDENCE else CHARDET_CORRESPONDENCE[encoding],
|
||||
'language': language,
|
||||
'confidence': confidence
|
||||
"encoding": encoding
|
||||
if encoding not in CHARDET_CORRESPONDENCE
|
||||
else CHARDET_CORRESPONDENCE[encoding],
|
||||
"language": language,
|
||||
"confidence": confidence,
|
||||
}
|
||||
|
||||
|
||||
class CharsetNormalizerMatch(CharsetMatch):
|
||||
pass
|
||||
|
||||
|
||||
class CharsetNormalizerMatches(CharsetMatches):
|
||||
@staticmethod
|
||||
def from_fp(*args, **kwargs): # type: ignore
|
||||
warnings.warn( # pragma: nocover
|
||||
"staticmethod from_fp, from_bytes, from_path and normalize are deprecated "
|
||||
"and scheduled to be removed in 3.0",
|
||||
DeprecationWarning,
|
||||
)
|
||||
return from_fp(*args, **kwargs) # pragma: nocover
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(*args, **kwargs): # type: ignore
|
||||
warnings.warn( # pragma: nocover
|
||||
"staticmethod from_fp, from_bytes, from_path and normalize are deprecated "
|
||||
"and scheduled to be removed in 3.0",
|
||||
DeprecationWarning,
|
||||
)
|
||||
return from_bytes(*args, **kwargs) # pragma: nocover
|
||||
|
||||
@staticmethod
|
||||
def from_path(*args, **kwargs): # type: ignore
|
||||
warnings.warn( # pragma: nocover
|
||||
"staticmethod from_fp, from_bytes, from_path and normalize are deprecated "
|
||||
"and scheduled to be removed in 3.0",
|
||||
DeprecationWarning,
|
||||
)
|
||||
return from_path(*args, **kwargs) # pragma: nocover
|
||||
|
||||
@staticmethod
|
||||
def normalize(*args, **kwargs): # type: ignore
|
||||
warnings.warn( # pragma: nocover
|
||||
"staticmethod from_fp, from_bytes, from_path and normalize are deprecated "
|
||||
"and scheduled to be removed in 3.0",
|
||||
DeprecationWarning,
|
||||
)
|
||||
return normalize(*args, **kwargs) # pragma: nocover
|
||||
|
||||
|
||||
class CharsetDetector(CharsetNormalizerMatches):
|
||||
pass
|
||||
|
||||
|
||||
class CharsetDoctor(CharsetNormalizerMatches):
|
||||
pass
|
||||
|
||||
+176
-87
@@ -1,9 +1,24 @@
|
||||
from functools import lru_cache
|
||||
from typing import Optional, List
|
||||
from typing import List, Optional
|
||||
|
||||
from pipenv.vendor.charset_normalizer.constant import UNICODE_SECONDARY_RANGE_KEYWORD
|
||||
from pipenv.vendor.charset_normalizer.utils import is_punctuation, is_symbol, unicode_range, is_accentuated, is_latin, \
|
||||
remove_accent, is_separator, is_cjk
|
||||
from .constant import COMMON_SAFE_ASCII_CHARACTERS, UNICODE_SECONDARY_RANGE_KEYWORD
|
||||
from .utils import (
|
||||
is_accentuated,
|
||||
is_ascii,
|
||||
is_case_variable,
|
||||
is_cjk,
|
||||
is_emoticon,
|
||||
is_hangul,
|
||||
is_hiragana,
|
||||
is_katakana,
|
||||
is_latin,
|
||||
is_punctuation,
|
||||
is_separator,
|
||||
is_symbol,
|
||||
is_thai,
|
||||
remove_accent,
|
||||
unicode_range,
|
||||
)
|
||||
|
||||
|
||||
class MessDetectorPlugin:
|
||||
@@ -41,8 +56,7 @@ class MessDetectorPlugin:
|
||||
|
||||
|
||||
class TooManySymbolOrPunctuationPlugin(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._punctuation_count = 0 # type: int
|
||||
self._symbol_count = 0 # type: int
|
||||
self._character_count = 0 # type: int
|
||||
@@ -56,10 +70,17 @@ class TooManySymbolOrPunctuationPlugin(MessDetectorPlugin):
|
||||
def feed(self, character: str) -> None:
|
||||
self._character_count += 1
|
||||
|
||||
if character != self._last_printable_char and character not in ["<", ">", "=", ":", "/", "&", ";", "{", "}", "[", "]"]:
|
||||
if (
|
||||
character != self._last_printable_char
|
||||
and character not in COMMON_SAFE_ASCII_CHARACTERS
|
||||
):
|
||||
if is_punctuation(character):
|
||||
self._punctuation_count += 1
|
||||
elif character.isdigit() is False and is_symbol(character):
|
||||
elif (
|
||||
character.isdigit() is False
|
||||
and is_symbol(character)
|
||||
and is_emoticon(character) is False
|
||||
):
|
||||
self._symbol_count += 2
|
||||
|
||||
self._last_printable_char = character
|
||||
@@ -72,16 +93,17 @@ class TooManySymbolOrPunctuationPlugin(MessDetectorPlugin):
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._character_count == 0:
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
ratio_of_punctuation = (self._punctuation_count + self._symbol_count) / self._character_count # type: float
|
||||
ratio_of_punctuation = (
|
||||
self._punctuation_count + self._symbol_count
|
||||
) / self._character_count # type: float
|
||||
|
||||
return ratio_of_punctuation if ratio_of_punctuation >= 0.3 else 0.
|
||||
return ratio_of_punctuation if ratio_of_punctuation >= 0.3 else 0.0
|
||||
|
||||
|
||||
class TooManyAccentuatedPlugin(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._character_count = 0 # type: int
|
||||
self._accentuated_count = 0 # type: int
|
||||
|
||||
@@ -101,14 +123,15 @@ class TooManyAccentuatedPlugin(MessDetectorPlugin):
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._character_count == 0:
|
||||
return 0.
|
||||
ratio_of_accentuation = self._accentuated_count / self._character_count # type: float
|
||||
return ratio_of_accentuation if ratio_of_accentuation >= 0.35 else 0.
|
||||
return 0.0
|
||||
ratio_of_accentuation = (
|
||||
self._accentuated_count / self._character_count
|
||||
) # type: float
|
||||
return ratio_of_accentuation if ratio_of_accentuation >= 0.35 else 0.0
|
||||
|
||||
|
||||
class UnprintablePlugin(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._unprintable_count = 0 # type: int
|
||||
self._character_count = 0 # type: int
|
||||
|
||||
@@ -116,7 +139,11 @@ class UnprintablePlugin(MessDetectorPlugin):
|
||||
return True
|
||||
|
||||
def feed(self, character: str) -> None:
|
||||
if character not in {'\n', '\t', '\r'} and character.isprintable() is False:
|
||||
if (
|
||||
character.isspace() is False # includes \n \t \r \v
|
||||
and character.isprintable() is False
|
||||
and character != "\x1A" # Why? Its the ASCII substitute character.
|
||||
):
|
||||
self._unprintable_count += 1
|
||||
self._character_count += 1
|
||||
|
||||
@@ -126,26 +153,31 @@ class UnprintablePlugin(MessDetectorPlugin):
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._character_count == 0:
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
return (self._unprintable_count * 8) / self._character_count
|
||||
|
||||
|
||||
class SuspiciousDuplicateAccentPlugin(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._successive_count = 0 # type: int
|
||||
self._character_count = 0 # type: int
|
||||
|
||||
self._last_latin_character = None # type: Optional[str]
|
||||
|
||||
def eligible(self, character: str) -> bool:
|
||||
return is_latin(character)
|
||||
return character.isalpha() and is_latin(character)
|
||||
|
||||
def feed(self, character: str) -> None:
|
||||
self._character_count += 1
|
||||
if self._last_latin_character is not None:
|
||||
if is_accentuated(character) and is_accentuated(self._last_latin_character):
|
||||
if remove_accent(character) == remove_accent(self._last_latin_character):
|
||||
if character.isupper() and self._last_latin_character.isupper():
|
||||
self._successive_count += 1
|
||||
# Worse if its the same char duplicated with different accent.
|
||||
if remove_accent(character) == remove_accent(
|
||||
self._last_latin_character
|
||||
):
|
||||
self._successive_count += 1
|
||||
self._last_latin_character = character
|
||||
|
||||
@@ -157,14 +189,13 @@ class SuspiciousDuplicateAccentPlugin(MessDetectorPlugin):
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._character_count == 0:
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
return (self._successive_count * 2) / self._character_count
|
||||
|
||||
|
||||
class SuspiciousRange(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._suspicious_successive_range_count = 0 # type: int
|
||||
self._character_count = 0 # type: int
|
||||
self._last_printable_seen = None # type: Optional[str]
|
||||
@@ -175,15 +206,21 @@ class SuspiciousRange(MessDetectorPlugin):
|
||||
def feed(self, character: str) -> None:
|
||||
self._character_count += 1
|
||||
|
||||
if (
|
||||
character.isspace()
|
||||
or is_punctuation(character)
|
||||
or character in COMMON_SAFE_ASCII_CHARACTERS
|
||||
):
|
||||
self._last_printable_seen = None
|
||||
return
|
||||
|
||||
if self._last_printable_seen is None:
|
||||
self._last_printable_seen = character
|
||||
return
|
||||
|
||||
if character.isspace() or is_punctuation(character):
|
||||
self._last_printable_seen = None
|
||||
return
|
||||
|
||||
unicode_range_a = unicode_range(self._last_printable_seen) # type: Optional[str]
|
||||
unicode_range_a = unicode_range(
|
||||
self._last_printable_seen
|
||||
) # type: Optional[str]
|
||||
unicode_range_b = unicode_range(character) # type: Optional[str]
|
||||
|
||||
if is_suspiciously_successive_range(unicode_range_a, unicode_range_b):
|
||||
@@ -199,22 +236,24 @@ class SuspiciousRange(MessDetectorPlugin):
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._character_count == 0:
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
ratio_of_suspicious_range_usage = (self._suspicious_successive_range_count * 2) / self._character_count # type: float
|
||||
ratio_of_suspicious_range_usage = (
|
||||
self._suspicious_successive_range_count * 2
|
||||
) / self._character_count # type: float
|
||||
|
||||
if ratio_of_suspicious_range_usage < 0.1:
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
return ratio_of_suspicious_range_usage
|
||||
|
||||
|
||||
class SuperWeirdWordPlugin(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._word_count = 0 # type: int
|
||||
self._bad_word_count = 0 # type: int
|
||||
self._is_current_word_bad = False # type: bool
|
||||
self._foreign_long_watch = False # type: bool
|
||||
|
||||
self._character_count = 0 # type: int
|
||||
self._bad_character_count = 0 # type: int
|
||||
@@ -230,16 +269,30 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
|
||||
self._buffer = "".join([self._buffer, character])
|
||||
if is_accentuated(character):
|
||||
self._buffer_accent_count += 1
|
||||
if (
|
||||
self._foreign_long_watch is False
|
||||
and is_latin(character) is False
|
||||
and is_cjk(character) is False
|
||||
and is_hangul(character) is False
|
||||
and is_katakana(character) is False
|
||||
and is_hiragana(character) is False
|
||||
and is_thai(character) is False
|
||||
):
|
||||
self._foreign_long_watch = True
|
||||
return
|
||||
if not self._buffer:
|
||||
return
|
||||
if (character.isspace() or is_punctuation(character) or is_separator(character)) and self._buffer:
|
||||
if (
|
||||
character.isspace() or is_punctuation(character) or is_separator(character)
|
||||
) and self._buffer:
|
||||
self._word_count += 1
|
||||
buffer_length = len(self._buffer) # type: int
|
||||
|
||||
self._character_count += buffer_length
|
||||
|
||||
if buffer_length >= 4 and self._buffer_accent_count / buffer_length >= 0.3:
|
||||
if buffer_length >= 4 and self._buffer_accent_count / buffer_length > 0.34:
|
||||
self._is_current_word_bad = True
|
||||
if buffer_length >= 24 and self._foreign_long_watch:
|
||||
self._is_current_word_bad = True
|
||||
|
||||
if self._is_current_word_bad:
|
||||
@@ -247,15 +300,21 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
|
||||
self._bad_character_count += len(self._buffer)
|
||||
self._is_current_word_bad = False
|
||||
|
||||
self._foreign_long_watch = False
|
||||
self._buffer = ""
|
||||
self._buffer_accent_count = 0
|
||||
elif character not in {"<", ">", "-", "="} and character.isdigit() is False and is_symbol(character):
|
||||
elif (
|
||||
character not in {"<", ">", "-", "="}
|
||||
and character.isdigit() is False
|
||||
and is_symbol(character)
|
||||
):
|
||||
self._is_current_word_bad = True
|
||||
self._buffer += character
|
||||
|
||||
def reset(self) -> None:
|
||||
self._buffer = ""
|
||||
self._is_current_word_bad = False
|
||||
self._foreign_long_watch = False
|
||||
self._bad_word_count = 0
|
||||
self._word_count = 0
|
||||
self._character_count = 0
|
||||
@@ -263,19 +322,19 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
|
||||
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._word_count <= 16:
|
||||
return 0.
|
||||
if self._word_count <= 10:
|
||||
return 0.0
|
||||
|
||||
return self._bad_character_count / self._character_count
|
||||
|
||||
|
||||
class CjkInvalidStopPlugin(MessDetectorPlugin):
|
||||
"""
|
||||
GB(Chinese) based encoding often render the stop incorrectly when the content does not fit and can be easily detected.
|
||||
Searching for the overuse of '丅' and '丄'.
|
||||
GB(Chinese) based encoding often render the stop incorrectly when the content does not fit and
|
||||
can be easily detected. Searching for the overuse of '丅' and '丄'.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._wrong_stop_count = 0 # type: int
|
||||
self._cjk_character_count = 0 # type: int
|
||||
|
||||
@@ -296,13 +355,12 @@ class CjkInvalidStopPlugin(MessDetectorPlugin):
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._cjk_character_count < 16:
|
||||
return 0.
|
||||
return 0.0
|
||||
return self._wrong_stop_count / self._cjk_character_count
|
||||
|
||||
|
||||
class ArchaicUpperLowerPlugin(MessDetectorPlugin):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self._buf = False # type: bool
|
||||
|
||||
self._character_count_since_last_sep = 0 # type: int
|
||||
@@ -313,27 +371,51 @@ class ArchaicUpperLowerPlugin(MessDetectorPlugin):
|
||||
self._character_count = 0 # type: int
|
||||
|
||||
self._last_alpha_seen = None # type: Optional[str]
|
||||
self._current_ascii_only = True # type: bool
|
||||
|
||||
def eligible(self, character: str) -> bool:
|
||||
return character.isspace() or character.isalpha()
|
||||
return True
|
||||
|
||||
def feed(self, character: str) -> None:
|
||||
if is_separator(character):
|
||||
if self._character_count_since_last_sep < 24:
|
||||
self._successive_upper_lower_count_final += self._successive_upper_lower_count
|
||||
is_concerned = character.isalpha() and is_case_variable(character)
|
||||
chunk_sep = is_concerned is False
|
||||
|
||||
if chunk_sep and self._character_count_since_last_sep > 0:
|
||||
if (
|
||||
self._character_count_since_last_sep <= 64
|
||||
and character.isdigit() is False
|
||||
and self._current_ascii_only is False
|
||||
):
|
||||
self._successive_upper_lower_count_final += (
|
||||
self._successive_upper_lower_count
|
||||
)
|
||||
|
||||
self._successive_upper_lower_count = 0
|
||||
self._character_count_since_last_sep = 0
|
||||
self._last_alpha_seen = None
|
||||
self._buf = False
|
||||
self._character_count += 1
|
||||
self._current_ascii_only = True
|
||||
|
||||
return
|
||||
|
||||
if self._current_ascii_only is True and is_ascii(character) is False:
|
||||
self._current_ascii_only = False
|
||||
|
||||
if self._last_alpha_seen is not None:
|
||||
if (character.isupper() and self._last_alpha_seen.islower()) or (character.islower() and self._last_alpha_seen.isupper()):
|
||||
if (character.isupper() and self._last_alpha_seen.islower()) or (
|
||||
character.islower() and self._last_alpha_seen.isupper()
|
||||
):
|
||||
if self._buf is True:
|
||||
self._successive_upper_lower_count += 1
|
||||
self._successive_upper_lower_count += 2
|
||||
self._buf = False
|
||||
else:
|
||||
self._buf = True
|
||||
else:
|
||||
self._buf = False
|
||||
|
||||
self._character_count += 1
|
||||
self._character_count_since_last_sep += 1
|
||||
self._last_alpha_seen = character
|
||||
|
||||
def reset(self) -> None:
|
||||
@@ -342,16 +424,20 @@ class ArchaicUpperLowerPlugin(MessDetectorPlugin):
|
||||
self._successive_upper_lower_count = 0
|
||||
self._successive_upper_lower_count_final = 0
|
||||
self._last_alpha_seen = None
|
||||
self._buf = False
|
||||
self._current_ascii_only = True
|
||||
|
||||
@property
|
||||
def ratio(self) -> float:
|
||||
if self._character_count == 0:
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
return (self._successive_upper_lower_count_final * 2) / self._character_count
|
||||
return self._successive_upper_lower_count_final / self._character_count
|
||||
|
||||
|
||||
def is_suspiciously_successive_range(unicode_range_a: Optional[str], unicode_range_b: Optional[str]) -> bool:
|
||||
def is_suspiciously_successive_range(
|
||||
unicode_range_a: Optional[str], unicode_range_b: Optional[str]
|
||||
) -> bool:
|
||||
"""
|
||||
Determine if two Unicode range seen next to each other can be considered as suspicious.
|
||||
"""
|
||||
@@ -367,7 +453,9 @@ def is_suspiciously_successive_range(unicode_range_a: Optional[str], unicode_ran
|
||||
if "Emoticons" in unicode_range_a or "Emoticons" in unicode_range_b:
|
||||
return False
|
||||
|
||||
keywords_range_a, keywords_range_b = unicode_range_a.split(" "), unicode_range_b.split(" ")
|
||||
keywords_range_a, keywords_range_b = unicode_range_a.split(
|
||||
" "
|
||||
), unicode_range_b.split(" ")
|
||||
|
||||
for el in keywords_range_a:
|
||||
if el in UNICODE_SECONDARY_RANGE_KEYWORD:
|
||||
@@ -376,12 +464,19 @@ def is_suspiciously_successive_range(unicode_range_a: Optional[str], unicode_ran
|
||||
return False
|
||||
|
||||
# Japanese Exception
|
||||
if unicode_range_a in ['Katakana', 'Hiragana'] and unicode_range_b in ['Katakana', 'Hiragana']:
|
||||
return False
|
||||
|
||||
if unicode_range_a in ['Katakana', 'Hiragana'] or unicode_range_b in ['Katakana', 'Hiragana']:
|
||||
range_a_jp_chars, range_b_jp_chars = (
|
||||
unicode_range_a
|
||||
in (
|
||||
"Hiragana",
|
||||
"Katakana",
|
||||
),
|
||||
unicode_range_b in ("Hiragana", "Katakana"),
|
||||
)
|
||||
if range_a_jp_chars or range_b_jp_chars:
|
||||
if "CJK" in unicode_range_a or "CJK" in unicode_range_b:
|
||||
return False
|
||||
if range_a_jp_chars and range_b_jp_chars:
|
||||
return False
|
||||
|
||||
if "Hangul" in unicode_range_a or "Hangul" in unicode_range_b:
|
||||
if "CJK" in unicode_range_a or "CJK" in unicode_range_b:
|
||||
@@ -390,30 +485,33 @@ def is_suspiciously_successive_range(unicode_range_a: Optional[str], unicode_ran
|
||||
return False
|
||||
|
||||
# Chinese/Japanese use dedicated range for punctuation and/or separators.
|
||||
if ('CJK' in unicode_range_a or 'CJK' in unicode_range_b) or (unicode_range_a in ['Katakana', 'Hiragana'] and unicode_range_b in ['Katakana', 'Hiragana']):
|
||||
if 'Punctuation' in unicode_range_a or 'Punctuation' in unicode_range_b:
|
||||
if ("CJK" in unicode_range_a or "CJK" in unicode_range_b) or (
|
||||
unicode_range_a in ["Katakana", "Hiragana"]
|
||||
and unicode_range_b in ["Katakana", "Hiragana"]
|
||||
):
|
||||
if "Punctuation" in unicode_range_a or "Punctuation" in unicode_range_b:
|
||||
return False
|
||||
if 'Forms' in unicode_range_a or 'Forms' in unicode_range_b:
|
||||
if "Forms" in unicode_range_a or "Forms" in unicode_range_b:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@lru_cache(maxsize=2048)
|
||||
def mess_ratio(decoded_sequence: str, maximum_threshold: float = 0.2, debug: bool = False) -> float:
|
||||
def mess_ratio(
|
||||
decoded_sequence: str, maximum_threshold: float = 0.2, debug: bool = False
|
||||
) -> float:
|
||||
"""
|
||||
Compute a mess ratio given a decoded bytes sequence. The maximum threshold does stop the computation earlier.
|
||||
"""
|
||||
detectors = [] # type: List[MessDetectorPlugin]
|
||||
|
||||
for md_class in MessDetectorPlugin.__subclasses__():
|
||||
detectors.append(
|
||||
md_class()
|
||||
)
|
||||
detectors = [
|
||||
md_class() for md_class in MessDetectorPlugin.__subclasses__()
|
||||
] # type: List[MessDetectorPlugin]
|
||||
|
||||
length = len(decoded_sequence) # type: int
|
||||
|
||||
mean_mess_ratio = 0. # type: float
|
||||
mean_mess_ratio = 0.0 # type: float
|
||||
|
||||
if length < 512:
|
||||
intermediary_mean_mess_ratio_calc = 32 # type: int
|
||||
@@ -427,25 +525,16 @@ def mess_ratio(decoded_sequence: str, maximum_threshold: float = 0.2, debug: boo
|
||||
if detector.eligible(character):
|
||||
detector.feed(character)
|
||||
|
||||
if (index > 0 and index % intermediary_mean_mess_ratio_calc == 0) or index == length-1:
|
||||
mean_mess_ratio = sum(
|
||||
[
|
||||
dt.ratio for dt in detectors
|
||||
]
|
||||
)
|
||||
if (
|
||||
index > 0 and index % intermediary_mean_mess_ratio_calc == 0
|
||||
) or index == length - 1:
|
||||
mean_mess_ratio = sum([dt.ratio for dt in detectors])
|
||||
|
||||
if mean_mess_ratio >= maximum_threshold:
|
||||
break
|
||||
|
||||
if debug:
|
||||
for dt in detectors: # pragma: nocover
|
||||
print(
|
||||
dt.__class__,
|
||||
dt.ratio
|
||||
)
|
||||
|
||||
return round(
|
||||
mean_mess_ratio,
|
||||
3
|
||||
)
|
||||
print(dt.__class__, dt.ratio)
|
||||
|
||||
return round(mean_mess_ratio, 3)
|
||||
|
||||
+102
-64
@@ -1,25 +1,25 @@
|
||||
import warnings
|
||||
from collections import Counter
|
||||
from encodings.aliases import aliases
|
||||
from hashlib import sha256
|
||||
from json import dumps
|
||||
from typing import Optional, List, Tuple, Set
|
||||
from collections import Counter
|
||||
from re import sub, compile as re_compile
|
||||
from re import sub
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
|
||||
|
||||
from pipenv.vendor.charset_normalizer.constant import TOO_BIG_SEQUENCE
|
||||
from pipenv.vendor.charset_normalizer.md import mess_ratio
|
||||
from pipenv.vendor.charset_normalizer.utils import iana_name, is_multi_byte_encoding, unicode_range
|
||||
from .constant import NOT_PRINTABLE_PATTERN, TOO_BIG_SEQUENCE
|
||||
from .md import mess_ratio
|
||||
from .utils import iana_name, is_multi_byte_encoding, unicode_range
|
||||
|
||||
|
||||
class CharsetMatch:
|
||||
def __init__(
|
||||
self,
|
||||
payload: bytes,
|
||||
guessed_encoding: str,
|
||||
mean_mess_ratio: float,
|
||||
has_sig_or_bom: bool,
|
||||
languages: "CoherenceMatches",
|
||||
decoded_payload: Optional[str] = None
|
||||
self,
|
||||
payload: bytes,
|
||||
guessed_encoding: str,
|
||||
mean_mess_ratio: float,
|
||||
has_sig_or_bom: bool,
|
||||
languages: "CoherenceMatches",
|
||||
decoded_payload: Optional[str] = None,
|
||||
):
|
||||
self._payload = payload # type: bytes
|
||||
|
||||
@@ -30,19 +30,23 @@ class CharsetMatch:
|
||||
self._unicode_ranges = None # type: Optional[List[str]]
|
||||
|
||||
self._leaves = [] # type: List[CharsetMatch]
|
||||
self._mean_coherence_ratio = 0. # type: float
|
||||
self._mean_coherence_ratio = 0.0 # type: float
|
||||
|
||||
self._output_payload = None # type: Optional[bytes]
|
||||
self._output_encoding = None # type: Optional[str]
|
||||
|
||||
self._string = decoded_payload # type: Optional[str]
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, CharsetMatch):
|
||||
raise TypeError('__eq__ cannot be invoked on {} and {}.'.format(str(other.__class__), str(self.__class__)))
|
||||
raise TypeError(
|
||||
"__eq__ cannot be invoked on {} and {}.".format(
|
||||
str(other.__class__), str(self.__class__)
|
||||
)
|
||||
)
|
||||
return self.encoding == other.encoding and self.fingerprint == other.fingerprint
|
||||
|
||||
def __lt__(self, other) -> bool:
|
||||
def __lt__(self, other: object) -> bool:
|
||||
"""
|
||||
Implemented to make sorted available upon CharsetMatches items.
|
||||
"""
|
||||
@@ -50,13 +54,21 @@ class CharsetMatch:
|
||||
raise ValueError
|
||||
|
||||
chaos_difference = abs(self.chaos - other.chaos) # type: float
|
||||
coherence_difference = abs(self.coherence - other.coherence) # type: float
|
||||
|
||||
# Bellow 1% difference --> Use Coherence
|
||||
if chaos_difference < 0.01:
|
||||
if chaos_difference < 0.01 and coherence_difference > 0.02:
|
||||
# When having a tough decision, use the result that decoded as many multi-byte as possible.
|
||||
if chaos_difference == 0.0 and self.coherence == other.coherence:
|
||||
return self.multi_byte_usage > other.multi_byte_usage
|
||||
return self.coherence > other.coherence
|
||||
|
||||
return self.chaos < other.chaos
|
||||
|
||||
@property
|
||||
def multi_byte_usage(self) -> float:
|
||||
return 1.0 - len(str(self)) / len(self.raw)
|
||||
|
||||
@property
|
||||
def chaos_secondary_pass(self) -> float:
|
||||
"""
|
||||
@@ -64,11 +76,11 @@ class CharsetMatch:
|
||||
Use with caution, this can be very slow.
|
||||
Notice: Will be removed in 3.0
|
||||
"""
|
||||
warnings.warn("chaos_secondary_pass is deprecated and will be removed in 3.0", DeprecationWarning)
|
||||
return mess_ratio(
|
||||
str(self),
|
||||
1.
|
||||
warnings.warn(
|
||||
"chaos_secondary_pass is deprecated and will be removed in 3.0",
|
||||
DeprecationWarning,
|
||||
)
|
||||
return mess_ratio(str(self), 1.0)
|
||||
|
||||
@property
|
||||
def coherence_non_latin(self) -> float:
|
||||
@@ -76,8 +88,11 @@ class CharsetMatch:
|
||||
Coherence ratio on the first non-latin language detected if ANY.
|
||||
Notice: Will be removed in 3.0
|
||||
"""
|
||||
warnings.warn("coherence_non_latin is deprecated and will be removed in 3.0", DeprecationWarning)
|
||||
return 0.
|
||||
warnings.warn(
|
||||
"coherence_non_latin is deprecated and will be removed in 3.0",
|
||||
DeprecationWarning,
|
||||
)
|
||||
return 0.0
|
||||
|
||||
@property
|
||||
def w_counter(self) -> Counter:
|
||||
@@ -85,9 +100,11 @@ class CharsetMatch:
|
||||
Word counter instance on decoded text.
|
||||
Notice: Will be removed in 3.0
|
||||
"""
|
||||
warnings.warn("w_counter is deprecated and will be removed in 3.0", DeprecationWarning)
|
||||
not_printable_pattern = re_compile(r'[0-9\W\n\r\t]+')
|
||||
string_printable_only = sub(not_printable_pattern, ' ', str(self).lower())
|
||||
warnings.warn(
|
||||
"w_counter is deprecated and will be removed in 3.0", DeprecationWarning
|
||||
)
|
||||
|
||||
string_printable_only = sub(NOT_PRINTABLE_PATTERN, " ", str(self).lower())
|
||||
|
||||
return Counter(string_printable_only.split())
|
||||
|
||||
@@ -102,7 +119,11 @@ class CharsetMatch:
|
||||
|
||||
def add_submatch(self, other: "CharsetMatch") -> None:
|
||||
if not isinstance(other, CharsetMatch) or other == self:
|
||||
raise ValueError("Unable to add instance <{}> as a submatch of a CharsetMatch".format(other.__class__))
|
||||
raise ValueError(
|
||||
"Unable to add instance <{}> as a submatch of a CharsetMatch".format(
|
||||
other.__class__
|
||||
)
|
||||
)
|
||||
|
||||
other._string = None # Unload RAM usage; dirty trick.
|
||||
self._leaves.append(other)
|
||||
@@ -153,9 +174,13 @@ class CharsetMatch:
|
||||
return "English"
|
||||
|
||||
# doing it there to avoid circular import
|
||||
from pipenv.vendor.charset_normalizer.cd import mb_encoding_languages, encoding_languages
|
||||
from charset_normalizer.cd import encoding_languages, mb_encoding_languages
|
||||
|
||||
languages = mb_encoding_languages(self.encoding) if is_multi_byte_encoding(self.encoding) else encoding_languages(self.encoding)
|
||||
languages = (
|
||||
mb_encoding_languages(self.encoding)
|
||||
if is_multi_byte_encoding(self.encoding)
|
||||
else encoding_languages(self.encoding)
|
||||
)
|
||||
|
||||
if len(languages) == 0 or "Latin Based" in languages:
|
||||
return "Unknown"
|
||||
@@ -171,7 +196,7 @@ class CharsetMatch:
|
||||
@property
|
||||
def coherence(self) -> float:
|
||||
if not self._languages:
|
||||
return 0.
|
||||
return 0.0
|
||||
return self._languages[0][1]
|
||||
|
||||
@property
|
||||
@@ -201,12 +226,12 @@ class CharsetMatch:
|
||||
def alphabets(self) -> List[str]:
|
||||
if self._unicode_ranges is not None:
|
||||
return self._unicode_ranges
|
||||
detected_ranges = set() # type: Set[str]
|
||||
for character in str(self):
|
||||
detected_ranges.add(
|
||||
unicode_range(character)
|
||||
)
|
||||
self._unicode_ranges = sorted(list(detected_ranges))
|
||||
# list detected ranges
|
||||
detected_ranges = [
|
||||
unicode_range(char) for char in str(self)
|
||||
] # type: List[Optional[str]]
|
||||
# filter and sort
|
||||
self._unicode_ranges = sorted(list({r for r in detected_ranges if r}))
|
||||
return self._unicode_ranges
|
||||
|
||||
@property
|
||||
@@ -254,14 +279,15 @@ class CharsetMatches:
|
||||
Container with every CharsetMatch items ordered by default from most probable to the less one.
|
||||
Act like a list(iterable) but does not implements all related methods.
|
||||
"""
|
||||
|
||||
def __init__(self, results: List[CharsetMatch] = None):
|
||||
self._results = sorted(results) if results else [] # type: List[CharsetMatch]
|
||||
|
||||
def __iter__(self):
|
||||
def __iter__(self) -> Iterator[CharsetMatch]:
|
||||
for result in self._results:
|
||||
yield result
|
||||
|
||||
def __getitem__(self, item) -> CharsetMatch:
|
||||
def __getitem__(self, item: Union[int, str]) -> CharsetMatch:
|
||||
"""
|
||||
Retrieve a single item either by its position or encoding name (alias may be used here).
|
||||
Raise KeyError upon invalid index or encoding not present in results.
|
||||
@@ -278,17 +304,24 @@ class CharsetMatches:
|
||||
def __len__(self) -> int:
|
||||
return len(self._results)
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return len(self._results) > 0
|
||||
|
||||
def append(self, item: CharsetMatch) -> None:
|
||||
"""
|
||||
Insert a single match. Will be inserted accordingly to preserve sort.
|
||||
Can be inserted as a submatch.
|
||||
"""
|
||||
if not isinstance(item, CharsetMatch):
|
||||
raise ValueError("Cannot append instance '{}' to CharsetMatches".format(str(item.__class__)))
|
||||
raise ValueError(
|
||||
"Cannot append instance '{}' to CharsetMatches".format(
|
||||
str(item.__class__)
|
||||
)
|
||||
)
|
||||
# We should disable the submatch factoring when the input file is too heavy (conserve RAM usage)
|
||||
if len(item.raw) <= TOO_BIG_SEQUENCE:
|
||||
for match in self._results:
|
||||
if match.fingerprint == item.fingerprint:
|
||||
if match.fingerprint == item.fingerprint and match.chaos == item.chaos:
|
||||
match.add_submatch(item)
|
||||
return
|
||||
self._results.append(item)
|
||||
@@ -314,11 +347,23 @@ CoherenceMatches = List[CoherenceMatch]
|
||||
|
||||
|
||||
class CliDetectionResult:
|
||||
|
||||
def __init__(self, path: str, encoding: str, encoding_aliases: List[str], alternative_encodings: List[str], language: str, alphabets: List[str], has_sig_or_bom: bool, chaos: float, coherence: float, unicode_path: Optional[str], is_preferred: bool):
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
encoding: Optional[str],
|
||||
encoding_aliases: List[str],
|
||||
alternative_encodings: List[str],
|
||||
language: str,
|
||||
alphabets: List[str],
|
||||
has_sig_or_bom: bool,
|
||||
chaos: float,
|
||||
coherence: float,
|
||||
unicode_path: Optional[str],
|
||||
is_preferred: bool,
|
||||
):
|
||||
self.path = path # type: str
|
||||
self.unicode_path = unicode_path # type: Optional[str]
|
||||
self.encoding = encoding # type: str
|
||||
self.encoding = encoding # type: Optional[str]
|
||||
self.encoding_aliases = encoding_aliases # type: List[str]
|
||||
self.alternative_encodings = alternative_encodings # type: List[str]
|
||||
self.language = language # type: str
|
||||
@@ -329,27 +374,20 @@ class CliDetectionResult:
|
||||
self.is_preferred = is_preferred # type: bool
|
||||
|
||||
@property
|
||||
def __dict__(self):
|
||||
def __dict__(self) -> Dict[str, Any]: # type: ignore
|
||||
return {
|
||||
'path': self.path,
|
||||
'encoding': self.encoding,
|
||||
'encoding_aliases': self.encoding_aliases,
|
||||
'alternative_encodings': self.alternative_encodings,
|
||||
'language': self.language,
|
||||
'alphabets': self.alphabets,
|
||||
'has_sig_or_bom': self.has_sig_or_bom,
|
||||
'chaos': self.chaos,
|
||||
'coherence': self.coherence,
|
||||
'unicode_path': self.unicode_path,
|
||||
'is_preferred': self.is_preferred
|
||||
"path": self.path,
|
||||
"encoding": self.encoding,
|
||||
"encoding_aliases": self.encoding_aliases,
|
||||
"alternative_encodings": self.alternative_encodings,
|
||||
"language": self.language,
|
||||
"alphabets": self.alphabets,
|
||||
"has_sig_or_bom": self.has_sig_or_bom,
|
||||
"chaos": self.chaos,
|
||||
"coherence": self.coherence,
|
||||
"unicode_path": self.unicode_path,
|
||||
"is_preferred": self.is_preferred,
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
return dumps(
|
||||
self.__dict__,
|
||||
ensure_ascii=True,
|
||||
indent=4
|
||||
)
|
||||
|
||||
|
||||
CharsetNormalizerMatch = CharsetMatch
|
||||
return dumps(self.__dict__, ensure_ascii=True, indent=4)
|
||||
|
||||
+114
-29
@@ -1,19 +1,25 @@
|
||||
try:
|
||||
import unicodedata2 as unicodedata
|
||||
except ImportError:
|
||||
import unicodedata
|
||||
import unicodedata # type: ignore[no-redef]
|
||||
|
||||
from codecs import IncrementalDecoder
|
||||
from re import findall
|
||||
from typing import Optional, Tuple, Union, List, Set
|
||||
import importlib
|
||||
from _multibytecodec import MultibyteIncrementalDecoder # type: ignore
|
||||
|
||||
from codecs import IncrementalDecoder
|
||||
from encodings.aliases import aliases
|
||||
from functools import lru_cache
|
||||
from re import findall
|
||||
from typing import List, Optional, Set, Tuple, Union
|
||||
|
||||
from pipenv.vendor.charset_normalizer.constant import UNICODE_RANGES_COMBINED, UNICODE_SECONDARY_RANGE_KEYWORD, \
|
||||
RE_POSSIBLE_ENCODING_INDICATION, ENCODING_MARKS, UTF8_MAXIMAL_ALLOCATION, IANA_SUPPORTED_SIMILAR
|
||||
from _multibytecodec import MultibyteIncrementalDecoder # type: ignore
|
||||
|
||||
from .constant import (
|
||||
ENCODING_MARKS,
|
||||
IANA_SUPPORTED_SIMILAR,
|
||||
RE_POSSIBLE_ENCODING_INDICATION,
|
||||
UNICODE_RANGES_COMBINED,
|
||||
UNICODE_SECONDARY_RANGE_KEYWORD,
|
||||
UTF8_MAXIMAL_ALLOCATION,
|
||||
)
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
@@ -22,7 +28,14 @@ def is_accentuated(character: str) -> bool:
|
||||
description = unicodedata.name(character) # type: str
|
||||
except ValueError:
|
||||
return False
|
||||
return "WITH GRAVE" in description or "WITH ACUTE" in description or "WITH CEDILLA" in description
|
||||
return (
|
||||
"WITH GRAVE" in description
|
||||
or "WITH ACUTE" in description
|
||||
or "WITH CEDILLA" in description
|
||||
or "WITH DIAERESIS" in description
|
||||
or "WITH CIRCUMFLEX" in description
|
||||
or "WITH TILDE" in description
|
||||
)
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
@@ -33,12 +46,7 @@ def remove_accent(character: str) -> str:
|
||||
|
||||
codes = decomposed.split(" ") # type: List[str]
|
||||
|
||||
return chr(
|
||||
int(
|
||||
codes[0],
|
||||
16
|
||||
)
|
||||
)
|
||||
return chr(int(codes[0], 16))
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
@@ -64,6 +72,14 @@ def is_latin(character: str) -> bool:
|
||||
return "LATIN" in description
|
||||
|
||||
|
||||
def is_ascii(character: str) -> bool:
|
||||
try:
|
||||
character.encode("ascii")
|
||||
except UnicodeEncodeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_punctuation(character: str) -> bool:
|
||||
character_category = unicodedata.category(character) # type: str
|
||||
@@ -94,9 +110,19 @@ def is_symbol(character: str) -> bool:
|
||||
return "Forms" in character_range
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_emoticon(character: str) -> bool:
|
||||
character_range = unicode_range(character) # type: Optional[str]
|
||||
|
||||
if character_range is None:
|
||||
return False
|
||||
|
||||
return "Emoticons" in character_range
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_separator(character: str) -> bool:
|
||||
if character.isspace() or character in ["|", "+"]:
|
||||
if character.isspace() or character in ["|", "+", ",", ";", "<", ">"]:
|
||||
return True
|
||||
|
||||
character_category = unicodedata.category(character) # type: str
|
||||
@@ -104,12 +130,18 @@ def is_separator(character: str) -> bool:
|
||||
return "Z" in character_category
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_case_variable(character: str) -> bool:
|
||||
return character.islower() != character.isupper()
|
||||
|
||||
|
||||
def is_private_use_only(character: str) -> bool:
|
||||
character_category = unicodedata.category(character) # type: str
|
||||
|
||||
return "Co" == character_category
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_cjk(character: str) -> bool:
|
||||
try:
|
||||
character_name = unicodedata.name(character)
|
||||
@@ -119,6 +151,46 @@ def is_cjk(character: str) -> bool:
|
||||
return "CJK" in character_name
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_hiragana(character: str) -> bool:
|
||||
try:
|
||||
character_name = unicodedata.name(character)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
return "HIRAGANA" in character_name
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_katakana(character: str) -> bool:
|
||||
try:
|
||||
character_name = unicodedata.name(character)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
return "KATAKANA" in character_name
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_hangul(character: str) -> bool:
|
||||
try:
|
||||
character_name = unicodedata.name(character)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
return "HANGUL" in character_name
|
||||
|
||||
|
||||
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
||||
def is_thai(character: str) -> bool:
|
||||
try:
|
||||
character_name = unicodedata.name(character)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
return "THAI" in character_name
|
||||
|
||||
|
||||
@lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
|
||||
def is_unicode_range_secondary(range_name: str) -> bool:
|
||||
for keyword in UNICODE_SECONDARY_RANGE_KEYWORD:
|
||||
@@ -139,14 +211,16 @@ def any_specified_encoding(sequence: bytes, search_zone: int = 4096) -> Optional
|
||||
|
||||
results = findall(
|
||||
RE_POSSIBLE_ENCODING_INDICATION,
|
||||
sequence[:seq_len if seq_len <= search_zone else search_zone].decode('ascii', errors='ignore')
|
||||
sequence[: seq_len if seq_len <= search_zone else search_zone].decode(
|
||||
"ascii", errors="ignore"
|
||||
),
|
||||
) # type: List[str]
|
||||
|
||||
if len(results) == 0:
|
||||
return None
|
||||
|
||||
for specified_encoding in results:
|
||||
specified_encoding = specified_encoding.lower().replace('-', '_')
|
||||
specified_encoding = specified_encoding.lower().replace("-", "_")
|
||||
|
||||
for encoding_alias, encoding_iana in aliases.items():
|
||||
if encoding_alias == specified_encoding:
|
||||
@@ -162,9 +236,19 @@ def is_multi_byte_encoding(name: str) -> bool:
|
||||
"""
|
||||
Verify is a specific encoding is a multi byte one based on it IANA name
|
||||
"""
|
||||
return name in {"utf_8", "utf_8_sig", "utf_16", "utf_16_be", "utf_16_le", "utf_32", "utf_32_le", "utf_32_be", "utf_7"} or issubclass(
|
||||
importlib.import_module('encodings.{}'.format(name)).IncrementalDecoder, # type: ignore
|
||||
MultibyteIncrementalDecoder
|
||||
return name in {
|
||||
"utf_8",
|
||||
"utf_8_sig",
|
||||
"utf_16",
|
||||
"utf_16_be",
|
||||
"utf_16_le",
|
||||
"utf_32",
|
||||
"utf_32_le",
|
||||
"utf_32_be",
|
||||
"utf_7",
|
||||
} or issubclass(
|
||||
importlib.import_module("encodings.{}".format(name)).IncrementalDecoder, # type: ignore
|
||||
MultibyteIncrementalDecoder,
|
||||
)
|
||||
|
||||
|
||||
@@ -191,7 +275,7 @@ def should_strip_sig_or_bom(iana_encoding: str) -> bool:
|
||||
|
||||
|
||||
def iana_name(cp_name: str, strict: bool = True) -> str:
|
||||
cp_name = cp_name.lower().replace('-', '_')
|
||||
cp_name = cp_name.lower().replace("-", "_")
|
||||
|
||||
for encoding_alias, encoding_iana in aliases.items():
|
||||
if cp_name == encoding_alias or cp_name == encoding_iana:
|
||||
@@ -212,9 +296,7 @@ def range_scan(decoded_sequence: str) -> List[str]:
|
||||
if character_range is None:
|
||||
continue
|
||||
|
||||
ranges.add(
|
||||
character_range
|
||||
)
|
||||
ranges.add(character_range)
|
||||
|
||||
return list(ranges)
|
||||
|
||||
@@ -222,10 +304,10 @@ def range_scan(decoded_sequence: str) -> List[str]:
|
||||
def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
|
||||
|
||||
if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
|
||||
return 0.
|
||||
return 0.0
|
||||
|
||||
decoder_a = importlib.import_module('encodings.{}'.format(iana_name_a)).IncrementalDecoder # type: ignore
|
||||
decoder_b = importlib.import_module('encodings.{}'.format(iana_name_b)).IncrementalDecoder # type: ignore
|
||||
decoder_a = importlib.import_module("encodings.{}".format(iana_name_a)).IncrementalDecoder # type: ignore
|
||||
decoder_b = importlib.import_module("encodings.{}".format(iana_name_b)).IncrementalDecoder # type: ignore
|
||||
|
||||
id_a = decoder_a(errors="ignore") # type: IncrementalDecoder
|
||||
id_b = decoder_b(errors="ignore") # type: IncrementalDecoder
|
||||
@@ -245,4 +327,7 @@ def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
|
||||
Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
|
||||
the function cp_similarity.
|
||||
"""
|
||||
return iana_name_a in IANA_SUPPORTED_SIMILAR and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
|
||||
return (
|
||||
iana_name_a in IANA_SUPPORTED_SIMILAR
|
||||
and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
|
||||
)
|
||||
|
||||
+2
-2
@@ -2,5 +2,5 @@
|
||||
Expose version
|
||||
"""
|
||||
|
||||
__version__ = "2.0.3"
|
||||
VERSION = __version__.split('.')
|
||||
__version__ = "2.0.7"
|
||||
VERSION = __version__.split(".")
|
||||
|
||||
Vendored
+1
-1
@@ -3,7 +3,7 @@ attrs==21.2.0
|
||||
cached-property==1.5.2
|
||||
cerberus==1.3.4
|
||||
certifi==2021.5.30
|
||||
charset-normalizer==2.0.3
|
||||
charset-normalizer==2.0.7
|
||||
click-didyoumean==0.0.3
|
||||
click==8.0.3
|
||||
colorama==0.4.4
|
||||
|
||||
Reference in New Issue
Block a user