Source code for pyglottolog.references.bibfiles

# bibfiles.py - ordered collection of bibfiles with load/save api
"""
Functionality to manipulate bibfiles and their entries.
"""
import logging
import re
import math
from typing import Union, TYPE_CHECKING, Optional, Any, Callable
import pathlib
import datetime
import functools
import collections
from collections.abc import Generator
import dataclasses
import unicodedata

from clldutils.path import memorymapped
from clldutils.source import Source
from clldutils.text import split_text
from clldutils.inifile import INI

from . import bibtex
from . import util
from ..config import MEDType
from ..util import PathType
from .bibfiles_db import Database

if TYPE_CHECKING:  # pragma: no cover
    from pyglottolog import Glottolog

__all__ = ['BibFiles', 'BibFile', 'Entry']

BIBFILES = 'bibfiles.sqlite3'
DOCTYPES = {k: k for k in ['grammar',
                           'grammar_sketch',
                           'dictionary',
                           'specific_feature',
                           'phonology',
                           'text',
                           'new_testament',
                           'wordlist',
                           'comparative',
                           'minimal',
                           'socling',
                           'dialectology',
                           'overview',
                           'ethnographic',
                           'bibliographical',
                           'unknown']}

PREF_YEAR_PATTERN = re.compile(r'\[(?P<year>[12][0-9]{3})(-[0-9]+)?]')
YEAR_PATTERN = re.compile(r'(?P<year>[12][0-9]{3})')


[docs]class BibFiles(list): """Ordered collection of `BibFile` objects accessible by filname or index."""
[docs] @classmethod def from_path(cls, path: PathType, api: Optional['Glottolog'] = None) -> 'BibFiles': """BibTeX files from `<path>/bibtex/*.bib` if listed in `<path>/BIBFILES.ini`.""" path = pathlib.Path(path) ini = INI.from_file(path / 'BIBFILES.ini', interpolation=None) return cls(cls._iterbibfiles(ini, path / 'bibtex', api=api))
@staticmethod def _iterbibfiles( ini: INI, bibtex_path: pathlib.Path, api: Optional['Glottolog'] = None, ) -> Generator['BibFile', None, None]: for sec in ini.sections(): if sec.endswith('.bib'): fpath = bibtex_path / sec if not fpath.exists(): # pragma: no cover raise ValueError('invalid bibtex file referenced in BIBFILES.ini') yield BibFile(fname=fpath, api=api, **ini[sec]) def __init__(self, bibfiles): super().__init__(bibfiles) self._map = {b.fname.name: b for b in self}
[docs] def __getitem__(self, index_or_filename: Union[int, str]) -> Union['BibFile', 'Entry']: """Retrieve a bibfile by index or filename or an entry by qualified key. :param index_or_filename: Either an `int` index, or a bibfile name, or a \ provider-qualified BibTeX key in the form `<prov>:<key>`. :return: A `BibFile` instance, or an `Entry` instance. """ if isinstance(index_or_filename, str): if ':' in index_or_filename: stem, key = index_or_filename.split(':', maxsplit=1) return self._map[f'{stem}.bib'][key] if not index_or_filename.endswith('.bib'): index_or_filename += '.bib' return self._map[index_or_filename] return super().__getitem__(index_or_filename)
[docs] def to_sqlite(self, filepath=BIBFILES, verbose=False) -> Database: """Return a database with the bibfiles loaded.""" return Database.from_bibfiles(self, filepath, verbose=verbose)
[docs] def roundtrip_all(self) -> list[None]: """Load and save all bibfiles with the current settings.""" return [b.roundtrip() for b in self]
[docs]@dataclasses.dataclass class BibFile: # pylint: disable=R0902 """ Represents a BibTeX file, storing a provider's bibliography, providing easy access to its records. """ fname: pathlib.Path name: str = None #: Short name of the bibliography title: str = None #: Title of the bibliography description: str = None #: The provenance of the bibliography abbr: str = None encoding: str = 'utf-8' normalize: str = 'NFC' sortkey: str = None priority: int = 0 url: str = None #: URL pointing to the source of the bibliography curation: str = None #: Curation policy for the bibliography at Glottolog api: Any = None def __post_init__(self): self.priority = int(self.priority) self.sortkey = None if self.sortkey is None or self.sortkey.lower() == 'none' \ else self.sortkey @property def id(self) -> str: # pylint: disable=C0116 return self.fname.stem
[docs] def __getitem__(self, item: str) -> 'Entry': """ :param item: BibTeX citation key of an entry :raises KeyError: if no matching `Entry` is contained in the `BibFile` """ if item.startswith(self.id + ':'): item = item.split(':', 1)[1] text = None with memorymapped(self.fname) as string: m = re.search( b'@[A-Za-z]+{' + re.escape(item.encode(self.encoding)) + rb'[\s,]', string) if m: next_ = string.find(b'\n@', m.end()) if next_ >= 0: text = string[m.start():next_] else: text = string[m.start():] if text: for k, (t, f) in bibtex.iterentries_from_text(text, encoding=self.encoding): return Entry(k, t, f, self, self.api) raise KeyError(item)
[docs] def visit(self, visitor: Optional[Callable[['Entry'], bool]] = None): """Visit the entries of the bibfile, possibly manipulating them in place.""" entries = collections.OrderedDict() for entry in self.iterentries(): if visitor is None or visitor(entry) is not True: entries[entry.key] = (entry.type, entry.fields) self.save(entries)
@property def size(self) -> int: """Size of the file in bytes.""" return self.fname.stat().st_size @property def mtime(self) -> datetime.datetime: """Modification time.""" return datetime.datetime.fromtimestamp(self.fname.stat().st_mtime) def iterentries(self) -> Generator['Entry', None, None]: # pylint: disable=C0116 for k, (t, f) in bibtex.iterentries(filename=self.fname, encoding=self.encoding): yield Entry(k, t, f, self, self.api)
[docs] def keys(self) -> list[str]: """List of provider-qualified keys of the bibfile""" return [f'{self.id}:{e.key}' for e in self.iterentries()]
@property def glottolog_ref_id_map(self) -> dict[str, str]: """Maps bibkey to glottolog_ref_id value.""" return { e.key: e.fields['glottolog_ref_id'] for e in self.iterentries() if 'glottolog_ref_id' in e.fields}
[docs] def update(self, fname: PathType, log: Optional[logging.Logger] = None, keep_old=False): """Update the bibfile with the data from fname.""" entries, new = collections.OrderedDict(), 0 if keep_old: for k, (t, f) in bibtex.iterentries(filename=self.fname, encoding=self.encoding): entries[k] = (t, f) ref_id_map = self.glottolog_ref_id_map for key, (type_, fields) in bibtex.iterentries(fname, self.encoding): if key in ref_id_map and 'glottolog_ref_id' not in fields: fields['glottolog_ref_id'] = ref_id_map[key] else: new += 1 entries[key] = (type_, fields) self.save(entries) if log: # pragma: no cover log.info('%s new entries', new)
[docs] def load(self, preserve_order=None): """Return entries as bibkey -> (entrytype, fields) dict.""" if preserve_order is None: preserve_order = self.sortkey is None return bibtex.load(self.fname, preserve_order, encoding=self.encoding)
[docs] def save(self, entries): """Write bibkey -> (entrytype, fields) map to file.""" bibtex.save( entries, filename=self.fname, sortkey=self.sortkey, encoding=self.encoding, normalize=self.normalize)
[docs] def __str__(self): return f'<{self.__class__.__name__} {self.fname.name}>'
[docs] def check(self, log: logging.Logger) -> tuple[int, str]: """Run checks and report the result.""" entries = self.load() # bare BibTeX syntax invalid = bibtex.check(filename=self.fname) # names/macros etc. verdict = f'({invalid} invalid)' if invalid else 'OK' method = log.warning if invalid else log.info method('%s %d %s', self, len(entries), verdict) return len(entries), verdict
def roundtrip(self): # pylint: disable=C0116 print(self) self.save(self.load())
[docs] def show_characters(self, include_plain=False): """Display character-frequencies (excluding printable ASCII).""" with self.fname.open(encoding=self.encoding) as fd: text = fd.read() hist = collections.Counter(text) table = '\n'.join( '%d\t%-9r\t%s\t%s' % (n, c, c, unicodedata.name(c, '')) # pylint: disable=C0209 for c, n in hist.most_common() if include_plain or not 20 <= ord(c) <= 126) print(table)
[docs]@functools.total_ordering @dataclasses.dataclass class Entry: """ Represents an entry in a `BibFile`, i.e. a bibliographical record. .. note:: `Entry` instances are orderable. The ordering is the one used to compute MEDs, i.e. - grammars are "better" than other document types, - more pages is "better" than less, - more recent is "better" than old. .. code-block:: python >>> g = pyglottolog.Glottolog() >>> g.bibfiles['hh:g:MacDonell:Sanskrit'] > g.bibfiles['hh:hv:Weijnen:Nederlandse'] True >>> refs = g.refs_by_languoid(gl.bibfiles['hh']) >>> sorted(refs[0]['stan1295'])[-1].med_type.name 'long grammar' """ key: str type: str #: BibTeX entry type fields: dict #: The metadata of the record bib: BibFile api: Optional['Glottolog'] = None # FIXME: add method to apply triggers! # pylint: disable=fixme lgcode_regex = r'[a-z0-9]{4}[0-9]{4}|[a-z]{3}|NOCODE_[A-Z][^\s\]]+' lgcode_in_brackets_pattern = re.compile(r"\[(" + lgcode_regex + r")]") recomma = re.compile(r"[,/]\s?") lgcode_pattern = re.compile(lgcode_regex + "$") def __eq__(self, other): return self.weight == other.weight def __ne__(self, other): return not self == other def __lt__(self, other): return self.weight < other.weight @property def _defined_doctypes(self): return collections.OrderedDict((hht.id, hht.id) for hht in self.api.hhtypes) \ if self.api else DOCTYPES @functools.cached_property def weight(self) -> tuple[int, int, int, str]: """The weight which determines ordering when computing MEDs.""" doctypes = self._defined_doctypes index = len(doctypes) doctype = None for _doctype in self.doctypes(doctypes)[0]: index = list(doctypes.values()).index(_doctype) doctype = getattr(_doctype, 'id', _doctype) break # the number of pages is divided by number of doctypes times number of described languages pages = int(math.ceil( float(self.pages_int or 0) / # noqa: W504 ((len(self.doctypes(doctypes)[0]) or 1) * # noqa: W504 (len(self.lgcodes(self.fields.get('lgcode', ''))) or 1)))) if doctype == 'grammar' and pages >= 300: index = -1 return -index, pages, self.year_int or 0, self.id @functools.cached_property def med_type(self) -> Optional[MEDType]: """ The entry's type on the MED scale. """ if self.api: doctypes = list(self._defined_doctypes.keys()) index = -self.weight[0] if index == -1: return self.api.med_types.long_grammar if 'dictionary' in doctypes and index < doctypes.index('dictionary'): return self.api.med_types.get(doctypes[index]) if 'wordlist' in doctypes and index < doctypes.index('wordlist'): return self.api.med_types.phonology_or_text return self.api.med_types.wordlist_or_less return None # pragma: no cover @functools.cached_property def year_int(self) -> Optional[int]: """Year as number if possible.""" if self.fields.get('year'): # prefer years in brackets over the first 4-digit number. match = PREF_YEAR_PATTERN.search(self.fields.get('year')) if match: return int(match.group('year')) match = YEAR_PATTERN.search(self.fields.get('year')) if match: return int(match.group('year')) return None @functools.cached_property def pages_int(self) -> Optional[int]: """Number of pages as int.""" if self.fields.get('numberofpages'): try: pages = int(self.fields.get('numberofpages').strip()) if pages < util.MAX_PAGE: return pages except ValueError: pass if self.fields.get('pages'): return util.compute_pages(self.fields['pages'])[2] return None @functools.cached_property def publisher_and_address(self) -> tuple[Optional[str], Optional[str]]: """Publisher and address values.""" p = self.fields.get('publisher') if p and ':' in p: address, publisher = [s.strip() for s in p.split(':', 1)] if (not self.fields.get('address')) or self.fields['address'] == address: return publisher, address return p, self.fields.get('address') def __str__(self): """Return the BibTeX representation of the entry.""" res = f"@{self.type}{{{self.key}" for k, v in bibtex.fieldorder.itersorted(self.fields): res += f",\n {k} = {{{v.strip() if hasattr(v, 'strip') else v}}}" res += '\n}\n' if self.fields else ',\n}\n' return res
[docs] def text(self) -> str: """Return the text linearization of the entry.""" return Source(self.type, self.key, _check_id=False, **self.fields).text()
@property def id(self) -> str: """ The qualified entry ID, including the provider prefix. """ return f'{self.bib.id}:{self.key}'
[docs] @classmethod def lgcodes(cls, string) -> list[str]: """Parse language codes from a string.""" if string is None: return [] codes = cls.lgcode_in_brackets_pattern.findall(string) if not codes: # ... or as comma separated list of identifiers. parts = [p.strip() for p in cls.recomma.split(string)] codes = [p for p in parts if cls.lgcode_pattern.match(p)] if len(codes) != len(parts): codes = [] return codes
[docs] @staticmethod def parse_ca(s: str) -> Optional[str]: """Read a trigger expression form a field value.""" if s: match = re.search('computerized assignment from "(?P<trigger>[^\"]+)"', s) if match: return match.group('trigger') return None
[docs] def languoids(self, langs_by_codes: dict) -> tuple[list, Optional[str]]: """ Expand the language codes mentioned in a reference's "lgcode" field to `Languoid` objects. """ res = [] if 'lgcode' in self.fields: for code in self.lgcodes(self.fields['lgcode']): if code in langs_by_codes: res.append(langs_by_codes[code]) return res, self.parse_ca(self.fields.get('lgcode'))
[docs] def doctypes(self, hhtypes): """Ordered doctypes assigned to this entry. :param hhtypes: `OrderedDict` mapping doctype names to doctypes :return: `list` of values of `hhtypes` which apply to the entry, ordered by occurrence in\ `hhtypes`. """ res = set() if 'hhtype' in self.fields: for ss in split_text(self.fields['hhtype'], separators=',;'): ss = ss.split('(')[0].strip() if ss in hhtypes: res.add(ss) return [v for k, v in hhtypes.items() if k in res], self.parse_ca(self.fields.get('hhtype'))