Source code for htrc.volumes

#!/usr/bin/env python
"""
`htrc.volumes`

Contains functions to retrieve volumes from the HTRC Data API. 

The functions in this package will not operate unless they are 
executed from an HTRC Data Capsule in Secure Mode. The module 
`htrc.mock.volumes` contains Patch objects for testing workflows.
"""
from __future__ import print_function
from future import standard_library

standard_library.install_aliases()

from htrc.models import HtrcPage

import http.client
from io import BytesIO, TextIOWrapper
import json
import os.path
import progressbar
import socket
import ssl
from urllib.parse import urlencode
from zipfile import ZipFile  # used to decompress requested zip archives.
from tqdm import tqdm
from htrc.runningheaders import parse_page_structure
from functools import partial
import pandas as pd
from htrc.util import split_items
import htrc.config
import multiprocessing

import logging
from logging import NullHandler

logging.getLogger(__name__).addHandler(NullHandler())


[docs]def get_volumes(data_api_config: htrc.config.HtrcDataApiConfig, volume_ids, concat=False, mets=False, buffer_size=128): """ Returns volumes from the Data API as a raw zip stream. Parameters: :token: An OAuth2 token for the app. :volume_ids: A list of volume_ids :concat: If True, return a single file per volume. If False, return a single file per page (default). :host: Data API host :port: Data API port """ if not volume_ids: raise ValueError("volume_ids is empty.") url = data_api_config.epr + "volumes" for id in volume_ids: if ("." not in id or " " in id): print("Invalid volume id " + id + ". Please correct this volume id and try again.") data = {'volumeIDs': '|'.join( [id.replace('+', ':').replace('=', '/') for id in volume_ids])} if concat: data['concat'] = 'true' if mets: data['mets'] = 'true' # Authorization headers = {"Authorization": "Bearer " + data_api_config.token, "Content-type": "application/x-www-form-urlencoded"} # Create SSL lookup # TODO: Fix SSL cert verification ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE # Retrieve the volumes httpsConnection = http.client.HTTPSConnection( data_api_config.host, data_api_config.port, context=ctx, key_file=data_api_config.key, cert_file=data_api_config.cert) httpsConnection.request("POST", url, urlencode(data), headers) response = httpsConnection.getresponse() if response.status is 200: body = True data = BytesIO() bytes_downloaded = 0 bar = progressbar.ProgressBar(max_value=progressbar.UnknownLength, widgets=[progressbar.AnimatedMarker(), ' ', progressbar.DataSize(), ' (', progressbar.FileTransferSpeed(), ')']) while body: body = response.read(buffer_size) data.write(body) bytes_downloaded += len(body) bar.update(bytes_downloaded) data = data.getvalue() else: logging.debug("Unable to get volumes") logging.debug("Response Code: {}".format(response.status)) logging.debug("Response: {}".format(response.reason)) raise EnvironmentError("Unable to get volumes.") if httpsConnection is not None: httpsConnection.close() return data
[docs]def get_pages(data_api_config: htrc.config.HtrcDataApiConfig, page_ids, concat=False, mets=False, buffer_size=128): """ Returns a ZIP file containing specfic pages. Parameters: :data_api_config: The configuration data of the DataAPI endpoint. :volume_ids: A list of volume_ids :concat: If True, return a single file per volume. If False, return a single file per page (default). """ if not page_ids: raise ValueError("page_ids is empty.") url = data_api_config.epr + "pages" for id in page_ids: if ("." not in id or " " in id): print("Invalid volume id " + id + ". Please correct this volume id and try again.") data = {'pageIDs': '|'.join( [id.replace('+', ':').replace('=', '/') for id in page_ids])} if concat and mets: print("Cannot set both concat and mets with pages.") elif concat: data['concat'] = 'true' elif mets: data['mets'] = 'true' # Authorization headers = {"Authorization": "Bearer " + data_api_config.token, "Content-type": "application/x-www-form-urlencoded"} # Create SSL lookup # TODO: Fix SSL cert verification ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE # Retrieve the volumes httpsConnection = http.client.HTTPSConnection( data_api_config.host, data_api_config.port, context=ctx, key_file=data_api_config.key, cert_file=data_api_config.cert ) httpsConnection.request("POST", url, urlencode(data), headers) response = httpsConnection.getresponse() if response.status is 200: body = True data = BytesIO() bytes_downloaded = 0 bar = progressbar.ProgressBar(max_value=progressbar.UnknownLength, widgets=[progressbar.AnimatedMarker(), ' ', progressbar.DataSize(), ' (', progressbar.FileTransferSpeed(), ')']) while body: body = response.read(buffer_size) data.write(body) bytes_downloaded += len(body) bar.update(bytes_downloaded) data = data.getvalue() else: logging.debug("Unable to get pages") logging.debug("Response Code: ".format(response.status)) logging.debug("Response: ".format(response.reason)) raise EnvironmentError("Unable to get pages.") if httpsConnection is not None: httpsConnection.close() return data
def get_oauth2_token(username, password): # make sure to set the request content-type as application/x-www-form-urlencoded headers = {"Content-type": "application/x-www-form-urlencoded"} data = {"grant_type": "client_credentials", "client_secret": password, "client_id": username} data = urlencode(data) # create an SSL context ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE # make sure the request method is POST host, port = htrc.config.get_oauth2_host_port() oauth2port = htrc.config.get_oauth2_port() oauth2EPRurl = htrc.config.get_oauth2_url() httpsConnection = http.client.HTTPSConnection(host, oauth2port, context=ctx) httpsConnection.request("POST", oauth2EPRurl + "?" + data, "", headers) response = httpsConnection.getresponse() # if response status is OK if response.status == 200: data = response.read().decode('utf8') jsonData = json.loads(data) logging.info("*** JSON: {}".format(jsonData)) token = jsonData["access_token"] logging.info("*** parsed token: {}".format(token)) else: logging.debug("Unable to get token") logging.debug("Response Code: {}".format(response.status)) logging.debug("Response: {}".format(response.reason)) logging.debug(response.read()) raise EnvironmentError("Unable to get the token.") if httpsConnection is not None: httpsConnection.close() return token def grep_error(file_name, output_dir, pattern, txt_index): na_volume = [] if output_dir.endswith("/"): file_path = output_dir + file_name else: file_path = output_dir + "/" + file_name if os.path.isfile(file_path): for line in open(file_path): if pattern in line: na_volume.append(line.split()[txt_index]) return na_volume def _to_htrc_page(page_file, zip): with TextIOWrapper(BytesIO(zip.read(page_file)), encoding='utf-8') as page: return HtrcPage([line.rstrip() for line in page.readlines()]) def download_volumes(volume_ids, output_dir, concat=False, mets=False, pages=False, remove_headers_footers=False, hf_window_size=6, hf_min_similarity=0.7, skip_removed_hf=False, parallelism=multiprocessing.cpu_count(), batch_size=250, data_api_config=None): if not 0 < parallelism <= multiprocessing.cpu_count(): raise ValueError("Invalid parallelism level specified") remove_hf_fun = partial( _remove_headers_footers_and_save, concat=concat, hf_min_similarity=hf_min_similarity, hf_window_size=hf_window_size, skip_removed_hf=skip_removed_hf, output_dir=output_dir ) volume_ids = list(set(volume_ids)) # ensure unique volume ids num_vols = len(volume_ids) data_api_config = data_api_config or htrc.config.HtrcDataApiConfig() os.makedirs(output_dir, exist_ok=True) if any((data_api_config.token, data_api_config.host, data_api_config.port)) is not None: logging.info("obtained token: %s\n" % data_api_config.token) try: errors = [] rights = [] with tqdm(total=num_vols) as progress, multiprocessing.Pool(processes=parallelism) as pool: for ids in split_items(volume_ids, batch_size): if pages: if concat and mets: raise ValueError("Cannot set both concat and mets with pages.") else: data = get_pages(data_api_config, ids, concat and not remove_headers_footers, mets) else: data = get_volumes(data_api_config, ids, concat and not remove_headers_footers, mets) volumes = [] with ZipFile(BytesIO(data)) as vols_zip: zip_list = vols_zip.namelist() if 'ERROR.err' in zip_list: errors.append(vols_zip.read('ERROR.err').decode('utf-8')) zip_list.remove('ERROR.err') if 'volume-rights.txt' in zip_list: rights_data = vols_zip.read('volume-rights.txt').decode('utf-8') zip_list.remove('volume-rights.txt') if not rights: rights.append(rights_data) else: # due to the format in which 'volume-rights.txt' is created, we have to skip # the first 4 lines which make up the header of the file, to extract only the # actual volume rights data for accumulation rights.append(''.join(rights_data.splitlines(keepends=True)[4:])) zip_volume_paths = [zip_vol_path for zip_vol_path in zip_list if zip_vol_path.endswith('/')] num_vols_in_zip = len(zip_volume_paths) if not remove_headers_footers: vols_zip.extractall(output_dir, members=zip_list) progress.update(num_vols_in_zip) else: for zip_vol_path in zip_volume_paths: sorted_vol_zip_page_paths = sorted(zip_page_path for zip_page_path in zip_list if zip_page_path.startswith(zip_vol_path) and not zip_page_path.endswith('/')) vol_pages = [_to_htrc_page(page_path, vols_zip) for page_path in sorted_vol_zip_page_paths] volumes.append((zip_vol_path, sorted_vol_zip_page_paths, vol_pages)) del data, vols_zip num_missing = batch_size - num_vols_in_zip if num_vols >= batch_size else num_vols - num_vols_in_zip progress.update(num_missing) # update progress bar state to include the missing volumes also # `volumes` will be empty if `remove_headers_footers=False` since the ZIP was extracted # without further processing if volumes: for _ in pool.imap_unordered(remove_hf_fun, volumes): progress.update() na_volumes_all = [] if errors: with open(os.path.join(output_dir, 'ERROR.err'), 'w') as err_file: err_file.write(''.join(errors)) na_volumes_error = grep_error('ERROR.err', output_dir, 'KeyNotFoundException', -1) na_volumes_all.extend(na_volumes_error) if rights: with open(os.path.join(output_dir, 'volume-rights.txt'), 'w') as rights_file: rights_file.write(''.join(rights)) if htrc.config.get_dataapi_access() == "true": na_volumes_rights = grep_error('volume-rights.txt', output_dir, ' 3', 0) na_volumes_all.extend(na_volumes_rights) num_na = len(na_volumes_all) if num_na > 0: with open(os.path.join(output_dir, 'volumes_not_available.txt'), 'w') as volumes_na: volumes_na.write("\n".join(str(item) for item in na_volumes_all)) if num_na < 100: print("\nThe following volume ids are not available. \n Please check volumes_not_available.txt " "for the complete list. ") print('\n'.join(str(item) for item in na_volumes_all)) else: print("\nThere are {:,} unavailable volumes.\n Please check volumes_not_available.txt " "for the " "complete list. \nTo check the validity of volumes in your workset or volume id file go " "to:\n " "https://analytics.hathitrust.org/validateworkset \n or email us at " "htrc-help@hathitrust.org " "for assistance.".format(num_na)) except socket.error: raise RuntimeError("HTRC Data API time out. Check your inode usage if downloading a large workset. " "Contact HTRC for further help.") else: raise RuntimeError("Failed to obtain the JWT token.") def _remove_headers_footers_and_save(vol_data, concat, hf_min_similarity, hf_window_size, skip_removed_hf, output_dir): zip_vol_path, sorted_vol_zip_page_paths, vol_pages = vol_data clean_volid = zip_vol_path[:-1] vol_pages = parse_page_structure(vol_pages, window_size=hf_window_size, min_similarity_ratio=hf_min_similarity) pages_body = (page.body for page in vol_pages) # save the removed headers/footers for user inspection if skip_removed_hf: if concat: with open(os.path.join(output_dir, clean_volid + '.txt'), 'w', encoding='utf-8') as vol_file: vol_file.write('\n'.join(pages_body)) else: vol_path = os.path.join(output_dir, zip_vol_path) os.mkdir(vol_path) for vol_page_path, page_body in zip(sorted_vol_zip_page_paths, pages_body): with open(os.path.join(output_dir, vol_page_path), 'w', encoding='utf-8') as page_file: page_file.write(page_body) else: if concat: with open(os.path.join(output_dir, clean_volid + '.txt'), 'w', encoding='utf-8') as vol_file: vol_file.write('\n'.join(pages_body)) else: vol_path = os.path.join(output_dir, zip_vol_path) os.mkdir(vol_path) for vol_page_path, page_body in zip(sorted_vol_zip_page_paths, pages_body): with open(os.path.join(output_dir, vol_page_path), 'w', encoding='utf-8') as page_file: page_file.write(page_body) removed_hf = [] for vol_page_path, vol_page in zip(sorted_vol_zip_page_paths, vol_pages): if not (vol_page.has_header or vol_page.has_footer): # skip reporting pages that don't have an identified header or footer continue _, page_name = os.path.split(vol_page_path) page_name, _ = os.path.splitext(page_name) removed_hf.append({'page': page_name, 'header': vol_page.header, 'footer': vol_page.footer}) if concat: removed_hf_filename = os.path.join(output_dir, clean_volid + '_removed_hf.csv') else: removed_hf_filename = os.path.join(output_dir, clean_volid, 'removed_hf.csv') pd.DataFrame(removed_hf, columns=['page', 'header', 'footer']).to_csv(removed_hf_filename, index=False) def download(args): # extract files with open(args.file) as IDfile: volumeIDs = [line.strip() for line in IDfile] data_api_config = htrc.config.HtrcDataApiConfig( token=args.token, host=args.datahost, port=args.dataport, epr=args.dataepr, cert=args.datacert, key=args.datakey ) return download_volumes(volumeIDs, args.output, remove_headers_footers=args.remove_headers_footers or args.remove_headers_footers_and_concat, concat=args.concat or args.remove_headers_footers_and_concat, mets=args.mets, pages=args.pages, hf_window_size=args.window_size, hf_min_similarity=args.min_similarity_ratio, parallelism=args.parallelism, batch_size=args.batch_size, skip_removed_hf=args.skip_removed_hf, data_api_config=data_api_config)