"""
Programmatic access to Glottolog data.
"""
import re
import typing
import pathlib
import functools
import contextlib
import collections
import pycldf.util
from csvw import TableGroup, Column
from clldutils.path import walk, git_describe
from clldutils.apilib import API
from clldutils.jsonlib import load
import clldutils.iso_639_3
import pycountry
from termcolor import colored
from tqdm import tqdm
from . import util
from . import languoids as lls
from . import references
from . import config
from .languoids import models
__all__ = ['Glottolog']
ISO_CODE_PATTERN = re.compile('[a-z]{3}$')
class Cache(dict):
def __init__(self):
super().__init__()
self._lineage = {}
def __bool__(self):
return True
def add(self, directory: pathlib.Path, api) -> lls.Languoid:
if directory.name not in self:
lang = lls.Languoid.from_dir(directory, nodes=self._lineage, _api=api)
self._lineage[lang.id] = (lang.name, lang.id, lang.level)
self[lang.id] = lang
if lang.iso:
self[lang.iso] = lang
else:
lang = self[directory.name]
return lang
[docs]class Glottolog(API):
"""
API to access Glottolog data
This class provides (read and write) access to a local copy of the Glottolog data, which can
be obtained as explained in the `README <https://github.com/glottolog/pyglottolog#install>`_
"""
countries = [models.Country(c.alpha_2, c.name) for c in pycountry.countries]
def __init__(self, repos='.', *, cache: bool = False):
"""
:param repos: Path to a copy of `<https://github.com/glottolog/glottolog>`_
:param cache: Indicate whether to cache `Languoid` objects or not. If `True`, the API must \
be used read-only.
"""
API.__init__(self, repos=repos)
#: Absolute path to the copy of the data repository:
self.repos: pathlib.Path = pathlib.Path.cwd() / self.repos
#: Absolute path to the `tree` directory in the repos.
self.tree: pathlib.Path = self.repos / 'languoids' / 'tree'
if not self.tree.exists():
raise ValueError('repos dir %s missing tree dir: %s' % (self.repos, self.tree))
if not self.repos.joinpath('references').exists():
raise ValueError('repos dir %s missing references subdir' % (self.repos,))
self.cache = Cache() if cache else None
def __str__(self):
return '<Glottolog repos {0} at {1}>'.format(git_describe(self.repos), self.repos)
def describe(self) -> str:
return git_describe(self.repos)
[docs] def references_path(self, *comps: str):
"""
Path within the `references` directory of the repos.
"""
return self.repos.joinpath('references', *comps)
[docs] def languoids_path(self, *comps):
"""
Path within the `languoids` directory of the repos.
"""
return self.repos.joinpath('languoids', *comps)
def build_path(self, *comps: str) -> pathlib.Path:
build_dir = self.repos.joinpath('build')
if not build_dir.exists():
build_dir.mkdir() # pragma: no cover
return build_dir.joinpath(*comps)
@contextlib.contextmanager
def cache_dir(self, name):
d = self.build_path(name)
if not d.exists():
d.mkdir()
yield d
def _cfg(self, name, cls=None):
return config.Config.from_ini(
self.path('config', name + '.ini'), object_class=cls or config.Generic)
@functools.cached_property
def aes_status(self) -> typing.Dict[str, config.AES]:
"""
:rtype: mapping with :class:`config.AES` values.
"""
return self._cfg('aes_status', cls=config.AES)
@functools.cached_property
def aes_sources(self) -> typing.Dict[str, config.AESSource]:
"""
:rtype: mapping with :class:`config.AESSource` values
"""
return self._cfg('aes_sources', cls=config.AESSource)
@functools.cached_property
def document_types(self) -> typing.Dict[str, config.DocumentType]:
"""
:rtype: mapping with :class:`config.DocumentType` values
"""
return self._cfg('document_types', cls=config.DocumentType)
@functools.cached_property
def med_types(self) -> typing.Dict[str, config.MEDType]:
"""
:rtype: mapping with :class:`config.MEDType` values
"""
return self._cfg('med_types', cls=config.MEDType)
@functools.cached_property
def macroareas(self) -> typing.Dict[str, config.Macroarea]:
"""
:rtype: mapping with :class:`config.Macroarea` values
"""
return self._cfg('macroareas', cls=config.Macroarea)
@functools.cached_property
def language_types(self) -> typing.Dict[str, config.LanguageType]:
"""
:rtype: mapping with :class:`config.LanguageType` values
"""
return self._cfg('language_types', cls=config.LanguageType)
@functools.cached_property
def languoid_levels(self) -> typing.Dict[str, config.LanguoidLevel]:
"""
:rtype: mapping with :class:`config.LanguoidLevel` values
"""
return self._cfg('languoid_levels', cls=config.LanguoidLevel)
@functools.cached_property
def editors(self) -> typing.Dict[str, config.Generic]:
"""
Metadata about editors of Glottolog
:rtype: mapping with :class:`config.Generic` values
"""
return self._cfg('editors')
@functools.cached_property
def publication(self) -> typing.Dict[str, config.Generic]:
"""
Metadata about the Glottolog publication
:rtype: mapping with :class:`config.Generic` values
"""
return self._cfg('publication')
@functools.cached_property
def iso(self) -> clldutils.iso_639_3.ISO:
"""
:return: `clldutils.iso_639_3.ISO` instance, fed with the data of the latest \
ISO code table zip found in the `build` directory.
"""
return util.get_iso(self.build_path())
@property
def ftsindex(self) -> pathlib.Path:
"""
Directory within `build` where the FullTextSearch index is created.
"""
return self.build_path('whoosh')
@functools.cached_property
def _tree_dirs(self):
return list(walk(self.tree, mode='dirs'))
@property
def glottocodes(self) -> models.Glottocodes:
"""
Registry of Glottocodes.
"""
return models.Glottocodes(self.languoids_path('glottocodes.json'))
[docs] def languoid(self, id_: typing.Union[str, lls.Languoid]) -> lls.Languoid:
"""
Retrieve a languoid specified by language code.
:param id_: Glottocode or ISO code.
"""
if isinstance(id_, lls.Languoid):
return id_
if self.cache and id_ in self.cache:
return self.cache[id_]
if ISO_CODE_PATTERN.match(id_):
for d in self._tree_dirs if self.cache else walk(self.tree, mode='dirs'):
if self.cache:
l_ = self.cache.add(d, self)
else:
l_ = lls.Languoid.from_dir(d, _api=self)
if l_.iso_code == id_:
return l_
else:
for d in self._tree_dirs if self.cache else walk(self.tree, mode='dirs'):
l_ = None
if self.cache:
# If we cache Languoids, we might as well instantiate the ones we traverse:
l_ = self.cache.add(d, self)
if d.name == id_:
if self.cache:
return l_
return lls.Languoid.from_dir(d, _api=self)
[docs] def languoids(
self,
ids: set = None,
maxlevel: typing.Union[int, config.LanguoidLevel, str] = None,
exclude_pseudo_families: bool = False
) -> typing.Generator[lls.Languoid, None, None]:
"""
Yields languoid objects.
:param ids: `set` of Glottocodes to limit the result to. This is useful to increase \
performance, since INI file reading can be skipped for languoids not listed.
:param maxlevel: Numeric maximal nesting depth of languoids, or Languoid.level.
:param exclude_pseudo_families: Flag signaling whether to exclude pseud families, \
i.e. languoids from non-genealogical trees.
"""
is_max_level_int = isinstance(maxlevel, int)
# Non-numeric levels are interpreted as `Languoid.level` descriptors.
if not is_max_level_int:
maxlevel = self.languoid_levels.get(maxlevel or 'dialect')
# Since we traverse the tree topdown, we can cache a mapping of Languoid.id to triples
# (name, id, level) for populating `Languoid.lineage`.
nodes = {}
for d in self._tree_dirs if self.cache else walk(self.tree, mode='dirs'):
if ids is None or d.name in ids:
if self.cache:
lang = self.cache.add(d, self)
else:
lang = lls.Languoid.from_dir(d, nodes=nodes, _api=self)
if (is_max_level_int and len(lang.lineage) <= maxlevel) \
or ((not is_max_level_int) and lang.level <= maxlevel):
if (not exclude_pseudo_families) or not lang.category.startswith('Pseudo'):
yield lang
[docs] def languoids_by_code(self, nodes=None) -> typing.Dict[str, lls.Languoid]:
"""
Returns a `dict` mapping the three major language code schemes
(Glottocode, ISO code, and Harald's NOCODE_s) to Languoid objects.
"""
res = {}
for lang in (self.languoids() if nodes is None else nodes.values()):
res[lang.id] = lang
if lang.hid:
res[lang.hid] = lang
if lang.iso:
res[lang.iso] = lang
return res
[docs] def ascii_tree(self, start: typing.Union[str, lls.Languoid], maxlevel=None):
"""
Prints an ASCII representation of the languoid tree starting at `start` to `stdout`.
"""
_ascii_node(
self.languoid(start),
0,
True,
self.languoid_levels.get(maxlevel, maxlevel) if maxlevel else None,
'',
self.languoid_levels)
[docs] def newick_tree(
self,
start: typing.Union[None, str, lls.Languoid] = None,
template: str = None,
nodes=None,
maxlevel: typing.Union[int, config.LanguoidLevel] = None
) -> str:
"""
Returns the Newick representation of a (set of) Glottolog classification tree(s).
:param start: Root languoid of the tree (or `None` to return the complete classification).
:param template: Python format string accepting the `Languoid` instance as single \
variable named `l`, used to format node labels.
"""
template = template or lls.Languoid._newick_default_template
if start:
return self.languoid(start).newick_node(
template=template, nodes=nodes, maxlevel=maxlevel, level=1).newick + ';'
if nodes is None:
nodes = collections.OrderedDict((lang.id, lang) for lang in self.languoids())
trees = []
for lang in nodes.values():
if not lang.lineage and not lang.category.startswith('Pseudo '):
ns = lang.newick_node(
nodes=nodes, template=template, maxlevel=maxlevel, level=1).newick
if lang.level == self.languoid_levels.language:
# An isolate: we wrap it in a pseudo-family with the same name and ID.
fam = lls.Languoid.from_name_id_level(
lang.dir.parent, lang.name, lang.id, 'family', _api=self)
ns = '({0}){1}:1'.format(ns, template.format(l=fam)) # noqa: E741
trees.append('{0};'.format(ns))
return '\n'.join(trees)
@functools.cached_property
def bibfiles(self) -> references.BibFiles:
"""
Access reference data by BibFile.
:rtype: :class:`references.BibFiles`
"""
return references.BibFiles.from_path(self.references_path(), api=self)
def refs_by_languoid(self, *bibfiles, **kw):
nodes = kw.get('nodes')
if bibfiles:
bibfiles = [
bib if isinstance(bib, references.BibFile) else self.bibfiles[bib]
for bib in bibfiles]
else:
bibfiles = self.bibfiles
all_ = {}
languoids_by_code = self.languoids_by_code(
nodes or {lang.id: lang for lang in self.languoids()})
res = collections.defaultdict(list)
for bib in tqdm(bibfiles):
for entry in bib.iterentries():
all_[entry.id] = entry
for lang in entry.languoids(languoids_by_code)[0]:
res[lang.id].append(entry)
return res, all_
@functools.cached_property
def hhtypes(self):
# Note: The file `hhtype.ini` does not exist anymore. This is fixed in HHTypes, when
# calling `config.get_ini`. Only used when compiling monster.bib.
return references.HHTypes(self.references_path('hhtype.ini'))
@functools.cached_property
def triggers(self):
res = {'inlg': [], 'lgcode': []}
for lang in self.languoids():
for type_ in res:
if lang.cfg.has_option('triggers', type_):
label = '%s [%s]' % (lang.name, lang.hid or lang.id)
res[type_].extend([util.Trigger(type_, label, text)
for text in lang.cfg.getlist('triggers', type_)])
return res
@functools.cached_property
def macroarea_map(self):
res = {}
for lang in self.languoids():
ma = lang.macroareas[0].name if lang.macroareas else ''
res[lang.id] = ma
if lang.iso:
res[lang.iso] = ma
if lang.hid:
res[lang.hid] = ma
return res
@property
def current_editors(self):
return sorted([e for e in self.editors.values() if e.current], key=lambda e: int(e.ord))
def write_languoids_table(self, outdir, version=None):
version = version or self.describe()
out = outdir / 'glottolog-languoids-{0}.csv'.format(version)
md = outdir / (out.name + '-metadata.json')
tg = TableGroup.fromvalue({
"@context": "http://www.w3.org/ns/csvw",
"dc:version": version,
"dc:bibliographicCitation":
"{0}. "
"{1} [Data set]. "
"Zenodo. https://doi.org/{2}".format(
' & '.join([e.name for e in self.current_editors]),
self.publication.zenodo.title_format.format('(Version {0})'.format(version)),
self.publication.zenodo.doi,
),
"tables": [load(pycldf.util.pkg_path('components', 'LanguageTable-metadata.json'))],
})
tg.tables[0].url = out.name
for col in [
dict(name='LL_Code'),
dict(name='Classification', separator='/'),
dict(name='Family_Glottocode'),
dict(name='Family_Name'),
dict(name='Language_Glottocode'),
dict(name='Language_Name'),
dict(name='Level', datatype=dict(base='string', format='family|language|dialect')),
dict(name='Status'),
]:
tg.tables[0].tableSchema.columns.append(Column.fromvalue(col))
langs = []
for lang in self.languoids():
lid, lname = None, None
if lang.level == self.languoid_levels.language:
lid, lname = lang.id, lang.name
elif lang.level == self.languoid_levels.dialect:
for lname, lid, level in reversed(lang.lineage):
if level == self.languoid_levels.language:
break
else: # pragma: no cover
raise ValueError
langs.append(dict(
ID=lang.id,
Name=lang.name,
Macroarea=lang.macroareas[0].name if lang.macroareas else None,
Latitude=lang.latitude,
Longitude=lang.longitude,
Glottocode=lang.id,
ISO639P3code=lang.iso,
LL_Code=lang.identifier.get('multitree'),
Classification=[c[1] for c in lang.lineage],
Language_Glottocode=lid,
Language_Name=lname,
Family_Name=lang.lineage[0][0] if lang.lineage else None,
Family_Glottocode=lang.lineage[0][1] if lang.lineage else None,
Level=lang.level.name,
Status=lang.endangerment.status.name if lang.endangerment else None,
))
tg.to_file(md)
tg.tables[0].write(langs, fname=out)
return md, out
def _ascii_node(n, level, last, maxlevel, prefix, levels):
nlevel = levels.get(n.level)
if maxlevel:
if (isinstance(maxlevel, config.LanguoidLevel) and nlevel > maxlevel) or \
(not isinstance(maxlevel, config.LanguoidLevel) and level > maxlevel):
return
s = '\u2514' if last else '\u251c'
s += '\u2500 '
if not level:
for i, node in enumerate(n.ancestors):
util.sprint('{0}{1}{2} [{3}]', prefix, s if i else '', node.name, node.id)
prefix = ' ' + prefix
nprefix = prefix + (' ' if last else '\u2502 ')
color = 'red' if not level else (
'green' if nlevel == levels.language else (
'blue' if nlevel == levels.dialect else None))
util.sprint(
'{0}{1}{2} [{3}]',
prefix,
s if level else (s if n.ancestors else ''),
colored(n.name, color) if color else n.name,
colored(n.id, color) if color else n.id)
for i, c in enumerate(sorted(n.children, key=lambda nn: nn.name)):
_ascii_node(c, level + 1, i == len(n.children) - 1, maxlevel, nprefix, levels)