######################## BEGIN LICENSE BLOCK ########################
# The Original Code is Mozilla Universal charset detector code.
#
# The Initial Developer of the Original Code is
# Netscape Communications Corporation.
# Portions created by the Initial Developer are Copyright (C) 2001
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Mark Pilgrim - port to Python
# Shy Shalom - original C code
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see
# <https://www.gnu.org/licenses/>.
######################### END LICENSE BLOCK #########################
"""
Module containing the UniversalDetector detector class, which is the primary
class a user of ``chardet`` should use.
:author: Mark Pilgrim (initial port to Python)
:author: Shy Shalom (original C code)
:author: Dan Blanchard (major refactoring for 3.0)
:author: Ian Cordasco
"""
import codecs
import logging
import re
from typing import Optional, Union
from .charsetgroupprober import CharSetGroupProber
from .charsetprober import CharSetProber
from .enums import EncodingEra, InputState, LanguageFilter, ProbingState
from .escprober import EscCharSetProber
from .mbcsgroupprober import MBCSGroupProber
from .metadata.charsets import get_charset, is_unicode_encoding
from .resultdict import ResultDict
from .sbcsgroupprober import ISO_WIN_MAP, SBCSGroupProber
from .utf1632prober import UTF1632Prober
[docs]
class UniversalDetector:
"""
The ``UniversalDetector`` class underlies the ``chardet.detect`` function
and coordinates all of the different charset probers.
To get a ``dict`` containing an encoding and its confidence, you can simply
run:
.. code::
u = UniversalDetector()
u.feed(some_bytes)
u.close()
detected = u.result
"""
MINIMUM_THRESHOLD = 0.20
HIGH_BYTE_DETECTOR = re.compile(b"[\x80-\xff]")
ESC_DETECTOR = re.compile(b"(\033|~{)")
# Threshold for "very close" confidence scores where era preference applies
VERY_CLOSE_THRESHOLD = 0.005 # 0.5%
# Map ISO encodings to their Windows equivalents (imported from sbcsgroupprober)
ISO_WIN_MAP = ISO_WIN_MAP
# Based on https://encoding.spec.whatwg.org/#names-and-labels
# Maps legacy encoding names to their modern/superset equivalents.
# Uses Python's canonical codec names (case-insensitive).
LEGACY_MAP = {
"ascii": "Windows-1252", # ASCII is subset of Windows-1252
"euc-kr": "CP949", # EUC-KR extended by CP949 (aka Windows-949)
"iso-8859-1": "Windows-1252", # Latin-1 extended by Windows-1252
"iso-8859-2": "Windows-1250", # Central European
"iso-8859-5": "Windows-1251", # Cyrillic
"iso-8859-6": "Windows-1256", # Arabic
"iso-8859-7": "Windows-1253", # Greek
"iso-8859-8": "Windows-1255", # Hebrew
"iso-8859-9": "Windows-1254", # Turkish
"iso-8859-11": "CP874", # Thai, extended by CP874 (aka Windows-874)
"iso-8859-13": "Windows-1257", # Baltic
"tis-620": "CP874", # Thai, equivalent to Windows-874
}
def __init__(
self,
lang_filter: LanguageFilter = LanguageFilter.ALL,
should_rename_legacy: bool | None = None,
encoding_era: EncodingEra = EncodingEra.MODERN_WEB,
max_bytes: int = 200_000,
) -> None:
self._esc_charset_prober: Optional[EscCharSetProber] = None
self._utf1632_prober: Optional[UTF1632Prober] = None
self._charset_probers: list[CharSetProber] = []
self.result: ResultDict = {
"encoding": None,
"confidence": 0.0,
"language": None,
}
self.done = False
self._got_data = False
self._input_state = InputState.PURE_ASCII
self._last_char = b""
self.lang_filter = lang_filter
self.logger = logging.getLogger(__name__)
if should_rename_legacy is None:
should_rename_legacy = encoding_era == EncodingEra.MODERN_WEB
self.should_rename_legacy = should_rename_legacy
self.encoding_era = encoding_era
self._total_bytes_fed = 0
self.max_bytes = max_bytes
self.reset()
@property
def input_state(self) -> int:
return self._input_state
@property
def has_win_bytes(self) -> bool:
"""Check if Windows-specific bytes were detected by the SBCS prober."""
for prober in self._charset_probers:
if isinstance(prober, SBCSGroupProber):
return prober._has_win_bytes
return False
@property
def charset_probers(self) -> list[CharSetProber]:
return self._charset_probers
@property
def nested_probers(self) -> list[CharSetProber]:
"""Get a flat list of all nested charset probers."""
nested = []
for prober in self._charset_probers:
if isinstance(prober, CharSetGroupProber):
nested.extend(getattr(prober, "probers", []))
else:
nested.append(prober)
return nested
@property
def active_probers(self) -> list[CharSetProber]:
"""Get a flat list of all active (not falsey and not in NOT_ME state) nested charset probers."""
return [prober for prober in self.nested_probers if prober and prober.active]
def _apply_encoding_heuristics(
self, charset_name: str, confidence: float, winning_prober: CharSetProber
) -> tuple[str, float]:
"""
Apply heuristic adjustments to the winning encoding based on:
1. Encoding era preferences (prefer newer/Unicode encodings)
2. Mac/Windows/ISO byte pattern disambiguation
Collects all close-confidence alternatives in a single pass and picks
the best one by era preference and Unicode preference.
Returns: (adjusted_charset_name, adjusted_confidence)
"""
lower_charset_name = charset_name.lower()
winner_charset = get_charset(lower_charset_name)
winner_era = winner_charset.encoding_era.value
winner_is_unicode = is_unicode_encoding(lower_charset_name)
min_confidence = confidence * (1 - self.VERY_CLOSE_THRESHOLD)
# Collect all close-confidence alternatives that would be preferred
best_alt_name = None
best_alt_confidence = confidence
best_alt_era = winner_era
best_alt_is_unicode = winner_is_unicode
for prober in self._charset_probers:
if not prober or not prober.active or prober == winning_prober:
continue
alt_charset_name = (prober.charset_name or "").lower()
if not alt_charset_name:
continue
alt_confidence = prober.get_confidence()
if alt_confidence < min_confidence:
continue
alt_charset = get_charset(alt_charset_name)
alt_era = alt_charset.encoding_era.value
alt_is_unicode = is_unicode_encoding(alt_charset_name)
# Check if this alternative is preferred over the current best
prefer_over_best = False
if alt_era < best_alt_era:
prefer_over_best = True
elif alt_era == best_alt_era and alt_is_unicode and not best_alt_is_unicode:
prefer_over_best = True
if prefer_over_best:
best_alt_name = alt_charset_name
best_alt_confidence = alt_confidence
best_alt_era = alt_era
best_alt_is_unicode = alt_is_unicode
if best_alt_name is not None:
self.logger.debug(
"Era preference: %s (era %s, unicode=%s) preferred over %s",
best_alt_name,
best_alt_era,
best_alt_is_unicode,
charset_name,
)
charset_name = best_alt_name
confidence = best_alt_confidence
return charset_name, confidence
def _get_utf8_prober(self) -> Optional[CharSetProber]:
"""
Get the UTF-8 prober from the charset probers.
Returns None if not found.
"""
for prober in self.nested_probers:
if prober.charset_name and "utf-8" in prober.charset_name.lower():
return prober
return None
[docs]
def reset(self) -> None:
"""
Reset the UniversalDetector and all of its probers back to their
initial states. This is called by ``__init__``, so you only need to
call this directly in between analyses of different documents.
"""
self.result = {"encoding": None, "confidence": 0.0, "language": None}
self.done = False
self._got_data = False
self._input_state = InputState.PURE_ASCII
self._last_char = b""
self._total_bytes_fed = 0
if self._esc_charset_prober:
self._esc_charset_prober.reset()
if self._utf1632_prober:
self._utf1632_prober.reset()
for prober in self._charset_probers:
prober.reset()
[docs]
def feed(self, byte_str: Union[bytes, bytearray]) -> None:
"""
Takes a chunk of a document and feeds it through all of the relevant
charset probers.
After calling ``feed``, you can check the value of the ``done``
attribute to see if you need to continue feeding the
``UniversalDetector`` more data, or if it has made a prediction
(in the ``result`` attribute).
.. note::
You should always call ``close`` when you're done feeding in your
document if ``done`` is not already ``True``.
"""
if self.done:
return
if not byte_str:
return
if not isinstance(byte_str, bytearray):
byte_str = bytearray(byte_str)
# First check for known BOMs, since these are guaranteed to be correct
if not self._got_data:
# If the data starts with BOM, we know it is UTF
if byte_str.startswith(codecs.BOM_UTF8):
# EF BB BF UTF-8 with BOM
self.result = {
"encoding": "UTF-8-SIG",
"confidence": 1.0,
"language": "",
}
elif byte_str.startswith((codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE)):
# FF FE 00 00 UTF-32, little-endian BOM
# 00 00 FE FF UTF-32, big-endian BOM
self.result = {"encoding": "UTF-32", "confidence": 1.0, "language": ""}
elif byte_str.startswith((codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE)):
# FF FE UTF-16, little endian BOM
# FE FF UTF-16, big endian BOM
self.result = {"encoding": "UTF-16", "confidence": 1.0, "language": ""}
else:
# Binary file detection - check for excessive null bytes early
# But UTF-16/32 have null bytes, so check for patterns first
# Check for no-BOM UTF-16/32 patterns (alternating nulls)
# UTF-32LE: XX 00 00 00 pattern (every 4th byte is null)
# UTF-32BE: 00 00 00 XX pattern (first 3 of 4 bytes are null)
# UTF-16LE: XX 00 pattern (every other byte is null in odd positions)
# UTF-16BE: 00 XX pattern (every other byte is null in even positions)
looks_like_utf16_32 = False
# Use larger sample for better pattern detection
sample_size = min(len(byte_str), 200)
if sample_size >= 50:
sample = byte_str[:sample_size]
# Count nulls in even and odd positions (for UTF-16 detection)
even_nulls = sum(
1 for i in range(0, sample_size, 2) if sample[i] == 0
)
odd_nulls = sum(
1 for i in range(1, sample_size, 2) if sample[i] == 0
)
# Check for UTF-32 patterns (more nulls in groups of 4)
# For UTF-32LE: positions 1,2,3 of every 4 bytes might be null
# For UTF-32BE: positions 0,1,2 of every 4 bytes might be null
if sample_size >= 100:
mod1_nulls = sum(
1 for i in range(1, sample_size, 4) if sample[i] == 0
)
mod2_nulls = sum(
1 for i in range(2, sample_size, 4) if sample[i] == 0
)
mod3_nulls = sum(
1 for i in range(3, sample_size, 4) if sample[i] == 0
)
# Strong UTF-32 signal: consistent null pattern in 2+ of the 3 positions
utf32_nulls = [mod1_nulls, mod2_nulls, mod3_nulls]
if sum(n > sample_size // 8 for n in utf32_nulls) >= 2:
looks_like_utf16_32 = True
# UTF-16 detection: significant nulls in even OR odd positions
# Lower threshold: 12% of positions (24 out of 200)
utf16_threshold = sample_size // 16
if even_nulls > utf16_threshold or odd_nulls > utf16_threshold:
looks_like_utf16_32 = True
if not looks_like_utf16_32:
# Sample first 8KB to detect binary files
check_size = min(len(byte_str), 8192)
null_count = byte_str[:check_size].count(0)
if null_count > check_size * 0.1: # >10% null bytes
# Likely a binary file, not text
self.result = {
"encoding": None,
"confidence": 0.0,
"language": "",
}
self.done = True
return
self._got_data = True
if self.result["encoding"] is not None:
self.done = True
return
# If none of those matched and we've only see ASCII so far, check
# for high bytes and escape sequences
if self._input_state == InputState.PURE_ASCII:
if self.HIGH_BYTE_DETECTOR.search(byte_str):
self._input_state = InputState.HIGH_BYTE
elif (
self._input_state == InputState.PURE_ASCII
and self.ESC_DETECTOR.search(self._last_char + byte_str)
):
self._input_state = InputState.ESC_ASCII
self._last_char = byte_str[-1:]
# Track total bytes processed
self._total_bytes_fed += len(byte_str)
# Stop processing after processing enough data
# Don't set done=True here, let close() finalize the result
if self._total_bytes_fed > self.max_bytes:
return
# next we will look to see if it is appears to be either a UTF-16 or
# UTF-32 encoding
if not self._utf1632_prober:
self._utf1632_prober = UTF1632Prober()
if self._utf1632_prober.state == ProbingState.DETECTING:
if self._utf1632_prober.feed(byte_str) == ProbingState.FOUND_IT:
self.result = {
"encoding": self._utf1632_prober.charset_name,
"confidence": self._utf1632_prober.get_confidence(),
"language": "",
}
self.done = True
return
# If we've seen escape sequences, use the EscCharSetProber, which
# uses a simple state machine to check for known escape sequences in
# HZ and ISO-2022 encodings, since those are the only encodings that
# use such sequences.
if self._input_state == InputState.ESC_ASCII:
if not self._esc_charset_prober:
self._esc_charset_prober = EscCharSetProber(self.lang_filter)
if self._esc_charset_prober.feed(byte_str) == ProbingState.FOUND_IT:
self.result = {
"encoding": self._esc_charset_prober.charset_name,
"confidence": self._esc_charset_prober.get_confidence(),
"language": self._esc_charset_prober.language,
}
self.done = True
# If we've seen high bytes (i.e., those with values greater than 127),
# we need to do more complicated checks using all our multi-byte and
# single-byte probers that are left. The single-byte probers
# use character bigram distributions to determine the encoding, whereas
# the multi-byte probers use a combination of character unigram and
# bigram distributions.
elif self._input_state == InputState.HIGH_BYTE:
if not self._charset_probers:
self._charset_probers = [
MBCSGroupProber(
lang_filter=self.lang_filter, encoding_era=self.encoding_era
)
]
# If we're checking non-CJK encodings, use single-byte prober
if self.lang_filter & LanguageFilter.NON_CJK:
self._charset_probers.append(
SBCSGroupProber(
encoding_era=self.encoding_era, lang_filter=self.lang_filter
)
)
for prober in self._charset_probers:
if prober.feed(byte_str) == ProbingState.FOUND_IT:
charset_name = prober.charset_name
# Rename legacy encodings if requested
if self.should_rename_legacy:
charset_name = self.LEGACY_MAP.get(
(charset_name or "").lower(), charset_name
)
self.result = {
"encoding": charset_name,
"confidence": prober.get_confidence(),
"language": prober.language,
}
self.done = True
break
[docs]
def close(self) -> ResultDict:
"""
Stop analyzing the current document and come up with a final
prediction.
:returns: The ``result`` attribute, a ``dict`` with the keys
`encoding`, `confidence`, and `language`.
"""
# Don't bother with checks if we're already done
if self.done:
return self.result
self.done = True
if not self._got_data:
self.logger.debug("no data received!")
# Default to ASCII if it is all we've seen so far
elif self._input_state == InputState.PURE_ASCII:
self.result = {"encoding": "ascii", "confidence": 1.0, "language": ""}
# Check if escape prober found anything
elif self._input_state == InputState.ESC_ASCII:
if self._esc_charset_prober:
charset_name = self._esc_charset_prober.charset_name
if charset_name:
self.result = {
"encoding": charset_name,
"confidence": self._esc_charset_prober.get_confidence(),
"language": self._esc_charset_prober.language,
}
else:
# ESC prober didn't identify a specific encoding
# Since input is pure ASCII + ESC, default to UTF-8
self.result = {
"encoding": "utf-8",
"confidence": 1.0,
"language": "",
}
# If we have seen non-ASCII, return the best that met MINIMUM_THRESHOLD
elif self._input_state == InputState.HIGH_BYTE:
prober_confidence = None
max_prober_confidence = 0.0
max_prober = None
for prober in self._charset_probers:
if not prober:
continue
prober_confidence = prober.get_confidence()
if prober_confidence > max_prober_confidence:
max_prober_confidence = prober_confidence
max_prober = prober
if max_prober and (max_prober_confidence > self.MINIMUM_THRESHOLD):
charset_name = max_prober.charset_name
assert charset_name is not None
lower_charset_name = charset_name.lower()
confidence = max_prober.get_confidence()
# Find the actual winning nested prober (max_prober might be a group prober)
winning_nested_prober = None
for prober in self.nested_probers:
if (
prober
and prober.active
and prober.charset_name
and prober.charset_name.lower() == lower_charset_name
and abs(prober.get_confidence() - confidence) < 0.0001
):
winning_nested_prober = prober
break
# Apply heuristic adjustments in a single pass over active probers
charset_name, confidence = self._apply_encoding_heuristics(
charset_name, confidence, winning_nested_prober or max_prober
)
# Rename legacy encodings with superset encodings if asked
if self.should_rename_legacy:
charset_name = self.LEGACY_MAP.get(
(charset_name or "").lower(), charset_name
)
self.result = {
"encoding": charset_name,
"confidence": confidence,
"language": max_prober.language,
}
else:
# Default to UTF-8 if no encoding met threshold AND UTF-8 prober
# hasn't determined this is NOT UTF-8
# UTF-8 is now the most common encoding on the web and a superset of ASCII
utf8_prober = self._get_utf8_prober()
if utf8_prober and utf8_prober.active:
# UTF-8 prober didn't rule it out, so default to UTF-8
self.result = {
"encoding": utf8_prober.charset_name,
"confidence": utf8_prober.get_confidence(),
"language": utf8_prober.language,
}
else:
# UTF-8 was ruled out, return None
self.result = {
"encoding": None,
"confidence": 0.0,
"language": None,
}
# Log all prober confidences if none met MINIMUM_THRESHOLD
if self.logger.getEffectiveLevel() <= logging.DEBUG:
if self.result["encoding"] is None:
self.logger.debug("no probers hit minimum threshold")
for prober in self.nested_probers:
if not prober:
continue
self.logger.debug(
"%s %s confidence = %s",
prober.charset_name,
prober.language,
prober.get_confidence(),
)
return self.result