"""
Programmatic access to Glottolog data.
"""
import re
import types
from typing import Union, Optional, TypedDict
import pathlib
import functools
import contextlib
import collections
from collections.abc import Generator
from clldutils.path import walk, git_describe
from clldutils.apilib import API
import pycountry
from termcolor import colored
from tqdm import tqdm
from . import util
from . import languoids as lls
from . import references
from . import config
from .languoids import models
from .iso import get_iso, CheckingISO
__all__ = ['Glottolog']
LanguoidOrCode = Union[str, lls.Languoid]
ISO_CODE_PATTERN = re.compile('[a-z]{3}$')
class TriggerDict(TypedDict):
"""Triggers by bibfile field name."""
inlg: list[util.Trigger]
lgcode: list[util.Trigger]
class Cache(dict):
"""
Since reading languoid metadata from disk is expensive, we provide a mechanism to cache them.
"""
def __init__(self):
super().__init__()
self._lineage = {}
def __bool__(self):
return True
def add(self, directory: pathlib.Path, api: 'Glottolog') -> lls.Languoid:
"""Add a languoid specified by directory in the Glottolog tree to the cache."""
if directory.name not in self:
lang = lls.Languoid.from_dir(directory, nodes=self._lineage, _api=api)
self._lineage[lang.id] = (lang.name, lang.id, lang.level)
self[lang.id] = lang
if lang.iso:
self[lang.iso] = lang
else:
lang = self[directory.name]
return lang
[docs]class Glottolog(API): # pylint: disable=too-many-public-methods
"""
API to access Glottolog data
This class provides (read and write) access to a local copy of the Glottolog data, which can
be obtained as explained in the `README <https://github.com/glottolog/pyglottolog#install>`_
"""
countries = [models.Country(c.alpha_2, c.name) for c in pycountry.countries]
def __init__(self, repos='.', *, cache: bool = False):
"""
:param repos: Path to a copy of `<https://github.com/glottolog/glottolog>`_
:param cache: Indicate whether to cache `Languoid` objects or not. If `True`, the API must \
be used read-only.
"""
API.__init__(self, repos=repos)
#: Absolute path to the copy of the data repository:
self.repos: pathlib.Path = pathlib.Path.cwd() / self.repos
#: Absolute path to the `tree` directory in the repos.
self.tree: pathlib.Path = self.repos / 'languoids' / 'tree'
if not self.tree.exists():
raise ValueError(f'repos dir {self.repos} missing tree dir: {self.tree}')
if not self.repos.joinpath('references').exists():
raise ValueError(f'repos dir {self.repos} missing references subdir')
self.cache: lls.LanguoidMapType = Cache() if cache else None
def __str__(self):
return f'<Glottolog repos {git_describe(self.repos)} at {self.repos}>'
def describe(self) -> str: # pylint: disable=C0116
return git_describe(self.repos) # pragma: no cover
[docs] def references_path(self, *comps: str) -> pathlib.Path:
"""
Path within the `references` directory of the repos.
"""
return self.repos.joinpath('references', *comps)
[docs] def languoids_path(self, *comps) -> pathlib.Path:
"""
Path within the `languoids` directory of the repos.
"""
return self.repos.joinpath('languoids', *comps)
def build_path(self, *comps: str) -> pathlib.Path: # pylint: disable=C0116
build_dir = self.repos.joinpath('build')
if not build_dir.exists():
build_dir.mkdir() # pragma: no cover
return build_dir.joinpath(*comps)
@contextlib.contextmanager
def cache_dir(self, name: str): # pylint: disable=C0116
d = self.build_path(name)
if not d.exists():
d.mkdir()
yield d
def _cfg(self, name, cls=None):
return config.Config.from_ini(
self.path('config', name + '.ini'), object_class=cls or types.SimpleNamespace)
@functools.cached_property
def aes_status(self) -> dict[str, config.AES]:
"""
:rtype: mapping with :class:`config.AES` values.
"""
return self._cfg('aes_status', cls=config.AES)
@functools.cached_property
def aes_sources(self) -> dict[str, config.AESSource]:
"""
:rtype: mapping with :class:`config.AESSource` values
"""
return self._cfg('aes_sources', cls=config.AESSource)
@functools.cached_property
def document_types(self) -> dict[str, config.DocumentType]:
"""
:rtype: mapping with :class:`config.DocumentType` values
"""
return self._cfg('document_types', cls=config.DocumentType)
@functools.cached_property
def med_types(self) -> dict[str, config.MEDType]:
"""
:rtype: mapping with :class:`config.MEDType` values
"""
return self._cfg('med_types', cls=config.MEDType)
@functools.cached_property
def macroareas(self) -> dict[str, config.Macroarea]:
"""
:rtype: mapping with :class:`config.Macroarea` values
"""
return self._cfg('macroareas', cls=config.Macroarea)
@functools.cached_property
def language_types(self) -> dict[str, config.LanguageType]:
"""
:rtype: mapping with :class:`config.LanguageType` values
"""
return self._cfg('language_types', cls=config.LanguageType)
@functools.cached_property
def languoid_levels(self) -> dict[str, config.LanguoidLevel]:
"""
:rtype: mapping with :class:`config.LanguoidLevel` values
"""
return self._cfg('languoid_levels', cls=config.LanguoidLevel)
@functools.cached_property
def editors(self) -> dict[str, config.Editors]:
"""
Metadata about editors of Glottolog
:rtype: mapping with :class:`config.Generic` values
"""
return self._cfg('editors', cls=config.Editors)
@functools.cached_property
def publication(self) -> dict[str, str]:
"""
Metadata about the Glottolog publication
:rtype: mapping with :class:`config.Generic` values
"""
return self._cfg('publication')
@functools.cached_property
def iso(self) -> CheckingISO:
"""
:return: `clldutils.iso_639_3.ISO` instance, fed with the data of the latest \
ISO code table zip found in the `build` directory.
"""
return get_iso(self.build_path())
@functools.cached_property
def _tree_dirs(self):
return list(walk(self.tree, mode='dirs'))
@property
def glottocodes(self) -> models.Glottocodes:
"""
Registry of Glottocodes.
"""
return models.Glottocodes(self.languoids_path('glottocodes.json'))
[docs] def languoid(self, id_: LanguoidOrCode) -> Optional[lls.Languoid]:
"""
Retrieve a languoid specified by language code.
:param id_: Glottocode or ISO code.
"""
if isinstance(id_, lls.Languoid):
return id_
if self.cache and id_ in self.cache:
return self.cache[id_]
if ISO_CODE_PATTERN.match(id_):
for d in self._tree_dirs if self.cache else walk(self.tree, mode='dirs'):
if self.cache:
l_ = self.cache.add(d, self)
else:
l_ = lls.Languoid.from_dir(d, _api=self)
if l_.iso_code == id_:
return l_
else:
for d in self._tree_dirs if self.cache else walk(self.tree, mode='dirs'):
l_ = None
if self.cache:
# If we cache Languoids, we might as well instantiate the ones we traverse:
l_ = self.cache.add(d, self)
if d.name == id_:
if self.cache:
return l_
return lls.Languoid.from_dir(d, _api=self)
return None
[docs] def languoids(
self,
ids: set = None,
maxlevel: Union[int, config.LanguoidLevel, str] = None,
exclude_pseudo_families: bool = False
) -> Generator[lls.Languoid, None, None]:
"""
Yields languoid objects.
:param ids: `set` of Glottocodes to limit the result to. This is useful to increase \
performance, since INI file reading can be skipped for languoids not listed.
:param maxlevel: Numeric maximal nesting depth of languoids, or Languoid.level.
:param exclude_pseudo_families: Flag signaling whether to exclude pseud families, \
i.e. languoids from non-genealogical trees.
"""
is_max_level_int = isinstance(maxlevel, int)
# Non-numeric levels are interpreted as `Languoid.level` descriptors.
if not is_max_level_int:
maxlevel = self.languoid_levels.get(maxlevel or 'dialect')
# Since we traverse the tree topdown, we can cache a mapping of Languoid.id to triples
# (name, id, level) for populating `Languoid.lineage`.
nodes = {}
for d in self._tree_dirs if self.cache else walk(self.tree, mode='dirs'):
if ids is None or d.name in ids:
if self.cache:
lang = self.cache.add(d, self)
else:
lang = lls.Languoid.from_dir(d, nodes=nodes, _api=self)
if (is_max_level_int and len(lang.lineage) <= maxlevel) \
or ((not is_max_level_int) and lang.level <= maxlevel):
if (not exclude_pseudo_families) or not lang.category.startswith('Pseudo'):
yield lang
[docs] def languoids_by_code(self, nodes: Optional[lls.LanguoidMapType] = None) -> lls.LanguoidMapType:
"""
Returns a `dict` mapping the three major language code schemes
(Glottocode, ISO code, and Harald's NOCODE_s) to Languoid objects.
"""
res = {}
for lang in (self.languoids() if nodes is None else nodes.values()):
res[lang.id] = lang
if lang.hid:
res[lang.hid] = lang
if lang.iso:
res[lang.iso] = lang
return res
[docs] def ascii_tree(self, start: LanguoidOrCode, maxlevel=None):
"""
Prints an ASCII representation of the languoid tree starting at `start` to `stdout`.
"""
_ascii_node(
self.languoid(start),
0,
True,
self.languoid_levels.get(maxlevel, maxlevel) if maxlevel else None,
'',
self.languoid_levels)
[docs] def newick_tree(
self,
start: Optional[LanguoidOrCode] = None,
template: str = None,
nodes: Optional[lls.LanguoidMapType] = None,
maxlevel: Union[int, config.LanguoidLevel] = None
) -> str:
"""
Returns the Newick representation of a (set of) Glottolog classification tree(s).
:param start: Root languoid of the tree (or `None` to return the complete classification).
:param template: Python format string accepting the `Languoid` instance as single \
variable named `l`, used to format node labels.
"""
template = template or lls.Languoid._newick_default_template # pylint: disable=W0212
if start:
return self.languoid(start).newick_node(
template=template, nodes=nodes, maxlevel=maxlevel, level=1).newick + ';'
if nodes is None:
nodes = collections.OrderedDict((lang.id, lang) for lang in self.languoids())
trees = []
for lang in nodes.values():
if not lang.lineage and not lang.category.startswith('Pseudo '):
ns = lang.newick_node(
nodes=nodes, template=template, maxlevel=maxlevel, level=1).newick
if lang.level == self.languoid_levels.language:
# An isolate: we wrap it in a pseudo-family with the same name and ID.
fam = lls.Languoid.from_name_id_level(
lang.dir.parent, lang.name, lang.id, 'family', _api=self)
ns = f'({ns}){template.format(l=fam)}:1' # noqa: E741
trees.append(f'{ns};')
return '\n'.join(trees)
@functools.cached_property
def bibfiles(self) -> references.BibFiles:
"""
Access reference data by BibFile.
:rtype: :class:`references.BibFiles`
"""
return references.BibFiles.from_path(self.references_path(), api=self)
def refs_by_languoid(
self,
*bibfiles: Union[references.BibFile, str],
nodes: Optional[lls.LanguoidMapType] = None,
) -> tuple[dict[lls.Glottocode, list[references.Entry]], dict[str, references.Entry]]:
"""
Get references from bibfiles keyed by associated Glottocodes.
"""
if bibfiles:
bibfiles = [
bib if isinstance(bib, references.BibFile) else self.bibfiles[bib]
for bib in bibfiles]
else:
bibfiles = self.bibfiles
all_: dict[str, references.Entry] = {}
languoids_by_code = self.languoids_by_code(
nodes or {lang.id: lang for lang in self.languoids()})
res: dict[lls.Glottocode, list[references.Entry]] = collections.defaultdict(list)
for bib in tqdm(bibfiles):
for entry in bib.iterentries():
all_[entry.id] = entry
for lang in entry.languoids(languoids_by_code)[0]:
res[lang.id].append(entry)
return res, all_
@functools.cached_property
def hhtypes(self): # pylint: disable=C0116
# Note: The file `hhtype.ini` does not exist anymore. This is fixed in HHTypes, when
# calling `config.get_ini`. Only used when compiling monster.bib.
return references.HHTypes(self.references_path('hhtype.ini'))
@functools.cached_property
def triggers(self) -> TriggerDict: # pylint: disable=C0116
res: TriggerDict = {'inlg': [], 'lgcode': []}
for lang in self.languoids():
for type_ in TriggerDict.__annotations__:
if lang.cfg.has_option('triggers', type_):
label = f'{lang.name} [{lang.hid or lang.id}]'
res[type_].extend([util.Trigger(type_, label, text)
for text in lang.cfg.getlist('triggers', type_)])
return res
@functools.cached_property
def macroarea_map(self) -> dict[str, str]:
"""Maps language codes (Glottocode, ISO code, hid) to the first macroarea name."""
res = {}
for lang in self.languoids():
ma = lang.macroareas[0].name if lang.macroareas else ''
res[lang.id] = ma
if lang.iso:
res[lang.iso] = ma
if lang.hid:
res[lang.hid] = ma
return res
@property
def current_editors(self) -> list[config.Editors]: # pylint: disable=C0116
return sorted([e for e in self.editors.values() if e.current], key=lambda e: int(e.ord))
def _ascii_node(n, level, last, maxlevel, prefix, levels): # pylint: disable=R0913,R0917
nlevel = levels.get(n.level)
if maxlevel:
if (isinstance(maxlevel, config.LanguoidLevel) and nlevel > maxlevel) or \
(not isinstance(maxlevel, config.LanguoidLevel) and level > maxlevel):
return
s = '\u2514' if last else '\u251c'
s += '\u2500 '
if not level:
for i, node in enumerate(n.ancestors):
util.sprint('{0}{1}{2} [{3}]', prefix, s if i else '', node.name, node.id)
prefix = ' ' + prefix
nprefix = prefix + (' ' if last else '\u2502 ')
color = 'red' if not level else (
'green' if nlevel == levels.language else (
'blue' if nlevel == levels.dialect else None))
util.sprint(
'{0}{1}{2} [{3}]',
prefix,
s if level else (s if n.ancestors else ''),
colored(n.name, color) if color else n.name,
colored(n.id, color) if color else n.id)
for i, c in enumerate(sorted(n.children, key=lambda nn: nn.name)):
_ascii_node(c, level + 1, i == len(n.children) - 1, maxlevel, nprefix, levels)