# bibfiles.py - ordered collection of bibfiles with load/save api
"""
Functionality to manipulate bibfiles and their entries.
"""
import logging
import re
import math
from typing import Union, TYPE_CHECKING, Optional, Any, Callable
import pathlib
import datetime
import functools
import collections
from collections.abc import Generator
import dataclasses
import unicodedata
from clldutils.path import memorymapped
from clldutils.source import Source
from clldutils.text import split_text
from clldutils.inifile import INI
from . import bibtex
from . import util
from ..config import MEDType
from ..util import PathType
from .bibfiles_db import Database
if TYPE_CHECKING: # pragma: no cover
from pyglottolog import Glottolog
__all__ = ['BibFiles', 'BibFile', 'Entry']
BIBFILES = 'bibfiles.sqlite3'
DOCTYPES = {k: k for k in ['grammar',
'grammar_sketch',
'dictionary',
'specific_feature',
'phonology',
'text',
'new_testament',
'wordlist',
'comparative',
'minimal',
'socling',
'dialectology',
'overview',
'ethnographic',
'bibliographical',
'unknown']}
PREF_YEAR_PATTERN = re.compile(r'\[(?P<year>[12][0-9]{3})(-[0-9]+)?]')
YEAR_PATTERN = re.compile(r'(?P<year>[12][0-9]{3})')
[docs]class BibFiles(list):
"""Ordered collection of `BibFile` objects accessible by filname or index."""
[docs] @classmethod
def from_path(cls, path: PathType, api: Optional['Glottolog'] = None) -> 'BibFiles':
"""BibTeX files from `<path>/bibtex/*.bib` if listed in `<path>/BIBFILES.ini`."""
path = pathlib.Path(path)
ini = INI.from_file(path / 'BIBFILES.ini', interpolation=None)
return cls(cls._iterbibfiles(ini, path / 'bibtex', api=api))
@staticmethod
def _iterbibfiles(
ini: INI,
bibtex_path: pathlib.Path,
api: Optional['Glottolog'] = None,
) -> Generator['BibFile', None, None]:
for sec in ini.sections():
if sec.endswith('.bib'):
fpath = bibtex_path / sec
if not fpath.exists(): # pragma: no cover
raise ValueError('invalid bibtex file referenced in BIBFILES.ini')
yield BibFile(fname=fpath, api=api, **ini[sec])
def __init__(self, bibfiles):
super().__init__(bibfiles)
self._map = {b.fname.name: b for b in self}
[docs] def __getitem__(self, index_or_filename: Union[int, str]) -> Union['BibFile', 'Entry']:
"""Retrieve a bibfile by index or filename or an entry by qualified key.
:param index_or_filename: Either an `int` index, or a bibfile name, or a \
provider-qualified BibTeX key in the form `<prov>:<key>`.
:return: A `BibFile` instance, or an `Entry` instance.
"""
if isinstance(index_or_filename, str):
if ':' in index_or_filename:
stem, key = index_or_filename.split(':', maxsplit=1)
return self._map[f'{stem}.bib'][key]
if not index_or_filename.endswith('.bib'):
index_or_filename += '.bib'
return self._map[index_or_filename]
return super().__getitem__(index_or_filename)
[docs] def to_sqlite(self, filepath=BIBFILES, verbose=False) -> Database:
"""Return a database with the bibfiles loaded."""
return Database.from_bibfiles(self, filepath, verbose=verbose)
[docs] def roundtrip_all(self) -> list[None]:
"""Load and save all bibfiles with the current settings."""
return [b.roundtrip() for b in self]
[docs]@dataclasses.dataclass
class BibFile: # pylint: disable=R0902
"""
Represents a BibTeX file, storing a provider's bibliography, providing easy access to its
records.
"""
fname: pathlib.Path
name: str = None #: Short name of the bibliography
title: str = None #: Title of the bibliography
description: str = None #: The provenance of the bibliography
abbr: str = None
encoding: str = 'utf-8'
normalize: str = 'NFC'
sortkey: str = None
priority: int = 0
url: str = None #: URL pointing to the source of the bibliography
curation: str = None #: Curation policy for the bibliography at Glottolog
api: Any = None
def __post_init__(self):
self.priority = int(self.priority)
self.sortkey = None if self.sortkey is None or self.sortkey.lower() == 'none' \
else self.sortkey
@property
def id(self) -> str: # pylint: disable=C0116
return self.fname.stem
[docs] def __getitem__(self, item: str) -> 'Entry':
"""
:param item: BibTeX citation key of an entry
:raises KeyError: if no matching `Entry` is contained in the `BibFile`
"""
if item.startswith(self.id + ':'):
item = item.split(':', 1)[1]
text = None
with memorymapped(self.fname) as string:
m = re.search(
b'@[A-Za-z]+{' + re.escape(item.encode(self.encoding)) + rb'[\s,]', string)
if m:
next_ = string.find(b'\n@', m.end())
if next_ >= 0:
text = string[m.start():next_]
else:
text = string[m.start():]
if text:
for k, (t, f) in bibtex.iterentries_from_text(text, encoding=self.encoding):
return Entry(k, t, f, self, self.api)
raise KeyError(item)
[docs] def visit(self, visitor: Optional[Callable[['Entry'], bool]] = None):
"""Visit the entries of the bibfile, possibly manipulating them in place."""
entries = collections.OrderedDict()
for entry in self.iterentries():
if visitor is None or visitor(entry) is not True:
entries[entry.key] = (entry.type, entry.fields)
self.save(entries)
@property
def size(self) -> int:
"""Size of the file in bytes."""
return self.fname.stat().st_size
@property
def mtime(self) -> datetime.datetime:
"""Modification time."""
return datetime.datetime.fromtimestamp(self.fname.stat().st_mtime)
def iterentries(self) -> Generator['Entry', None, None]: # pylint: disable=C0116
for k, (t, f) in bibtex.iterentries(filename=self.fname, encoding=self.encoding):
yield Entry(k, t, f, self, self.api)
[docs] def keys(self) -> list[str]:
"""List of provider-qualified keys of the bibfile"""
return [f'{self.id}:{e.key}' for e in self.iterentries()]
@property
def glottolog_ref_id_map(self) -> dict[str, str]:
"""Maps bibkey to glottolog_ref_id value."""
return {
e.key: e.fields['glottolog_ref_id'] for e in self.iterentries()
if 'glottolog_ref_id' in e.fields}
[docs] def update(self, fname: PathType, log: Optional[logging.Logger] = None, keep_old=False):
"""Update the bibfile with the data from fname."""
entries, new = collections.OrderedDict(), 0
if keep_old:
for k, (t, f) in bibtex.iterentries(filename=self.fname, encoding=self.encoding):
entries[k] = (t, f)
ref_id_map = self.glottolog_ref_id_map
for key, (type_, fields) in bibtex.iterentries(fname, self.encoding):
if key in ref_id_map and 'glottolog_ref_id' not in fields:
fields['glottolog_ref_id'] = ref_id_map[key]
else:
new += 1
entries[key] = (type_, fields)
self.save(entries)
if log: # pragma: no cover
log.info('%s new entries', new)
[docs] def load(self, preserve_order=None):
"""Return entries as bibkey -> (entrytype, fields) dict."""
if preserve_order is None:
preserve_order = self.sortkey is None
return bibtex.load(self.fname, preserve_order, encoding=self.encoding)
[docs] def save(self, entries):
"""Write bibkey -> (entrytype, fields) map to file."""
bibtex.save(
entries,
filename=self.fname,
sortkey=self.sortkey,
encoding=self.encoding,
normalize=self.normalize)
[docs] def __str__(self):
return f'<{self.__class__.__name__} {self.fname.name}>'
[docs] def check(self, log: logging.Logger) -> tuple[int, str]:
"""Run checks and report the result."""
entries = self.load() # bare BibTeX syntax
invalid = bibtex.check(filename=self.fname) # names/macros etc.
verdict = f'({invalid} invalid)' if invalid else 'OK'
method = log.warning if invalid else log.info
method('%s %d %s', self, len(entries), verdict)
return len(entries), verdict
def roundtrip(self): # pylint: disable=C0116
print(self)
self.save(self.load())
[docs] def show_characters(self, include_plain=False):
"""Display character-frequencies (excluding printable ASCII)."""
with self.fname.open(encoding=self.encoding) as fd:
text = fd.read()
hist = collections.Counter(text)
table = '\n'.join(
'%d\t%-9r\t%s\t%s' % (n, c, c, unicodedata.name(c, '')) # pylint: disable=C0209
for c, n in hist.most_common()
if include_plain or not 20 <= ord(c) <= 126)
print(table)
[docs]@functools.total_ordering
@dataclasses.dataclass
class Entry:
"""
Represents an entry in a `BibFile`, i.e. a bibliographical record.
.. note::
`Entry` instances are orderable. The ordering is the one used to compute MEDs, i.e.
- grammars are "better" than other document types,
- more pages is "better" than less,
- more recent is "better" than old.
.. code-block:: python
>>> g = pyglottolog.Glottolog()
>>> g.bibfiles['hh:g:MacDonell:Sanskrit'] > g.bibfiles['hh:hv:Weijnen:Nederlandse']
True
>>> refs = g.refs_by_languoid(gl.bibfiles['hh'])
>>> sorted(refs[0]['stan1295'])[-1].med_type.name
'long grammar'
"""
key: str
type: str #: BibTeX entry type
fields: dict #: The metadata of the record
bib: BibFile
api: Optional['Glottolog'] = None
# FIXME: add method to apply triggers! # pylint: disable=fixme
lgcode_regex = r'[a-z0-9]{4}[0-9]{4}|[a-z]{3}|NOCODE_[A-Z][^\s\]]+'
lgcode_in_brackets_pattern = re.compile(r"\[(" + lgcode_regex + r")]")
recomma = re.compile(r"[,/]\s?")
lgcode_pattern = re.compile(lgcode_regex + "$")
def __eq__(self, other):
return self.weight == other.weight
def __ne__(self, other):
return not self == other
def __lt__(self, other):
return self.weight < other.weight
@property
def _defined_doctypes(self):
return collections.OrderedDict((hht.id, hht.id) for hht in self.api.hhtypes) \
if self.api else DOCTYPES
@functools.cached_property
def weight(self) -> tuple[int, int, int, str]:
"""The weight which determines ordering when computing MEDs."""
doctypes = self._defined_doctypes
index = len(doctypes)
doctype = None
for _doctype in self.doctypes(doctypes)[0]:
index = list(doctypes.values()).index(_doctype)
doctype = getattr(_doctype, 'id', _doctype)
break
# the number of pages is divided by number of doctypes times number of described languages
pages = int(math.ceil(
float(self.pages_int or 0) / # noqa: W504
((len(self.doctypes(doctypes)[0]) or 1) * # noqa: W504
(len(self.lgcodes(self.fields.get('lgcode', ''))) or 1))))
if doctype == 'grammar' and pages >= 300:
index = -1
return -index, pages, self.year_int or 0, self.id
@functools.cached_property
def med_type(self) -> Optional[MEDType]:
"""
The entry's type on the MED scale.
"""
if self.api:
doctypes = list(self._defined_doctypes.keys())
index = -self.weight[0]
if index == -1:
return self.api.med_types.long_grammar
if 'dictionary' in doctypes and index < doctypes.index('dictionary'):
return self.api.med_types.get(doctypes[index])
if 'wordlist' in doctypes and index < doctypes.index('wordlist'):
return self.api.med_types.phonology_or_text
return self.api.med_types.wordlist_or_less
return None # pragma: no cover
@functools.cached_property
def year_int(self) -> Optional[int]:
"""Year as number if possible."""
if self.fields.get('year'):
# prefer years in brackets over the first 4-digit number.
match = PREF_YEAR_PATTERN.search(self.fields.get('year'))
if match:
return int(match.group('year'))
match = YEAR_PATTERN.search(self.fields.get('year'))
if match:
return int(match.group('year'))
return None
@functools.cached_property
def pages_int(self) -> Optional[int]:
"""Number of pages as int."""
if self.fields.get('numberofpages'):
try:
pages = int(self.fields.get('numberofpages').strip())
if pages < util.MAX_PAGE:
return pages
except ValueError:
pass
if self.fields.get('pages'):
return util.compute_pages(self.fields['pages'])[2]
return None
@functools.cached_property
def publisher_and_address(self) -> tuple[Optional[str], Optional[str]]:
"""Publisher and address values."""
p = self.fields.get('publisher')
if p and ':' in p:
address, publisher = [s.strip() for s in p.split(':', 1)]
if (not self.fields.get('address')) or self.fields['address'] == address:
return publisher, address
return p, self.fields.get('address')
def __str__(self):
"""Return the BibTeX representation of the entry."""
res = f"@{self.type}{{{self.key}"
for k, v in bibtex.fieldorder.itersorted(self.fields):
res += f",\n {k} = {{{v.strip() if hasattr(v, 'strip') else v}}}"
res += '\n}\n' if self.fields else ',\n}\n'
return res
[docs] def text(self) -> str:
"""Return the text linearization of the entry."""
return Source(self.type, self.key, _check_id=False, **self.fields).text()
@property
def id(self) -> str:
"""
The qualified entry ID, including the provider prefix.
"""
return f'{self.bib.id}:{self.key}'
[docs] @classmethod
def lgcodes(cls, string) -> list[str]:
"""Parse language codes from a string."""
if string is None:
return []
codes = cls.lgcode_in_brackets_pattern.findall(string)
if not codes:
# ... or as comma separated list of identifiers.
parts = [p.strip() for p in cls.recomma.split(string)]
codes = [p for p in parts if cls.lgcode_pattern.match(p)]
if len(codes) != len(parts):
codes = []
return codes
[docs] @staticmethod
def parse_ca(s: str) -> Optional[str]:
"""Read a trigger expression form a field value."""
if s:
match = re.search('computerized assignment from "(?P<trigger>[^\"]+)"', s)
if match:
return match.group('trigger')
return None
[docs] def languoids(self, langs_by_codes: dict) -> tuple[list, Optional[str]]:
"""
Expand the language codes mentioned in a reference's "lgcode" field to `Languoid` objects.
"""
res = []
if 'lgcode' in self.fields:
for code in self.lgcodes(self.fields['lgcode']):
if code in langs_by_codes:
res.append(langs_by_codes[code])
return res, self.parse_ca(self.fields.get('lgcode'))
[docs] def doctypes(self, hhtypes):
"""Ordered doctypes assigned to this entry.
:param hhtypes: `OrderedDict` mapping doctype names to doctypes
:return: `list` of values of `hhtypes` which apply to the entry, ordered by occurrence in\
`hhtypes`.
"""
res = set()
if 'hhtype' in self.fields:
for ss in split_text(self.fields['hhtype'], separators=',;'):
ss = ss.split('(')[0].strip()
if ss in hhtypes:
res.add(ss)
return [v for k, v in hhtypes.items() if k in res], self.parse_ca(self.fields.get('hhtype'))