diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 070d0a9..5256b5e 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,5 +1,70 @@ -1.0 (unreleased) - 1.0 will include an api overhaul and remove all deprecations +1.0.0 (unreleased) + + IMPORTANT + + This release has many breaking changes. + + Deprecated legacy code was removed. + + Work has been done to make the API more consistent. + + Several long-standing bugs and inconsistencies were fixed. + + + Backwards Incompatible Changes: + + Remove Deprecated Functions: + ``MetadataParser.get_metadata`` + ``MetadataParser.get_metadatas`` + ``MetadataParser.is_opengraph_minimum`` + ``MetadataParser.metadata`` + ``MetadataParser.metadata_encoding`` + ``MetadataParser.metadata_version`` + ``MetadataParser.soup`` + ``ParsedResult.get_metadata`` + + Remove Deprecated Functionality: + ``MetadataParser.__init__::cached_urlparser`` + no longer accepts `int` to control `cached_urlparser_maxitems` + + Encoder changes + affected functions: + ``decode_html`` + ``encode_ascii`` + ``ParsedResult.default_encoder`` + ``ParsedResult.get_metadatas::encoder`` + ``MetadataParser.__init__::default_encoder`` + previously, encoders accepted one argument, which was documented to + be a string. This would cause issues if the elements were DC, as + that storage uses a dict. The new behavior is to send a first + raw input value that could either be a Dict or String, and a + second value that is a string identifiying the storage type. + now they accept two arguments: + Arg 1 is a string or dict + Arg 2 is optional string, identifying the strategy/store + + API Changes + The package was split into namespaces. + ``MetadataParser.__init__`` now validates submitted `strategy` args + + ``MetadataParser.strategy`` now defaults to: `["meta", "page", "og", "dc", "twitter"]` + previously this was: `["og", "dc", "meta", "page", "twitter"]` + + ``ParsedResult.get_metadatas`` will now return a dict or None. + A bug was discovered in which it would return the first matched + elements when there were multiple options + + An invalid strategy will now raise `InvalidStrategy`, a subclass of `ValueError` + + `InvalidDocument` no longer has a .message attribute + + Exceptions now invoke `super().__init__(args)` + + New Functionality + + ```ParsedResult.select_first_match(field, strategy)``` + will return the first match for the given, or default strategy + 0.13.1 diff --git a/README.rst b/README.rst index 668a3bf..03ea75c 100644 --- a/README.rst +++ b/README.rst @@ -7,10 +7,10 @@ Build Status: |build_status| MetadataParser is a Python module for pulling metadata out of web documents. -It requires `BeautifulSoup` for parsing. `Requests` is required for installation -at this time, but not for operation. Additional functionality is automatically -enabled if the `tldextract` project is installed, but can be disabled by -setting an environment variable. +`BeautifulSoup` is required for parsing. +`Requests` is required for fetching remote documents. +`tldextract` is utilized to parse domains, but can be disabled by setting an +environment variable. This project has been used in production for many years, and has successfully parsed billions of documents. @@ -29,7 +29,8 @@ For example: * if the current release is: `0.10.6` * the advised pin is: `metadata_parser<0.11` -PATCH releases will usually be bug fixes and new features that support backwards compatibility with Public Methods. Private Methods are not guaranteed to be +PATCH releases will usually be bug fixes and new features that support backwards +compatibility with Public Methods. Private Methods are not guaranteed to be backwards compatible. MINOR releases are triggered when there is a breaking change to Public Methods. @@ -37,12 +38,10 @@ Once a new MINOR release is triggered, first-party support for the previous MINO release is EOL (end of life). PRs for previous releases are welcome, but giving them proper attention is not guaranteed. -The current MAJOR release is `0`. -A `1` MAJOR release is planned, and will have an entirely different structure and API. - Future deprecations will raise warnings. By populating the following environment variable, future deprecations will raise exceptions: + export METADATA_PARSER_FUTURE=1 Installation @@ -74,7 +73,7 @@ Features Logging ======= -This file has extensive logging to help developers pinpoint problems. +This file utilizes extensive logging to help developers pinpoint problems. * ``log.debug`` This log level is mostly used to handle library maintenance and @@ -109,7 +108,8 @@ Optional Integrations * ``tldextract`` This package will attempt to use the package ``tldextract`` for advanced domain - and hostname analysis. If ``tldextract`` is not found, a fallback is used. + and hostname analysis. If ``tldextract`` is not wanted, it can be disabled + with an environment variable. Environment Variables @@ -132,7 +132,7 @@ Notes 1. This package requires BeautifulSoup 4. 2. For speed, it will instantiate a BeautifulSoup parser with lxml, and - fallback to 'none' (the internal pure Python) if it can't load lxml. + fallback to 'None' (the internal pure Python) if it can not load lxml. 3. URL Validation is not RFC compliant, but tries to be "Real World" compliant. It is HIGHLY recommended that you install lxml for usage. @@ -145,7 +145,7 @@ Using at least the most recent 3.x versions is strongly recommended The default 'strategy' is to look in this order:: - og,dc,meta,page + meta,page,og,dc, Which stands for the following:: @@ -239,12 +239,12 @@ is extracted from the metadata payload:: >>> import metadata_parser >>> page = metadata_parser.MetadataParser(url="http://www.example.com") - >>> print page.get_metadata_link('image') + >>> print(page.get_metadata_link('image')) This method accepts a kwarg ``allow_encoded_uri`` (default False) which will return the image without further processing:: - >>> print page.get_metadata_link('image', allow_encoded_uri=True) + >>> print(page.get_metadata_link('image', allow_encoded_uri=True)) Similarly, if a url is local:: @@ -252,14 +252,14 @@ Similarly, if a url is local:: The ``get_metadata_link`` method will automatically upgrade it onto the domain:: - >>> print page.get_metadata_link('image') + >>> print(page.get_metadata_link('image')) http://example.com/image.jpg Poorly Constructed Canonical URLs --------------------------------- -Many website publishers implement canonical URLs incorrectly. This package -tries to fix that. +Many website publishers implement canonical URLs incorrectly. +This package tries to fix that. By default ``MetadataParser`` is constructed with ``require_public_netloc=True`` and ``allow_localhosts=True``. @@ -298,17 +298,17 @@ improper canonical url, and remount the local part "/alt-path/to/foo" onto the domain that served the file. The vast majority of times this 'behavior' has been encountered, this is the intended canonical:: - print page.get_discrete_url() + print(page.get_discrete_url()) >>> http://example.com/alt-path/to/foo In contrast, versions 0.8.3 and earlier will not catch this situation:: - print page.get_discrete_url() + print(page.get_discrete_url()) >>> http://localhost:8000/alt-path/to/foo In order to preserve the earlier behavior, just submit ``require_public_global=False``:: - print page.get_discrete_url(require_public_global=False) + print(page.get_discrete_url(require_public_global=False)) >>> http://localhost:8000/alt-path/to/foo @@ -340,43 +340,7 @@ content, not just templates/Site-Operators. WARNING ============= -1.0 will be a complete API overhaul. pin your releases to avoid sadness. - - -Version 0.9.19 Breaking Changes -=============================== - -Issue #12 exposed some flaws in the existing package - -1. ``MetadataParser.get_metadatas`` replaces ``MetadataParser.get_metadata`` ----------------------------------------------------------------------------- - -Until version 0.9.19, the recommended way to get metadata was to use -``get_metadata`` which will either return a string (or None). - -Starting with version 0.9.19, the recommended way to get metadata is to use -``get_metadatas`` which will always return a list (or None). - -This change was made because the library incorrectly stored a single metadata -key value when there were duplicates. - -2. The ``ParsedResult`` payload stores mixed content and tracks it's version -==-------------------------------------------------------------------------- - -Many users (including the maintainer) archive the parsed metadata. After -testing a variety of payloads with an all-list format and a mixed format -(string or list), a mixed format had a much smaller payload size with a -negligible performance hit. A new ``_v`` attribute tracks the payload version. -In the future, payloads without a ``_v`` attribute will be interpreted as the -pre-versioning format. - -3. ``DublinCore`` payloads might be a dict ------------------------------------------- - -Tests were added to handle dublincore data. An extra attribute may be needed to -properly represent the payload, so always returning a dict with at least a -name+content (and possibly ``lang`` or ``scheme`` is the best approach. - +Please pin your releases. Usage @@ -389,19 +353,19 @@ Until version ``0.9.19``, the recommended way to get metadata was to use >>> import metadata_parser >>> page = metadata_parser.MetadataParser(url="http://www.example.com") - >>> print page.metadata - >>> print page.get_metadatas('title') - >>> print page.get_metadatas('title', strategy=['og',]) - >>> print page.get_metadatas('title', strategy=['page', 'og', 'dc',]) + >>> print(page.metadata) + >>> print(page.get_metadatas('title')) + >>> print(page.get_metadatas('title', strategy=['og',])) + >>> print(page.get_metadatas('title', strategy=['page', 'og', 'dc',])) **From HTML**:: >>> HTML = """""" >>> page = metadata_parser.MetadataParser(html=HTML) - >>> print page.metadata - >>> print page.get_metadatas('title') - >>> print page.get_metadatas('title', strategy=['og',]) - >>> print page.get_metadatas('title', strategy=['page', 'og', 'dc',]) + >>> print(page.metadata) + >>> print(page.get_metadatas('title')) + >>> print(page.get_metadatas('title', strategy=['og',])) + >>> print(page.get_metadatas('title', strategy=['page', 'og', 'dc',])) Malformed Data @@ -428,4 +392,4 @@ when building on Python3, a ``static`` toplevel directory may be needed This library was originally based on Erik River's `opengraph module `_. Something more -aggressive than Erik's module was needed, so this project was started. \ No newline at end of file +aggressive than Erik's module was needed, so this project was started. diff --git a/TODO.txt b/TODO.txt new file mode 100644 index 0000000..330769d --- /dev/null +++ b/TODO.txt @@ -0,0 +1,5 @@ +1.0.0 + tests needed for: + select_first_strategy + try to break it + select different strategies, different data on each \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8648517..68b125b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.black] line-length = 88 -target-version = ['py36'] +target-version = ['py37'] exclude = ''' ( /( diff --git a/pytest.ini b/pytest.ini index db7cd78..6859fae 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,3 @@ [pytest] filterwarnings = - ignore:MetadataParser. - ignore:`ParsedResult.get_metadata` returns a string diff --git a/setup.cfg b/setup.cfg index f65ddae..0fa3306 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,13 +1,16 @@ [flake8] +application_import_names = metadata_parser +import_order_style = appnexus +exclude = .eggs/*, .pytest_cache/*, .tox/*, build/*, dist/*, workspace-demos/* +max_line_length = 88 + # ignore = E402,E501,W503 # E501: line too long # F401: imported but unused # I202: Additional newline in a group of imports per-file-ignores = - setup.py: E501 - src/metadata_parser/__init__.py: E501,I202 + setup.py: + src/metadata_parser/__init__.py: E501 + src/metadata_parser/regex.py: E501 tests/*: E501 tests/_compat.py: F401 -exclude = .eggs/*, .pytest_cache/*, .tox/*, build/*, dist/*, workspace-demos/* -application_import_names = metadata_parser -import_order_style = appnexus diff --git a/setup.py b/setup.py index 3c48d58..6e2d726 100644 --- a/setup.py +++ b/setup.py @@ -32,8 +32,6 @@ "requests-toolbelt>=0.8.0", "typing_extensions", ] -if sys.version_info.major == 2: - requires.append("backports.html") if sys.version_info >= (3, 13): requires.append("legacy-cgi") diff --git a/src/metadata_parser/__init__.py b/src/metadata_parser/__init__.py index d5cdb71..3ab6ee2 100644 --- a/src/metadata_parser/__init__.py +++ b/src/metadata_parser/__init__.py @@ -1,15 +1,6 @@ -import _socket # noqa: I100,I201 # peername hack, see below - # stdlib -import cgi # noqa: I100,I201 import collections -import datetime -from html import unescape as html_unescape import logging -import os -import re -import socket # peername hack, see below -import typing from typing import Any from typing import Callable from typing import Dict @@ -19,194 +10,84 @@ from typing import Tuple from typing import TYPE_CHECKING from typing import Union -import unicodedata from urllib.parse import _ResultMixinStr # what happens if you decode from urllib.parse import ParseResult from urllib.parse import ParseResultBytes -from urllib.parse import quote as url_quote -from urllib.parse import unquote as url_unquote from urllib.parse import urlparse -from urllib.parse import urlunparse -import warnings # pypi from bs4 import BeautifulSoup import requests from requests.structures import CaseInsensitiveDict -from requests_toolbelt.utils.deprecated import get_encodings_from_content from typing_extensions import Literal # py38 -from typing_extensions import Protocol # py38 + +# local +from . import config +from .exceptions import InvalidDocument +from .exceptions import InvalidStrategy +from .exceptions import NotParsable +from .exceptions import NotParsableFetchError +from .exceptions import NotParsableJson +from .exceptions import NotParsableRedirect +from .exceptions import RedirectDetected +from .regex import RE_ALL_NUMERIC +from .regex import RE_canonical +from .regex import RE_doctype +from .regex import RE_DOMAIN_NAME +from .regex import RE_IPV4_ADDRESS +from .regex import RE_prefix_opengraph +from .regex import RE_prefix_rel_img_src +from .regex import RE_prefix_twitter +from .regex import RE_rfc3986_valid_characters +from .regex import RE_shortlink +from .regex import RE_VALID_NETLOC +from .regex import RE_whitespace +from .requests_extensions import derive_encoding__hook +from .requests_extensions import get_response_peername +from .requests_extensions import response_peername__hook +from .typing import _UrlParserCacheable +from .utils import DummyResponse +from .utils import fix_unicode_url +from .utils import warn_user if TYPE_CHECKING: from bs4 import Tag as _bs4_Tag + from .typing import TYPE_ENCODER + from .typing import TYPE_REQUESTS_TIMEOUT + from .typing import TYPE_URL_FETCH + from .typing import TYPE_URLPARSE + from .typing import TYPES_PEERNAME + from .typing import TYPES_RESPONSE + from .typing import TYPES_STRATEGY if __debug__: # only used for testing. turn off in most production env with -o flags import pprint # noqa: F401 -FUTURE_BEHAVIOR = bool(int(os.getenv("METADATA_PARSER_FUTURE", "0"))) # ============================================================================== -__VERSION__ = "0.13.1" - - -# ------------------------------------------------------------------------------ - - -log = logging.getLogger(__name__) - - -def warn_future(message: str) -> None: - warnings.warn(message, FutureWarning, stacklevel=2) - if FUTURE_BEHAVIOR: - raise ValueError(message) - - -def warn_user(message: str) -> None: - warnings.warn(message, UserWarning, stacklevel=2) +__VERSION__ = "1.0.0dev" # ------------------------------------------------------------------------------ -# defaults -DUMMY_URL = os.environ.get( - "METADATA_PARSER__DUMMY_URL", "http://example.com/index.html" -) -ENCODING_FALLBACK = os.environ.get("METADATA_PARSER__ENCODING_FALLBACK", "ISO-8859-1") -TESTING = bool(int(os.environ.get("METADATA_PARSER__TESTING", "0"))) - -""" -# currently unused -MAX_CONNECTIONTIME = int( - os.environ.get("METADATA_PARSER__MAX_CONNECTIONTIME", 20) -) # in seconds -MAX_FILESIZE = int( - os.environ.get("METADATA_PARSER__MAX_FILESIZE", 2 ** 19) -) # bytes; this is .5MB -""" +log = logging.getLogger("metdata_parser") -TYPES_RESPONSE = Union["DummyResponse", requests.Response] -TYPES_PEERNAME = Tuple[str, int] # (ip, port) -TYPE_URL_FETCH = Tuple[str, str, "ResponseHistory"] -TYPE_REQUESTS_TIMEOUT = Optional[ - Union[int, float, Tuple[int, int], Tuple[float, float]] -] # ------------------------------------------------------------------------------ -_DISABLE_TLDEXTRACT = bool( - int(os.environ.get("METADATA_PARSER__DISABLE_TLDEXTRACT", "0")) -) -USE_TLDEXTRACT = None -if not _DISABLE_TLDEXTRACT: - try: - import tldextract - - USE_TLDEXTRACT = True - except ImportError: - log.info( - "tldextract is not available on this system. " - "medatadata_parser recommends installing tldextract" - ) - USE_TLDEXTRACT = False -# ------------------------------------------------------------------------------ +USE_TLDEXTRACT = False +if not config.DISABLE_TLDEXTRACT: + import tldextract - -# peername hacks -# only use for these stdlib packages -# eventually will not be needed thanks to upstream changes in `requests` -try: - _compatible_sockets: Tuple = ( - _socket.socket, - socket._socketobject, # type: ignore[attr-defined] - ) -except AttributeError: - _compatible_sockets: Tuple = (_socket.socket,) # type: ignore[no-redef] + USE_TLDEXTRACT = True # ------------------------------------------------------------------------------ -# regex library - -RE_ALL_NUMERIC = re.compile(r"^[\d\.]+$") -RE_bad_title = re.compile( - r"""(?:|<title>)(.*)(?:<?/title>|(?:<)?/title>)""", re.I -) -RE_canonical = re.compile("^canonical$", re.I) -RE_doctype = re.compile(r"^\s*<!DOCTYPE[^>]*>", re.IGNORECASE) -RE_DOMAIN_NAME = re.compile( - r"""(^ - (?: - [A-Z0-9] - (?: - [A-Z0-9-]{0,61} - [A-Z0-9] - )? - \. - )+ - (?: - [A-Z]{2,6}\.? - | - [A-Z0-9-]{2,} - (?<!-)\.?) - $)""", - re.VERBOSE | re.IGNORECASE, -) -RE_IPV4_ADDRESS = re.compile( - r"^(\d{1,3})\.(\d{1,3}).(\d{1,3}).(\d{1,3})$" # grab 4 octets -) -RE_PORT = re.compile(r"^" r"(?P<main>.+)" r":" r"(?P<port>\d+)" r"$", re.IGNORECASE) -RE_prefix_opengraph = re.compile(r"^og") -RE_prefix_rel_img_src = re.compile("^image_src$", re.I) -RE_prefix_twitter = re.compile(r"^twitter") - -# we may need to test general validity of url components -RE_rfc3986_valid_characters = re.compile( - r"""^[a-z0-9\-\.\_\~\:\/\?\#\[\]\@\!\$\&\'\(\)\*\+\,\;\=\%]+$""", re.I -) -r""" -What is valid in the RFC? - # don't need escaping - rfc3986_unreserved__noescape = ['a-z', '0-9', ] - - # do need escaping - rfc3986_unreserved__escape = ['-', '.', '_', '~', ] - rfc3986_gen_delims__escape = [":", "/", "?", "#", "[", "]", "@", ] - rfc3986_sub_delims__escape = ["!", "$", "&", "'", "(", ")", "*", "+", ",", ";", "=", ] - rfc3986_pct_encoded__escape = ["%", ] - rfc3986__escape = rfc3986_unreserved__escape + rfc3986_gen_delims__escape + rfc3986_sub_delims__escape + rfc3986_pct_encoded__escape - rfc3986__escaped = re.escape(''.join(rfc3986__escape)) - rfc3986_chars = ''.join(rfc3986_unreserved__noescape) + rfc3986__escaped - print rfc3986_chars - - a-z0-9\-\.\_\~\:\/\?\#\[\]\@\!\$\&\'\(\)\*\+\,\;\=\% -""" - -RE_shortlink = re.compile("^shortlink$", re.I) -RE_whitespace = re.compile(r"\s+") - -# based on DJANGO -# https://github.com/django/django/blob/master/django/core/validators.py -# not testing ipv6 right now, because rules are needed for ensuring they -# are correct -RE_VALID_NETLOC = re.compile( - r"(?:" - r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" - r"|" # ...or ipv4 - # r'(?P<ipv6>\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 - # r'|' - r"(?P<localhost>localhost)" # localhost... - r"|" - r"(?P<domain>([A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}(?<!-)\.?))" # domain... - r"(?P<port>:\d+)?" # optional port - r")", - re.IGNORECASE, -) - -# ------------------------------------------------------------------------------ # globals library @@ -261,183 +142,14 @@ def warn_user(message: str) -> None: ) -# ------------------------------------------------------------------------------ - - -def encode_ascii(text: str) -> str: - """ - helper function to force ascii; some edge-cases have unicode line breaks in titles/etc. - """ - if not text: - text = "" - _as_bytes = unicodedata.normalize("NFKD", text).encode("ascii", "ignore") - _as_str = _as_bytes.decode("utf-8", "ignore") - return _as_str - - -def decode_html(text: str) -> str: - """ - helper function to decode text that has both HTML and non-ascii characters - """ - text = encode_ascii(html_unescape(text)) - return text - +STRATEGY_ALL = ["meta", "page", "og", "dc", "twitter"] # ------------------------------------------------------------------------------ -def get_encoding_from_headers(headers: CaseInsensitiveDict) -> Optional[str]: - """ - Returns encodings from given HTTP Header Dict. - - :param headers: dictionary to extract encoding from. - :rtype: str - - `requests.get("http://example.com").headers` should be `requests.structures.CaseInsensitiveDict` - - ---------------------------------------------------------------------------- - - Modified from `requests` version 2.x - - The Requests Library: - - Copyright 2017 Kenneth Reitz - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - """ - content_type = headers.get("content-type") - if not content_type: - return None - content_type, params = cgi.parse_header(content_type) - if "charset" in params: - return params["charset"].strip("'\"") - return None - - # ------------------------------------------------------------------------------ -def get_response_peername(resp: TYPES_RESPONSE) -> Optional[TYPES_PEERNAME]: - """ - used to get the peername (ip+port) data from the request - if a socket is found, caches this onto the request object - - IMPORTANT. this must happen BEFORE any content is consumed. - - `response` is really `requests.models.Response` - - This will UPGRADE the response object to have the following attribute: - - * _mp_peername - """ - if not isinstance(resp, requests.Response) and not isinstance(resp, DummyResponse): - # raise AllowableError("Not a HTTPResponse") - log.debug("Not a supported HTTPResponse | %s", resp) - log.debug("-> received a type of: %s", type(resp)) - return None - - if hasattr(resp, "_mp_peername"): - return resp._mp_peername - - def _get_socket() -> Optional[socket.socket]: - if isinstance(resp, DummyResponse): - return None - i = 0 - while True: - i += 1 - try: - if i == 1: - sock = resp.raw._connection.sock # type: ignore[union-attr] - elif i == 2: - sock = resp.raw._connection.sock.socket # type: ignore[union-attr] - elif i == 3: - sock = resp.raw._fp.fp._sock # type: ignore[union-attr] - elif i == 4: - sock = resp.raw._fp.fp._sock.socket # type: ignore[union-attr] - elif i == 5: - sock = resp.raw._fp.fp.raw._sock # type: ignore[union-attr] - else: - break - if not isinstance(sock, _compatible_sockets): - raise AllowableError() - return sock - except Exception: - pass - return None - - sock = _get_socket() - if sock: - # only cache if we have a sock - # we may want/need to call again - resp._mp_peername = sock.getpeername() # type: ignore [union-attr] - else: - resp._mp_peername = None # type: ignore [union-attr] - return resp._mp_peername # type: ignore [union-attr] - - -# ------------------------------------------------------------------------------ - - -def response_peername__hook(resp: TYPES_RESPONSE, *args, **kwargs) -> None: - get_response_peername(resp) - # do not return anything - - -def safe_sample(source: Union[str, bytes]) -> bytes: - if isinstance(source, bytes): - _sample = source[:1024] - else: - # this block can cause an error on PY3 depending on where the data came - # from such as what the source is (from a request vs a document/test) - # thanks, @keyz182 for the PR/investigation https://github.com/jvanasco/metadata_parser/pull/16 - _sample = (source.encode())[:1024] - return _sample - - -def derive_encoding__hook(resp: TYPES_RESPONSE, *args, **kwargs) -> None: - """ - a note about `requests` - - `response.content` is the raw response bytes - `response.text` is `response.content` decoded to the identified codec or - the fallback codec. - - This fallback codec is normally iso-8859-1 (latin-1) which is defined by the - RFC for HTTP as the default when no codec is provided in the headers or - body. This hook exists because users in certain regions may expect the - servers to not follow RFC and for the default encoding to be different. - """ - if TYPE_CHECKING: - assert hasattr(resp, "_encoding_fallback") - assert hasattr(resp, "_encoding_content") - assert hasattr(resp, "_encoding_headers") - - resp._encoding_fallback = ENCODING_FALLBACK - # modified version, returns `None` if no charset available - resp._encoding_headers = get_encoding_from_headers(resp.headers) - resp._encoding_content = None - if not resp._encoding_headers and resp.content: - # html5 spec requires a meta-charset in the first 1024 bytes - _sample = safe_sample(resp.content) - resp._encoding_content = get_encodings_from_content(_sample) - if resp._encoding_content: - # it's a list - resp.encoding = resp._encoding_content[0] - else: - resp.encoding = resp._encoding_headers or resp._encoding_fallback - # do not return anything - - # ------------------------------------------------------------------------------ @@ -585,6 +297,33 @@ def is_parsed_valid_relative(parsed: ParseResult) -> bool: return False +def is_url_valid( + url: str, + require_public_netloc: Optional[bool] = None, + allow_localhosts: Optional[bool] = None, + urlparser: "TYPE_URLPARSE" = urlparse, +) -> Union[Literal[False], ParseResult]: + """ + tries to parse a url. if valid returns `ParseResult` + (boolean eval is True); if invalid returns `False` + kwargs: + `require_public_netloc` - + `allow_localhosts` - + `urlparser` - defaults to standard `urlparse`, can be substituted with + a cacheable version. + """ + if url is None: + return False + parsed = urlparser(url) + if is_parsed_valid_url( + parsed, + require_public_netloc=require_public_netloc, + allow_localhosts=allow_localhosts, + ): + return parsed + return False + + def parsed_to_relative( parsed: ParseResult, parsed_fallback: Optional[ParseResult] = None, @@ -613,77 +352,12 @@ def parsed_to_relative( return _path -def fix_unicode_url( - url: str, - encoding: Optional[str] = None, - urlparser: Callable[[str], ParseResult] = urlparse, -) -> str: - """ - some cms systems will put unicode in their canonical url - this is not allowed by rfc. - currently this function will update the PATH but not the kwargs. - perhaps it should. - rfc3986 says that characters should be put into utf8 then percent encoded - - kwargs: - `encoding` - used for python2 encoding - `urlparser` - defaults to standard `urlparse`, can be substituted with - a cacheable version. - """ - parsed = urlparser(url) - if parsed.path in ("", "/"): - # can't do anything - return url - if RE_rfc3986_valid_characters.match(parsed.path): - # again, can't do anything - return url - # okay, we know we have bad items in the path, so try and upgrade! - # turn the namedtuple from urlparse into something we can edit - candidate = [i for i in parsed] - for _idx in [2]: # 2=path, 3=params, 4=queryparams, 5fragment - try: - candidate[_idx] = parsed[_idx] - candidate[_idx] = url_quote(url_unquote(candidate[_idx])) - except Exception as exc: - log.debug("fix_unicode_url failure: %s | %s | %s", url, encoding, exc) - return url - _url = urlunparse(candidate) - return _url - - -def is_url_valid( - url: str, - require_public_netloc: Optional[bool] = None, - allow_localhosts: Optional[bool] = None, - urlparser: Callable[[str], ParseResult] = urlparse, -) -> Union[Literal[False], ParseResult]: - """ - tries to parse a url. if valid returns `ParseResult` - (boolean eval is True); if invalid returns `False` - kwargs: - `require_public_netloc` - - `allow_localhosts` - - `urlparser` - defaults to standard `urlparse`, can be substituted with - a cacheable version. - """ - if url is None: - return False - parsed = urlparser(url) - if is_parsed_valid_url( - parsed, - require_public_netloc=require_public_netloc, - allow_localhosts=allow_localhosts, - ): - return parsed - return False - - def url_to_absolute_url( url_test: Optional[str], url_fallback: Optional[str] = None, require_public_netloc: Optional[bool] = None, allow_localhosts: Optional[bool] = None, - urlparser: Callable[[str], ParseResult] = urlparse, + urlparser: "TYPE_URLPARSE" = urlparse, ) -> Optional[str]: """ returns an "absolute url" if we have one. @@ -799,153 +473,35 @@ def url_to_absolute_url( return rval -# ------------------------------------------------------------------------------ - - -class InvalidDocument(Exception): - message: str - - def __init__(self, message: str = ""): - self.message = message - - def __str__(self) -> str: - return "InvalidDocument: %s" % (self.message) - - -class NotParsable(Exception): - raised: Optional[requests.exceptions.RequestException] - code: Optional[int] - metadataParser: Optional["MetadataParser"] - response: Optional[TYPES_RESPONSE] - - def __init__( - self, - message: str = "", - raised: Optional[requests.exceptions.RequestException] = None, - code: Optional[int] = None, - metadataParser: Optional["MetadataParser"] = None, - response: Optional[TYPES_RESPONSE] = None, - ): - self.message = message - self.raised = raised - self.code = code - self.metadataParser = metadataParser - self.response = response - - def __str__(self) -> str: - return "NotParsable: %s | %s | %s" % (self.message, self.code, self.raised) - - -class NotParsableJson(NotParsable): - def __str__(self) -> str: - return "NotParsableJson: %s | %s | %s" % (self.message, self.code, self.raised) - - -class NotParsableRedirect(NotParsable): - """Raised if a redirect is detected, but there is no Location header.""" - - def __str__(self) -> str: - return "NotParsableRedirect: %s | %s | %s" % ( - self.message, - self.code, - self.raised, - ) - - -class NotParsableFetchError(NotParsable): - def __str__(self) -> str: - return "NotParsableFetchError: %s | %s | %s" % ( - self.message, - self.code, - self.raised, - ) - - -class AllowableError(Exception): - pass - - -class RedirectDetected(Exception): - """ - Raised if a redirect is detected - Instance properties: - - ``location``: redirect location - ``code``: status code of the response - ``response``: actual response object - """ - - location: str - code: Optional[int] - response: Optional[TYPES_RESPONSE] - metadataParser: Optional["MetadataParser"] - - def __init__( - self, - location: str = "", - code: Optional[int] = None, - response: Optional[TYPES_RESPONSE] = None, - metadataParser: Optional["MetadataParser"] = None, - ): - self.location = location - self.code = code - self.response = response - self.metadataParser = metadataParser - - -# ------------------------------------------------------------------------------ - - -class DummyResponse(object): +def validate_strategy( + strategy: "TYPES_STRATEGY", +) -> List[str]: """ - A DummyResponse is used to ensure compatibility between url fetching - and html data + Used by `MetadataParser.__init__` to validate a strategy on set + Used by ParsedResult to """ - - text: str - url: str - status_code: int - encoding: str - elapsed_seconds: float = 0 - history: List - headers: CaseInsensitiveDict - content: Optional[Union[str, bytes]] = None - default_encoding: str - - def __init__( - self, - text: str = "", - url: str = "", - status_code: int = 200, - encoding: Optional[str] = None, - elapsed_seconds: float = 0, - headers: Optional[CaseInsensitiveDict] = None, - content: Optional[typing.AnyStr] = None, - derive_encoding: Optional[bool] = None, - default_encoding: Optional[str] = None, - ): - self.text = text - self.url = url - self.status_code = status_code - self.elapsed = datetime.timedelta(0, elapsed_seconds) - self.headers = headers if headers is not None else CaseInsensitiveDict() - self.history = [] - self.content = content - - # start `encoding` block - if encoding: - self.encoding = encoding - elif derive_encoding: - # only examine first 1024 bytes. in this case chars. utf could be 4x chars - _sample = safe_sample(text) - encodings = get_encodings_from_content(_sample) - if encodings: - self.encoding = encoding = encodings[0] - self.default_encoding = default_encoding or ENCODING_FALLBACK - # second phase cleanup - if not encoding: - self.encoding = self.default_encoding - # end `encoding` block + if not strategy: + raise InvalidStrategy("Missing `strategy`") + if isinstance(strategy, str): + if strategy != "all": + raise InvalidStrategy('If `strategy` is not a `list`, it must be "all".') + strategy = STRATEGY_ALL.copy() + elif isinstance(strategy, list): + _msgs = [] + _invalids = [] + for _candidate in strategy: + if _candidate == "all": + _msgs.append('Submit "all" as a `str`, not in a `list`.') + continue + if _candidate not in STRATEGY_ALL: + _invalids.append(_candidate) + if _invalids: + _msgs.append( + "Invalid strategy: %s." % ", ".join(['"%s"' % i for i in _invalids]) + ) + if _msgs: + raise InvalidStrategy(" ".join(_msgs)) + return strategy # ------------------------------------------------------------------------------ @@ -954,7 +510,7 @@ def __init__( class ResponseHistory(object): history: Optional[Iterable] = None - def __init__(self, resp: TYPES_RESPONSE): + def __init__(self, resp: "TYPES_RESPONSE"): """ :param resp: A :class:`requests.Response` object to compute history of :type resp: class:`requests.Response` @@ -988,10 +544,6 @@ def log( ) -class _UrlParserCacheable(Protocol): - urlparse: Callable[[str], ParseResult] - - class UrlParserCacheable(_UrlParserCacheable): """ class for caching calls to urlparse @@ -1001,12 +553,12 @@ class for caching calls to urlparse cache: collections.OrderedDict maxitems: int - urlparser: Callable[[str], ParseResult] + urlparser: "TYPE_URLPARSE" def __init__( self, maxitems: int = 30, - urlparser: Callable[[str], ParseResult] = urlparse, + urlparser: "TYPE_URLPARSE" = urlparse, ): """ :param maxitems: maximum items to cache, default 30 @@ -1042,24 +594,26 @@ class ParsedResult(object): readme/docs are not necessarily installed locally. """ + _og_minimum_requirements: List = ["title", "type", "image", "url"] + _version: int = 1 # version tracking + + # unused + # _get_metadata__last_strategy: Optional[str] = None + # twitter_sections: List = ["card", "title", "site", "description"] + + default_encoder: Optional["TYPE_ENCODER"] = None metadata: Dict - soup: Optional[BeautifulSoup] = None response_history: Optional[ResponseHistory] = ( None # only stashing `ResponseHistory` if we have it ) - _version: int = 1 # version tracking - default_encoder: Optional[Callable[[str], str]] = None - og_minimum_requirements: List = ["title", "type", "image", "url"] - twitter_sections: List = ["card", "title", "site", "description"] - strategy: Union[List[str], str] = ["og", "dc", "meta", "page", "twitter"] - - _get_metadata__last_strategy: Optional[str] = None + soup: Optional[BeautifulSoup] = None + strategy: Union[List[str], str] = STRATEGY_ALL def __init__(self): self.metadata = { - "og": {}, - "meta": {}, "dc": {}, + "meta": {}, + "og": {}, "page": {}, "twitter": {}, "_internal": {}, @@ -1107,149 +661,34 @@ def _add_discovered( def _coerce_validate_strategy( self, - strategy: Union[List[str], str, None] = None, - ) -> Union[List, str]: - """normalize a strategy into a valid option""" + strategy: "TYPES_STRATEGY" = None, + ) -> List[str]: + """normalize a strategy into a list of valid options.""" + _strategy = None if strategy: - if isinstance(strategy, str): - if strategy != "all": - raise ValueError("If `strategy` is not a `list`, it must be 'all'.") - elif isinstance(strategy, list): - _invalids = [] - for _candidate in strategy: - if _candidate not in self.strategy: - _invalids.append(_candidate) - if "all" in strategy: - raise ValueError('Submit "all" as a `str`, not in a `list`.') - if _invalids: - raise ValueError("invalid strategy: %s" % _invalids) + if strategy == self.strategy: + _strategy = self.strategy + else: + _strategy = validate_strategy(strategy) else: # use our default list - strategy = self.strategy - return strategy - - def get_metadata( - self, - field: str, - strategy: Union[list, str, None] = None, - encoder: Optional[Callable[[str], str]] = None, - ) -> Union[str, Dict[str, Union[str, Dict]], None]: - """ - LEGACY. DEPRECATED. DO NOT USE THIS. - - `get_metadata` - looks for the field in various stores. defaults to the core - strategy, though you may specify a certain item. if you search for - 'all' it will return a dict of all values. - - This is a legacy method and is being deprecated in favor of `get_metadatas` - This method will always return a string for the field value, however it - is possible the field contains multiple elements or even a dict if the - source was dublincore. - - In comparison, `get_metadatas` will always return a list for the values. - - In the case of DC/DublinCore metadata, this will return the first 'simple' - pairing (key/value - without a scheme/language) or the first element if no - simple match exists. - - This function will return different types depending on the input: - - if `strategy` is a single type: - `str` or `None` - - if `strategy` is a list: - `str` or `None`, with `str` being the first match - self._get_metadata__last_strategy will persist the matching strategy - - if `strategy` is "all": - `dict` of {strategy: result} - - :param field: - The field to retrieve - :type field: str - - :param strategy: - Where to search for the metadata. such as 'all' or - iterable like ['og', 'dc', 'meta', 'page', 'twitter', ] - :type strategy: string or list - - :param encoder: - a function, such as `encode_ascii`, to encode values before returning. - a valid `encoder` accepts one(1) arg. - if a `default_encoder` is registered, the string "raw" will disable it. - :type encoder: - function or "raw" - """ - warn_future( - """`ParsedResult.get_metadata` returns a string and is deprecated """ - """in favor of `get_metadatas` which returns a list. """ - """This will be removed in the next minor or major release.""" - ) - strategy = self._coerce_validate_strategy(strategy) - self._get_metadata__last_strategy = None - - if encoder is None: - encoder = self.default_encoder - elif encoder == "raw": - encoder = None - - def _lookup(store: str) -> Optional[Union[str, Dict]]: - if field in self.metadata[store]: - val = self.metadata[store][field] - if store == "dc": - # dublincore will be different. it uses dicts by default - # this is a one-element match - if isinstance(val, dict): - val = val["content"] - else: - _opt = None - for _val in val: - if len(_val.keys()) == 1: - _opt = _val["content"] - break - if _opt is None: - _opt = val[0]["content"] - val = _opt - else: - if isinstance(val, list): - val = val[0] - if encoder: - val = encoder(val) - return val - return None - - # `_coerce_validate_strategy` ensured a compliant strategy - if isinstance(strategy, list): - for store in strategy: - if store in self.metadata: - val = _lookup(store) - if val is not None: - self._get_metadata__last_strategy = store - return val - return None - elif strategy == "all": - rval: Dict = {} - for store in self.metadata: - if store == "_v": - continue - if field in self.metadata[store]: - val = _lookup(store) - rval[store] = val - return rval - else: - raise ValueError("unsupported strategy") + _strategy = self.strategy or "all" + if _strategy == "all": + _strategy = STRATEGY_ALL.copy() + if TYPE_CHECKING: + assert isinstance(_strategy, list) + return _strategy def get_metadatas( self, field: str, - strategy: Union[List[str], str, None] = None, - encoder: Optional[Callable[[str], str]] = None, - ) -> Optional[Union[Dict, List]]: + strategy: "TYPES_STRATEGY" = None, + encoder: Optional["TYPE_ENCODER"] = None, + ) -> Optional[Dict[str, Union[Dict, List]]]: """ looks for the field in various stores. defaults to the core strategy, though you may specify a certain item. if you search for - 'all' it will return a dict of all values. + "all" it will return a dict of all values. This method replaced the legacy method `get_metadatas`. This method will always return a list. @@ -1259,18 +698,31 @@ def get_metadatas( :type field: str :param strategy: - Where to search for the metadata. such as 'all' or - iterable like ['og', 'dc', 'meta', 'page', 'twitter', ] + Where to search for the metadata. such as "all" or + iterable like ["meta", "page", "og", "dc", "twitter", ] :type strategy: string or list :param encoder: a function, such as `encode_ascii`, to encode values before returning. - a valid `encoder` accepts one(1) arg. + a valid `encoder` requires one arg and accepts one optional arg:. + + def encode(value: Union[str, dict], store:Optional[str]=None) -> str: + if store == "dc": + return value["content"].lower() if "content" in value else None + return value.lower() if value is not None else None + if a `default_encoder` is registered, the string "raw" will disable it. :type encoder: function or "raw" + + :param dc_simple: + simplify dc elements into just the 'content' text, not dict + :type encoder: + bool """ + # normalize a strategy into a list of valid options. strategy = self._coerce_validate_strategy(strategy) + assert isinstance(strategy, list) if encoder is None: encoder = self.default_encoder @@ -1281,35 +733,20 @@ def _lookup(store: str) -> Optional[List]: if field in self.metadata[store]: val = self.metadata[store][field] if not isinstance(val, list): - val = [ - val, - ] + val = [val] if encoder: - val = [encoder(v) for v in val] + val = [encoder(v, store) for v in val] return val return None - # `_coerce_validate_strategy` ensured a compliant strategy - if isinstance(strategy, list): - # returns List or None - for store in strategy: - if store in self.metadata: - val = _lookup(store) - if val is not None: - return val - return None - elif strategy == "all": - # returns Dict or None - rval: Dict = {} - for store in self.metadata: - if store == "_v": - continue - if field in self.metadata[store]: - val = _lookup(store) + # returns List or None + rval: Dict = {} + for store in strategy: + if store in self.metadata: + val = _lookup(store) + if val is not None: rval[store] = val - return rval - else: - raise ValueError("unsupported strategy") + return rval or None def is_opengraph_minimum(self) -> bool: """ @@ -1318,10 +755,41 @@ def is_opengraph_minimum(self) -> bool: return all( [ self.metadata["og"].get(attr, None) - for attr in self.og_minimum_requirements + for attr in self._og_minimum_requirements ] ) + def select_first_match( + self, + field: str, + strategy: "TYPES_STRATEGY" = None, + ) -> Optional[str]: + # default + strategy = strategy or self.strategy + + candidates = self.get_metadatas(field, strategy=strategy) + if not candidates: + return None + + # handle "all" + if strategy == "all": + strategy = validate_strategy(strategy) # convert to ordered list + else: + assert isinstance(strategy, list) + + for _strategy in strategy: + if _strategy in candidates: + first_strategy = _strategy + break + first_value = candidates[first_strategy][0] + if isinstance(first_value, dict): + if first_strategy == "dc": + return first_value["content"] + msg = "unknown dict handling for strategy=`%s`" % first_strategy + raise ValueError(msg) + # return None + return first_value + # ------------------------------------------------------------------------------ @@ -1377,14 +845,14 @@ class MetadataParser(object): url: Optional[str] = None url_actual: Optional[str] = None - strategy: Union[List[str], str, None] = None + strategy: "TYPES_STRATEGY" = None LEN_MAX_TITLE: int = 255 only_parse_file_extensions: Optional[List[str]] = None allow_localhosts: Optional[bool] = None require_public_netloc: Optional[bool] = None force_doctype: Optional[bool] = None - requests_timeout: TYPE_REQUESTS_TIMEOUT = None - peername: Optional[TYPES_PEERNAME] = None + requests_timeout: "TYPE_REQUESTS_TIMEOUT" = None + peername: Optional["TYPES_PEERNAME"] = None is_redirect: Optional[bool] = None is_redirect_unique: Optional[bool] = None is_redirect_same_host: Optional[bool] = None @@ -1395,10 +863,10 @@ class MetadataParser(object): requests_session: Optional[requests.Session] = None derive_encoding: Optional[bool] = None default_encoding: Optional[str] = None - default_encoder: Optional[Callable[[str], str]] = None + default_encoder: Optional["TYPE_ENCODER"] = None support_malformed: Optional[bool] = None - urlparse: Callable[[str], ParseResult] + urlparse: "TYPE_URLPARSE" _cached_urlparser: Optional[_UrlParserCacheable] # this has a per-parser default tuple @@ -1409,13 +877,13 @@ class MetadataParser(object): _content_types_parse: Tuple[str, ...] = ("text/html",) _content_types_noparse: Tuple[str, ...] = ("application/json",) - response: Optional[TYPES_RESPONSE] + response: Optional["TYPES_RESPONSE"] def __init__( self, url: Optional[str] = None, html: Optional[str] = None, - strategy: Union[List[str], str, None] = None, + strategy: "TYPES_STRATEGY" = None, url_data: Optional[Dict[str, Any]] = None, url_headers: Optional[Dict[str, str]] = None, force_parse: bool = False, @@ -1425,7 +893,7 @@ def __init__( require_public_netloc: bool = True, allow_localhosts: Optional[bool] = None, force_doctype: bool = False, - requests_timeout: TYPE_REQUESTS_TIMEOUT = None, + requests_timeout: "TYPE_REQUESTS_TIMEOUT" = None, raise_on_invalid: bool = False, search_head_only: bool = False, allow_redirects: bool = True, @@ -1435,10 +903,10 @@ def __init__( derive_encoding: bool = True, html_encoding: Optional[str] = None, default_encoding: Optional[str] = None, - default_encoder: Optional[Callable[[str], str]] = None, + default_encoder: Optional["TYPE_ENCODER"] = None, retry_dropped_without_headers: Optional[bool] = None, support_malformed: Optional[bool] = None, - cached_urlparser: Union[bool, int, Callable[[str], ParseResult]] = True, + cached_urlparser: Union[bool, "TYPE_URLPARSE"] = True, cached_urlparser_maxitems: Optional[int] = None, ): """ @@ -1533,9 +1001,6 @@ def __init__( `cached_urlparser` default: True options: True: use a instance of UrlParserCacheable(maxitems=30) - : INT: use a instance of UrlParserCacheable(maxitems=cached_urlparser) - DEPRECATED in v13.0 - instead, set `cached_urlparser=True, cached_urlparser_maxitems=maxitems : None/False - use native urlparse : callable - use as a custom urlparse `cached_urlparser_maxitems` @@ -1552,26 +1017,7 @@ def __init__( raise ValueError( "`cached_urlparser_maxitems` requires `cached_urlparser=True`" ) - if cached_urlparser == 0: - warn_future( - "Supplying `0` to `cached_urlparser` to set maxitems is deprecated. " - "This will be removed in the next major or minor release." - "Supply `cached_urlparser=False` instead." - ) - cached_urlparser = False if cached_urlparser: - if isinstance(cached_urlparser, int) and not isinstance( - cached_urlparser, bool - ): - # build a default parser with maxitems - warn_future( - "Supplying an int to `cached_urlparser` to set maxitems is deprecated. " - "This will be removed in the next major or minor release." - "Supply `cached_urlparser=True, cached_urlparser_maxitems=int` instead." - ) - # coerce args for the next block - cached_urlparser_maxitems = cached_urlparser - cached_urlparser = True if cached_urlparser is True: # build a default parser if cached_urlparser_maxitems is not None: @@ -1590,6 +1036,10 @@ def __init__( else: self.urlparse = urlparse if strategy: + # this method is used for setting default strategy + # as such, validate it on set + validate_strategy(strategy) # will raise `InvalidStrategy` + self.strategy = strategy self.parsed_result.strategy = strategy self.url = self.parsed_result.metadata["_internal"]["url"] = url self.url_actual = self.parsed_result.metadata["_internal"]["url_actual"] = url @@ -1621,7 +1071,7 @@ def __init__( # if `html_encoding` was provided as a kwarg, it becomes the encoding self.response = DummyResponse( text=html, - url=(url or DUMMY_URL), + url=(url or config.DUMMY_URL), encoding=html_encoding, derive_encoding=derive_encoding, default_encoding=default_encoding, @@ -1672,75 +1122,6 @@ def deferred_fetch() -> None: # -------------------------------------------------------------------------- - @property - def metadata(self): - # deprecating in 1.0 - warn_future( - "MetadataParser.metadata is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.metadata - - @property - def metadata_version(self): - # deprecating in 1.0 - warn_future( - "MetadataParser.metadata_version is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.metadata_version - - @property - def metadata_encoding(self): - # deprecating in 1.0 - warn_future( - "MetadataParser.metadata_encoding is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.metadata_encoding - - @property - def soup(self): - # deprecating in 1.0 - warn_future( - "MetadataParser.soup is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.soup - - def get_metadata( - self, - field: str, - strategy: Union[list, str, None] = None, - encoder: Optional[Callable[[str], str]] = None, - ) -> Union[str, Dict[str, Union[str, Dict]], None]: - # deprecating in 1.0; operate on the result instead - warn_future( - "MetadataParser.get_metadata is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.get_metadata( - field, strategy=strategy, encoder=encoder - ) - - def get_metadatas( - self, - field, - strategy: Union[List[str], str, None] = None, - encoder: Optional[Callable[[str], str]] = None, - ) -> Optional[Union[Dict, List]]: - # deprecating in 1.0; operate on the result instead - warn_future( - "MetadataParser.get_metadatas is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.get_metadatas( - field, strategy=strategy, encoder=encoder - ) - - def is_opengraph_minimum(self) -> bool: - # deprecating in 1.0 - warn_future( - "MetadataParser.is_opengraph_minimum is deprecated in 1.0; Operate on the parsed result directly." - ) - return self.parsed_result.is_opengraph_minimum() - - # -------------------------------------------------------------------------- - def deferred_fetch(self): # allows for a deferrable fetch; override in __init__ raise ValueError("no `deferred_fetch` set") @@ -1750,7 +1131,7 @@ def deferred_fetch(self): def _response_encoding(self) -> Optional[str]: if self.response: return self.response.encoding - return self.default_encoding or ENCODING_FALLBACK + return self.default_encoding or config.ENCODING_FALLBACK def fetch_url( self, @@ -1760,13 +1141,13 @@ def fetch_url( force_parse_invalid_content_type: Optional[bool] = None, allow_redirects: Optional[bool] = None, ssl_verify: Optional[bool] = None, - requests_timeout: TYPE_REQUESTS_TIMEOUT = None, + requests_timeout: "TYPE_REQUESTS_TIMEOUT" = None, requests_session: Optional[requests.Session] = None, only_parse_http_ok: Optional[bool] = None, derive_encoding: Optional[bool] = None, default_encoding: Optional[str] = None, retry_dropped_without_headers: Optional[bool] = None, - ) -> TYPE_URL_FETCH: + ) -> "TYPE_URL_FETCH": """ fetches the url and returns a tuple of (html, html_encoding). this was busted out so you could subclass. @@ -2355,7 +1736,7 @@ def parse( pass # optimize this away on cpython production servers if __debug__: - if TESTING: + if config.TESTING: pprint.pprint(self.parsed_result.__dict__) def get_url_scheme(self) -> Optional[str]: @@ -2433,13 +1814,16 @@ def get_url_canonical( url_fallback=True allow_unicode_url=True """ - _candidates = self.parsed_result.get_metadatas("canonical", strategy=["page"]) + _candidates = self.parsed_result.get_metadatas( + "canonical", strategy=["page", "meta"] + ) + candidates = _candidates["page"] if _candidates else _candidates # get_metadatas returns a list, so find the first canonical item - _candidates = [c for c in _candidates if c] if _candidates else [] - if not _candidates: + candidates = [c for c in candidates if c] if candidates else [] + if not candidates: return None - canonical = _candidates[0] + canonical = candidates[0] # does the canonical have valid characters? # some websites, even BIG PROFESSIONAL ONES, will put html in here. @@ -2508,12 +1892,16 @@ def get_url_opengraph( url_fallback=None allow_unicode_url=True """ - _candidates = self.parsed_result.get_metadatas("url", strategy=["og"]) + _candidates = self.parsed_result.get_metadatas( + "url", strategy=["og", "page", "meta"] + ) + candidates = _candidates["og"] if _candidates else _candidates + # get_metadatas returns a list, so find the first og item - _candidates = [c for c in _candidates if c] if _candidates else [] - if not _candidates: + candidates = [c for c in candidates if c] if candidates else [] + if not candidates: return None - og = _candidates[0] + og = candidates[0] # does the og have valid characters? # some websites, even BIG PROFESSIONAL ONES, will put html in here. @@ -2628,7 +2016,7 @@ def get_discrete_url( def get_metadata_link( self, field: str, - strategy: Union[List[str], str, None] = None, + strategy: "TYPES_STRATEGY" = None, allow_encoded_uri: bool = False, require_public_global: bool = True, ) -> Optional[str]: @@ -2639,7 +2027,7 @@ def get_metadata_link( kwargs: strategy=None - 'all' or List ['og', 'dc', 'meta', 'page', 'twitter', ] + "all" or List ["og", "dc", "meta", "page", "twitter", ] allow_encoded_uri=False require_public_global=True @@ -2648,12 +2036,10 @@ def get_metadata_link( also require the fallback url to be on the public internet and not a localhost value. """ - _candidates = self.parsed_result.get_metadatas(field, strategy=strategy) - _candidates = [c for c in _candidates if c] if _candidates else [] - if not _candidates: - return None # `_value` will be our raw value - _value = _candidates[0] + _value = self.parsed_result.select_first_match(field, strategy=strategy) + if not _value: + return None # `value` will be our clean value # remove whitespace, because some bad blogging platforms add in whitespace by printing elements on multiple lines. d'oh! diff --git a/src/metadata_parser/config.py b/src/metadata_parser/config.py new file mode 100644 index 0000000..d16fc2c --- /dev/null +++ b/src/metadata_parser/config.py @@ -0,0 +1,24 @@ +import os + +# ============================================================================== + +# defaults +DISABLE_TLDEXTRACT = bool( + int(os.environ.get("METADATA_PARSER__DISABLE_TLDEXTRACT", "0")) +) +DUMMY_URL = os.environ.get( + "METADATA_PARSER__DUMMY_URL", "http://example.com/index.html" +) +ENCODING_FALLBACK = os.environ.get("METADATA_PARSER__ENCODING_FALLBACK", "ISO-8859-1") +FUTURE_BEHAVIOR = bool(int(os.getenv("METADATA_PARSER_FUTURE", "0"))) +TESTING = bool(int(os.environ.get("METADATA_PARSER__TESTING", "0"))) + +""" +# currently unused +MAX_CONNECTIONTIME = int( + os.environ.get("METADATA_PARSER__MAX_CONNECTIONTIME", 20) +) # in seconds +MAX_FILESIZE = int( + os.environ.get("METADATA_PARSER__MAX_FILESIZE", 2 ** 19) +) # bytes; this is .5MB +""" diff --git a/src/metadata_parser/exceptions.py b/src/metadata_parser/exceptions.py new file mode 100644 index 0000000..da9e4e2 --- /dev/null +++ b/src/metadata_parser/exceptions.py @@ -0,0 +1,105 @@ +# stdlib +from typing import Optional +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import requests + from . import MetadataParser + from .typing import TYPES_RESPONSE + +# ============================================================================== + + +class AllowableError(Exception): + pass + + +class InvalidDocument(Exception): + + def __str__(self) -> str: + return "InvalidDocument: %s" % (self.args[0]) + + +class InvalidStrategy(ValueError): + + def __str__(self) -> str: + return "InvalidStrategy: %s" % (self.args[0]) + + +class NotParsable(Exception): + code: Optional[int] + metadataParser: Optional["MetadataParser"] + raised: Optional["requests.exceptions.RequestException"] + response: Optional["TYPES_RESPONSE"] + + def __init__( + self, + message: str = "", + raised: Optional["requests.exceptions.RequestException"] = None, + code: Optional[int] = None, + metadataParser: Optional["MetadataParser"] = None, + response: Optional["TYPES_RESPONSE"] = None, + ): + super().__init__(message, raised, code, metadataParser, response) + self.code = code + self.message = message + self.metadataParser = metadataParser + self.raised = raised + self.response = response + + def __str__(self) -> str: + return "NotParsable: %s | %s | %s" % (self.message, self.code, self.raised) + + +class NotParsableJson(NotParsable): + def __str__(self) -> str: + return "NotParsableJson: %s | %s | %s" % (self.message, self.code, self.raised) + + +class NotParsableRedirect(NotParsable): + """Raised if a redirect is detected, but there is no Location header.""" + + def __str__(self) -> str: + return "NotParsableRedirect: %s | %s | %s" % ( + self.message, + self.code, + self.raised, + ) + + +class NotParsableFetchError(NotParsable): + def __str__(self) -> str: + return "NotParsableFetchError: %s | %s | %s" % ( + self.message, + self.code, + self.raised, + ) + + +class RedirectDetected(Exception): + """ + Raised if a redirect is detected + Instance properties: + + ``location``: redirect location + ``code``: status code of the response + ``response``: actual response object + """ + + code: Optional[int] + location: str + metadataParser: Optional["MetadataParser"] + response: Optional["TYPES_RESPONSE"] + + def __init__( + self, + location: str = "", + code: Optional[int] = None, + response: Optional["TYPES_RESPONSE"] = None, + metadataParser: Optional["MetadataParser"] = None, + ): + super().__init__(location, code, response, metadataParser) + self.code = code + self.location = location + self.metadataParser = metadataParser + self.response = response diff --git a/src/metadata_parser/regex.py b/src/metadata_parser/regex.py new file mode 100644 index 0000000..9c04d0f --- /dev/null +++ b/src/metadata_parser/regex.py @@ -0,0 +1,80 @@ +import re + +# ============================================================================== + +# regex library + +RE_ALL_NUMERIC = re.compile(r"^[\d\.]+$") +RE_bad_title = re.compile( + r"""(?:<title>|<title>)(.*)(?:<?/title>|(?:<)?/title>)""", re.I +) +RE_canonical = re.compile("^canonical$", re.I) +RE_doctype = re.compile(r"^\s*<!DOCTYPE[^>]*>", re.IGNORECASE) +RE_DOMAIN_NAME = re.compile( + r"""(^ + (?: + [A-Z0-9] + (?: + [A-Z0-9-]{0,61} + [A-Z0-9] + )? + \. + )+ + (?: + [A-Z]{2,6}\.? + | + [A-Z0-9-]{2,} + (?<!-)\.?) + $)""", + re.VERBOSE | re.IGNORECASE, +) +RE_IPV4_ADDRESS = re.compile( + r"^(\d{1,3})\.(\d{1,3}).(\d{1,3}).(\d{1,3})$" # grab 4 octets +) +RE_PORT = re.compile(r"^" r"(?P<main>.+)" r":" r"(?P<port>\d+)" r"$", re.IGNORECASE) +RE_prefix_opengraph = re.compile(r"^og") +RE_prefix_rel_img_src = re.compile("^image_src$", re.I) +RE_prefix_twitter = re.compile(r"^twitter") + +# we may need to test general validity of url components +RE_rfc3986_valid_characters = re.compile( + r"""^[a-z0-9\-\.\_\~\:\/\?\#\[\]\@\!\$\&\'\(\)\*\+\,\;\=\%]+$""", re.I +) +r""" +What is valid in the RFC? + # don't need escaping + rfc3986_unreserved__noescape = ['a-z', '0-9', ] + + # do need escaping + rfc3986_unreserved__escape = ['-', '.', '_', '~', ] + rfc3986_gen_delims__escape = [":", "/", "?", "#", "[", "]", "@", ] + rfc3986_sub_delims__escape = ["!", "$", "&", "'", "(", ")", "*", "+", ",", ";", "=", ] + rfc3986_pct_encoded__escape = ["%", ] + rfc3986__escape = rfc3986_unreserved__escape + rfc3986_gen_delims__escape + rfc3986_sub_delims__escape + rfc3986_pct_encoded__escape + rfc3986__escaped = re.escape(''.join(rfc3986__escape)) + rfc3986_chars = ''.join(rfc3986_unreserved__noescape) + rfc3986__escaped + print rfc3986_chars + + a-z0-9\-\.\_\~\:\/\?\#\[\]\@\!\$\&\'\(\)\*\+\,\;\=\% +""" + +RE_shortlink = re.compile("^shortlink$", re.I) +RE_whitespace = re.compile(r"\s+") + +# based on DJANGO +# https://github.com/django/django/blob/master/django/core/validators.py +# not testing ipv6 right now, because rules are needed for ensuring they +# are correct +RE_VALID_NETLOC = re.compile( + r"(?:" + r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" + r"|" # ...or ipv4 + # r'(?P<ipv6>\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 + # r'|' + r"(?P<localhost>localhost)" # localhost... + r"|" + r"(?P<domain>([A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}(?<!-)\.?))" # domain... + r"(?P<port>:\d+)?" # optional port + r")", + re.IGNORECASE, +) diff --git a/src/metadata_parser/requests_extensions.py b/src/metadata_parser/requests_extensions.py new file mode 100644 index 0000000..1d06f5b --- /dev/null +++ b/src/metadata_parser/requests_extensions.py @@ -0,0 +1,186 @@ +import _socket # noqa: I201 + +# stdlib +import cgi # noqa: I100 +import logging +import socket +from typing import Optional +from typing import Tuple +from typing import TYPE_CHECKING + +# pypi +import requests +from requests_toolbelt.utils.deprecated import get_encodings_from_content + +# local +from . import config +from .exceptions import AllowableError +from .utils import DummyResponse +from .utils import safe_sample + + +if TYPE_CHECKING: + from requests.structures import CaseInsensitiveDict + from .typing import TYPES_PEERNAME + from .typing import TYPES_RESPONSE + + +# ============================================================================== + +log = logging.getLogger("metdata_parser") + +# ------------------------------------------------------------------------------ + + +# peername hacks +# only use for these stdlib packages +# eventually will not be needed thanks to upstream changes in `requests` +try: + _compatible_sockets: Tuple = ( + _socket.socket, + socket._socketobject, # type: ignore[attr-defined] + ) +except AttributeError: + _compatible_sockets: Tuple = (_socket.socket,) # type: ignore[no-redef] + + +def derive_encoding__hook(resp: "TYPES_RESPONSE", *args, **kwargs) -> None: + """ + a note about `requests` + + `response.content` is the raw response bytes + `response.text` is `response.content` decoded to the identified codec or + the fallback codec. + + This fallback codec is normally iso-8859-1 (latin-1) which is defined by the + RFC for HTTP as the default when no codec is provided in the headers or + body. This hook exists because users in certain regions may expect the + servers to not follow RFC and for the default encoding to be different. + """ + if TYPE_CHECKING: + assert hasattr(resp, "_encoding_fallback") + assert hasattr(resp, "_encoding_content") + assert hasattr(resp, "_encoding_headers") + + resp._encoding_fallback = config.ENCODING_FALLBACK + # modified version, returns `None` if no charset available + resp._encoding_headers = get_encoding_from_headers(resp.headers) + resp._encoding_content = None + if not resp._encoding_headers and resp.content: + # html5 spec requires a meta-charset in the first 1024 bytes + _sample = safe_sample(resp.content) + resp._encoding_content = get_encodings_from_content(_sample) + if resp._encoding_content: + # it's a list + resp.encoding = resp._encoding_content[0] + else: + resp.encoding = resp._encoding_headers or resp._encoding_fallback + # do not return anything + + +def get_encoding_from_headers(headers: "CaseInsensitiveDict") -> Optional[str]: + """ + Returns encodings from given HTTP Header Dict. + + :param headers: dictionary to extract encoding from. + :rtype: str + + `requests.get("http://example.com").headers` + should be `requests.structures.CaseInsensitiveDict` + + ---------------------------------------------------------------------------- + + Modified from `requests` version 2.x + + The Requests Library: + + Copyright 2017 Kenneth Reitz + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + """ + content_type = headers.get("content-type") + if not content_type: + return None + content_type, params = cgi.parse_header(content_type) + if "charset" in params: + return params["charset"].strip("'\"") + return None + + +# ------------------------------------------------------------------------------ + + +def get_response_peername(resp: "TYPES_RESPONSE") -> Optional["TYPES_PEERNAME"]: + """ + used to get the peername (ip+port) data from the request + if a socket is found, caches this onto the request object + + IMPORTANT. this must happen BEFORE any content is consumed. + + `response` is really `requests.models.Response` + + This will UPGRADE the response object to have the following attribute: + + * _mp_peername + """ + if not isinstance(resp, requests.Response) and not isinstance(resp, DummyResponse): + # raise AllowableError("Not a HTTPResponse") + log.debug("Not a supported HTTPResponse | %s", resp) + log.debug("-> received a type of: %s", type(resp)) + return None + + if hasattr(resp, "_mp_peername"): + return resp._mp_peername + + def _get_socket() -> Optional[socket.socket]: + if isinstance(resp, DummyResponse): + return None + i = 0 + while True: + i += 1 + try: + if i == 1: + sock = resp.raw._connection.sock # type: ignore[union-attr] + elif i == 2: + sock = resp.raw._connection.sock.socket # type: ignore[union-attr] + elif i == 3: + sock = resp.raw._fp.fp._sock # type: ignore[union-attr] + elif i == 4: + sock = resp.raw._fp.fp._sock.socket # type: ignore[union-attr] + elif i == 5: + sock = resp.raw._fp.fp.raw._sock # type: ignore[union-attr] + else: + break + if not isinstance(sock, _compatible_sockets): + raise AllowableError() + return sock + except Exception: + pass + return None + + sock = _get_socket() + if sock: + # only cache if we have a sock + # we may want/need to call again + resp._mp_peername = sock.getpeername() # type: ignore [union-attr] + else: + resp._mp_peername = None # type: ignore [union-attr] + return resp._mp_peername # type: ignore [union-attr] + + +# ------------------------------------------------------------------------------ + + +def response_peername__hook(resp: "TYPES_RESPONSE", *args, **kwargs) -> None: + get_response_peername(resp) + # do not return anything diff --git a/src/metadata_parser/typing.py b/src/metadata_parser/typing.py new file mode 100644 index 0000000..e87d617 --- /dev/null +++ b/src/metadata_parser/typing.py @@ -0,0 +1,36 @@ +# stdlib +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import TYPE_CHECKING +from typing import Union + +# pypi +from typing_extensions import Protocol # py38 + +if TYPE_CHECKING: + import requests + from urllib.parse import ParseResult + from . import DummyResponse + from . import ResponseHistory + +# ============================================================================== + +# TYPE_ENCODER = Callable[[str, Optional[str]], str] # def encode(value, strategy) +TYPE_ENCODER = Callable[ + [str, Optional[str]], Union[str, Dict] +] # def encode(value, strategy) +TYPE_REQUESTS_TIMEOUT = Optional[ + Union[int, float, Tuple[int, int], Tuple[float, float]] +] +TYPE_URL_FETCH = Tuple[str, str, "ResponseHistory"] +TYPE_URLPARSE = Callable[[str], "ParseResult"] +TYPES_PEERNAME = Tuple[str, int] # (ip, port) +TYPES_RESPONSE = Union["DummyResponse", "requests.Response"] +TYPES_STRATEGY = Union[List[str], str, None] + + +class _UrlParserCacheable(Protocol): + urlparse: TYPE_URLPARSE diff --git a/src/metadata_parser/utils.py b/src/metadata_parser/utils.py new file mode 100644 index 0000000..23490e4 --- /dev/null +++ b/src/metadata_parser/utils.py @@ -0,0 +1,184 @@ +# stdlib +import datetime +from html import unescape as html_unescape +import logging +from typing import AnyStr +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import TYPE_CHECKING +from typing import Union +import unicodedata +from urllib.parse import quote as url_quote +from urllib.parse import unquote as url_unquote +from urllib.parse import urlparse +from urllib.parse import urlunparse +import warnings + +# pypi +from requests.structures import CaseInsensitiveDict +from requests_toolbelt.utils.deprecated import get_encodings_from_content + +# local +from . import config +from .regex import RE_rfc3986_valid_characters + + +if TYPE_CHECKING: + from urllib.parse import ParseResult + + +# ============================================================================== + +log = logging.getLogger("metdata_parser") + +# ------------------------------------------------------------------------------ + + +class DummyResponse(object): + """ + A DummyResponse is used to ensure compatibility between url fetching + and html data + """ + + text: str + url: str + status_code: int + encoding: str + elapsed_seconds: float = 0 + history: List + headers: CaseInsensitiveDict + content: Optional[Union[str, bytes]] = None + default_encoding: str + + def __init__( + self, + text: str = "", + url: str = "", + status_code: int = 200, + encoding: Optional[str] = None, + elapsed_seconds: float = 0, + headers: Optional[CaseInsensitiveDict] = None, + content: Optional[AnyStr] = None, + derive_encoding: Optional[bool] = None, + default_encoding: Optional[str] = None, + ): + self.text = text + self.url = url + self.status_code = status_code + self.elapsed = datetime.timedelta(0, elapsed_seconds) + self.headers = headers if headers is not None else CaseInsensitiveDict() + self.history = [] + self.content = content + + # start `encoding` block + if encoding: + self.encoding = encoding + elif derive_encoding: + # only examine first 1024 bytes. in this case chars. utf could be 4x chars + _sample = safe_sample(text) + encodings = get_encodings_from_content(_sample) + if encodings: + self.encoding = encoding = encodings[0] + self.default_encoding = default_encoding or config.ENCODING_FALLBACK + # second phase cleanup + if not encoding: + self.encoding = self.default_encoding + # end `encoding` block + + +def decode_html(raw: Union[str, Dict], strategy: Optional[str] = None) -> str: + """ + helper function to decode text that has both HTML and non-ascii characters + """ + if isinstance(raw, dict): + if strategy == "dc": + return decode_html(raw.get("content", "")) + raise ValueError("strategy `%s` not known to support `dict") + text = encode_ascii(html_unescape(raw)) + return text + + +def encode_ascii(raw: Union[str, Dict], strategy: Optional[str] = None) -> str: + """ + helper function to force ascii; + some edge-cases have unicode line breaks in titles/etc. + + reference function for `encoder` + + The first arg is a `raw` value to be encoded, which will either be a str or + dict. The second arg is an (optional) str that identifies the strategy. + + When invoked by `MetadataParser.get_metadatas()` the strategy will always + be sent. + """ + if isinstance(raw, dict): + if strategy == "dc": + return encode_ascii(raw.get("content", "")) + raise ValueError("strategy `%s` not known to support `dict") + if not raw: + raw = "" + _as_bytes = unicodedata.normalize("NFKD", raw).encode("ascii", "ignore") + _as_str = _as_bytes.decode("utf-8", "ignore") + return _as_str + + +def fix_unicode_url( + url: str, + encoding: Optional[str] = None, + urlparser: Callable[[str], "ParseResult"] = urlparse, +) -> str: + """ + some cms systems will put unicode in their canonical url + this is not allowed by rfc. + currently this function will update the PATH but not the kwargs. + perhaps it should. + rfc3986 says that characters should be put into utf8 then percent encoded + + kwargs: + `encoding` - used for python2 encoding + `urlparser` - defaults to standard `urlparse`, can be substituted with + a cacheable version. + """ + parsed = urlparser(url) + if parsed.path in ("", "/"): + # can't do anything + return url + if RE_rfc3986_valid_characters.match(parsed.path): + # again, can't do anything + return url + # okay, we know we have bad items in the path, so try and upgrade! + # turn the namedtuple from urlparse into something we can edit + candidate = [i for i in parsed] + for _idx in [2]: # 2=path, 3=params, 4=queryparams, 5fragment + try: + candidate[_idx] = parsed[_idx] + candidate[_idx] = url_quote(url_unquote(candidate[_idx])) + except Exception as exc: + log.debug("fix_unicode_url failure: %s | %s | %s", url, encoding, exc) + return url + _url = urlunparse(candidate) + return _url + + +def safe_sample(source: Union[str, bytes]) -> bytes: + if isinstance(source, bytes): + _sample = source[:1024] + else: + # this block can cause an error on PY3 depending on where the data came + # from such as what the source is (from a request vs a document/test) + # thanks, @keyz182 for the PR/investigation + # https://github.com/jvanasco/metadata_parser/pull/16 + _sample = (source.encode())[:1024] + return _sample + + +def warn_future(message: str) -> None: + warnings.warn(message, FutureWarning, stacklevel=2) + if config.FUTURE_BEHAVIOR: + raise ValueError(message) + + +def warn_user(message: str) -> None: + warnings.warn(message, UserWarning, stacklevel=2) diff --git a/tests/html_scaffolds/duplicates.html b/tests/html_scaffolds/duplicates.html index b3cf863..20297a4 100644 --- a/tests/html_scaffolds/duplicates.html +++ b/tests/html_scaffolds/duplicates.html @@ -129,7 +129,17 @@ <meta name="google-site-verification" content="123123123"> <meta name="twitter:data1" value="8 min read"> <meta name="google" value="notranslate"> - <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"> + <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"> + + <!-- testing order --> + <meta name="keywords.order" content="meta.keywords.order::1"> + <meta name="keywords.order" content="meta.keywords.order::2"> + <meta property="og:keywords.order" content="meta.property=og:keywords.order::1" /> + <meta property="og:keywords.order" content="meta.property=og:keywords.order::2" /> + <meta name="dc.keywords.order" content="dc:keywords.order::1"/> + <meta name="dc.keywords.order" content="dc:keywords.order::2"/> + <meta name='twitter:keywords.order' content='meta.name=twitter:keywords.order::1' /> + <meta name='twitter:keywords.order' content='meta.name=twitter:keywords.order::2' /> </head> <body> diff --git a/tests/test_document_parsing.py b/tests/test_document_parsing.py index 094822c..e0bd8ff 100644 --- a/tests/test_document_parsing.py +++ b/tests/test_document_parsing.py @@ -1,11 +1,19 @@ # stdlib import os +from typing import Callable from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union import unittest -import warnings # local import metadata_parser +from metadata_parser import MetadataParser +from metadata_parser import urlparse +from metadata_parser.exceptions import InvalidStrategy + # ============================================================================== @@ -148,10 +156,22 @@ } -def encoder_capitalizer(decoded): - if type(decoded) is dict: - return {k.upper(): v.upper() for k, v in decoded.items()} - return decoded.upper() +def encoder_capitalizer( + raw: Union[str, Dict], strategy: Optional[str] = None +) -> Union[str, dict]: + # note, an api compliant encoder will only return str + if isinstance(raw, dict): + return {k.upper(): v.upper() for k, v in raw.items()} + return raw.upper() + + +def encoder_lowercaser( + raw: Union[str, Dict], strategy: Optional[str] = None +) -> Union[str, dict]: + # note, an api compliant encoder will only return str + if isinstance(raw, dict): + return {k.lower(): v.lower() for k, v in raw.items()} + return raw.lower() # setup the test_docs with html bodies @@ -216,7 +236,9 @@ def _docs_test(test_names): return errors -def _docs_test_parser(test_names, cached_urlparser, cached_urlparser_maxitems=None): +def _docs_test_parser( + test_names, cached_urlparser, cached_urlparser_maxitems=None +) -> Tuple[metadata_parser.MetadataParser, List]: errors = [] for test in test_names: tests = [] @@ -237,7 +259,7 @@ def _docs_test_parser(test_names, cached_urlparser, cached_urlparser_maxitems=No errors.append([test, "get_discrete_url()", url_expected, url_retrieved]) if not tests: raise ValueError("No tests!") - return errors + return parsed, errors class TestHtmlDocument(unittest.TestCase): @@ -322,10 +344,10 @@ def _make_raw(self, data_option): # create a parsed result, and inject raw data. # data coming through beautifulsoup will be parsed differently parsed = metadata_parser.MetadataParser() - parsed.metadata["meta"]["title"] = self._data[data_option]["raw"] + parsed.parsed_result.metadata["meta"]["title"] = self._data[data_option]["raw"] return parsed - def _make_html(self, data_option, default_encoder=None): + def _make_html(self, data_option, default_encoder: Optional[Callable] = None): # data coming through beautifulsoup is parsed by that library parsed = metadata_parser.MetadataParser( html=self._data[data_option]["html"], @@ -336,50 +358,69 @@ def _make_html(self, data_option, default_encoder=None): def test_unicode_whitespace(self): parsed = self._make_raw("unicode_whitespace") - # title_raw = parsed.get_metadata('title') - title_ascii = parsed.get_metadata("title", encoder=metadata_parser.encode_ascii) - self.assertEqual(title_ascii, self._data["unicode_whitespace"]["ascii"]) + # title_raw = parsed.parsed_result.get_metadatas('title') + _title_ascii = parsed.parsed_result.get_metadatas( + "title", encoder=metadata_parser.utils.encode_ascii + ) + title_ascii = _title_ascii["meta"] + self.assertEqual(title_ascii[0], self._data["unicode_whitespace"]["ascii"]) def test_unicode_chars(self): parsed = self._make_raw("unicode_chars") - # title_raw = parsed.get_metadata('title') - title_ascii = parsed.get_metadata("title", encoder=metadata_parser.encode_ascii) - self.assertEqual(title_ascii, self._data["unicode_chars"]["ascii"]) + # title_raw = parsed.parsed_result.get_metadatas('title') + _title_ascii = parsed.parsed_result.get_metadatas( + "title", encoder=metadata_parser.utils.encode_ascii + ) + title_ascii = _title_ascii["meta"] + self.assertEqual(title_ascii[0], self._data["unicode_chars"]["ascii"]) def test_decode_html_encoder(self): parsed = self._make_html("decode_html_encoder") - parsed_description = parsed.get_metadata("description") + _parsed_description = parsed.parsed_result.get_metadatas("description") + parsed_description = _parsed_description["meta"] - decoded_direct = metadata_parser.decode_html(parsed_description) + decoded_direct = metadata_parser.utils.decode_html(parsed_description[0]) self.assertEqual(decoded_direct, self._data["decode_html_encoder"]["decoded"]) - decoded_decoder = parsed.get_metadata( - "description", encoder=metadata_parser.decode_html + _decoded_decoder = parsed.parsed_result.get_metadatas( + "description", encoder=metadata_parser.utils.decode_html + ) + decoded_decoder = _decoded_decoder["meta"] + self.assertEqual( + decoded_decoder[0], self._data["decode_html_encoder"]["decoded"] ) - self.assertEqual(decoded_decoder, self._data["decode_html_encoder"]["decoded"]) def test_default_encoder(self): """ ensure the default decoder is invoked """ parsed_with_default = self._make_html( - "decode_html_encoder", default_encoder=metadata_parser.decode_html + "decode_html_encoder", default_encoder=metadata_parser.utils.decode_html ) parsed_no_default = self._make_html("decode_html_encoder") # does the default_decoder work? - decoded_default = parsed_with_default.get_metadata("description") - self.assertEqual(decoded_default, self._data["decode_html_encoder"]["decoded"]) + _decoded_default = parsed_with_default.parsed_result.get_metadatas( + "description" + ) + decoded_default = _decoded_default["meta"] + self.assertEqual( + decoded_default[0], self._data["decode_html_encoder"]["decoded"] + ) # does the no decoder work as expected? - not_decoded = parsed_no_default.get_metadata("description") - self.assertEqual(not_decoded, self._data["decode_html_encoder"]["parsed"]) + _not_decoded = parsed_no_default.parsed_result.get_metadatas("description") + not_decoded = _not_decoded["meta"] + self.assertEqual(not_decoded[0], self._data["decode_html_encoder"]["parsed"]) # can we override the default_decoder to get RAW? - decoded_override = parsed_with_default.get_metadata( + _decoded_override = parsed_with_default.parsed_result.get_metadatas( "description", encoder="raw" ) - self.assertEqual(decoded_override, self._data["decode_html_encoder"]["parsed"]) + decoded_override = _decoded_override["meta"] + self.assertEqual( + decoded_override[0], self._data["decode_html_encoder"]["parsed"] + ) # can we override the default_decoder to get something else? # ensure these 2 aren't equal, otherwise the next bit doesn't really test! @@ -387,22 +428,16 @@ def test_default_encoder(self): self._data["decode_html_encoder"]["parsed"], self._data["decode_html_encoder"]["parsed"].upper(), ) - decoded_override = parsed_with_default.get_metadata( - "description", encoder=lambda x: x.upper() + _decoded_override = parsed_with_default.parsed_result.get_metadatas( + "description", encoder=encoder_capitalizer ) + decoded_override = _decoded_override["meta"] self.assertEqual( - decoded_override, self._data["decode_html_encoder"]["parsed"].upper() + decoded_override[0], self._data["decode_html_encoder"]["parsed"].upper() ) -class TestDocumentParsing(unittest.TestCase): - """ - python -m unittest tests.document_parsing.TestDocumentParsing - python -m unittest tests.document_parsing.TestDocumentParsing.test_simple_html - python -m unittest tests.document_parsing.TestDocumentParsing.test_html_urls - python -m unittest tests.document_parsing.TestDocumentParsing.test_complex_html - python -m unittest tests.document_parsing.TestDocumentParsing.test_charsets - """ +class _TestDocumentParsingCore: def _MakeOne(self, filename): """lazy cache of files as needed""" @@ -413,97 +448,203 @@ def _MakeOne(self, filename): ).read() return CACHED_FILESYSTEM_DOCUMENTS[filename] + def _MakeOneParsed(self, **kwargs) -> metadata_parser.MetadataParser: + html = self._MakeOne("duplicates.html") + + mp_kwargs = {} + if "strategy" in kwargs: + mp_kwargs["strategy"] = kwargs["strategy"] + + parsed = metadata_parser.MetadataParser(url=None, html=html, **mp_kwargs) + + # we should be tracking the verison now + self.assertIn("_v", parsed.parsed_result.metadata) + + # it should be the same version + self.assertEqual( + parsed.parsed_result.metadata_version, + metadata_parser.ParsedResult._version, + ) + + # we should be tracking the verison now + self.assertIn("_v", parsed.parsed_result.metadata) + + # it should be the same version + self.assertEqual( + parsed.parsed_result.metadata_version, metadata_parser.ParsedResult._version + ) + return parsed + + +class TestDocumentParsing_Exceptions(unittest.TestCase, _TestDocumentParsingCore): + + def test__all_in_list(self): + parsed = self._MakeOneParsed() + # this should error! + with self.assertRaises(InvalidStrategy) as cm: + parsed.parsed_result.get_metadatas("canonical", strategy=["all"]) + self.assertEqual( + cm.exception.args[0], + 'Submit "all" as a `str`, not in a `list`.', + ) + + def test__known_as_str(self): + parsed = self._MakeOneParsed() + # this should error! + with self.assertRaises(InvalidStrategy) as cm: + parsed.parsed_result.get_metadatas("TestMixedCandidates1a", strategy="dc") + self.assertEqual( + cm.exception.args[0], + 'If `strategy` is not a `list`, it must be "all".', + ) + + def test__unknown_in_list(self): + parsed = self._MakeOneParsed() + # this should error! + with self.assertRaises(InvalidStrategy) as cm: + parsed.parsed_result.get_metadatas("canonical", strategy=["unknown"]) + self.assertEqual( + cm.exception.args[0], + 'Invalid strategy: "unknown".', + ) + with self.assertRaises(InvalidStrategy) as cm: + parsed.parsed_result.get_metadatas( + "canonical", strategy=["unknown", "unknown-too"] + ) + self.assertEqual( + cm.exception.args[0], + 'Invalid strategy: "unknown", "unknown-too".', + ) + + +class TestDocumentParsing(unittest.TestCase, _TestDocumentParsingCore): + """ + python -m unittest tests.document_parsing.TestDocumentParsing + python -m unittest tests.document_parsing.TestDocumentParsing.test_simple_html + python -m unittest tests.document_parsing.TestDocumentParsing.test_html_urls + python -m unittest tests.document_parsing.TestDocumentParsing.test_complex_html + python -m unittest tests.document_parsing.TestDocumentParsing.test_charsets + """ + def test_simple_html(self): """this tests simple.html to have certain fields""" html = self._MakeOne("simple.html") parsed = metadata_parser.MetadataParser(url=None, html=html) self.assertEqual( - parsed.metadata["meta"]["article:publisher"], + parsed.parsed_result.metadata["meta"]["article:publisher"], "https://www.example.com/meta/property=article:publisher", ) - self.assertEqual(parsed.metadata["meta"]["author"], "meta.author") - self.assertEqual(parsed.metadata["meta"]["description"], "meta.description") - self.assertEqual(parsed.metadata["meta"]["keywords"], "meta.keywords") + self.assertEqual(parsed.parsed_result.metadata["meta"]["author"], "meta.author") + self.assertEqual( + parsed.parsed_result.metadata["meta"]["description"], "meta.description" + ) + self.assertEqual( + parsed.parsed_result.metadata["meta"]["keywords"], "meta.keywords" + ) self.assertEqual( - parsed.metadata["meta"]["og:description"], "meta.property=og:description" + parsed.parsed_result.metadata["meta"]["og:description"], + "meta.property=og:description", ) self.assertEqual( - parsed.metadata["meta"]["og:image"], + parsed.parsed_result.metadata["meta"]["og:image"], "https://www.example.com/meta/property=og:image", ) self.assertEqual( - parsed.metadata["meta"]["og:site_name"], "meta.property=og:site_name" + parsed.parsed_result.metadata["meta"]["og:site_name"], + "meta.property=og:site_name", + ) + self.assertEqual( + parsed.parsed_result.metadata["meta"]["og:title"], "meta.property=og:title" + ) + self.assertEqual( + parsed.parsed_result.metadata["meta"]["og:type"], "meta.property=og:type" ) - self.assertEqual(parsed.metadata["meta"]["og:title"], "meta.property=og:title") - self.assertEqual(parsed.metadata["meta"]["og:type"], "meta.property=og:type") self.assertEqual( - parsed.metadata["meta"]["og:url"], + parsed.parsed_result.metadata["meta"]["og:url"], "https://www.example.com/meta/property=og:url", ) self.assertEqual( - parsed.metadata["meta"]["twitter:card"], "meta.name=twitter:card" + parsed.parsed_result.metadata["meta"]["twitter:card"], + "meta.name=twitter:card", ) self.assertEqual( - parsed.metadata["meta"]["twitter:description"], + parsed.parsed_result.metadata["meta"]["twitter:description"], "meta.name=twitter:description", ) self.assertEqual( - parsed.metadata["meta"]["twitter:image:src"], + parsed.parsed_result.metadata["meta"]["twitter:image:src"], "https://example.com/meta/name=twitter:image:src", ) self.assertEqual( - parsed.metadata["meta"]["twitter:site"], "meta.name=twitter:site" + parsed.parsed_result.metadata["meta"]["twitter:site"], + "meta.name=twitter:site", ) self.assertEqual( - parsed.metadata["meta"]["twitter:title"], "meta.name=twitter:title" + parsed.parsed_result.metadata["meta"]["twitter:title"], + "meta.name=twitter:title", ) self.assertEqual( - parsed.metadata["meta"]["twitter:url"], + parsed.parsed_result.metadata["meta"]["twitter:url"], "https://example.com/meta/name=twitter:url", ) self.assertEqual( - parsed.metadata["og"]["description"], "meta.property=og:description" + parsed.parsed_result.metadata["og"]["description"], + "meta.property=og:description", ) self.assertEqual( - parsed.metadata["og"]["image"], + parsed.parsed_result.metadata["og"]["image"], "https://www.example.com/meta/property=og:image", ) self.assertEqual( - parsed.metadata["og"]["site_name"], "meta.property=og:site_name" + parsed.parsed_result.metadata["og"]["site_name"], + "meta.property=og:site_name", ) - self.assertEqual(parsed.metadata["og"]["title"], "meta.property=og:title") - self.assertEqual(parsed.metadata["og"]["type"], "meta.property=og:type") self.assertEqual( - parsed.metadata["og"]["url"], "https://www.example.com/meta/property=og:url" + parsed.parsed_result.metadata["og"]["title"], "meta.property=og:title" ) self.assertEqual( - parsed.metadata["page"]["canonical"], + parsed.parsed_result.metadata["og"]["type"], "meta.property=og:type" + ) + self.assertEqual( + parsed.parsed_result.metadata["og"]["url"], + "https://www.example.com/meta/property=og:url", + ) + self.assertEqual( + parsed.parsed_result.metadata["page"]["canonical"], "http://example.com/meta/rel=canonical", ) self.assertEqual( - parsed.metadata["page"]["shortlink"], + parsed.parsed_result.metadata["page"]["shortlink"], "http://example.com/meta/rel=shortlink", ) - self.assertEqual(parsed.metadata["page"]["title"], "title") - self.assertEqual(parsed.metadata["twitter"]["card"], "meta.name=twitter:card") + self.assertEqual(parsed.parsed_result.metadata["page"]["title"], "title") self.assertEqual( - parsed.metadata["twitter"]["description"], "meta.name=twitter:description" + parsed.parsed_result.metadata["twitter"]["card"], "meta.name=twitter:card" ) self.assertEqual( - parsed.metadata["twitter"]["image:src"], + parsed.parsed_result.metadata["twitter"]["description"], + "meta.name=twitter:description", + ) + self.assertEqual( + parsed.parsed_result.metadata["twitter"]["image:src"], "https://example.com/meta/name=twitter:image:src", ) - self.assertEqual(parsed.metadata["twitter"]["site"], "meta.name=twitter:site") - self.assertEqual(parsed.metadata["twitter"]["title"], "meta.name=twitter:title") self.assertEqual( - parsed.metadata["twitter"]["url"], + parsed.parsed_result.metadata["twitter"]["site"], "meta.name=twitter:site" + ) + self.assertEqual( + parsed.parsed_result.metadata["twitter"]["title"], "meta.name=twitter:title" + ) + self.assertEqual( + parsed.parsed_result.metadata["twitter"]["url"], "https://example.com/meta/name=twitter:url", ) self.assertEqual( - parsed.metadata["twitter"]["data"], "meta.name=twitter:data||value" + parsed.parsed_result.metadata["twitter"]["data"], + "meta.name=twitter:data||value", ) - self.assertNotIn("label", parsed.metadata["twitter"]) - self.assertEqual(parsed.is_opengraph_minimum(), True) + self.assertNotIn("label", parsed.parsed_result.metadata["twitter"]) + self.assertEqual(parsed.parsed_result.is_opengraph_minimum(), True) def test_html_urls(self): """this tests simple.html to have certain fields""" @@ -539,123 +680,209 @@ def test_encoding_declared(self): assert parsed.response is not None self.assertEqual(parsed.response.encoding, "UTF-8") - def test_complex_html(self): + def test_charsets(self): """ - this tests duplicates.html to have certain fields + python -m unittest tests.document_parsing.TestDocumentParsing.test_charsets + """ + a_html = self._MakeOne("charset_a.html") + a_parsed = metadata_parser.MetadataParser(url=None, html=a_html) + self.assertEqual( + a_parsed.parsed_result.metadata["meta"]["content-type"], + "text/html; charset=UTF-8", + ) - this also ensures some legacy behavior is supported + b_html = self._MakeOne("charset_b.html") + b_parsed = metadata_parser.MetadataParser(url=None, html=b_html) + self.assertEqual(b_parsed.parsed_result.metadata["meta"]["charset"], "UTF-8") + + c_html = self._MakeOne("charset_c.html") + c_parsed = metadata_parser.MetadataParser(url=None, html=c_html) + self.assertEqual(c_parsed.parsed_result.metadata["meta"]["charset"], "UTF-8") - such as calling both: - * `parsed.parsed_result.get_metadatas` - * `parsed.get_metadatas` + def test_malformed_twitter(self): """ - html = self._MakeOne("duplicates.html") + this tests simple.html to have certain fields + python -munittest tests.document_parsing.TestDocumentParsing.test_malformed_twitter + """ + html = self._MakeOne("simple.html") + + # the default behavior is to not support malformed + # that means we should consult 'value' for data and 'label' + # in `simple.html`, "label" (incorrectly) uses "content" and "data" uses "label" parsed = metadata_parser.MetadataParser(url=None, html=html) + self.assertEqual( + parsed.parsed_result.metadata["twitter"]["data"], + "meta.name=twitter:data||value", + ) + self.assertNotIn("label", parsed.parsed_result.metadata["twitter"]) + self.assertNotIn("invalid", parsed.parsed_result.metadata["twitter"]) - # this is just a property and should be the same object - self.assertIs(parsed.metadata, parsed.parsed_result.metadata) + # now with `support_malformed` support we will load the label! + parsed2 = metadata_parser.MetadataParser( + url=None, html=html, support_malformed=True + ) + self.assertEqual( + parsed2.parsed_result.metadata["twitter"]["data"], + "meta.name=twitter:data||value", + ) + self.assertEqual( + parsed2.parsed_result.metadata["twitter"]["label"], + "meta.name=twitter:label||content", + ) + self.assertEqual( + parsed2.parsed_result.metadata["twitter"]["invalid"], + "meta.name=twitter:invalid", + ) - # we should be tracking the verison now - self.assertIn("_v", parsed.metadata) + # try it with dupes... + html_dupes = self._MakeOne("duplicates.html") + parsed_dupe = metadata_parser.MetadataParser(url=None, html=html_dupes) + # two items for each of data/label, but label is empty strings + self.assertEqual( + parsed_dupe.parsed_result.metadata["twitter"]["data"], + ["meta.name=twitter:data||value,1", "meta.name=twitter:data||value,2"], + ) + self.assertNotIn("label", parsed.parsed_result.metadata["twitter"]) - # it should be the same version - self.assertEqual(parsed.metadata_version, metadata_parser.ParsedResult._version) + # everyone is happy when metadata is malformed! + parsed_dupe = metadata_parser.MetadataParser( + url=None, html=html_dupes, support_malformed=True + ) self.assertEqual( - parsed.parsed_result.metadata_version, metadata_parser.ParsedResult._version + parsed_dupe.parsed_result.metadata["twitter"]["data"], + ["meta.name=twitter:data||value,1", "meta.name=twitter:data||value,2"], ) + self.assertEqual( + parsed_dupe.parsed_result.metadata["twitter"]["label"], + [ + "meta.name=twitter:label||content,1", + "meta.name=twitter:label||content,2", + ], + ) + + +class TestDocumentParsing_Complex(unittest.TestCase, _TestDocumentParsingCore): + """ + this tests duplicates.html to have certain fields under complex conditions + """ + + def test_og_image(self): + parsed = self._MakeOneParsed() # we have 3 og:image entries in this file _computed_link = parsed.get_metadata_link("image", strategy=["og"]) assert _computed_link == "https://www.example.com/meta/property=og:image" - _all_og_images = parsed.get_metadatas("og:image") + _all_og_images = parsed.parsed_result.get_metadatas("og:image") assert _all_og_images is not None - assert len(_all_og_images) == 3 - assert "https://www.example.com/meta/property=og:image" in _all_og_images + assert isinstance(_all_og_images, dict) + assert "meta" in _all_og_images + + all_og_images = _all_og_images["meta"] + + assert len(all_og_images) == 3 + assert "https://www.example.com/meta/property=og:image" in all_og_images # bs4 cleans up the ampersand internally into an entity, but prints it deserialized by default assert ( "https://www.example.com/meta?property=og:image&duplicate=1" - in _all_og_images + in all_og_images ) assert ( "https://www.example.com/meta?property=og:image&duplicate=2" - in _all_og_images + in all_og_images ) + def test__citation_authors(self): + parsed = self._MakeOneParsed() + # ----- # this is a duplicate element and should be stored in the metadata dict as a list - _citation_authors = [ + citation_authors = [ "citation_author:1", "citation_author:2", "citation_author:3", ] # these should be lists - self.assertEqual(parsed.metadata["meta"]["citation_author"], _citation_authors) self.assertEqual( - parsed.parsed_result.get_metadatas("citation_author", ["meta"]), - _citation_authors, + parsed.parsed_result.metadata["meta"]["citation_author"], citation_authors ) + self.assertEqual( - parsed.get_metadatas("citation_author", ["meta"]), _citation_authors + parsed.parsed_result.get_metadatas("citation_author", ["meta"])["meta"], + citation_authors, ) + # this is a string self.assertEqual( - parsed.parsed_result.get_metadata("citation_author", ["meta"]), - _citation_authors[0], - ) - self.assertEqual( - parsed.get_metadata("citation_author", ["meta"]), _citation_authors[0] + parsed.parsed_result.get_metadatas("citation_author", ["meta"])["meta"][0], + citation_authors[0], ) - _meta_authors = ["meta.author:1", "meta.author:2"] + def test__meta_authors(self): + parsed = self._MakeOneParsed() + + meta_authors = ["meta.author:1", "meta.author:2"] + # these should be lists - self.assertEqual(parsed.metadata["meta"]["author"], _meta_authors) + self.assertEqual(parsed.parsed_result.metadata["meta"]["author"], meta_authors) self.assertEqual( - parsed.parsed_result.get_metadatas("author", ["meta"]), _meta_authors + parsed.parsed_result.get_metadatas("author", ["meta"])["meta"], meta_authors ) - self.assertEqual(parsed.get_metadatas("author", ["meta"]), _meta_authors) # this is a string self.assertEqual( - parsed.parsed_result.get_metadata("author", ["meta"]), _meta_authors[0] + parsed.parsed_result.get_metadatas("author", ["meta"])["meta"][0], + meta_authors[0], ) - self.assertEqual(parsed.get_metadata("author", ["meta"]), _meta_authors[0]) - _meta_kws = ["meta.keywords:1", "meta.keywords:2"] + def test__meta_keywords(self): + parsed = self._MakeOneParsed() + + meta_kws = ["meta.keywords:1", "meta.keywords:2"] # these should be lists - self.assertEqual(parsed.metadata["meta"]["keywords"], _meta_kws) self.assertEqual( - parsed.parsed_result.get_metadatas("keywords", ["meta"]), _meta_kws + parsed.parsed_result.metadata["meta"]["keywords"], + meta_kws, + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("keywords", ["meta"])["meta"], + meta_kws, ) - self.assertEqual(parsed.get_metadatas("keywords", ["meta"]), _meta_kws) # this is a string self.assertEqual( - parsed.parsed_result.get_metadata("keywords", ["meta"]), _meta_kws[0] + parsed.parsed_result.get_metadatas("keywords", ["meta"])["meta"][0], + meta_kws[0], ) - self.assertEqual(parsed.get_metadata("keywords", ["meta"]), _meta_kws[0]) + def test__meta_description(self): + parsed = self._MakeOneParsed() # ----- # this is a single element and should be stored in the metadata dict as a string - _description = "meta.description" + description = "meta.description" # these should be lists self.assertEqual( - parsed.parsed_result.get_metadatas("description", ["meta"]), [_description] + parsed.parsed_result.get_metadatas("description", ["meta"])["meta"], + [description], ) - self.assertEqual(parsed.get_metadatas("description", ["meta"]), [_description]) # this is a string - self.assertEqual(parsed.metadata["meta"]["description"], _description) self.assertEqual( - parsed.parsed_result.get_metadata("description", ["meta"]), _description + parsed.parsed_result.metadata["meta"]["description"], + description, + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("description", ["meta"])["meta"][0], + description, ) - self.assertEqual(parsed.get_metadata("description", ["meta"]), _description) + def test__dc__basic(self): + parsed = self._MakeOneParsed() # ----- # dc creator has a language variant # 'dc': {'Creator': [{'content': 'Plato'}, # {'content': 'Platon', 'lang': 'fr'}], - self.assertIn("Creator", parsed.metadata["dc"]) - dc_creator = parsed.metadata["dc"]["Creator"] + self.assertIn("Creator", parsed.parsed_result.metadata["dc"]) + dc_creator = parsed.parsed_result.metadata["dc"]["Creator"] # so this should be a list self.assertIs(type(dc_creator), list) # with a length of 2 @@ -669,6 +896,8 @@ def test_complex_html(self): self.assertIn("lang", dc_creator[1]) self.assertEqual(dc_creator[1]["lang"], "fr") + def test__dc__subject(self): + parsed = self._MakeOneParsed() # ----- # dc subject has a scheme variant # 'Subject': [{'content': 'heart attack'}, @@ -690,8 +919,8 @@ def test_complex_html(self): {"content": "Friendship"}, {"content": "158.25", "scheme": "ddc"}, ] - self.assertIn("Subject", parsed.metadata["dc"]) - dc_subject = parsed.metadata["dc"]["Subject"] + self.assertIn("Subject", parsed.parsed_result.metadata["dc"]) + dc_subject = parsed.parsed_result.metadata["dc"]["Subject"] self.assertIs(type(dc_subject), list) self.assertEqual(len(dc_subject), len(dcSubjectsExpected)) for idx, _expected in enumerate(dc_subject): @@ -705,13 +934,20 @@ def test_complex_html(self): for _key in dc_subject[idx].keys(): self.assertEqual(dc_subject[idx][_key], dcSubjectsExpected[idx][_key]) + def test__dc__TestMixedCandidates1(self): + parsed = self._MakeOneParsed() # ----- # dc TestMixedCandidates1 # handle the ordering of results # the raw info tested is the same as the above Subject test... dcTestMixedCandidates1aExpected = {"content": "Friendship"} - self.assertIn("TestMixedCandidates1a", parsed.metadata["dc"]) - dc_mixed_candidates = parsed.metadata["dc"]["TestMixedCandidates1a"] + self.assertIn( + "TestMixedCandidates1a", + parsed.parsed_result.metadata["dc"], + ) + dc_mixed_candidates = parsed.parsed_result.metadata["dc"][ + "TestMixedCandidates1a" + ] self.assertIs(type(dc_mixed_candidates), dict) self.assertEqual( len(dc_mixed_candidates.keys()), len(dcTestMixedCandidates1aExpected.keys()) @@ -722,39 +958,43 @@ def test_complex_html(self): ) for _key in dc_mixed_candidates.keys(): self.assertEqual( - dc_mixed_candidates[_key], dcTestMixedCandidates1aExpected[_key] + dc_mixed_candidates[_key], + dcTestMixedCandidates1aExpected[_key], ) - # but we need to test get_metadata and get_metadatas - with self.assertRaises(ValueError) as cm: - parsed.get_metadata("TestMixedCandidates1a", strategy="dc") + + # test get_metadatas + with self.assertRaises(InvalidStrategy) as cm: + parsed.parsed_result.get_metadatas("TestMixedCandidates1a", strategy="dc") self.assertEqual( - cm.exception.args[0], "If `strategy` is not a `list`, it must be 'all'." + cm.exception.args[0], + 'If `strategy` is not a `list`, it must be "all".', ) self.assertEqual( - parsed.get_metadata("TestMixedCandidates1a", strategy=["dc"]), "Friendship" + parsed.parsed_result.get_metadatas( + "TestMixedCandidates1a", strategy=["dc"] + )["dc"][0], + {"content": "Friendship"}, ) self.assertEqual( - parsed.get_metadatas("TestMixedCandidates1a", strategy=["dc"]), + parsed.parsed_result.get_metadatas( + "TestMixedCandidates1a", strategy=["dc"] + )["dc"], [dcTestMixedCandidates1aExpected], ) self.assertEqual( - parsed.get_metadata( - "TestMixedCandidates1a", strategy=["dc"], encoder=encoder_capitalizer - ), - "FRIENDSHIP", - ) - self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedCandidates1a", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [{"CONTENT": "FRIENDSHIP"}], ) # 1b dcTestMixedCandidates1bExpected = {"content": "158.25", "scheme": "ddc"} - self.assertIn("TestMixedCandidates1b", parsed.metadata["dc"]) - dc_mixed_candidates = parsed.metadata["dc"]["TestMixedCandidates1b"] + self.assertIn("TestMixedCandidates1b", parsed.parsed_result.metadata["dc"]) + dc_mixed_candidates = parsed.parsed_result.metadata["dc"][ + "TestMixedCandidates1b" + ] self.assertIs(type(dc_mixed_candidates), dict) self.assertEqual( len(dc_mixed_candidates.keys()), len(dcTestMixedCandidates1bExpected.keys()) @@ -767,27 +1007,29 @@ def test_complex_html(self): self.assertEqual( dc_mixed_candidates[_key], dcTestMixedCandidates1bExpected[_key] ) - # but we need to test get_metadata and get_metadatas + + # test get_metadatas self.assertEqual( - parsed.get_metadata("TestMixedCandidates1b", strategy=["dc"]), "158.25" + parsed.parsed_result.get_metadatas( + "TestMixedCandidates1b", strategy=["dc"] + )["dc"][0], + {"content": "158.25", "scheme": "ddc"}, ) self.assertEqual( - parsed.get_metadatas("TestMixedCandidates1b", strategy=["dc"]), + parsed.parsed_result.get_metadatas( + "TestMixedCandidates1b", strategy=["dc"] + )["dc"], [dcTestMixedCandidates1bExpected], ) self.assertEqual( - parsed.get_metadata( - "TestMixedCandidates1b", strategy=["dc"], encoder=encoder_capitalizer - ), - "158.25", - ) - self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedCandidates1b", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [{"CONTENT": "158.25", "SCHEME": "DDC"}], ) + def test__dc__TestMixedCandidates2(self): + parsed = self._MakeOneParsed() # ----- # dc TestMixedCandidates2 # handle the ordering of results @@ -796,8 +1038,13 @@ def test_complex_html(self): {"content": "158.25", "scheme": "ddc"}, {"content": "Friendship"}, ] - self.assertIn("TestMixedCandidates2a", parsed.metadata["dc"]) - dc_mixed_candidates = parsed.metadata["dc"]["TestMixedCandidates2a"] + self.assertIn( + "TestMixedCandidates2a", + parsed.parsed_result.metadata["dc"], + ) + dc_mixed_candidates = parsed.parsed_result.metadata["dc"][ + "TestMixedCandidates2a" + ] self.assertIs(type(dc_mixed_candidates), list) self.assertEqual(len(dc_mixed_candidates), len(dcTestMixedCandidates2aExpected)) for idx, _expected in enumerate(dc_mixed_candidates): @@ -815,25 +1062,32 @@ def test_complex_html(self): dc_mixed_candidates[idx][_key], dcTestMixedCandidates2aExpected[idx][_key], ) - # but we need to test get_metadata and get_metadatas + + # test get_metadatas self.assertEqual( - parsed.get_metadata("TestMixedCandidates2a", strategy=["dc"]), "Friendship" + parsed.parsed_result.get_metadatas( + "TestMixedCandidates2a", strategy=["dc"] + )["dc"][0], + {"content": "158.25", "scheme": "ddc"}, ) self.assertEqual( - parsed.get_metadatas("TestMixedCandidates2a", strategy=["dc"]), + parsed.parsed_result.get_metadatas( + "TestMixedCandidates2a", strategy=["dc"] + )["dc"], dcTestMixedCandidates2aExpected, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedCandidates2a", strategy=["dc"], encoder=encoder_capitalizer - ), - "FRIENDSHIP", + )["dc"][0], + {"CONTENT": "158.25", "SCHEME": "DDC"}, + {"CONTENT": "FRIENDSHIP"}, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedCandidates2a", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [{"CONTENT": "158.25", "SCHEME": "DDC"}, {"CONTENT": "FRIENDSHIP"}], ) @@ -842,8 +1096,13 @@ def test_complex_html(self): {"content": "Friendship"}, {"content": "158.25", "scheme": "ddc"}, ] - self.assertIn("TestMixedCandidates2b", parsed.metadata["dc"]) - dc_mixed_candidates = parsed.metadata["dc"]["TestMixedCandidates2b"] + self.assertIn( + "TestMixedCandidates2b", + parsed.parsed_result.metadata["dc"], + ) + dc_mixed_candidates = parsed.parsed_result.metadata["dc"][ + "TestMixedCandidates2b" + ] self.assertIs(type(dc_mixed_candidates), list) self.assertEqual(len(dc_mixed_candidates), len(dcTestMixedCandidates2bExpected)) for idx, _expected in enumerate(dc_mixed_candidates): @@ -861,144 +1120,178 @@ def test_complex_html(self): dc_mixed_candidates[idx][_key], dcTestMixedCandidates2bExpected[idx][_key], ) - # but we need to test get_metadata and get_metadatas + + # test get_metadatas self.assertEqual( - parsed.get_metadata("TestMixedCandidates2b", strategy=["dc"]), "Friendship" + parsed.parsed_result.get_metadatas( + "TestMixedCandidates2b", strategy=["dc"] + )["dc"][0], + {"content": "Friendship"}, ) self.assertEqual( - parsed.get_metadatas("TestMixedCandidates2b", strategy=["dc"]), + parsed.parsed_result.get_metadatas( + "TestMixedCandidates2b", strategy=["dc"] + )["dc"], dcTestMixedCandidates2bExpected, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedCandidates2b", strategy=["dc"], encoder=encoder_capitalizer - ), - "FRIENDSHIP", + )["dc"][0], + {"CONTENT": "FRIENDSHIP"}, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedCandidates2b", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [{"CONTENT": "FRIENDSHIP"}, {"CONTENT": "158.25", "SCHEME": "DDC"}], ) + def test__TestMixedField0(self): + parsed = self._MakeOneParsed() # ok, mixedfield tests: # TestMixedField0 - self.assertEqual(parsed.get_metadata("TestMixedField0", strategy=["dc"]), None) self.assertEqual( - parsed.get_metadata("TestMixedField0", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField0", strategy=["dc"]), + None, + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("TestMixedField0", strategy=["meta"])[ + "meta" + ][0], "meta:TestMixedField0", ) self.assertEqual( - parsed.get_metadata("TestMixedField0", strategy="all"), - {"meta": "meta:TestMixedField0"}, + parsed.parsed_result.get_metadatas("TestMixedField0", strategy="all"), + {"meta": ["meta:TestMixedField0"]}, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField0", strategy=["dc"], encoder=encoder_capitalizer ), None, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField0", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"][0], "META:TESTMIXEDFIELD0", ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField0", strategy="all", encoder=encoder_capitalizer ), - {"meta": "META:TESTMIXEDFIELD0"}, + {"meta": ["META:TESTMIXEDFIELD0"]}, ) - self.assertEqual(parsed.get_metadatas("TestMixedField0", strategy=["dc"]), None) self.assertEqual( - parsed.get_metadatas("TestMixedField0", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField0", strategy=["dc"]), + None, + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("TestMixedField0", strategy=["meta"])[ + "meta" + ], ["meta:TestMixedField0"], ) self.assertEqual( - parsed.get_metadatas("TestMixedField0", strategy="all"), + parsed.parsed_result.get_metadatas("TestMixedField0", strategy="all"), {"meta": ["meta:TestMixedField0"]}, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField0", strategy=["dc"], encoder=encoder_capitalizer ), None, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField0", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"], ["META:TESTMIXEDFIELD0"], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField0", strategy="all", encoder=encoder_capitalizer ), {"meta": ["META:TESTMIXEDFIELD0"]}, ) + def test__TestMixedField1(self): + parsed = self._MakeOneParsed() # TestMixedField1 self.assertEqual( - parsed.get_metadata("TestMixedField1", strategy=["dc"]), - "dc:TestMixedField1", + parsed.parsed_result.get_metadatas("TestMixedField1", strategy=["dc"])[ + "dc" + ][0], + {"content": "dc:TestMixedField1"}, ) self.assertEqual( - parsed.get_metadata("TestMixedField1", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField1", strategy=["meta"])[ + "meta" + ][0], "meta:TestMixedField1", ) self.assertEqual( - parsed.get_metadata("TestMixedField1", strategy="all"), - {"meta": "meta:TestMixedField1", "dc": "dc:TestMixedField1"}, + parsed.parsed_result.get_metadatas("TestMixedField1", strategy="all"), + { + "dc": [{"content": "dc:TestMixedField1"}], + "meta": ["meta:TestMixedField1"], + }, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField1", strategy=["dc"], encoder=encoder_capitalizer - ), - "DC:TESTMIXEDFIELD1", + )["dc"][0], + {"CONTENT": "DC:TESTMIXEDFIELD1"}, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField1", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"][0], "META:TESTMIXEDFIELD1", ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField1", strategy="all", encoder=encoder_capitalizer ), - {"meta": "META:TESTMIXEDFIELD1", "dc": "DC:TESTMIXEDFIELD1"}, + { + "dc": [{"CONTENT": "DC:TESTMIXEDFIELD1"}], + "meta": ["META:TESTMIXEDFIELD1"], + }, ) self.assertEqual( - parsed.get_metadatas("TestMixedField1", strategy=["dc"]), + parsed.parsed_result.get_metadatas("TestMixedField1", strategy=["dc"])[ + "dc" + ], [{"content": "dc:TestMixedField1"}], ) self.assertEqual( - parsed.get_metadatas("TestMixedField1", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField1", strategy=["meta"])[ + "meta" + ], ["meta:TestMixedField1"], ) self.assertEqual( - parsed.get_metadatas("TestMixedField1", strategy="all"), + parsed.parsed_result.get_metadatas("TestMixedField1", strategy="all"), { "meta": ["meta:TestMixedField1"], "dc": [{"content": "dc:TestMixedField1"}], }, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField1", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [{"CONTENT": "DC:TESTMIXEDFIELD1"}], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField1", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"], ["META:TESTMIXEDFIELD1"], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField1", strategy="all", encoder=encoder_capitalizer ), { @@ -1006,50 +1299,74 @@ def test_complex_html(self): "dc": [{"CONTENT": "DC:TESTMIXEDFIELD1"}], }, ) + + def test__TestMixedField2(self): + parsed = self._MakeOneParsed() # TestMixedField2 self.assertEqual( - parsed.get_metadata("TestMixedField2", strategy=["dc"]), - "dc:TestMixedField2", + parsed.parsed_result.get_metadatas("TestMixedField2", strategy=["dc"])[ + "dc" + ][0], + {"content": "dc:TestMixedField2"}, + # {"con[45 chars]dc"}, ) self.assertEqual( - parsed.get_metadata("TestMixedField2", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField2", strategy=["meta"])[ + "meta" + ][0], "meta:TestMixedField2", ) self.assertEqual( - parsed.get_metadata("TestMixedField2", strategy="all"), - {"meta": "meta:TestMixedField2", "dc": "dc:TestMixedField2"}, + parsed.parsed_result.get_metadatas("TestMixedField2", strategy="all"), + { + "dc": [ + {"content": "dc:TestMixedField2"}, + {"content": "dc:TestMixedField2.ddc", "scheme": "ddc"}, + ], + "meta": ["meta:TestMixedField2"], + }, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField2", strategy=["dc"], encoder=encoder_capitalizer - ), - "DC:TESTMIXEDFIELD2", + )["dc"][0], + {"CONTENT": "DC:TESTMIXEDFIELD2"}, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField2", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"][0], "META:TESTMIXEDFIELD2", ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField2", strategy="all", encoder=encoder_capitalizer ), - {"meta": "META:TESTMIXEDFIELD2", "dc": "DC:TESTMIXEDFIELD2"}, + { + "dc": [ + {"CONTENT": "DC:TESTMIXEDFIELD2"}, + {"CONTENT": "DC:TESTMIXEDFIELD2.DDC", "SCHEME": "DDC"}, + ], + "meta": ["META:TESTMIXEDFIELD2"], + }, ) self.assertEqual( - parsed.get_metadatas("TestMixedField2", strategy=["dc"]), + parsed.parsed_result.get_metadatas("TestMixedField2", strategy=["dc"])[ + "dc" + ], [ {"content": "dc:TestMixedField2"}, {"content": "dc:TestMixedField2.ddc", "scheme": "ddc"}, ], ) self.assertEqual( - parsed.get_metadatas("TestMixedField2", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField2", strategy=["meta"])[ + "meta" + ], ["meta:TestMixedField2"], ) self.assertEqual( - parsed.get_metadatas("TestMixedField2", strategy="all"), + parsed.parsed_result.get_metadatas("TestMixedField2", strategy="all"), { "meta": ["meta:TestMixedField2"], "dc": [ @@ -1059,22 +1376,22 @@ def test_complex_html(self): }, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField2", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [ {"CONTENT": "DC:TESTMIXEDFIELD2"}, {"CONTENT": "DC:TESTMIXEDFIELD2.DDC", "SCHEME": "DDC"}, ], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField2", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"], ["META:TESTMIXEDFIELD2"], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField2", strategy="all", encoder=encoder_capitalizer ), { @@ -1086,66 +1403,82 @@ def test_complex_html(self): }, ) + def test__TestMixedField3(self): + parsed = self._MakeOneParsed() # TestMixedField3 self.assertEqual( - parsed.get_metadata("TestMixedField3", strategy=["dc"]), - "dc:TestMixedField3", + parsed.parsed_result.get_metadatas("TestMixedField3", strategy=["dc"])[ + "dc" + ][0], + {"content": "dc:TestMixedField3"}, ) self.assertEqual( - parsed.get_metadata("TestMixedField3", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField3", strategy=["meta"])[ + "meta" + ][0], "meta:TestMixedField3", ) self.assertEqual( - parsed.get_metadata("TestMixedField3", strategy="all"), - {"meta": "meta:TestMixedField3", "dc": "dc:TestMixedField3"}, + parsed.parsed_result.get_metadatas("TestMixedField3", strategy="all"), + { + "dc": [{"content": "dc:TestMixedField3"}], + "meta": ["meta:TestMixedField3"], + }, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField3", strategy=["dc"], encoder=encoder_capitalizer - ), - "DC:TESTMIXEDFIELD3", + )["dc"][0], + {"CONTENT": "DC:TESTMIXEDFIELD3"}, ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField3", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"][0], "META:TESTMIXEDFIELD3", ) self.assertEqual( - parsed.get_metadata( + parsed.parsed_result.get_metadatas( "TestMixedField3", strategy="all", encoder=encoder_capitalizer ), - {"meta": "META:TESTMIXEDFIELD3", "dc": "DC:TESTMIXEDFIELD3"}, + { + "dc": [{"CONTENT": "DC:TESTMIXEDFIELD3"}], + "meta": ["META:TESTMIXEDFIELD3"], + }, ) self.assertEqual( - parsed.get_metadatas("TestMixedField3", strategy=["dc"]), + parsed.parsed_result.get_metadatas("TestMixedField3", strategy=["dc"])[ + "dc" + ], [{"content": "dc:TestMixedField3"}], ) self.assertEqual( - parsed.get_metadatas("TestMixedField3", strategy=["meta"]), + parsed.parsed_result.get_metadatas("TestMixedField3", strategy=["meta"])[ + "meta" + ], ["meta:TestMixedField3"], ) self.assertEqual( - parsed.get_metadatas("TestMixedField3", strategy="all"), + parsed.parsed_result.get_metadatas("TestMixedField3", strategy="all"), { "meta": ["meta:TestMixedField3"], "dc": [{"content": "dc:TestMixedField3"}], }, ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField3", strategy=["dc"], encoder=encoder_capitalizer - ), + )["dc"], [{"CONTENT": "DC:TESTMIXEDFIELD3"}], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField3", strategy=["meta"], encoder=encoder_capitalizer - ), + )["meta"], ["META:TESTMIXEDFIELD3"], ) self.assertEqual( - parsed.get_metadatas( + parsed.parsed_result.get_metadatas( "TestMixedField3", strategy="all", encoder=encoder_capitalizer ), { @@ -1154,37 +1487,63 @@ def test_complex_html(self): }, ) - self.assertEqual(parsed.get_metadata("news_keywords", strategy=["meta"]), "") self.assertEqual( - parsed.get_metadata("auto-publish", strategy=["meta"]), "timely" + parsed.parsed_result.get_metadatas("news_keywords", strategy=["meta"])[ + "meta" + ][0], + "", ) self.assertEqual( - parsed.get_metadata("article:modified_time", strategy=["meta"]), + parsed.parsed_result.get_metadatas("auto-publish", strategy=["meta"])[ + "meta" + ][0], + "timely", + ) + self.assertEqual( + parsed.parsed_result.get_metadatas( + "article:modified_time", strategy=["meta"] + )["meta"][0], "2017-10-11 01:01:01", ) self.assertEqual( - parsed.get_metadata("msapplication-tap-highlight", strategy=["meta"]), "no" + parsed.parsed_result.get_metadatas( + "msapplication-tap-highlight", strategy=["meta"] + )["meta"][0], + "no", ) self.assertEqual( - parsed.get_metadata("google-site-verification", strategy=["meta"]), + parsed.parsed_result.get_metadatas( + "google-site-verification", strategy=["meta"] + )["meta"][0], "123123123", ) self.assertEqual( - parsed.get_metadata("twitter:data1", strategy=["meta"]), "8 min read" + parsed.parsed_result.get_metadatas("twitter:data1", strategy=["meta"])[ + "meta" + ][0], + "8 min read", + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("google", strategy=["meta"])["meta"][0], + "notranslate", ) self.assertEqual( - parsed.get_metadata("google", strategy=["meta"]), "notranslate" + parsed.parsed_result.get_metadatas("news_keywords", strategy=["meta"])[ + "meta" + ][0], + "", ) - self.assertEqual(parsed.get_metadata("news_keywords", strategy=["meta"]), "") self.assertEqual( - parsed.get_metadatas("viewport", strategy=["meta"]), + parsed.parsed_result.get_metadatas("viewport", strategy=["meta"])["meta"], [ "width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no", "width=device-width, initial-scale=1, maximum-scale=1", ], ) self.assertEqual( - parsed.get_metadata("thumbnail", strategy=["meta"]), + parsed.parsed_result.get_metadatas("thumbnail", strategy=["meta"])["meta"][ + 0 + ], "https://example.com/path/to/image.jpg", ) self.assertEqual( @@ -1192,169 +1551,341 @@ def test_complex_html(self): "https://example.com/path/to/image.jpg", ) self.assertEqual( - parsed.get_metadata("thumbnail-2", strategy=["meta"]), + parsed.parsed_result.get_metadatas("thumbnail-2", strategy=["meta"])[ + "meta" + ][0], "//example.com/path/to/image.jpg", ) self.assertEqual( parsed.get_metadata_link("thumbnail-2", strategy=["meta"]), None ) self.assertEqual( - parsed.get_metadata("thumbnail-3", strategy=["meta"]), "/path/to/image.jpg" + parsed.parsed_result.get_metadatas("thumbnail-3", strategy=["meta"])[ + "meta" + ][0], + "/path/to/image.jpg", ) self.assertEqual( parsed.get_metadata_link("thumbnail-3", strategy=["meta"]), None ) + def test__canonical(self): + parsed = self._MakeOneParsed() # this should error! - with self.assertRaises(ValueError) as cm: - parsed.get_metadatas("canonical", strategy=["all"]) + with self.assertRaises(InvalidStrategy) as cm: + parsed.parsed_result.get_metadatas("canonical", strategy=["all"]) self.assertEqual( - cm.exception.args[0], 'Submit "all" as a `str`, not in a `list`.' + cm.exception.args[0], + 'Submit "all" as a `str`, not in a `list`.', ) # ok, now test the return types # some behavior was changed in the .7 release - # get_metadata - single section + # get_metadatas - single section self.assertEqual( - parsed.get_metadata("canonical", strategy=["page"]), + parsed.parsed_result.get_metadatas("canonical", strategy=["page"])["page"][ + 0 + ], "http://example.com/meta/rel=canonical", ) - self.assertEqual(parsed.get_metadata("canonical", strategy=["meta"]), None) self.assertEqual( - parsed.get_metadata("canonical", strategy="all"), - {"page": "http://example.com/meta/rel=canonical"}, + parsed.parsed_result.get_metadatas("canonical", strategy=["meta"]), + None, + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("canonical", strategy="all"), + {"page": ["http://example.com/meta/rel=canonical"]}, ) # get_metadatas - single section self.assertEqual( - parsed.get_metadatas("canonical", strategy=["page"]), - [ - "http://example.com/meta/rel=canonical", - ], + parsed.parsed_result.get_metadatas("canonical", strategy=["page"])["page"], + ["http://example.com/meta/rel=canonical"], ) - self.assertEqual(parsed.get_metadatas("canonical", strategy=["meta"]), None) self.assertEqual( - parsed.get_metadatas("canonical", strategy="all"), - { - "page": [ - "http://example.com/meta/rel=canonical", - ] - }, + parsed.parsed_result.get_metadatas("canonical", strategy=["meta"]), + None, + ) + self.assertEqual( + parsed.parsed_result.get_metadatas("canonical", strategy="all"), + {"page": ["http://example.com/meta/rel=canonical"]}, ) - # get_metadata - multiple section + def test__description(self): + parsed = self._MakeOneParsed() + # get_metadatas - multiple section self.assertEqual( - parsed.get_metadata("description", strategy=["meta"]), "meta.description" + parsed.parsed_result.get_metadatas("description", strategy=["meta"])[ + "meta" + ][0], + "meta.description", ) self.assertEqual( - parsed.get_metadata("description", strategy="all"), + parsed.parsed_result.get_metadatas("description", strategy="all"), { - "og": "meta.property=og:description", - "meta": "meta.description", - "twitter": "meta.name=twitter:description", + "og": ["meta.property=og:description"], + "meta": ["meta.description"], + "twitter": ["meta.name=twitter:description"], }, ) # get_metadatas - multiple section self.assertEqual( - parsed.get_metadatas("description", strategy=["meta"]), ["meta.description"] + parsed.parsed_result.get_metadatas("description", strategy=["meta"])[ + "meta" + ], + ["meta.description"], ) self.assertEqual( - parsed.get_metadatas("description", strategy="all"), + parsed.parsed_result.get_metadatas("description", strategy="all"), { - "og": [ - "meta.property=og:description", - ], - "meta": [ - "meta.description", - ], + "og": ["meta.property=og:description"], + "meta": ["meta.description"], "twitter": ["meta.name=twitter:description"], }, ) + def test__keywords(self): + parsed = self._MakeOneParsed() # multiple candidates! self.assertEqual( - parsed.get_metadata("keywords", strategy=["meta"]), "meta.keywords:1" + parsed.parsed_result.get_metadatas("keywords", strategy=["meta"])["meta"][ + 0 + ], + "meta.keywords:1", ) self.assertEqual( - parsed.get_metadatas("keywords", strategy=["meta"]), + parsed.parsed_result.get_metadatas("keywords", strategy=["meta"])["meta"], ["meta.keywords:1", "meta.keywords:2"], ) - def test_malformed_twitter(self): + def test_complex_html__encoder(self): """ - this tests simple.html to have certain fields - python -munittest tests.document_parsing.TestDocumentParsing.test_malformed_twitter + pytest tests/test_document_parsing.py::TestDocumentParsing::test_complex_html__encoder """ - html = self._MakeOne("simple.html") - - # the default behavior is to not support malformed - # that means we should consult 'value' for data and 'label' - # in `simple.html`, "label" (incorrectly) uses "content" and "data" uses "label" + html = self._MakeOne("duplicates.html") parsed = metadata_parser.MetadataParser(url=None, html=html) + + # Test a few things with and without encoding + + # Test A1 + self.assertEqual( + parsed.parsed_result.get_metadatas("TestMixedField3", strategy=["meta"]), + {"meta": ["meta:TestMixedField3"]}, + ) self.assertEqual( - parsed.metadata["twitter"]["data"], "meta.name=twitter:data||value" + parsed.parsed_result.get_metadatas( + "TestMixedField3", + strategy=["meta"], + encoder=metadata_parser.utils.encode_ascii, + ), + {"meta": ["meta:TestMixedField3"]}, ) - self.assertNotIn("label", parsed.metadata["twitter"]) - self.assertNotIn("invalid", parsed.metadata["twitter"]) - # now with `support_malformed` support we will load the label! - parsed2 = metadata_parser.MetadataParser( - url=None, html=html, support_malformed=True + # Test A2 - dc only + # without an encoder, DC generates a dict + # with the encoder, DC generates a str + self.assertEqual( + parsed.parsed_result.get_metadatas("TestMixedField3", strategy=["dc"]), + {"dc": [{"content": "dc:TestMixedField3"}]}, ) self.assertEqual( - parsed2.metadata["twitter"]["data"], "meta.name=twitter:data||value" + parsed.parsed_result.get_metadatas( + "TestMixedField3", + strategy=["dc"], + encoder=metadata_parser.utils.encode_ascii, + ), + {"dc": ["dc:TestMixedField3"]}, ) + + # Test A3 - dc within all + # without an encoder, DC generates a dict + # with the encoder, DC generates a str self.assertEqual( - parsed2.metadata["twitter"]["label"], "meta.name=twitter:label||content" + parsed.parsed_result.get_metadatas( + "TestMixedField3", + strategy="all", + encoder=metadata_parser.utils.encode_ascii, + ), + { + "meta": ["meta:TestMixedField3"], + "dc": ["dc:TestMixedField3"], + }, ) + + # Test A3 - dc + meta + # without an encoder, DC generates a dict + # with the encoder, DC generates a str self.assertEqual( - parsed2.metadata["twitter"]["invalid"], "meta.name=twitter:invalid" + parsed.parsed_result.get_metadatas( + "TestMixedField3", + strategy=["dc", "meta"], + encoder=metadata_parser.utils.encode_ascii, + ), + { + "meta": ["meta:TestMixedField3"], + "dc": ["dc:TestMixedField3"], + }, ) - # try it with dupes... - html_dupes = self._MakeOne("duplicates.html") - parsed_dupe = metadata_parser.MetadataParser(url=None, html=html_dupes) - # two items for each of data/label, but label is empty strings + +class TestDocumentParsing_SelectFirstMatch(unittest.TestCase, _TestDocumentParsingCore): + + def _test__shared(self, parsed: MetadataParser): + # but the data is still there... self.assertEqual( - parsed_dupe.metadata["twitter"]["data"], - ["meta.name=twitter:data||value,1", "meta.name=twitter:data||value,2"], + parsed.parsed_result.get_metadatas("keywords.order", strategy="all"), + { + "dc": [ + {"content": "dc:keywords.order::1"}, + {"content": "dc:keywords.order::2"}, + ], + "meta": [ + "meta.keywords.order::1", + "meta.keywords.order::2", + ], + "og": [ + "meta.property=og:keywords.order::1", + "meta.property=og:keywords.order::2", + ], + "twitter": [ + "meta.name=twitter:keywords.order::1", + "meta.name=twitter:keywords.order::2", + ], + }, ) - self.assertNotIn("label", parsed.metadata["twitter"]) - # everyone is happy when metadata is malformed! - parsed_dupe = metadata_parser.MetadataParser( - url=None, html=html_dupes, support_malformed=True + # all gets meta first + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order", strategy="all"), + "meta.keywords.order::1", ) + + # only look in: meta self.assertEqual( - parsed_dupe.metadata["twitter"]["data"], - ["meta.name=twitter:data||value,1", "meta.name=twitter:data||value,2"], + parsed.parsed_result.select_first_match( + "keywords.order", strategy=["meta"] + ), + "meta.keywords.order::1", ) + # only look in: page self.assertEqual( - parsed_dupe.metadata["twitter"]["label"], - [ - "meta.name=twitter:label||content,1", - "meta.name=twitter:label||content,2", - ], + parsed.parsed_result.select_first_match( + "keywords.order", strategy=["page"] + ), + None, + ) + # only look in: dc + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order", strategy=["dc"]), + "dc:keywords.order::1", + ) + # only look in: og + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order", strategy=["og"]), + "meta.property=og:keywords.order::1", + ) + # only look in: twitter + self.assertEqual( + parsed.parsed_result.select_first_match( + "keywords.order", strategy=["twitter"] + ), + "meta.name=twitter:keywords.order::1", ) - def test_charsets(self): - """ - python -m unittest tests.document_parsing.TestDocumentParsing.test_charsets - """ - a_html = self._MakeOne("charset_a.html") - a_parsed = metadata_parser.MetadataParser(url=None, html=a_html) + def test__basic(self): + parsed = self._MakeOneParsed() + self._test__shared(parsed) + + # multiple candidates! self.assertEqual( - a_parsed.metadata["meta"]["content-type"], "text/html; charset=UTF-8" + parsed.parsed_result.get_metadatas("keywords.order"), + { + "dc": [ + {"content": "dc:keywords.order::1"}, + {"content": "dc:keywords.order::2"}, + ], + "meta": [ + "meta.keywords.order::1", + "meta.keywords.order::2", + ], + "og": [ + "meta.property=og:keywords.order::1", + "meta.property=og:keywords.order::2", + ], + "twitter": [ + "meta.name=twitter:keywords.order::1", + "meta.name=twitter:keywords.order::2", + ], + }, ) - b_html = self._MakeOne("charset_b.html") - b_parsed = metadata_parser.MetadataParser(url=None, html=b_html) - self.assertEqual(b_parsed.metadata["meta"]["charset"], "UTF-8") + # default gets meta first + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order"), + "meta.keywords.order::1", + ) - c_html = self._MakeOne("charset_c.html") - c_parsed = metadata_parser.MetadataParser(url=None, html=c_html) - self.assertEqual(c_parsed.metadata["meta"]["charset"], "UTF-8") + def test__all(self): + parsed = self._MakeOneParsed(strategy="all") + self._test__shared(parsed) + + # multiple candidates! + self.assertEqual( + parsed.parsed_result.get_metadatas("keywords.order"), + { + "dc": [ + {"content": "dc:keywords.order::1"}, + {"content": "dc:keywords.order::2"}, + ], + "meta": [ + "meta.keywords.order::1", + "meta.keywords.order::2", + ], + "og": [ + "meta.property=og:keywords.order::1", + "meta.property=og:keywords.order::2", + ], + "twitter": [ + "meta.name=twitter:keywords.order::1", + "meta.name=twitter:keywords.order::2", + ], + }, + ) + + # default gets meta first + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order"), + "meta.keywords.order::1", + ) + + def test__meta(self): + parsed = self._MakeOneParsed(strategy=["meta"]) + self._test__shared(parsed) + + # multiple candidates! + # only shows the meta, because of the init + self.assertEqual( + parsed.parsed_result.get_metadatas("keywords.order"), + {"meta": ["meta.keywords.order::1", "meta.keywords.order::2"]}, + ) + + # default gets meta first + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order"), + "meta.keywords.order::1", + ) + + def test__reversed(self): + parsed = self._MakeOneParsed(strategy=["twitter", "dc", "og", "page", "meta"]) + + self._test__shared(parsed) + + # default gets TWITTER first + self.assertEqual( + parsed.parsed_result.select_first_match("keywords.order"), + "meta.name=twitter:keywords.order::1", + ) class Test_UrlParserCacheable(unittest.TestCase): @@ -1364,7 +1895,7 @@ class Test_UrlParserCacheable(unittest.TestCase): def test__default(self): """MetadataParser()""" - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1377,7 +1908,7 @@ def test__default(self): def test__True(self): """MetadataParser(cached_urlparser=True)""" - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1390,9 +1921,9 @@ def test__True(self): def test__Int_1(self): """MetadataParser(cached_urlparser=1)""" - with warnings.catch_warnings(record=True) as warned: - warnings.simplefilter("always") - errors = _docs_test_parser( + # this should fail + with self.assertRaises(ValueError) as cm: + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1402,50 +1933,26 @@ def test__Int_1(self): ) if errors: raise ValueError(errors) - assert len(warned) >= 1 - _found = False - for w in warned: - if isinstance(w.message, FutureWarning): - if w.message.args[0].startswith( - "Supplying an int to `cached_urlparser` to set maxitems is deprecated." - ): - _found = True - assert ( - "Supply `cached_urlparser=True, cached_urlparser_maxitems=int` instead." - in w.message.args[0] - ) - assert _found is True + assert isinstance(cm.exception, ValueError) + assert cm.exception.args[0] == "`cached_urlparser` must be a callable" def test__Int_0(self): - """MetadataParser(cached_urlparser=1)""" - with warnings.catch_warnings(record=True) as warned: - warnings.simplefilter("always") - errors = _docs_test_parser( - [ - "good-canonical-relative", - "good-canonical-relative_alt", - "good-og-relative_alt", - ], - 0, - ) - if errors: - raise ValueError(errors) - assert len(warned) >= 1 - _found = False - for w in warned: - if isinstance(w.message, FutureWarning): - if w.message.args[0].startswith( - "Supplying `0` to `cached_urlparser` to set maxitems is deprecated." - ): - _found = True - assert ( - "Supply `cached_urlparser=False` instead" - in w.message.args[0] - ) - assert _found is True + """MetadataParser(cached_urlparser=0)""" + parsed, errors = _docs_test_parser( + [ + "good-canonical-relative", + "good-canonical-relative_alt", + "good-og-relative_alt", + ], + 0, + ) + if errors: + raise ValueError(errors) + # equivalent to `cached_urlparser=False` + assert parsed.urlparse is urlparse def test__None(self): - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1457,7 +1964,7 @@ def test__None(self): raise ValueError(errors) def test__False(self): - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1471,7 +1978,7 @@ def test__False(self): def test__CustomParser(self): custom_parser_obj = metadata_parser.UrlParserCacheable() custom_parser = custom_parser_obj.urlparse - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1487,7 +1994,7 @@ class Test_UrlParserCacheable_MaxItems(unittest.TestCase): def test__default(self): """MetadataParser()""" - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1501,7 +2008,7 @@ def test__default(self): def test__True(self): # this should fail - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1516,7 +2023,7 @@ def test__True(self): def test__False(self): # this should fail with self.assertRaises(ValueError) as cm: - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1536,7 +2043,7 @@ def test__False(self): def test__Int_1(self): # this should fail with self.assertRaises(ValueError) as cm: - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1554,9 +2061,10 @@ def test__Int_1(self): ) def test__Int_0(self): + """MetadataParser(cached_urlparser=0)""" # this should fail with self.assertRaises(ValueError) as cm: - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1576,7 +2084,7 @@ def test__Int_0(self): def test__None(self): # this should fail with self.assertRaises(ValueError) as cm: - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt", @@ -1598,7 +2106,7 @@ def test__CustomParser(self): custom_parser_obj = metadata_parser.UrlParserCacheable() custom_parser = custom_parser_obj.urlparse with self.assertRaises(ValueError) as cm: - errors = _docs_test_parser( + parsed, errors = _docs_test_parser( [ "good-canonical-relative", "good-canonical-relative_alt",