Source code for averell.utils

import json
import logging
import os
import urllib.request
from pathlib import Path
from zipfile import ZipFile

import yaml
from tqdm import tqdm

BASE_DIR = Path(os.path.dirname(os.path.realpath(__file__)))
with open(BASE_DIR / 'corpora.yaml', 'r') as config_file:
    CORPORA_SOURCES = yaml.load(config_file, Loader=yaml.FullLoader)

DEFAULT_OUTPUT_FOLDER = Path.cwd() / "corpora"

TEI_NAMESPACE = "{http://www.tei-c.org/ns/1.0}"
XML_NS = "{http://www.w3.org/XML/1998/namespace}"


[docs]def progress_bar(t): """ from https://gist.github.com/leimao/37ff6e990b3226c2c9670a2cd1e4a6f5 Wraps tqdm instance. Don't forget to close() or __exit__() the tqdm instance once you're done (easiest using `with` syntax). """ last_b = [0] def update_to(b=1, bsize=1, tsize=None): """ :param b: int, optional Number of blocks transferred so far [default: 1]. :param bsize: int, optional Size of each block (in tqdm units) [default: 1]. :param tsize: int, optional Total size (in tqdm units). If [default: None] remains unchanged. """ if tsize is not None: t.total = tsize t.update((b - last_b[0]) * bsize) last_b[0] = b return update_to
[docs]def download_corpus(url, filename=None): """Function to download the corpus zip file from external source :param url: string URL of the corpus file :return: string Local filename of the corpus """ if filename is None: filename = url.split('/')[-1] with tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc=filename) as pb: urllib.request.urlretrieve(url, filename, reporthook=progress_bar(pb)) return filename
[docs]def uncompress_corpus(filename, save_dir): """Simple function to uncompress the corpus zip file :param filename: string The file that is going to be uncompressed :param save_dir: string The folder where the corpus is going to be uncompressed :return: string Filename of uncompressed corpus """"" with ZipFile(filename, 'r') as zipObj: zipObj.extractall(save_dir) os.remove(filename) return filename
[docs]def download_corpora(corpus_indices=None, output_folder=DEFAULT_OUTPUT_FOLDER): """Download corpus from a list of sources to a local folder :param corpus_indices: list List with the indexes of CORPORA_SOURCES to choose which corpus is going to be downloaded :param output_folder: string The folder where the corpus is going to be saved """ folder_list = [] if corpus_indices: for index in tqdm(corpus_indices): if index < 0: raise IndexError folder_name = CORPORA_SOURCES[index]["properties"]["slug"] folder_path = Path(output_folder) / folder_name if folder_path.exists(): logging.info(f'Corpus {CORPORA_SOURCES[index]["name"]}' f' already downloaded') continue else: url = CORPORA_SOURCES[index]["properties"]["url"] filename = download_corpus(url, f"{folder_name}.zip") folder_list.append(uncompress_corpus(filename, folder_path)) else: logging.error("No corpus selected. Nothing will be downloaded") return folder_list
[docs]def get_stanza_features(poem_features): """Filter the stanza features of a poem :param poem_features: dict Poem dictionary :return: dict list Stanzas dict list """ manually_checked = poem_features['manually_checked'] author = poem_features['author'] corpus_name = poem_features['corpus'] stanza_list = [] for stanza_index, key in enumerate(poem_features["stanzas"]): stanza_features = poem_features['stanzas'][stanza_index] dic_final = { 'stanza_number': stanza_features['stanza_number'], 'manually_checked': manually_checked, 'poem_title': poem_features['poem_title'], 'author': author, 'stanza_text': stanza_features['stanza_text'], 'stanza_type': stanza_features['stanza_type'], 'corpus': corpus_name, } stanza_list.append(dic_final) return stanza_list
[docs]def get_line_features(features): """Filter the line features of a poem :param features: dict Poem dictionary :return: dict list Lines dict list """ stanza_features = get_stanza_features(features) lines_features = [] for stanza_index, stanza in enumerate(stanza_features): key = features["stanzas"][stanza_index] for line in key["lines"]: line_features = {} if not line.get("words"): line_features.update(line) else: line_features['line_number'] = line['line_number'] line_features['line_text'] = line['line_text'] line_features['metrical_pattern'] = line['metrical_pattern'] lines_features.append({**line_features, **stanza}) return lines_features
[docs]def get_word_features(features): """Filter the word features of a poem :param features: dict Poem dictionary :return: dict list Words dict list """ all_lines_features = get_line_features(features) all_words_features = [] for stanza_index, stanza in enumerate(features["stanzas"]): lines = stanza["lines"] for line in lines: line_number = int(line["line_number"]) for word in line["words"]: word_features = {"word_text": word["word_text"]} line_features = all_lines_features[line_number - 1] word_features.update(line_features) word_features.pop("stanza_text") all_words_features.append(word_features) return all_words_features
[docs]def get_syllable_features(features): """Filter the syllable features of a poem :param features: dict Poem dictionary :return: dict list Syllables dict list """ all_words_features = get_word_features(features) all_syllable_features = [] word_number = 0 for stanza_index, stanza in enumerate(features["stanzas"]): lines = stanza["lines"] for line in lines: line_number = int(line["line_number"]) words = line["words"] for word_index, word in enumerate(words): syllables = word["syllables"] for syllable in syllables: syllable_features = { "syllable": syllable, "line_number": line_number, } word_features = all_words_features[word_number] syllable_features.update(word_features) all_syllable_features.append(syllable_features) word_number += 1 return all_syllable_features
[docs]def filter_features(features, corpus_index, granularity=None): """Select the granularity :param features: dict Poem python dict :param corpus_index: int Corpus index to be filtered :param granularity: string Level to filter the poem (stanza, line, word or syllable) :return: list List of rows with the poem granularity info """ filtered_features = [] granularities_list = CORPORA_SOURCES[corpus_index]["properties"][ "granularity"] if granularity in granularities_list: if granularity == "stanza": filtered_features = get_stanza_features(features) elif granularity == "line": filtered_features = get_line_features(features) elif granularity == "word": filtered_features = get_word_features(features) elif granularity == "syllable": filtered_features = get_syllable_features(features) return filtered_features
[docs]def filter_corpus_features(corpus_features, corpus_id, granularity): """Get the granularity features for each poem in corpus :param corpus_features: list of dicts List of corpus poems python dicts :param corpus_id: int Corpus id to be filtered :param granularity: string Level to filter the poem (stanza, line, word or syllable) :return: list List of rows with the corpus granularity info """ corpus_filtered_features = [] for poem_features in corpus_features: poem_filtered_features = filter_features(poem_features, corpus_id, granularity) corpus_filtered_features.extend(poem_filtered_features) return corpus_filtered_features
[docs]def write_json(poem_dict, filename): """Simple function to save data in json format :param poem_dict: dict Python dict with poem data :param filename: string JSON filename that will be written with the poem data """ if filename.endswith(".json"): filename, *_ = filename.rsplit(".json", 1) with open(f"{filename}.json", 'w', encoding='utf-8') as f: json.dump(poem_dict, f, ensure_ascii=False, indent=4)
[docs]def read_features(corpus_folder): """Read the dictionary of each poem in "corpus_folder" and return the list of python dictionaries :param corpus_folder: Local folder where the corpus is located :return: List of python dictionaries with the poems features """ features_path = Path.cwd() / Path(corpus_folder) / "averell" / "parser" features = [] for json_file in features_path.rglob("*.json"): features.append(json.loads(json_file.read_text())) features = sorted(features, key=lambda i: i['poem_title']) return features
[docs]def pretty_string(text, num_words): """Add a line break every number of words into a text to create multiline cells to use in :py:func:`~averell.utils.get_main_corpora_info` :param text: String to be split :param num_words: Number of words to add a line break after :return: String with line break every number of words entered :rtype: str """ words = text.split() grouped_words = [' '.join(words[i: i + num_words]) for i in range(0, len(words), num_words)] return '\n'.join(grouped_words)
[docs]def get_main_corpora_info(): """Create dict with the main corpora info saved in CORPORA_SOURCES :return: Dictionary with the corpora info to be shown :rtype: dict """ table = [] for corpus_info in CORPORA_SOURCES: corpus_id = CORPORA_SOURCES.index(corpus_info) + 1 props = corpus_info["properties"] corpus_name = pretty_string( f"{corpus_info['name']} ({props['slug']})", 2 ) table.append({ "id": corpus_id, "name": corpus_name, "lang": props["language"], "size": props["size"], "docs": props["doc_quantity"], "words": props["word_quantity"], "granularity": pretty_string('\n'.join(props["granularity"]), 1), "license": pretty_string(props["license"], 1), }) return table
[docs]def get_ids(values): """Transform numeric identifiers, corpora shortcodes (slugs), and two-letter ISO language codes, into their corresponding numeric identifier as per the order in CORPORA_SOURCES. :return: List of indices in CORPORA_SOURCES :rtype: list """ if "all" in values: ids = list(range(len(CORPORA_SOURCES))) else: ids = [] for index, corpus_info in enumerate(CORPORA_SOURCES): corpus_id = index + 1 props = corpus_info["properties"] if (str(corpus_id) in values or props["slug"] in values or props["language"] in values): ids.append(index) return ids