API Reference

dataverse_utils

Generalized dataverse utilities. Note that import dataverse_utils is the equivalent of import dataverse_utils.dataverse_utils

DvGeneralUploadError

Bases: Exception

Raised on non-200 URL response

Source code in src/dataverse_utils/dataverse_utils.py
class DvGeneralUploadError(Exception):
    '''
    Raised on non-200 URL response
    '''

Md5Error

Bases: Exception

Raised on md5 mismatch

Source code in src/dataverse_utils/dataverse_utils.py
class Md5Error(Exception):
    '''
    Raised on md5 mismatch
    '''

check_lock(dv_url, study, apikey)

Checks study lock status; returns True if locked.

Parameters:
  • dv_url (str) –

    URL of Dataverse installation

  • study

    Persistent ID of study

  • apikey (str) –

    API key for user

Source code in src/dataverse_utils/dataverse_utils.py
def check_lock(dv_url, study, apikey) -> bool:
    '''
    Checks study lock status; returns True if locked.

    Parameters
    ----------
    dv_url : str
        URL of Dataverse installation

    study: str
        Persistent ID of study

    apikey : str
        API key for user
    '''
    dv_url, headers, params = _make_info(dv_url, study, apikey)
    lock_status = requests.get(f'{dv_url}/api/datasets/:persistentId/locks',
                               headers=headers,
                               params=params, timeout=300)
    lock_status.raise_for_status()
    data = lock_status.json().get('data')
    if data:
        LOGGER.warning('Study %s has been locked', study)
        LOGGER.warning('Lock info:\n%s', lock_status.json())
        return True
    return False

dump_tsv(start_dir, filename, in_list=None, **kwargs)

Dumps output of make_tsv manifest to a file.

Parameters:
  • start_dir (str) –

    Path to start directory

  • in_list (list, default: None ) –

    List of files for which to create manifest entries. Will default to recursive directory crawl

  • **kwargs (dict, default: {} ) –

    Other parameters

  • def_tag (str) –

    Default Dataverse tag (eg, Data, Documentation, etc). Separate tags with an easily splitable character: eg. (‘Data, 2016’)

  • inc_header (bool) –

    Include header for tsv.

  • quotype (int) –

    integer value or csv quote type. Acceptable values: * csv.QUOTE_MINIMAL / 0 * csv.QUOTE_ALL / 1 * csv.QUOTE_NONNUMERIC / 2 * csv.QUOTE_NONE / 3

Source code in src/dataverse_utils/dataverse_utils.py
def dump_tsv(start_dir, filename, in_list=None,
             **kwargs):
    '''
    Dumps output of make_tsv manifest to a file.

    Parameters
    ----------
    start_dir : str
        Path to start directory

    in_list : list
        List of files for which to create manifest entries. Will
        default to recursive directory crawl

    **kwargs : dict
        Other parameters

    Other parameters
    ----------------
    def_tag : str, optional, default='Data'
        Default Dataverse tag (eg, Data, Documentation, etc).
        Separate tags with an easily splitable character:
        eg. ('Data, 2016')

    inc_header : bool, optional, default=True
        Include header for tsv.

    quotype : int, optional, default=csv.QUOTE_MINIMAL
        integer value or csv quote type.
        Acceptable values:
        * csv.QUOTE_MINIMAL / 0
        * csv.QUOTE_ALL / 1
        * csv.QUOTE_NONNUMERIC / 2
        * csv.QUOTE_NONE / 3
    '''

    def_tag= kwargs.get('def_tag', 'Data')
    inc_header =kwargs.get('inc_header', True)
    mime = kwargs.get('mime', False)
    path = kwargs.get('path', False)
    quotype = kwargs.get('quotype', csv.QUOTE_MINIMAL)

    dumper = make_tsv(start_dir, in_list, def_tag, inc_header, mime, quotype, path=path)
    with open(filename, 'w', newline='', encoding='utf-8') as tsvfile:
        tsvfile.write(dumper)

file_path(fpath, trunc='')

Create relative file path from full path string

Parameters:
  • fpath (str) –

    File location (ie, complete path)

  • trunc (str, default: '' ) –

    Leftmost portion of path to remove

Notes
>>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp/')
'Data/2011'
>>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp')
'Data/2011'
Source code in src/dataverse_utils/dataverse_utils.py
def file_path(fpath, trunc='') -> str:
    '''
    Create relative file path from full path string

    Parameters
    ----------
    fpath : str
        File location (ie, complete path)

    trunc : str
        Leftmost portion of path to remove

    Notes
    -----
    ```
    >>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp/')
    'Data/2011'
    >>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp')
    'Data/2011'
    ```
    '''
    if trunc and not trunc.endswith(os.sep):
        trunc += os.sep

    path = os.path.dirname(fpath)
    try:
        if fpath.find(trunc) == -1:
            dirlabel = os.path.relpath(os.path.split(path)[0])
        dirlabel = os.path.relpath(path[path.find(trunc)+len(trunc):])

        if dirlabel == '.':
            dirlabel = ''
        return dirlabel

    except ValueError:
        return ''

force_notab_unlock(study, dv_url, fid, apikey, try_uningest=True)

Forcibly unlocks and uningests to prevent tabular file processing. Required if mime and filename spoofing is not sufficient.

Returns 0 if unlocked, file id if locked (and then unlocked).

Parameters:
  • study (str) –

    Persistent indentifer of study

  • dv_url (str) –

    URL to base Dataverse installation

  • fid (str) –

    File ID for file object

  • apikey (str) –

    API key for user

  • try_uningest (bool, default: True ) –

    Try to uningest the file that was locked. Default: True

Source code in src/dataverse_utils/dataverse_utils.py
def force_notab_unlock(study, dv_url, fid, apikey, try_uningest=True) -> int:
    '''
    Forcibly unlocks and uningests
    to prevent tabular file processing. Required if mime and filename
    spoofing is not sufficient.

    Returns 0 if unlocked, file id if locked (and then unlocked).

    Parameters
    ----------
    study : str
        Persistent indentifer of study

    dv_url : str
        URL to base Dataverse installation

    fid : str
        File ID for file object

    apikey : str
        API key for user

    try_uningest : bool
        Try to uningest the file that was locked.
        Default: True
    '''
    dv_url, headers, params = _make_info(dv_url, study, apikey)
    force_unlock = requests.delete(f'{dv_url}/api/datasets/:persistentId/locks',
                                   params=params, headers=headers,
                                   timeout=300)
    LOGGER.warning('Lock removed for %s', study)
    LOGGER.warning('Lock status:\n %s', force_unlock.json())
    if try_uningest:
        uningest_file(dv_url, fid, apikey, study)
        return int(fid)
    return 0

make_tsv(start_dir, in_list=None, def_tag='Data', inc_header=True, mime=False, quotype=csv.QUOTE_MINIMAL, **kwargs)

Recurses the tree for files and produces tsv output with with headers ‘file’, ‘description’, ‘tags’.

The ‘description’ is the filename without an extension.

Returns tsv as string.

Parameters:
  • start_dir (str) –

    Path to start directory

  • in_list (list, default: None ) –

    Input file list. Defaults to recursive walk of current directory.

  • def_tag (str, default: 'Data' ) –

    Default Dataverse tag (eg, Data, Documentation, etc) Separate tags with a comma: eg. (‘Data, 2016’)

  • inc_header (bool, default: True ) –

    Include header row

  • mime (bool, default: False ) –

    Include automatically determined mimetype

  • quotype

    integer value or csv quote type. Default = csv.QUOTE_MINIMAL Acceptable values: csv.QUOTE_MINIMAL / 0 csv.QUOTE_ALL / 1 csv.QUOTE_NONNUMERIC / 2 csv.QUOTE_NONE / 3

  • **kwargs (dict, default: {} ) –

    Other parameters

  • path (bool) –

    If true include a ‘path’ field so that you can type in a custom path instead of actually structuring your data

Source code in src/dataverse_utils/dataverse_utils.py
def make_tsv(start_dir, in_list=None, def_tag='Data',
             inc_header=True,
             mime=False,
             quotype=csv.QUOTE_MINIMAL,
             **kwargs) -> str:
    # pylint: disable=too-many-positional-arguments
    # pylint: disable=too-many-arguments
    '''
    Recurses the tree for files and produces tsv output with
    with headers 'file', 'description', 'tags'.

    The 'description' is the filename without an extension.

    Returns tsv as string.

    Parameters
    ----------
    start_dir : str
        Path to start directory

    in_list : list
        Input file list. Defaults to recursive walk of current directory.

    def_tag : str
        Default Dataverse tag (eg, Data, Documentation, etc)
        Separate tags with a comma:
        eg. ('Data, 2016')

    inc_header : bool
        Include header row

    mime : bool
        Include automatically determined mimetype

    quotype: int
        integer value or csv quote type.
        Default = csv.QUOTE_MINIMAL
        Acceptable values:
        csv.QUOTE_MINIMAL / 0
        csv.QUOTE_ALL / 1
        csv.QUOTE_NONNUMERIC / 2
        csv.QUOTE_NONE / 3

    **kwargs : dict
        Other parameters

    Other parameters
    ----------------
    path : bool
        If true include a 'path' field so that you can type
        in a custom path instead of actually structuring
        your data

    '''
    if start_dir.endswith(os.sep):
        #start_dir += os.sep
        start_dir = start_dir[:-1]
    if not in_list:
        in_list = [f'{x[0]}{os.sep}{y}'
                   for x in os.walk(start_dir)
                   for y in x[2]
                   if not y.startswith('.')]
    if isinstance(in_list, set):
        in_list=list(in_list)
    in_list.sort()
    def_tag = ", ".join([x.strip() for x in def_tag.split(',')])
    headers = ['file', 'description', 'tags']
    if mime:
        headers.append('mimetype')
    if kwargs.get('path'):
        headers.insert(1, 'path')
    outf = io.StringIO(newline='')
    tsv_writer = csv.DictWriter(outf, delimiter='\t',
                                quoting=quotype,
                                fieldnames=headers,
                                extrasaction='ignore')
    if inc_header:
        tsv_writer.writeheader()
    for row in in_list:
        #the columns
        r = {}
        r['file'] = row
        r['description'] = os.path.splitext(os.path.basename(row))[0]
        r['mimetype'] = mimetypes.guess_type(row)[0]
        r['tags'] = def_tag
        r['path'] =  ''
        tsv_writer.writerow(r)
    outf.seek(0)
    outfile = outf.read()
    outf.close()

    return outfile

restrict_file(**kwargs)

Restrict file in Dataverse study.

Parameters:
  • **kwargs (dict, default: {} ) –
  • pid (str) –

    file persistent ID

  • fid (str) –

    file database ID

  • dv ((str, required)) –

    url to base Dataverse installation eg: ‘https://abacus.library.ubc.ca’

  • apikey ((str, required)) –

    API key for user

  • rest (bool) –

    On True, restrict. Default True

Notes

One of pid or fid is required

Source code in src/dataverse_utils/dataverse_utils.py
def restrict_file(**kwargs):
    '''
    Restrict file in Dataverse study.

    Parameters
    ----------
    **kwargs : dict

    Other parameters
    ----------------
    pid : str, optional
        file persistent ID

    fid : str, optional
        file database ID

    dv : str, required
        url to base Dataverse installation
        eg: 'https://abacus.library.ubc.ca'

    apikey : str, required
        API key for user

    rest : bool
        On True, restrict. Default True

    Notes
    --------
    One of `pid` or `fid` is **required**
    '''
    headers = {'X-Dataverse-key': kwargs['apikey']}
    headers.update(dataverse_utils.UAHEADER)
    #Requires a true/false *string* for the API.
    if kwargs.get('rest', True):
        rest = 'true'
    else:
        rest= 'false'
    if kwargs.get('pid'):
        params={'persistentId':kwargs['pid']}
        rest = requests.put(f'{kwargs["dv"]}/api/files/:persistentId/restrict',
                            headers=headers,
                            params=params,
                            data=rest,
                            timeout=300)
    elif kwargs.get('fid'):
        rest = requests.put(f'{kwargs["dv"]}/api/files/{kwargs["fid"]}/restrict',
                            headers=headers, data=rest, timeout=300)
    else:
        LOGGER.error('No file ID/PID supplied for file restriction')
        raise KeyError('One of persistentId (pid) or database ID'
                       '(fid) is required for file restriction')

script_ver_stmt(name)

Returns a formatted version statement for any script

Parameters:
  • name (str) –

    Name of utility to join to create version statement. Normally %prog from argparse.

Source code in src/dataverse_utils/__init__.py
def script_ver_stmt(name:str)->str:
    '''
    Returns a formatted version statement for any script

    Parameters
    ----------
    name : str
        Name of utility to join to create version statement. Normally %prog from argparse.

    '''
    key = pathlib.Path(name).stem
    if not SCRIPT_VERSIONS.get(key):
        return f'dataverse_utils: v{__version__}'

    return (f"{key} v{'.'.join(map(str, SCRIPT_VERSIONS[key]))} / "
            f'dataverse_utils v{__version__}')

uningest_file(dv_url, fid, apikey, study='n/a')

Tries to uningest a file that has been ingested. Requires superuser API key.

Parameters:
  • dv_url (str) –

    URL to base Dataverse installation

  • fid (int or str) –

    File ID of file to uningest

  • apikey (str) –

    API key for superuser

  • study (str, default: 'n/a' ) –

    Optional handle parameter for log messages

Source code in src/dataverse_utils/dataverse_utils.py
def uningest_file(dv_url, fid, apikey, study='n/a'):
    '''
    Tries to uningest a file that has been ingested.
    Requires superuser API key.

    Parameters
    ----------
    dv_url : str
        URL to base Dataverse installation

    fid : int or str
        File ID of file to uningest

    apikey : str
        API key for superuser

    study : str, optional
        Optional handle parameter for log messages
    '''
    dv_url, headers, params = _make_info(dv_url, fid, apikey)
    fid = params['persistentId']
    #TODONE: Awaiting answer from Harvard on how to remove progress bar
    #for uploaded tab files that squeak through.
    #Answer: you can't!
    try:
        uningest = requests.post(f'{dv_url}/api/files/{fid}/uningest',
                                 headers=headers,
                                 timeout=300)
        LOGGER.warning('Ingest halted for file %s for fileID %s', fid, study)
        uningest.raise_for_status()
    except requests.exceptions.HTTPError:
        LOGGER.error('Uningestion error: %s', uningest.reason)
        print(uningest.reason)

upload_file(fpath, hdl, **kwargs)

Uploads file to Dataverse study and sets file metadata and tags.

Parameters:
  • fpath (str) –

    file location (ie, complete path)

  • hdl (str) –

    Dataverse persistent ID for study (handle or DOI)

  • **kwargs (dict, default: {} ) –

    Other parameters

  • dv ((str, required)) –

    URL to base Dataverse installation eg: ‘https://abacus.library.ubc.ca’

  • apikey ((str, required)) –

    API key for user

  • descr (str) –

    File description

  • md5 (str) –

    md5sum for file checking

  • tags (list) –

    list of text file tags. Eg [‘Data’, ‘June 2020’]

  • dirlabel (str) –

    Unix style relative pathname for Dataverse file path: eg: path/to/file/

  • nowait (bool) –

    Force a file unlock and uningest instead of waiting for processing to finish

  • trunc (str) –

    Leftmost portion of path to remove

  • rest (bool) –

    Restrict file. Defaults to false unless True supplied

  • mimetype (str) –

    Mimetype of file. Useful if using File Previewers. Mimetype for zip files (application/zip) will be ignored to circumvent Dataverse’s automatic unzipping function.

  • label (str) –

    If included in kwargs, this value will be used for the label

  • timeout (int) –

    Timeout in seconds

  • override (bool) –

    Ignore NOTAB (ie, NOTAB = [])

Source code in src/dataverse_utils/dataverse_utils.py
def upload_file(fpath, hdl, **kwargs):
    '''
    Uploads file to Dataverse study and sets file metadata and tags.

    Parameters
    ----------
    fpath : str
        file location (ie, complete path)

    hdl : str
        Dataverse persistent ID for study (handle or DOI)

    **kwargs : dict
        Other parameters

    Other parameters
    ----------------
    dv : str, required
        URL to base Dataverse installation
        eg: 'https://abacus.library.ubc.ca'

    apikey : str, required
        API key for user

    descr : str, optional
        File description

    md5 : str, optional
        md5sum for file checking

    tags : list, optional
        list of text file tags. Eg ['Data', 'June 2020']

    dirlabel : str, optional
        Unix style relative pathname for Dataverse
        file path: eg: path/to/file/

    nowait : bool, optional
        Force a file unlock and uningest instead of waiting for processing
        to finish

    trunc : str, optional
        Leftmost portion of path to remove

    rest : bool, optional
        Restrict file. Defaults to false unless True supplied

    mimetype : str, optional
        Mimetype of file. Useful if using File Previewers. Mimetype for zip files
        (application/zip) will be ignored to circumvent Dataverse's automatic
        unzipping function.

    label : str, optional
        If included in kwargs, this value will be used for the label

    timeout : int, optional
        Timeout in seconds

    override : bool, optional
        Ignore NOTAB (ie, NOTAB = [])
    '''
    #Why are SPSS files getting processed anyway?
    #Does SPSS detection happen *after* upload
    #Does the file need to be renamed post hoc?
    #I don't think this can be fixed. Goddamitsomuch.
    dvurl = kwargs['dv'].strip('\\ /')
    if os.path.splitext(fpath)[1].lower() in NOTAB and not kwargs.get('override'):
        file_name_clean = os.path.basename(fpath)
        #file_name = os.path.basename(fpath) + '.NOPROCESS'
        # using .NOPROCESS doesn't seem to work?
        file_name = os.path.basename(fpath) + '.NOPROCESS'
    else:
        file_name = os.path.basename(fpath)
        file_name_clean = file_name
    #My workstation python on Windows produces null for isos for some reason
    if mimetypes.guess_type('test.iso') == (None, None):
        mimetypes.add_type('application/x-iso9660-image', '.iso')
    mime = mimetypes.guess_type(fpath)[0]
    if kwargs.get('mimetype'):
        mime = kwargs['mimetype']
    if file_name.endswith('.NOPROCESS') or mime == 'application/zip':
        mime = 'application/octet-stream'

    #create file metadata in nice, simple, chunks
    dv4_meta = {'label' : kwargs.get('label', file_name_clean),
                'description' : kwargs.get('descr', ''),
                'directoryLabel': kwargs.get('dirlabel', ''),
                'categories': kwargs.get('tags', []),
                'mimetype' : mime}
    fpath = os.path.abspath(fpath)
    fields = {'file': (file_name, open(fpath, 'rb'), mime)}#pylint: disable=consider-using-with
    fields.update({'jsonData' : json.dumps(dv4_meta)})
    multi = MultipartEncoder(fields=fields) # use multipart streaming for large files
    headers = {'X-Dataverse-key' : kwargs.get('apikey'),
               'Content-type' : multi.content_type}
    headers.update(dataverse_utils.UAHEADER)
    params = {'persistentId' : hdl}

    LOGGER.info('Uploading %s to %s', fpath, hdl)
    upload = requests.post(f"{dvurl}/api/datasets/:persistentId/add",
                           params=params, headers=headers, data=multi,
                           timeout=kwargs.get('timeout',1000))
    try:
        print(upload.json())
    except json.decoder.JSONDecodeError:
        #This can happend when Glassfish crashes
        LOGGER.critical(upload.text)
        print(upload.text)
        err = ('It\'s possible Glassfish may have crashed. '
               'Check server logs for anomalies')
        LOGGER.exception(err)
        print(err)
        raise
    #SPSS files still process despite spoof, so there's
    #a forcible unlock check
    fid = upload.json()['data']['files'][0]['dataFile']['id']
    print(f'FID: {fid}')
    if kwargs.get('nowait') and check_lock(dvurl, hdl, kwargs['apikey']):
        force_notab_unlock(hdl, dvurl, fid, kwargs['apikey'])
    else:
        while check_lock(dvurl, hdl, kwargs['apikey']):
            time.sleep(10)

    if upload.status_code != 200:
        LOGGER.critical('Upload failure: %s', (upload.status_code, upload.reason))
        raise DvGeneralUploadError(f'\nReason: {(upload.status_code, upload.reason)}'
                                   f'\n{upload.text}')

    if kwargs.get('md5'):
        if upload.json()['data']['files'][0]['dataFile']['md5'] != kwargs.get('md5'):
            LOGGER.warning('md5sum mismatch on %s', fpath)
            raise Md5Error('md5sum mismatch')

    restrict_file(fid=fid, dv=dvurl, apikey=kwargs.get('apikey'),
                  rest=kwargs.get('rest', False))

upload_from_tsv(fil, hdl, **kwargs)

Utility for bulk uploading. Assumes fil is formatted as tsv with headers ‘file’, ‘description’, ‘tags’.

‘tags’ field will be split on commas.

Parameters:
  • fil

    Open file object or io.IOStream()

  • hdl (str) –

    Dataverse persistent ID for study (handle or DOI)

  • **kwargs (dict, default: {} ) –

    Other parameters

  • trunc (str) –

    Leftmost portion of Dataverse study file path to remove. eg: trunc =’/home/user/’ if the tsv field is ‘/home/user/Data/ASCII’ would set the path for that line of the tsv to ‘Data/ASCII’. Defaults to None.

  • dv ((str, required)) –

    url to base Dataverse installation eg: ‘https://abacus.library.ubc.ca’

  • apikey ((str, required)) –

    API key for user

  • rest (bool) –

    On True, restrict access. Default False

Source code in src/dataverse_utils/dataverse_utils.py
def upload_from_tsv(fil, hdl, **kwargs):
    '''
    Utility for bulk uploading. Assumes fil is formatted
    as tsv with headers 'file', 'description', 'tags'.

    'tags' field will be split on commas.

    Parameters
    ----------
    fil
        Open file object or io.IOStream()

    hdl : str
        Dataverse persistent ID for study (handle or DOI)

    **kwargs : dict
        Other parameters

    Other parameters
    ----------------
    trunc : str
        Leftmost portion of Dataverse study file path to remove.
        eg: trunc ='/home/user/' if the tsv field is
        '/home/user/Data/ASCII'
        would set the path for that line of the tsv to 'Data/ASCII'.
        Defaults to None.

    dv : str, required
        url to base Dataverse installation
        eg: 'https://abacus.library.ubc.ca'

    apikey : str, required
        API key for user

    rest : bool, optional
        On True, restrict access. Default False
        '''
    #reader = csv.reader(fil, delimiter='\t', quotechar='"')
    #new, optional mimetype column allows using GeoJSONS.
    #Read the headers from the file first before using DictReader
    headers = fil.readline().strip('\n\r').split('\t')#Goddamn it Windows
    fil.seek(0)
    reader = csv.DictReader(fil, fieldnames=headers, quotechar='"', delimiter='\t')
    #See API call for "Adding File Metadata"
    for num, row in enumerate(reader):
        if num == 0:
            continue
        #dirlabel = file_path(row[0], './')
        if row.get('path'):
            #Explicit separate path because that way you can organize
            #on upload
            dirlabel = row.get('path')
        else:
            dirlabel = file_path(row['file'], kwargs.get('trunc', ''))
        tags = row['tags'].split(',')
        tags = [x.strip() for x in tags]
        descr = row['description']
        mimetype = row.get('mimetype')
        params = {'dv' : kwargs.get('dv'),
                  'tags' : tags,
                  'descr' : descr,
                  'dirlabel' : dirlabel,
                  'apikey' : kwargs.get('apikey'),
                  'md5' : kwargs.get('md5', ''),
                  'rest': kwargs.get('rest', False)}
        if mimetype:
            params['mimetype'] = mimetype
        #So that you can pass everything all at once, params
        #is merged onto kwargs. This is for easier upgradability
        kwargs.update(params)
        upload_file(row['file'], hdl, **kwargs)

dataverse_utils.dvdata

Dataverse studies and files

File

Bases: dict

Class representing a file on a Dataverse instance

Source code in src/dataverse_utils/dvdata.py
class File(dict):
    '''
    Class representing a file on a Dataverse instance
    '''
    def __init__(self, url:str, key:str,
                 **kwargs):
        '''
        Dataverse file object

        Parameters
        ----------
        url : str
            Base URL to host Dataverse instance

        key : str
            Dataverse API key with downloader privileges

        **kwargs : dict
            Other parameters

        Notes
        -----
        To initialize correctly, pass a value from Study['file_info'].

        Eg: `File('https://test.invalid', 'ABC123', **Study_instance['file_info'][0])`

        Not to be confused with the FileAnalysis object in `dataverse_utils.collections`.

        '''
        self['url'] = url
        self.__key = key
        self['downloaded'] = False
        self['downloaded_file_name'] = None
        self['downloaded_checksum'] = None
        self['verified'] = None
        #self['dv_file_metadata'] = None
        #    if not self['dv_file_metadata']:
        #        self['dv_file_metadata'] = self._get_file_metadata()
        for keey, val in kwargs.items():
            self[keey] = val
        self['timeout'] = kwargs.get('timeout', TIMEOUT)

    def download_file(self):
        '''
        Downloads the file to a temporary location. Data will be in the ORIGINAL format,
        not Dataverse-processed TSVs
        '''
        if not self['downloaded'] or not os.path.exists(self.get('downloaded_file_name', '')):

            headers = headers={'X-Dataverse-key':self.__key}
            headers.update(UAHEADER)
            try:
                #curl "$SERVER_URL/api/access/datafile/:persistentId/?persistentId=$PERSISTENT_ID"
                dwnld = requests.get(self['url']+'/api/access/datafile/'+
                                                str(self['dataFile']['id']),
                                     headers=headers,
                                     params = {'format':'original'},
                                     timeout=self['timeout'])
                with tempfile.NamedTemporaryFile(delete=False) as fil:
                    self['downloaded_file_name'] = fil.name
                    fil.write(dwnld.content)
                self['downloaded'] = True
                return True

            except requests.exceptions.HTTPError as err:
                LOGGER.exception(err)
                LOGGER.exception(traceback.format_exc())
                self['downloaded'] = False
                return False
        return None

    def del_tempfile(self):
        '''
        Delete tempfile if it exists
        '''
        if os.path.exists(self['downloaded_file_name']):
            os.remove(self['downloaded_file_name'])
            self['downloaded'] = False
            self['downloaded_file_name'] = None
            self['verified'] = None

    def produce_digest(self, prot: str = 'md5', blocksize: int = 2**16) -> str:
        '''
        Returns hex digest for object

        Parameters
        ----------
        prot : str, optional, default='md5'
            Hash type. Supported hashes: 'sha1', 'sha224', 'sha256',
            'sha384', 'sha512', 'blake2b', 'blake2s', 'md5'.
            Default: 'md5'

        blocksize : int, optional, default=2**16
            Read block size in bytes
        '''
        if not self['downloaded_file_name']:
            return None
        ok_hash = {'sha1' : hashlib.sha1(),
                   'sha224' : hashlib.sha224(),
                   'sha256' : hashlib.sha256(),
                   'sha384' : hashlib.sha384(),
                   'sha512' : hashlib.sha512(),
                   'blake2b' : hashlib.blake2b(),
                   'blake2s' : hashlib.blake2s(),
                   'md5': hashlib.md5()}
        with open(self['downloaded_file_name'], 'rb') as _fobj:
            try:
                _hash = ok_hash[prot]
            except (UnboundLocalError, KeyError) as err:
                message = ('Unsupported hash type. Valid values are '
                           f'{list(ok_hash)}.' )
                LOGGER.exception(err)
                LOGGER.exception(message)
                LOGGER.exception(traceback.format_exc())
                raise

            fblock = _fobj.read(blocksize)
            while fblock:
                _hash.update(fblock)
                fblock = _fobj.read(blocksize)
            return _hash.hexdigest()

    def verify(self)->None:
        '''
        Compares actual checksum with stated checksum
        '''
        if not self.get('downloaded_file_name') or not self.get('downloaded'):
            LOGGER.error('File has not been downloaded')
            self['verified'] = None
            self['downloaded_checksum'] = None
            return None
        _hash = self.produce_digest(self['dataFile']['checksum']['type'].lower())
        if _hash == self['dataFile']['checksum']['value']:
            self['verified'] = True
            self['downloaded_checksum'] = hash
            return True
        LOGGER.error('Checksum mismatch in %s', self.get('label'))
        self['verified'] = False
        self['downloaded_checksum'] = _hash
        return False

__init__(url, key, **kwargs)

Dataverse file object

Parameters:
  • url (str) –

    Base URL to host Dataverse instance

  • key (str) –

    Dataverse API key with downloader privileges

  • **kwargs (dict, default: {} ) –

    Other parameters

Notes

To initialize correctly, pass a value from Study[‘file_info’].

Eg: File('https://test.invalid', 'ABC123', **Study_instance['file_info'][0])

Not to be confused with the FileAnalysis object in dataverse_utils.collections.

Source code in src/dataverse_utils/dvdata.py
def __init__(self, url:str, key:str,
             **kwargs):
    '''
    Dataverse file object

    Parameters
    ----------
    url : str
        Base URL to host Dataverse instance

    key : str
        Dataverse API key with downloader privileges

    **kwargs : dict
        Other parameters

    Notes
    -----
    To initialize correctly, pass a value from Study['file_info'].

    Eg: `File('https://test.invalid', 'ABC123', **Study_instance['file_info'][0])`

    Not to be confused with the FileAnalysis object in `dataverse_utils.collections`.

    '''
    self['url'] = url
    self.__key = key
    self['downloaded'] = False
    self['downloaded_file_name'] = None
    self['downloaded_checksum'] = None
    self['verified'] = None
    #self['dv_file_metadata'] = None
    #    if not self['dv_file_metadata']:
    #        self['dv_file_metadata'] = self._get_file_metadata()
    for keey, val in kwargs.items():
        self[keey] = val
    self['timeout'] = kwargs.get('timeout', TIMEOUT)

del_tempfile()

Delete tempfile if it exists

Source code in src/dataverse_utils/dvdata.py
def del_tempfile(self):
    '''
    Delete tempfile if it exists
    '''
    if os.path.exists(self['downloaded_file_name']):
        os.remove(self['downloaded_file_name'])
        self['downloaded'] = False
        self['downloaded_file_name'] = None
        self['verified'] = None

download_file()

Downloads the file to a temporary location. Data will be in the ORIGINAL format, not Dataverse-processed TSVs

Source code in src/dataverse_utils/dvdata.py
def download_file(self):
    '''
    Downloads the file to a temporary location. Data will be in the ORIGINAL format,
    not Dataverse-processed TSVs
    '''
    if not self['downloaded'] or not os.path.exists(self.get('downloaded_file_name', '')):

        headers = headers={'X-Dataverse-key':self.__key}
        headers.update(UAHEADER)
        try:
            #curl "$SERVER_URL/api/access/datafile/:persistentId/?persistentId=$PERSISTENT_ID"
            dwnld = requests.get(self['url']+'/api/access/datafile/'+
                                            str(self['dataFile']['id']),
                                 headers=headers,
                                 params = {'format':'original'},
                                 timeout=self['timeout'])
            with tempfile.NamedTemporaryFile(delete=False) as fil:
                self['downloaded_file_name'] = fil.name
                fil.write(dwnld.content)
            self['downloaded'] = True
            return True

        except requests.exceptions.HTTPError as err:
            LOGGER.exception(err)
            LOGGER.exception(traceback.format_exc())
            self['downloaded'] = False
            return False
    return None

produce_digest(prot='md5', blocksize=2 ** 16)

Returns hex digest for object

Parameters:
  • prot (str, default: 'md5' ) –

    Hash type. Supported hashes: ‘sha1’, ‘sha224’, ‘sha256’, ‘sha384’, ‘sha512’, ‘blake2b’, ‘blake2s’, ‘md5’. Default: ‘md5’

  • blocksize (int, default: 2**16 ) –

    Read block size in bytes

Source code in src/dataverse_utils/dvdata.py
def produce_digest(self, prot: str = 'md5', blocksize: int = 2**16) -> str:
    '''
    Returns hex digest for object

    Parameters
    ----------
    prot : str, optional, default='md5'
        Hash type. Supported hashes: 'sha1', 'sha224', 'sha256',
        'sha384', 'sha512', 'blake2b', 'blake2s', 'md5'.
        Default: 'md5'

    blocksize : int, optional, default=2**16
        Read block size in bytes
    '''
    if not self['downloaded_file_name']:
        return None
    ok_hash = {'sha1' : hashlib.sha1(),
               'sha224' : hashlib.sha224(),
               'sha256' : hashlib.sha256(),
               'sha384' : hashlib.sha384(),
               'sha512' : hashlib.sha512(),
               'blake2b' : hashlib.blake2b(),
               'blake2s' : hashlib.blake2s(),
               'md5': hashlib.md5()}
    with open(self['downloaded_file_name'], 'rb') as _fobj:
        try:
            _hash = ok_hash[prot]
        except (UnboundLocalError, KeyError) as err:
            message = ('Unsupported hash type. Valid values are '
                       f'{list(ok_hash)}.' )
            LOGGER.exception(err)
            LOGGER.exception(message)
            LOGGER.exception(traceback.format_exc())
            raise

        fblock = _fobj.read(blocksize)
        while fblock:
            _hash.update(fblock)
            fblock = _fobj.read(blocksize)
        return _hash.hexdigest()

verify()

Compares actual checksum with stated checksum

Source code in src/dataverse_utils/dvdata.py
def verify(self)->None:
    '''
    Compares actual checksum with stated checksum
    '''
    if not self.get('downloaded_file_name') or not self.get('downloaded'):
        LOGGER.error('File has not been downloaded')
        self['verified'] = None
        self['downloaded_checksum'] = None
        return None
    _hash = self.produce_digest(self['dataFile']['checksum']['type'].lower())
    if _hash == self['dataFile']['checksum']['value']:
        self['verified'] = True
        self['downloaded_checksum'] = hash
        return True
    LOGGER.error('Checksum mismatch in %s', self.get('label'))
    self['verified'] = False
    self['downloaded_checksum'] = _hash
    return False

FileInfo

Bases: dict

An object representing all of a dataverse study’s files. Easily parseable as a dict.

Source code in src/dataverse_utils/dvdata.py
class FileInfo(dict):
    '''

    An object representing all of a dataverse study's files.
    Easily parseable as a dict.

    '''
    #Should this be incorporated into the above class? Probably.
    def __init__(self, **kwargs)->None:
        '''
        Intialize a File object

        Parameters
        ----------
        **kwargs : dict
            Keyword arguments as below

        Other parameters
        ----------------
        url : str, required
            Base URL of dataverse installation

        pid : str, required
            Handle or DOI of study

        apikey : str, optional
            Dataverse API key; required for DRAFT or restricted material.
            Or if the platform policy requires an API key.

        timeout : int, optional
            Optional timeout in seconds
        '''
        self.kwargs = kwargs
        self['version_list'] = []
        self.dv = None
        self._get_json()
        self._get_all_files()
        self['headers'] = list(self[self['current_version']][0].keys())

    def _get_json(self) -> None:
        '''
        Get study file json
        '''
        try:
            headers={'X-Dataverse-key' : self.kwargs.get('apikey')}
            headers.update(UAHEADER)
            params = {'persistentId': self.kwargs['pid']}
            self.dv = requests.get(f'{self.kwargs["url"]}/api/datasets/:persistentId/versions',
                                   params=params,
                                   timeout=self.kwargs.get('timeout', 100),
                                   headers=headers)
            self.dv.raise_for_status()
        except (requests.exceptions.RequestException,
                requests.exceptions.ConnectionError,
                requests.exceptions.HTTPError,
                requests.exceptions.TooManyRedirects,
                requests.exceptions.ConnectTimeout,
                requests.exceptions.ReadTimeout,
                requests.exceptions.Timeout,
                requests.exceptions.JSONDecodeError,
                requests.exceptions.InvalidSchema) as err:

            err.add_note(f'Connection error: {"\n".join((str(x) for x in err.args))}')
            msg = '\n'.join(getattr(err, '__notes__', []))
            LOGGER.critical(msg)
            raise err

    def _get_all_files(self):
        '''
        Iterates over self.dv_json()['data']. to produce a list of files
        in self['files']
        '''
        try:
            for num, version in enumerate(self.dv.json()['data']):
                self._get_version_files(version, current=num)

        except AttributeError as err:
            err.add_note('No JSON present')
            #LOGGER.exception('FileInfo AttributeError: %s', err)
            #LOGGER.exception(traceback.format_exc())
            raise err

        except KeyError as err:
            err.add_note(f'JSON parsing error: {err}')
            err.add_note('Offending JSON:')
            err.add_note(f'{self.dv.json()}')
            msg = '\n'.join(getattr(err, '__notes__', []))
            LOGGER.exception('FileInfo KeyError: %s', msg)
            #LOGGER.exception(traceback.format_exc())
            raise err

    def _get_version_files(self, flist: list, current=1)->None:
        '''
        Set version number and assign file info a version key

        Parameters
        ----------
        flist : list
            list of file metadata for a particular version

        current: int, optional, default=1
            Value of zero represents most current version

        '''
        if flist['versionState'] == 'DRAFT':
            ver_info='DRAFT'
        else:
            ver_info = f"{flist['versionNumber']}.{flist['versionMinorNumber']}"
        if current == 0:
            self['current_version'] = ver_info
        self['version_list'].append(ver_info)
        self[ver_info] = []
        for fil in flist['files']:
            self[ver_info].append(self._get_file_info(fil,
                                                     ver_info=ver_info,
                                                     state_info=flist['versionState']))

    def _get_file_info(self, file:dict, **kwargs)->dict:
        '''
        Returns a dict of required info from a chunk of dataverse study
        version metadata

        Parameters
        ----------
        file : dict
            The dict containing one file's metadata

        **kwargs : dict
            Keyword arguments

        version_info: str
            Version info string

        state_info : str
            Publication state
        '''
        # headers = ['file', 'description', 'pidURL','downloadURL', 'version', 'state']
        file_name = file['dataFile'].get('originalFileName', file['label'])
        filepath = pathlib.Path(file.get('directoryLabel', ''), file_name)
        description = file.get('description', '')
        try:
            pid_url = file['dataFile']['pidURL']
        except KeyError:
            pid_url = f'{self.kwargs["url"]}/file.xhtml?fileId={file["dataFile"]["id"]}'
        fid = file['dataFile']['id']
        download_url = f'{self.kwargs["url"]}/api/access/datafile/{fid}?format=original'
        out = {'file': str(filepath).strip(),
               'description': description.strip(),
               'pid_url': pid_url, 'download_url':download_url,
               'version': kwargs['ver_info'],
               'state' : kwargs['state_info']}
        return out

__init__(**kwargs)

Intialize a File object

Parameters:
  • **kwargs (dict, default: {} ) –

    Keyword arguments as below

  • url ((str, required)) –

    Base URL of dataverse installation

  • pid ((str, required)) –

    Handle or DOI of study

  • apikey (str) –

    Dataverse API key; required for DRAFT or restricted material. Or if the platform policy requires an API key.

  • timeout (int) –

    Optional timeout in seconds

Source code in src/dataverse_utils/dvdata.py
def __init__(self, **kwargs)->None:
    '''
    Intialize a File object

    Parameters
    ----------
    **kwargs : dict
        Keyword arguments as below

    Other parameters
    ----------------
    url : str, required
        Base URL of dataverse installation

    pid : str, required
        Handle or DOI of study

    apikey : str, optional
        Dataverse API key; required for DRAFT or restricted material.
        Or if the platform policy requires an API key.

    timeout : int, optional
        Optional timeout in seconds
    '''
    self.kwargs = kwargs
    self['version_list'] = []
    self.dv = None
    self._get_json()
    self._get_all_files()
    self['headers'] = list(self[self['current_version']][0].keys())

Study

Bases: dict

Dataverse record. Dataverse study records are pure metadata so this is represented with a dictionary.

Source code in src/dataverse_utils/dvdata.py
class Study(dict): #pylint:  disable=too-few-public-methods
    '''
    Dataverse record. Dataverse study records are pure metadata so this
    is represented with a dictionary.
    '''
    def __init__(self, pid: str,
                 url:str, key:str,
                 **kwargs):
        '''
        Initialize a Study object

        Parameters
        ----------
        pid : str
            Record persistent identifier: hdl or doi

        url : str
            Base URL to host Dataverse instance

        key : str
            Dataverse API key with downloader privileges

        **kwargs : dict
            Keyword arguments

        Other parameters
        ----------------
        timeout : int
            Request timeout in seconds
        '''
        self['pid'] = pid
        self['url'] = url
        self.__key = key
        self['orig_json'] = None
        self['timeout'] = kwargs.get('timeout',TIMEOUT)
        if not self['orig_json']:
            self['orig_json'] = self._orig_json()
        self['upload_json'] = self._upload_json
        self['file_info'] = self['orig_json']['files']
        self['file_ids'] = [x['dataFile'].get('id') for x in self['orig_json']['files']]
        self['file_persistentIds'] = self._get_file_pids()
        self['source_version'] = Study.get_version(url)
        self['target_version'] = None
        if not self['target_version']:
            self['target_version'] = Study.get_version(url)

    @classmethod
    def get_version(cls, url:str, timeout:int=100)->float:
        '''
        Returns a float representing a Dataverse version number.
        Floating point value composed of:
        float(f'{major_version}.{minor_verson:03d}{patch:03d}')
        ie, version 5.9.2 would be 5.009002

        Parameters
        ----------
        url : str
            URL of base Dataverse instance. eg: 'https://abacus.library.ubc.ca'
        timeout : int, default=100
            Request timeout in seconds
        '''
        ver = requests.get(f'{url}/api/info/version',
                           headers=UAHEADER,
                           #headers = {'X-Dataverse-key' : key},
                           timeout = timeout)
        try:
            ver.raise_for_status()
        except requests.exceptions.HTTPError as exc:
            LOGGER.error(r'Error getting version for {url}')
            LOGGER.exception(exc)
            LOGGER.exception(traceback.format_exc())
            raise requests.exceptions.HTTPError
        #Scholars Portal version is formatted as v5.13.9-SP, so. . .
        verf = ver.json()['data']['version'].strip('v ').split('.')
        verf = [x.split('-')[0] for x in verf]
        verf =[int(b)/10**(3*a) for a,b in enumerate(verf)]
        #it's 3*a in case for some reason we hit, say v5.99.99 and there's more before v6.
        verf = sum(verf)
        return verf

    def set_version(self, url:str, timeout:int=100)->None:
        '''
        Sets self['target_version'] to appropriate integer value *AND*
        formats self['upload_json'] to correct JSON format

        Parameters
        ----------
        url : str
            URL of *target* Dataverse instance

        timeout : int, optional, default=100
            request timeout in seconds
        '''
        self['target_version'] = Study.get_version(url, timeout)
        # Now fix the metadata to work with various versions
        if self['target_version'] >= 5.010:
            self.fix_licence()
        if self['target_version'] >= 5.013:
            self.production_location()

    def _orig_json(self) -> dict:
        '''
        Latest study version record JSON. Retrieved from
        Dataverse installation so an internet connection
        is required.
        '''
        #curl -H "X-Dataverse-key:$API_TOKEN" /
        #$SERVER_URL/api/datasets/:persistentId/?persistentId=$PERSISTENT_IDENTIFIER
        headers = {'X-Dataverse-key' : self.__key}
        headers.update(UAHEADER)
        getjson = requests.get(self['url']+'/api/datasets/:persistentId',
                               headers=headers,
                               params = {'persistentId': self['pid']},
                               timeout = self['timeout'])
        getjson.raise_for_status()
        return getjson.json()['data']['latestVersion']

    def __add_email(self, upjson):
        '''
        Adds contact information if it's not there. Fills with dummy data

        Parameters
        ----------
        upjson : dict
            Metadata
        '''
        #pylint: disable=possibly-used-before-assignment
        for n, v in enumerate((upjson['datasetVersion']
                              ['metadataBlocks']['citation']['fields'])):
            if v['typeName'] == 'datasetContact':
                contact_no = n
        for _x in (upjson['datasetVersion']['metadataBlocks']
                  ['citation']['fields'][contact_no]['value']):
            if not _x.get('datasetContactEmail'):
                _x['datasetContactEmail'] = {'typeName':'datasetContactEmail',
                                              'multiple': False,
                                              'typeClass':'primitive',
                                              'value': 'suppressed_value@test.invalid'}
        return upjson

    @property
    def _upload_json(self)->dict:
        '''
        A Dataverse JSON record with with PIDs and other information stripped
        suitable for upload as a new Dataverse study record.
        '''
        upj = {'datasetVersion': {'license': self['orig_json']['license'],
                                     'termsOfUse': self['orig_json'].get('termsOfUse',''),
                                     'metadataBlocks': self['orig_json']['metadataBlocks']
                                     }
                  }
        return self.__add_email(upj)

    @property
    def _oldupload_json(self)->dict:
        '''
        A Dataverse JSON record with with PIDs and other information stripped
        suitable for upload as a new Dataverse study record.
        '''
        return {'datasetVersion': {'license': self['orig_json']['license'],
                                     'termsOfUse': self['orig_json'].get('termsOfUse',''),
                                     'metadataBlocks': self['orig_json']['metadataBlocks']
                                     }
                  }

    def _get_file_pids(self)->list:
        '''
        Returns a list of file ids representing the file
        objects in dataverse record
        '''
        pids = [x['dataFile'].get('persistentId') for x in self['orig_json']['files']]
        if not all(pids):
            return None
        return pids

    ######
    #JSON metdata fixes for different versions
    ######
    def fix_licence(self)->None:
        '''
        Replaces non-standard licence with None

        Notes
        -----
        With Dataverse v5.10+, a licence type of 'NONE' is now forbidden.
        Now, as per <https://guides.dataverse.org/en/5.14/api/sword.html\
        ?highlight=invalid%20license>,
        non-standard licences may be replaced with None.

        This function edits the same Study object *in place*, so returns nothing.
        '''
        if self['upload_json']['datasetVersion']['license'] == 'NONE':
            self['upload_json']['datasetVersion']['license'] = None

        if not self['upload_json']['datasetVersion']['termsOfUse']:
            #This shouldn't happen, but UBC has datasets from the early 1970s
            self['upload_json']['datasetVersion']['termsOfUse'] = 'Not available'

    def production_location(self)->None:
        '''
        Changes "multiple" to True where typeName == 'productionPlace' in
        Study['upload_json'] Changes are done
        *in place*.

        Notes
        -----
        Multiple production places came into effect with Dataverse v5.13
        '''
        #{'typeName': 'productionPlace', 'multiple': True, 'typeClass': 'primitive',
        #'value': ['Vancouver, BC', 'Ottawa, ON']}

        # get index
        indy = None
        for ind, val in enumerate(self['upload_json']['datasetVersion']\
                                      ['metadataBlocks']['citation']['fields']):
            if val['typeName'] == 'productionPlace':
                indy = ind
                break

        if indy and not self['upload_json']['datasetVersion']['metadataBlocks']\
                ['citation']['fields'][indy]['multiple']:
            self['upload_json']['datasetVersion']['metadataBlocks']\
                ['citation']['fields'][indy]['multiple'] = True
            self['upload_json']['datasetVersion']['metadataBlocks']\
                ['citation']['fields'][indy]['value'] = [self['upload_json']['datasetVersion']\
                                                         ['metadataBlocks']['citation']\
                                                         ['fields'][indy]['value']]

__add_email(upjson)

Adds contact information if it’s not there. Fills with dummy data

Parameters:
  • upjson (dict) –

    Metadata

Source code in src/dataverse_utils/dvdata.py
def __add_email(self, upjson):
    '''
    Adds contact information if it's not there. Fills with dummy data

    Parameters
    ----------
    upjson : dict
        Metadata
    '''
    #pylint: disable=possibly-used-before-assignment
    for n, v in enumerate((upjson['datasetVersion']
                          ['metadataBlocks']['citation']['fields'])):
        if v['typeName'] == 'datasetContact':
            contact_no = n
    for _x in (upjson['datasetVersion']['metadataBlocks']
              ['citation']['fields'][contact_no]['value']):
        if not _x.get('datasetContactEmail'):
            _x['datasetContactEmail'] = {'typeName':'datasetContactEmail',
                                          'multiple': False,
                                          'typeClass':'primitive',
                                          'value': 'suppressed_value@test.invalid'}
    return upjson

__init__(pid, url, key, **kwargs)

Initialize a Study object

Parameters:
  • pid (str) –

    Record persistent identifier: hdl or doi

  • url (str) –

    Base URL to host Dataverse instance

  • key (str) –

    Dataverse API key with downloader privileges

  • **kwargs (dict, default: {} ) –

    Keyword arguments

  • timeout (int) –

    Request timeout in seconds

Source code in src/dataverse_utils/dvdata.py
def __init__(self, pid: str,
             url:str, key:str,
             **kwargs):
    '''
    Initialize a Study object

    Parameters
    ----------
    pid : str
        Record persistent identifier: hdl or doi

    url : str
        Base URL to host Dataverse instance

    key : str
        Dataverse API key with downloader privileges

    **kwargs : dict
        Keyword arguments

    Other parameters
    ----------------
    timeout : int
        Request timeout in seconds
    '''
    self['pid'] = pid
    self['url'] = url
    self.__key = key
    self['orig_json'] = None
    self['timeout'] = kwargs.get('timeout',TIMEOUT)
    if not self['orig_json']:
        self['orig_json'] = self._orig_json()
    self['upload_json'] = self._upload_json
    self['file_info'] = self['orig_json']['files']
    self['file_ids'] = [x['dataFile'].get('id') for x in self['orig_json']['files']]
    self['file_persistentIds'] = self._get_file_pids()
    self['source_version'] = Study.get_version(url)
    self['target_version'] = None
    if not self['target_version']:
        self['target_version'] = Study.get_version(url)

fix_licence()

Replaces non-standard licence with None

Notes

With Dataverse v5.10+, a licence type of ‘NONE’ is now forbidden. Now, as per https://guides.dataverse.org/en/5.14/api/sword.html ?highlight=invalid%20license, non-standard licences may be replaced with None.

This function edits the same Study object in place, so returns nothing.

Source code in src/dataverse_utils/dvdata.py
def fix_licence(self)->None:
    '''
    Replaces non-standard licence with None

    Notes
    -----
    With Dataverse v5.10+, a licence type of 'NONE' is now forbidden.
    Now, as per <https://guides.dataverse.org/en/5.14/api/sword.html\
    ?highlight=invalid%20license>,
    non-standard licences may be replaced with None.

    This function edits the same Study object *in place*, so returns nothing.
    '''
    if self['upload_json']['datasetVersion']['license'] == 'NONE':
        self['upload_json']['datasetVersion']['license'] = None

    if not self['upload_json']['datasetVersion']['termsOfUse']:
        #This shouldn't happen, but UBC has datasets from the early 1970s
        self['upload_json']['datasetVersion']['termsOfUse'] = 'Not available'

get_version(url, timeout=100) classmethod

Returns a float representing a Dataverse version number. Floating point value composed of: float(f’{major_version}.{minor_verson:03d}{patch:03d}’) ie, version 5.9.2 would be 5.009002

Parameters:
  • url (str) –

    URL of base Dataverse instance. eg: ‘https://abacus.library.ubc.ca’

  • timeout (int, default: 100 ) –

    Request timeout in seconds

Source code in src/dataverse_utils/dvdata.py
@classmethod
def get_version(cls, url:str, timeout:int=100)->float:
    '''
    Returns a float representing a Dataverse version number.
    Floating point value composed of:
    float(f'{major_version}.{minor_verson:03d}{patch:03d}')
    ie, version 5.9.2 would be 5.009002

    Parameters
    ----------
    url : str
        URL of base Dataverse instance. eg: 'https://abacus.library.ubc.ca'
    timeout : int, default=100
        Request timeout in seconds
    '''
    ver = requests.get(f'{url}/api/info/version',
                       headers=UAHEADER,
                       #headers = {'X-Dataverse-key' : key},
                       timeout = timeout)
    try:
        ver.raise_for_status()
    except requests.exceptions.HTTPError as exc:
        LOGGER.error(r'Error getting version for {url}')
        LOGGER.exception(exc)
        LOGGER.exception(traceback.format_exc())
        raise requests.exceptions.HTTPError
    #Scholars Portal version is formatted as v5.13.9-SP, so. . .
    verf = ver.json()['data']['version'].strip('v ').split('.')
    verf = [x.split('-')[0] for x in verf]
    verf =[int(b)/10**(3*a) for a,b in enumerate(verf)]
    #it's 3*a in case for some reason we hit, say v5.99.99 and there's more before v6.
    verf = sum(verf)
    return verf

production_location()

Changes “multiple” to True where typeName == ‘productionPlace’ in Study[‘upload_json’] Changes are done in place.

Notes

Multiple production places came into effect with Dataverse v5.13

Source code in src/dataverse_utils/dvdata.py
def production_location(self)->None:
    '''
    Changes "multiple" to True where typeName == 'productionPlace' in
    Study['upload_json'] Changes are done
    *in place*.

    Notes
    -----
    Multiple production places came into effect with Dataverse v5.13
    '''
    #{'typeName': 'productionPlace', 'multiple': True, 'typeClass': 'primitive',
    #'value': ['Vancouver, BC', 'Ottawa, ON']}

    # get index
    indy = None
    for ind, val in enumerate(self['upload_json']['datasetVersion']\
                                  ['metadataBlocks']['citation']['fields']):
        if val['typeName'] == 'productionPlace':
            indy = ind
            break

    if indy and not self['upload_json']['datasetVersion']['metadataBlocks']\
            ['citation']['fields'][indy]['multiple']:
        self['upload_json']['datasetVersion']['metadataBlocks']\
            ['citation']['fields'][indy]['multiple'] = True
        self['upload_json']['datasetVersion']['metadataBlocks']\
            ['citation']['fields'][indy]['value'] = [self['upload_json']['datasetVersion']\
                                                     ['metadataBlocks']['citation']\
                                                     ['fields'][indy]['value']]

set_version(url, timeout=100)

Sets self[‘target_version’] to appropriate integer value AND formats self[‘upload_json’] to correct JSON format

Parameters:
  • url (str) –

    URL of target Dataverse instance

  • timeout (int, default: 100 ) –

    request timeout in seconds

Source code in src/dataverse_utils/dvdata.py
def set_version(self, url:str, timeout:int=100)->None:
    '''
    Sets self['target_version'] to appropriate integer value *AND*
    formats self['upload_json'] to correct JSON format

    Parameters
    ----------
    url : str
        URL of *target* Dataverse instance

    timeout : int, optional, default=100
        request timeout in seconds
    '''
    self['target_version'] = Study.get_version(url, timeout)
    # Now fix the metadata to work with various versions
    if self['target_version'] >= 5.010:
        self.fix_licence()
    if self['target_version'] >= 5.013:
        self.production_location()

dataverse_utils.ldc

Creates dataverse JSON from Linguistic Data Consortium website page.

Ldc

Bases: Serializer

An LDC item (eg, LDC2021T01)

Source code in src/dataverse_utils/ldc.py
class Ldc(ds.Serializer):#pylint: disable=too-many-instance-attributes
    '''
    An LDC item (eg, LDC2021T01)
    '''
    #pylint: disable=super-init-not-called, arguments-differ
    def __init__(self, ldc, cert=None):
        '''
        Returns a dict with keys created from an LDC catalogue web
        page.

        Parameters
        ----------
        ldc : str
            Linguistic Consortium Catalogue Number (eg. 'LDC2015T05'.
            This is what forms the last part of the LDC catalogue URL.

        cert : str, optional, default=None
            Path to certificate chain; LDC has had a problem
            with intermediate certificates, so you can
            download the chain with a browser and supply a
            path to the .pem with this parameter
        '''
        self.ldc = ldc.strip().upper()
        self.ldcHtml = None
        self._ldcJson = None
        self._dryadJson = None
        self._dvJson = None
        self.cert = cert
        self.session = requests.Session()
        self.session.mount('https://',
                           HTTPAdapter(max_retries=ds.constants.RETRY_STRATEGY))
        if self.cert:
            self.cert = os.path.expanduser(self.cert)
        self.__fixdesc = None

    @property
    def ldcJson(self):
        '''
        Returns a JSON based on the LDC web page scraping
        '''
        if not self._ldcJson:
            self._ldcJson = self.make_ldc_json()
        return self._ldcJson

    @property
    def dryadJson(self):
        '''
        LDC metadata in Dryad JSON format
        '''
        if not self._dryadJson:
            self._dryadJson = self.make_dryad_json()
        return self._dryadJson

    @property
    def dvJson(self):
        '''
        LDC metadata in Dataverse JSON format
        '''
        #return False
        if not self._dvJson:
            self._dvJson = self.make_dv_json()
        return self._dvJson

    @property
    def embargo(self)->bool:
        '''
        Boolean indicating embargo status
        '''
        return False

    @property
    def fileJson(self):
        '''
        Returns False: No attached files possible at LDC
        '''
        return False

    @property
    def files(self):
        '''
        Returns None. No files possible
        '''
        return None

    @property
    def oversize(self, maxsize=None):
        '''
        Make sure file is not too big for the Dataverse instance

        Parameters
        ----------
        maxsize : int, optional, default=None
            Maximum size in bytes
        '''
        #pylint: disable=property-with-parameters
        if not maxsize:
            maxsize = ds.constants.MAX_UPLOAD

    @property
    def id(self):
        '''
        Returns LDC ID
        '''
        return self.ldc

    def fetch_record(self, timeout=45):
        '''
        Downloads record from LDC website

        Parameters
        ----------
        timeout : int, optional, default=45
            Request timeout in seconds

        '''
        interim = self.session.get(f'https://catalog.ldc.upenn.edu/{self.ldc}',
                                   verify=self.cert, timeout=timeout)
        interim.raise_for_status()
        self.ldcHtml = interim.text

    def make_ldc_json(self):
        '''
        Returns a dict with keys created from an LDC catalogue web
        page.
        '''
        if not self.ldcHtml:
            self.fetch_record()
        soup = bs(self.ldcHtml, 'html.parser')
        #Should data just look in the *first* table? Specifically tbody?
        #Is it always the first? I assume yes.
        tbody = soup.find('tbody')#new
        data = [x.text.strip() for x in tbody.find_all('td')]#new
        #data = [x.text.strip() for x in soup.find_all('td')]#original
        LDC_dict = {data[x][:data[x].find('\n')].strip(): data[x+1].strip()
                    for x in range(0, len(data), 2)}
        #Related Works appears to have an extra 'Hide' at the end
        if LDC_dict.get('Related Works:'):
            LDC_dict['Related Works'] = (x.strip() for x in LDC_dict['Related Works:'].split('\n'))
            del LDC_dict['Related Works:'] #remove the renamed key
        LDC_dict['Linguistic Data Consortium'] = LDC_dict['LDC Catalog No.']
        del LDC_dict['LDC Catalog No.']#This key must be renamed for consistency
        LDC_dict['Author(s)'] = [x.strip() for x in LDC_dict['Author(s)'].split(',')]
        #Other metadata probably has HTML in it, so we keep as much as possible
        other_meta = soup.find_all('div')
        alldesc = [x for x in other_meta if x.attrs.get('itemprop') == 'description']
        #sometimes they format pages oddly and we can use this for a
        #quick and dirty fix
        self.__fixdesc = copy.deepcopy(alldesc)
        #sections use h3, so split on these
        #24 Jan 23 Apparently, this is all done manually so some of them sometime use h4.
        #Because reasons.
        #was:
        #alldesc = str(alldesc).split('<h3>')
        #is now
        alldesc = str(alldesc).replace('h4>', 'h3>').split('<h3>')
        for i in range(1, len(alldesc)):
            alldesc[i] = '<h3>' + alldesc[i]
        #first one is not actually useful, so discard it
        alldesc.pop(0)


        #So far, so good. At this point the relative links need fixing
        #and tables need to be converted to pre.
        for desc in alldesc:
            #It's already strings; replace relative links first
            desc = desc.replace('../../../', 'https://catalog.ldc.upenn.edu/')
            subsoup = bs(desc, 'html.parser')
            key = subsoup.h3.text.strip()
            #don't need the h3 tags anymore
            subsoup.find('h3').extract()
            # Convert tables to <pre>
            for tab in subsoup.find_all('table'):
                content = str(tab)
                #convert to markdown
                content = markdownify.markdownify(content)
                tab.name = 'pre'
                tab.string = content #There is not much documentation on the
                                     #difference between tab.string and tab.content
            #That was relatively easy
            LDC_dict[key] = str(subsoup)
        LDC_dict['Introduction'] = LDC_dict.get('Introduction',
                                                self.__no_intro())
        #LDC puts http in front of their DOI identifier
        if LDC_dict.get('DOI'):
            LDC_dict['DOI'] = LDC_dict['DOI'].strip('https://doi.org/')
        return LDC_dict

    def __no_intro(self)->str:
        '''
        Makes an introduction even if they forgot to include the word "Introduction"
        '''
        #self.__fixdesc is set in make_ldc_json
        intro = [x for x in self.__fixdesc if
                 self.__fixdesc[0]['itemprop']=='description'][0]
        while intro.find('div'): #nested?, not cleaning properly
            intro.find('div').unwrap() # remove the div tag
        intro = str(intro)
        #Normally, there's an <h3>Introduction</h3> but sometimes there's not
        #Assumes that the first section up to "<h" is an intro.
        #You know what they say about assuming
        intro = intro[:intro.find('<h')]
        start = intro.find('<div')
        if start != -1:
            end = intro.find('>',start)+1
            intro = intro.replace(intro[start:end], '').strip()
        return intro

    @staticmethod
    def name_parser(name):
        '''
        Returns lastName/firstName JSON snippet from a name

        Parameters
        ----------
        name : str
            A name

        Notes
        -----
        Can't be 100% accurate, because names can be split in many ways. However, as they
        say, 80% is good enough.
        '''
        names = name.split(' ')
        return {'lastName': names[-1], 'firstName': ' '.join(names[:-1]), 'affiliation':''}

    def make_dryad_json(self, ldc=None):
        '''
        Creates a Dryad-style dict from an LDC dictionary

        Parameters
        ----------
        ldc : dict, optional, default=self.ldcJson
            Dictionary containing LDC data. Defaults to self.ldcJson
        '''
        if not ldc:
            ldc = self.ldcJson
        print(ldc)
        dryad = {}
        dryad['title'] = ldc['Item Name']
        dryad['authors'] = [Ldc.name_parser(x) for x in ldc['Author(s)']]
        abstract = ('<p><b>Introduction</b></p>'
                    f"<p>{ldc['Introduction']}</p>"
                    '<p><b>Data</b></p>'
                    f"<p>{ldc['Data']}</p>")
        if ldc.get('Acknowledgement'):
            abstract += ('<p><b>Acknowledgement</b></p>'
                         f"<p>{ldc['Acknowledgement']}</p>")
        dryad['abstract'] = abstract
        dryad['keywords'] = ['Linguistics']

        #Dataverse accepts only ISO formatted date

        try:
            releaseDate = time.strptime(ldc['Release Date'], '%B %d, %Y')
            releaseDate = time.strftime('%Y-%m-%d', releaseDate)
        except KeyError:
            #Older surveys don't have a release date field
            #so it must be created from the record number
            if self.ldc[3] == '9':
                releaseDate = '19' + self.ldc[3:5]
        dryad['lastModificationDate'] = releaseDate
        dryad['publicationDate'] = releaseDate

        return dryad


    def _make_note(self, ldc=None)->str:
        '''
        Creates a generalizes HTML notes field from a bunch of
        LDC fields that don't fit into dataverse

        Parameters
        ----------
        ldc : dict, optional, default=self.ldcJson
            Dictionary containing LDC data
        '''
        if not ldc:
            ldc = self.ldcJson
        note_fields = ['DCMI Type(s)',
                       'Sample Type',
                       'Sample Rate',
                       'Application(s)',
                       'Language(s)',
                       'Language ID(s)']
        outhtml = []
        for note in note_fields:
            if ldc.get(note):
                data = ldc[note].split(',')
                data = [x.strip() for x in data]
                data = ', '.join(data)
                if note != 'Language ID(s)':
                    data = data[0].capitalize() + data[1:]
                    #data = [x.capitalize() for x in data]
                outhtml.append(f'{note}: {data}')
        outhtml.append(f'Metadata automatically created from '
                       f'<a href="https://catalog.ldc.upenn.edu/{self.ldc}">'
                       f'https://catalog.ldc.upenn.edu/{self.ldc}</a> '
                       f'[{time.strftime("%d %b %Y", time.localtime())}]')
        return '<br />'.join(outhtml)

    @staticmethod
    def find_block_index(dvjson, key):
        '''
        Finds the index number of an item in Dataverse's idiotic JSON list

        Parameters
        ----------
        dvjson : dict
            Dataverse JSON

        key : str
            key for which to find list index
        '''
        for num, item in enumerate(dvjson['datasetVersion']
                                   ['metadataBlocks']['citation']['fields']):
            if item['typeName'] == key:
                return num
        return None

    def make_dv_json(self, ldc=None):#pylint: disable=too-many-locals, too-many-statements
        '''
        Returns complete Dataverse JSON

        Parameters
        ----------
        ldc : dict, optional, default=self.ldcJson
            LDC dictionary.
        '''
        if not ldc:
            ldc = self.ldcJson

        dvjson = super().dvJson.copy()

        #ID Numbers
        otherid = super()._typeclass('otherId', True, 'compound')
        ids = []
        for item in ['Linguistic Data Consortium', 'ISBN', 'ISLRN', 'DOI']:
            if ldc.get(item):
                out = {}
                agency = super()._convert_generic(inJson={item:item},
                                                  dryField=item,
                                                  dvField='otherIdAgency')
                value = super()._convert_generic(inJson={item:ldc[item]},
                                                 dryField=item,
                                                 dvField='otherIdValue')
                out.update(agency)
                out.update(value)
                ids.append(out)
        otherid['value'] = ids
        dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(otherid)

        #Producer and publisher
        prod = super()._typeclass('producer', True, 'compound')
        p_name = super()._convert_generic(inJson={'producerName': 'Linguistic Data Consortium'},
                                          dryField='producerName',
                                          dvField='producerName')
        p_affil = super()._convert_generic(inJson={'producerAffiliation':
                                                   'University of Pennsylvania'},
                                           dryField='producerName',
                                           dvField='producerName')
        p_url = super()._convert_generic(inJson={'producerURL': 'https://www.ldc.upenn.edu/'},
                                         dryField='producerURL',
                                         dvField='producerURL')
        p_name.update(p_affil)
        p_name.update(p_url)
        prod['value'] = [p_name]
        dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(prod)

        #Kind of data
        kind = super()._typeclass('kindOfData', True, 'primitive')
        kind['value'] = 'Linguistic data'

        #Series
        series = super()._typeclass('series', False, 'compound')
        s_name = super()._convert_generic(inJson={'seriesName': 'LDC'},
                                          dryField='seriesName',
                                          dvField='seriesName')
        s_info = super()._convert_generic(inJson={'seriesInformation':
                                                  'Linguistic Data Consortium'},
                                          dryField='seriesInformation',
                                          dvField='seriesInformation')
        s_name.update(s_info)
        series['value'] = s_name #not a list
        dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)

        #Data sources
        series = super()._typeclass('dataSources', True, 'primitive')
        data_sources = ldc['Data Source(s)'].split(',')
        data_sources = [x.strip().capitalize() for x in data_sources]
        series['value'] = data_sources
        dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)

        #Fix keyword labels that are hardcoded for Dryad
        #There should be only one keyword block
        keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']['metadataBlocks']
                                                      ['citation']['fields'])
                                                        if y.get('typeName') == 'keyword'][0]
        key_pos = [x for x, y in enumerate(keyword_field[1]['value'])
                   if y['keywordVocabulary']['value'] == 'Dryad'][0]
        dvjson['datasetVersion']['metadataBlocks']['citation']\
                ['fields'][keyword_field[0]]['value'][key_pos]\
                ['keywordVocabulary']['value'] = 'Linguistic Data Consortium'

        #The first keyword field is hardcoded in by dryad2dataverse.serializer
        #So I think it needs to be deleted
        keyword_field = [(x, y) for x, y in
                         enumerate(dvjson['datasetVersion']['metadataBlocks']['citation']['fields'])
                         if y.get('typeName') == 'otherId'][0] #ibid
        del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]

        #Notes
        note_index = Ldc.find_block_index(dvjson, 'notesText')
        if note_index:
            dvjson['datasetVersion']['metadataBlocks']['citation']\
                ['fields'][note_index]['value'] = self._make_note()
        else:
            notes = super()._typeclass('notesText', False, 'primitive')
            notes['value'] = self._make_note()
            dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(notes)

        #Deletes unused "publication" fields: rewrite to make it a function call.
        keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']
                                                      ['metadataBlocks']['citation']['fields'])
                         if y.get('typeName') == 'publication'][0] #ibid
        del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]

        #And now the licence:
        dvjson['datasetVersion']['license'] = LIC_NAME
        dvjson['datasetVersion']['termsOfUse'] = LICENCE
        return dvjson

    def upload_metadata(self, **kwargs) -> dict:
        '''
        Uploads metadata to dataverse. Returns json from
        connection attempt.

        Parameters
        ----------
        **kwargs : dict
            Parameters

        Other parameters
        ----------------
        url : str
            base url to Dataverse installation

        key : str
            api key

        dv : str
            Dataverse to which it is being uploaded
        '''
        url = kwargs['url'].strip('/')
        key = kwargs['key']
        dv = kwargs['dv']
        json = kwargs.get('json', self.dvJson)
        headers = {'X-Dataverse-key':key}
        headers.update(UAHEADER)
        try:
            upload = self.session.post(f'{url}/api/dataverses/{dv}/datasets',
                                       headers=headers,
                                       json=json)
            upload.raise_for_status()
            return upload.json()
        except (requests.exceptions.HTTPError,
                requests.exceptions.ConnectionError):
            print(upload.text)
            raise

dryadJson property

LDC metadata in Dryad JSON format

dvJson property

LDC metadata in Dataverse JSON format

embargo property

Boolean indicating embargo status

fileJson property

Returns False: No attached files possible at LDC

files property

Returns None. No files possible

id property

Returns LDC ID

ldcJson property

Returns a JSON based on the LDC web page scraping

oversize property

Make sure file is not too big for the Dataverse instance

Parameters:
  • maxsize (int, default: None ) –

    Maximum size in bytes

__init__(ldc, cert=None)

Returns a dict with keys created from an LDC catalogue web page.

Parameters:
  • ldc (str) –

    Linguistic Consortium Catalogue Number (eg. ‘LDC2015T05’. This is what forms the last part of the LDC catalogue URL.

  • cert (str, default: None ) –

    Path to certificate chain; LDC has had a problem with intermediate certificates, so you can download the chain with a browser and supply a path to the .pem with this parameter

Source code in src/dataverse_utils/ldc.py
def __init__(self, ldc, cert=None):
    '''
    Returns a dict with keys created from an LDC catalogue web
    page.

    Parameters
    ----------
    ldc : str
        Linguistic Consortium Catalogue Number (eg. 'LDC2015T05'.
        This is what forms the last part of the LDC catalogue URL.

    cert : str, optional, default=None
        Path to certificate chain; LDC has had a problem
        with intermediate certificates, so you can
        download the chain with a browser and supply a
        path to the .pem with this parameter
    '''
    self.ldc = ldc.strip().upper()
    self.ldcHtml = None
    self._ldcJson = None
    self._dryadJson = None
    self._dvJson = None
    self.cert = cert
    self.session = requests.Session()
    self.session.mount('https://',
                       HTTPAdapter(max_retries=ds.constants.RETRY_STRATEGY))
    if self.cert:
        self.cert = os.path.expanduser(self.cert)
    self.__fixdesc = None

__no_intro()

Makes an introduction even if they forgot to include the word “Introduction”

Source code in src/dataverse_utils/ldc.py
def __no_intro(self)->str:
    '''
    Makes an introduction even if they forgot to include the word "Introduction"
    '''
    #self.__fixdesc is set in make_ldc_json
    intro = [x for x in self.__fixdesc if
             self.__fixdesc[0]['itemprop']=='description'][0]
    while intro.find('div'): #nested?, not cleaning properly
        intro.find('div').unwrap() # remove the div tag
    intro = str(intro)
    #Normally, there's an <h3>Introduction</h3> but sometimes there's not
    #Assumes that the first section up to "<h" is an intro.
    #You know what they say about assuming
    intro = intro[:intro.find('<h')]
    start = intro.find('<div')
    if start != -1:
        end = intro.find('>',start)+1
        intro = intro.replace(intro[start:end], '').strip()
    return intro

fetch_record(timeout=45)

Downloads record from LDC website

Parameters:
  • timeout (int, default: 45 ) –

    Request timeout in seconds

Source code in src/dataverse_utils/ldc.py
def fetch_record(self, timeout=45):
    '''
    Downloads record from LDC website

    Parameters
    ----------
    timeout : int, optional, default=45
        Request timeout in seconds

    '''
    interim = self.session.get(f'https://catalog.ldc.upenn.edu/{self.ldc}',
                               verify=self.cert, timeout=timeout)
    interim.raise_for_status()
    self.ldcHtml = interim.text

find_block_index(dvjson, key) staticmethod

Finds the index number of an item in Dataverse’s idiotic JSON list

Parameters:
  • dvjson (dict) –

    Dataverse JSON

  • key (str) –

    key for which to find list index

Source code in src/dataverse_utils/ldc.py
@staticmethod
def find_block_index(dvjson, key):
    '''
    Finds the index number of an item in Dataverse's idiotic JSON list

    Parameters
    ----------
    dvjson : dict
        Dataverse JSON

    key : str
        key for which to find list index
    '''
    for num, item in enumerate(dvjson['datasetVersion']
                               ['metadataBlocks']['citation']['fields']):
        if item['typeName'] == key:
            return num
    return None

make_dryad_json(ldc=None)

Creates a Dryad-style dict from an LDC dictionary

Parameters:
  • ldc (dict, default: self.ldcJson ) –

    Dictionary containing LDC data. Defaults to self.ldcJson

Source code in src/dataverse_utils/ldc.py
def make_dryad_json(self, ldc=None):
    '''
    Creates a Dryad-style dict from an LDC dictionary

    Parameters
    ----------
    ldc : dict, optional, default=self.ldcJson
        Dictionary containing LDC data. Defaults to self.ldcJson
    '''
    if not ldc:
        ldc = self.ldcJson
    print(ldc)
    dryad = {}
    dryad['title'] = ldc['Item Name']
    dryad['authors'] = [Ldc.name_parser(x) for x in ldc['Author(s)']]
    abstract = ('<p><b>Introduction</b></p>'
                f"<p>{ldc['Introduction']}</p>"
                '<p><b>Data</b></p>'
                f"<p>{ldc['Data']}</p>")
    if ldc.get('Acknowledgement'):
        abstract += ('<p><b>Acknowledgement</b></p>'
                     f"<p>{ldc['Acknowledgement']}</p>")
    dryad['abstract'] = abstract
    dryad['keywords'] = ['Linguistics']

    #Dataverse accepts only ISO formatted date

    try:
        releaseDate = time.strptime(ldc['Release Date'], '%B %d, %Y')
        releaseDate = time.strftime('%Y-%m-%d', releaseDate)
    except KeyError:
        #Older surveys don't have a release date field
        #so it must be created from the record number
        if self.ldc[3] == '9':
            releaseDate = '19' + self.ldc[3:5]
    dryad['lastModificationDate'] = releaseDate
    dryad['publicationDate'] = releaseDate

    return dryad

make_dv_json(ldc=None)

Returns complete Dataverse JSON

Parameters:
  • ldc (dict, default: self.ldcJson ) –

    LDC dictionary.

Source code in src/dataverse_utils/ldc.py
def make_dv_json(self, ldc=None):#pylint: disable=too-many-locals, too-many-statements
    '''
    Returns complete Dataverse JSON

    Parameters
    ----------
    ldc : dict, optional, default=self.ldcJson
        LDC dictionary.
    '''
    if not ldc:
        ldc = self.ldcJson

    dvjson = super().dvJson.copy()

    #ID Numbers
    otherid = super()._typeclass('otherId', True, 'compound')
    ids = []
    for item in ['Linguistic Data Consortium', 'ISBN', 'ISLRN', 'DOI']:
        if ldc.get(item):
            out = {}
            agency = super()._convert_generic(inJson={item:item},
                                              dryField=item,
                                              dvField='otherIdAgency')
            value = super()._convert_generic(inJson={item:ldc[item]},
                                             dryField=item,
                                             dvField='otherIdValue')
            out.update(agency)
            out.update(value)
            ids.append(out)
    otherid['value'] = ids
    dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(otherid)

    #Producer and publisher
    prod = super()._typeclass('producer', True, 'compound')
    p_name = super()._convert_generic(inJson={'producerName': 'Linguistic Data Consortium'},
                                      dryField='producerName',
                                      dvField='producerName')
    p_affil = super()._convert_generic(inJson={'producerAffiliation':
                                               'University of Pennsylvania'},
                                       dryField='producerName',
                                       dvField='producerName')
    p_url = super()._convert_generic(inJson={'producerURL': 'https://www.ldc.upenn.edu/'},
                                     dryField='producerURL',
                                     dvField='producerURL')
    p_name.update(p_affil)
    p_name.update(p_url)
    prod['value'] = [p_name]
    dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(prod)

    #Kind of data
    kind = super()._typeclass('kindOfData', True, 'primitive')
    kind['value'] = 'Linguistic data'

    #Series
    series = super()._typeclass('series', False, 'compound')
    s_name = super()._convert_generic(inJson={'seriesName': 'LDC'},
                                      dryField='seriesName',
                                      dvField='seriesName')
    s_info = super()._convert_generic(inJson={'seriesInformation':
                                              'Linguistic Data Consortium'},
                                      dryField='seriesInformation',
                                      dvField='seriesInformation')
    s_name.update(s_info)
    series['value'] = s_name #not a list
    dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)

    #Data sources
    series = super()._typeclass('dataSources', True, 'primitive')
    data_sources = ldc['Data Source(s)'].split(',')
    data_sources = [x.strip().capitalize() for x in data_sources]
    series['value'] = data_sources
    dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)

    #Fix keyword labels that are hardcoded for Dryad
    #There should be only one keyword block
    keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']['metadataBlocks']
                                                  ['citation']['fields'])
                                                    if y.get('typeName') == 'keyword'][0]
    key_pos = [x for x, y in enumerate(keyword_field[1]['value'])
               if y['keywordVocabulary']['value'] == 'Dryad'][0]
    dvjson['datasetVersion']['metadataBlocks']['citation']\
            ['fields'][keyword_field[0]]['value'][key_pos]\
            ['keywordVocabulary']['value'] = 'Linguistic Data Consortium'

    #The first keyword field is hardcoded in by dryad2dataverse.serializer
    #So I think it needs to be deleted
    keyword_field = [(x, y) for x, y in
                     enumerate(dvjson['datasetVersion']['metadataBlocks']['citation']['fields'])
                     if y.get('typeName') == 'otherId'][0] #ibid
    del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]

    #Notes
    note_index = Ldc.find_block_index(dvjson, 'notesText')
    if note_index:
        dvjson['datasetVersion']['metadataBlocks']['citation']\
            ['fields'][note_index]['value'] = self._make_note()
    else:
        notes = super()._typeclass('notesText', False, 'primitive')
        notes['value'] = self._make_note()
        dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(notes)

    #Deletes unused "publication" fields: rewrite to make it a function call.
    keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']
                                                  ['metadataBlocks']['citation']['fields'])
                     if y.get('typeName') == 'publication'][0] #ibid
    del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]

    #And now the licence:
    dvjson['datasetVersion']['license'] = LIC_NAME
    dvjson['datasetVersion']['termsOfUse'] = LICENCE
    return dvjson

make_ldc_json()

Returns a dict with keys created from an LDC catalogue web page.

Source code in src/dataverse_utils/ldc.py
def make_ldc_json(self):
    '''
    Returns a dict with keys created from an LDC catalogue web
    page.
    '''
    if not self.ldcHtml:
        self.fetch_record()
    soup = bs(self.ldcHtml, 'html.parser')
    #Should data just look in the *first* table? Specifically tbody?
    #Is it always the first? I assume yes.
    tbody = soup.find('tbody')#new
    data = [x.text.strip() for x in tbody.find_all('td')]#new
    #data = [x.text.strip() for x in soup.find_all('td')]#original
    LDC_dict = {data[x][:data[x].find('\n')].strip(): data[x+1].strip()
                for x in range(0, len(data), 2)}
    #Related Works appears to have an extra 'Hide' at the end
    if LDC_dict.get('Related Works:'):
        LDC_dict['Related Works'] = (x.strip() for x in LDC_dict['Related Works:'].split('\n'))
        del LDC_dict['Related Works:'] #remove the renamed key
    LDC_dict['Linguistic Data Consortium'] = LDC_dict['LDC Catalog No.']
    del LDC_dict['LDC Catalog No.']#This key must be renamed for consistency
    LDC_dict['Author(s)'] = [x.strip() for x in LDC_dict['Author(s)'].split(',')]
    #Other metadata probably has HTML in it, so we keep as much as possible
    other_meta = soup.find_all('div')
    alldesc = [x for x in other_meta if x.attrs.get('itemprop') == 'description']
    #sometimes they format pages oddly and we can use this for a
    #quick and dirty fix
    self.__fixdesc = copy.deepcopy(alldesc)
    #sections use h3, so split on these
    #24 Jan 23 Apparently, this is all done manually so some of them sometime use h4.
    #Because reasons.
    #was:
    #alldesc = str(alldesc).split('<h3>')
    #is now
    alldesc = str(alldesc).replace('h4>', 'h3>').split('<h3>')
    for i in range(1, len(alldesc)):
        alldesc[i] = '<h3>' + alldesc[i]
    #first one is not actually useful, so discard it
    alldesc.pop(0)


    #So far, so good. At this point the relative links need fixing
    #and tables need to be converted to pre.
    for desc in alldesc:
        #It's already strings; replace relative links first
        desc = desc.replace('../../../', 'https://catalog.ldc.upenn.edu/')
        subsoup = bs(desc, 'html.parser')
        key = subsoup.h3.text.strip()
        #don't need the h3 tags anymore
        subsoup.find('h3').extract()
        # Convert tables to <pre>
        for tab in subsoup.find_all('table'):
            content = str(tab)
            #convert to markdown
            content = markdownify.markdownify(content)
            tab.name = 'pre'
            tab.string = content #There is not much documentation on the
                                 #difference between tab.string and tab.content
        #That was relatively easy
        LDC_dict[key] = str(subsoup)
    LDC_dict['Introduction'] = LDC_dict.get('Introduction',
                                            self.__no_intro())
    #LDC puts http in front of their DOI identifier
    if LDC_dict.get('DOI'):
        LDC_dict['DOI'] = LDC_dict['DOI'].strip('https://doi.org/')
    return LDC_dict

name_parser(name) staticmethod

Returns lastName/firstName JSON snippet from a name

Parameters:
  • name (str) –

    A name

Notes

Can’t be 100% accurate, because names can be split in many ways. However, as they say, 80% is good enough.

Source code in src/dataverse_utils/ldc.py
@staticmethod
def name_parser(name):
    '''
    Returns lastName/firstName JSON snippet from a name

    Parameters
    ----------
    name : str
        A name

    Notes
    -----
    Can't be 100% accurate, because names can be split in many ways. However, as they
    say, 80% is good enough.
    '''
    names = name.split(' ')
    return {'lastName': names[-1], 'firstName': ' '.join(names[:-1]), 'affiliation':''}

upload_metadata(**kwargs)

Uploads metadata to dataverse. Returns json from connection attempt.

Parameters:
  • **kwargs (dict, default: {} ) –

    Parameters

  • url (str) –

    base url to Dataverse installation

  • key (str) –

    api key

  • dv (str) –

    Dataverse to which it is being uploaded

Source code in src/dataverse_utils/ldc.py
def upload_metadata(self, **kwargs) -> dict:
    '''
    Uploads metadata to dataverse. Returns json from
    connection attempt.

    Parameters
    ----------
    **kwargs : dict
        Parameters

    Other parameters
    ----------------
    url : str
        base url to Dataverse installation

    key : str
        api key

    dv : str
        Dataverse to which it is being uploaded
    '''
    url = kwargs['url'].strip('/')
    key = kwargs['key']
    dv = kwargs['dv']
    json = kwargs.get('json', self.dvJson)
    headers = {'X-Dataverse-key':key}
    headers.update(UAHEADER)
    try:
        upload = self.session.post(f'{url}/api/dataverses/{dv}/datasets',
                                   headers=headers,
                                   json=json)
        upload.raise_for_status()
        return upload.json()
    except (requests.exceptions.HTTPError,
            requests.exceptions.ConnectionError):
        print(upload.text)
        raise

dataverse_utils.collections

Utilities for recursively analysing a Dataverse collection.

DvCollection

Metadata for an entire dataverse collection, recursively.

Source code in src/dataverse_utils/collections.py
class DvCollection:
    '''
    Metadata for an *entire* dataverse collection, recursively.
    '''
    #pylint: disable=too-many-instance-attributes
    def __init__(self, url:str, coll:str, key=None, **kwargs):
        '''
        All you need to start recursively crawling.

        Parameters
        ----------
        coll : str
            short collection name or id number
        url : str
            base URL of Dataverse collection.
            eg: https://borealisdata.ca
                borealisdata.ca
        key : str
            API key (optional, only use if you want to see hidden material)

        **kwargs: dict
            Other parameters

        Other parameters
        ----------------
        timeout : int
            retry timeout in seconds
        '''
        self.coll = coll
        self.url = self.__clean_url(url)
        self.headers = None
        self.__key = key
        if self.__key:
            self.headers = {'X-Dataverse-key': self.__key}
            self.headers.update(UAHEADER)
        else:
            self.headers = UAHEADER.copy()
        if not kwargs.get('retry'):
            self.retry_strategy = RETRY
        else:
            self.retry_strategy = kwargs['retry']
        self.session = requests.Session()
        self.session.mount('https://',
                           requests.adapters.HTTPAdapter(max_retries=self.retry_strategy))
        self.collections = None
        self.studies = None

    def __clean_url(self, badurl:str):
        '''
        Sanitize URL, return properly formatted HTTP string.

        Parameters
        ----------
        badurl: str
            URL string

        '''
        clean = badurl.strip().strip('/')
        if not clean.startswith('https://'):
            clean = f'https://{clean}'
        return clean

    def __get_shortname(self, dvid):
        '''
        Get collection short name.
        '''
        shortname = self.session.get(f'{self.url}/api/dataverses/{dvid}', headers=self.headers)
        shortname.raise_for_status()
        return shortname.json()['data']['alias']

    def get_collections(self, coll:str=None, output=None, **kwargs)->list:#pylint: disable=unused-argument
        '''
        Get a [recursive] listing of all dataverses in a collection.

        Parameters
        ----------
        coll : str, optional, default=None
            Collection short name or id
        output : list, optional, default=[]
            output list to append to
        **kwargs : dict
            Other keyword arguments

        '''
        if not output:
            output = []
        if not coll:
            coll = self.coll
        x = self.session.get(f'{self.url}/api/dataverses/{coll}/contents',
                                 headers=self.headers)
        data = x.json().get('data')
        #---
        #Because it's possible that permissions errors can cause API read errors,
        #we have this insane way of checking errors.
        #I have no idea what kind of errors would be raised, so it has
        #a bare except, which is bad. But what can you do?
        dvs =[]
        for _ in data:
            if _['type'] == 'dataverse':
                try:
                    out=self.__get_shortname(_['id'])
                    dvs.append((_['title'], out))
                except Exception as e:

                    obscure_error = f'''
                                        An error has occured where a collection can be
                                        identified by ID but its name cannot be determined.
                                        This is (normally) caused by a configuration error where
                                        administrator permissions are not correctly inherited by
                                        the child collection.

                                        Please check with the system administrator to determine
                                        any exact issues.

                                        Problematic collection id number: {_.get("id",
                                        "not available")}'''
                    print(50*'-')
                    print(textwrap.dedent(obscure_error))
                    print(e)
                    LOGGER.error(textwrap.fill(textwrap.dedent(obscure_error).strip()))
                    traceback.print_exc()
                    print(50*'-')
                    raise e
        #---
        if not dvs:
            dvs = []
        output.extend(dvs)
        for dv in dvs:
            LOGGER.debug('%s/api/dataverses/%s/contents', self.url, dv[1])
            LOGGER.debug('recursive')
            self.get_collections(dv[1], output)
        self.collections = output
        return output

    def get_studies(self, root:str=None):
        '''
        return [(pid, title)..(pid_n, title_n)] of a collection.

        Parameters
        ----------
        root : str
            Short name or id of *top* level of tree. Default self.coll
        '''
        all_studies = []
        if not root:
            root=self.coll
        all_studies = self.get_collection_listing(root)
        #collections = self.get_collections(root, self.url)
        collections = self.get_collections(root)
        for collection in collections:
            all_studies.extend(self.get_collection_listing(collection[1]))
        self.studies = all_studies
        return all_studies

    def get_collection_listing(self, coll_id):
        '''
        Return a listing of studies in a collection, with pid.

        Parameters
        ----------
        coll_id : str
            Short name or id of a dataverse collection
        '''
        cl = self.session.get(f'{self.url}/api/dataverses/{coll_id}/contents',
                                  headers=self.headers)
        cl.raise_for_status()
        pids = [f"{z['protocol']}:{z['authority']}/{z['identifier']}"
                for z in cl.json()['data'] if z['type'] == 'dataset']
        out = [(self.get_study_info(pid), pid) for pid in pids]
        for _ in out:
            _[0].update({'pid': _[1]})
        return [x[0] for x in out]

    def get_study_info(self, pid):
        '''
        Returns a StudyMetadata object with complete metadata for a study.

        Parameters
        ----------
        pid : str
            Persistent ID of a Dataverse study
        '''
        meta = self.session.get(f'{self.url}/api/datasets/:persistentId',
                            params={'persistentId': pid},
                            headers=self.headers)
        meta.raise_for_status()
        LOGGER.debug(pid)
        return StudyMetadata(study_meta=meta.json(), key=self.__key, url=self.url)

__clean_url(badurl)

Sanitize URL, return properly formatted HTTP string.

Parameters:
  • badurl (str) –

    URL string

Source code in src/dataverse_utils/collections.py
def __clean_url(self, badurl:str):
    '''
    Sanitize URL, return properly formatted HTTP string.

    Parameters
    ----------
    badurl: str
        URL string

    '''
    clean = badurl.strip().strip('/')
    if not clean.startswith('https://'):
        clean = f'https://{clean}'
    return clean

__get_shortname(dvid)

Get collection short name.

Source code in src/dataverse_utils/collections.py
def __get_shortname(self, dvid):
    '''
    Get collection short name.
    '''
    shortname = self.session.get(f'{self.url}/api/dataverses/{dvid}', headers=self.headers)
    shortname.raise_for_status()
    return shortname.json()['data']['alias']

__init__(url, coll, key=None, **kwargs)

All you need to start recursively crawling.

Parameters:
  • coll (str) –

    short collection name or id number

  • url (str) –

    base URL of Dataverse collection. eg: https://borealisdata.ca borealisdata.ca

  • key (str, default: None ) –

    API key (optional, only use if you want to see hidden material)

  • **kwargs

    Other parameters

  • timeout (int) –

    retry timeout in seconds

Source code in src/dataverse_utils/collections.py
def __init__(self, url:str, coll:str, key=None, **kwargs):
    '''
    All you need to start recursively crawling.

    Parameters
    ----------
    coll : str
        short collection name or id number
    url : str
        base URL of Dataverse collection.
        eg: https://borealisdata.ca
            borealisdata.ca
    key : str
        API key (optional, only use if you want to see hidden material)

    **kwargs: dict
        Other parameters

    Other parameters
    ----------------
    timeout : int
        retry timeout in seconds
    '''
    self.coll = coll
    self.url = self.__clean_url(url)
    self.headers = None
    self.__key = key
    if self.__key:
        self.headers = {'X-Dataverse-key': self.__key}
        self.headers.update(UAHEADER)
    else:
        self.headers = UAHEADER.copy()
    if not kwargs.get('retry'):
        self.retry_strategy = RETRY
    else:
        self.retry_strategy = kwargs['retry']
    self.session = requests.Session()
    self.session.mount('https://',
                       requests.adapters.HTTPAdapter(max_retries=self.retry_strategy))
    self.collections = None
    self.studies = None

get_collection_listing(coll_id)

Return a listing of studies in a collection, with pid.

Parameters:
  • coll_id (str) –

    Short name or id of a dataverse collection

Source code in src/dataverse_utils/collections.py
def get_collection_listing(self, coll_id):
    '''
    Return a listing of studies in a collection, with pid.

    Parameters
    ----------
    coll_id : str
        Short name or id of a dataverse collection
    '''
    cl = self.session.get(f'{self.url}/api/dataverses/{coll_id}/contents',
                              headers=self.headers)
    cl.raise_for_status()
    pids = [f"{z['protocol']}:{z['authority']}/{z['identifier']}"
            for z in cl.json()['data'] if z['type'] == 'dataset']
    out = [(self.get_study_info(pid), pid) for pid in pids]
    for _ in out:
        _[0].update({'pid': _[1]})
    return [x[0] for x in out]

get_collections(coll=None, output=None, **kwargs)

Get a [recursive] listing of all dataverses in a collection.

Parameters:
  • coll (str, default: None ) –

    Collection short name or id

  • output (list, default: [] ) –

    output list to append to

  • **kwargs (dict, default: {} ) –

    Other keyword arguments

Source code in src/dataverse_utils/collections.py
def get_collections(self, coll:str=None, output=None, **kwargs)->list:#pylint: disable=unused-argument
    '''
    Get a [recursive] listing of all dataverses in a collection.

    Parameters
    ----------
    coll : str, optional, default=None
        Collection short name or id
    output : list, optional, default=[]
        output list to append to
    **kwargs : dict
        Other keyword arguments

    '''
    if not output:
        output = []
    if not coll:
        coll = self.coll
    x = self.session.get(f'{self.url}/api/dataverses/{coll}/contents',
                             headers=self.headers)
    data = x.json().get('data')
    #---
    #Because it's possible that permissions errors can cause API read errors,
    #we have this insane way of checking errors.
    #I have no idea what kind of errors would be raised, so it has
    #a bare except, which is bad. But what can you do?
    dvs =[]
    for _ in data:
        if _['type'] == 'dataverse':
            try:
                out=self.__get_shortname(_['id'])
                dvs.append((_['title'], out))
            except Exception as e:

                obscure_error = f'''
                                    An error has occured where a collection can be
                                    identified by ID but its name cannot be determined.
                                    This is (normally) caused by a configuration error where
                                    administrator permissions are not correctly inherited by
                                    the child collection.

                                    Please check with the system administrator to determine
                                    any exact issues.

                                    Problematic collection id number: {_.get("id",
                                    "not available")}'''
                print(50*'-')
                print(textwrap.dedent(obscure_error))
                print(e)
                LOGGER.error(textwrap.fill(textwrap.dedent(obscure_error).strip()))
                traceback.print_exc()
                print(50*'-')
                raise e
    #---
    if not dvs:
        dvs = []
    output.extend(dvs)
    for dv in dvs:
        LOGGER.debug('%s/api/dataverses/%s/contents', self.url, dv[1])
        LOGGER.debug('recursive')
        self.get_collections(dv[1], output)
    self.collections = output
    return output

get_studies(root=None)

return [(pid, title)..(pid_n, title_n)] of a collection.

Parameters:
  • root (str, default: None ) –

    Short name or id of top level of tree. Default self.coll

Source code in src/dataverse_utils/collections.py
def get_studies(self, root:str=None):
    '''
    return [(pid, title)..(pid_n, title_n)] of a collection.

    Parameters
    ----------
    root : str
        Short name or id of *top* level of tree. Default self.coll
    '''
    all_studies = []
    if not root:
        root=self.coll
    all_studies = self.get_collection_listing(root)
    #collections = self.get_collections(root, self.url)
    collections = self.get_collections(root)
    for collection in collections:
        all_studies.extend(self.get_collection_listing(collection[1]))
    self.studies = all_studies
    return all_studies

get_study_info(pid)

Returns a StudyMetadata object with complete metadata for a study.

Parameters:
  • pid (str) –

    Persistent ID of a Dataverse study

Source code in src/dataverse_utils/collections.py
def get_study_info(self, pid):
    '''
    Returns a StudyMetadata object with complete metadata for a study.

    Parameters
    ----------
    pid : str
        Persistent ID of a Dataverse study
    '''
    meta = self.session.get(f'{self.url}/api/datasets/:persistentId',
                        params={'persistentId': pid},
                        headers=self.headers)
    meta.raise_for_status()
    LOGGER.debug(pid)
    return StudyMetadata(study_meta=meta.json(), key=self.__key, url=self.url)

FileAnalysis

Bases: dict

Download and analyze a file from a dataverse installation and produce useful metadata.

Source code in src/dataverse_utils/collections.py
class FileAnalysis(dict):
    '''
    Download and analyze a file from a dataverse installation and
    produce useful metadata.
    '''

    def __init__(self, **kwargs):
        '''
        Intialize the object.

        Parameters
        ----------
        **kwargs : dict
            Keyword arguments

        Other parameters
        ----------------
        local : str
            Path to local file

        url : str
            URL of Dataverse instance

        key : str
            API key for downloading

        fid : int
            Integer file id

        pid : str
            Persistent ID of file

        filename : str
            File name (original)

        filesize_bytes : int
            File size in bytes

        Notes
        -----
        Either `local` must be supplied, or `url`, `key` and at least one of
        `fid` or `pid` must be supplied

        '''

        #self.url = self.__clean_url(url)
        self.headers = UAHEADER.copy()
        self.kwargs = kwargs
        if self.kwargs.get('key'):
            self.headers.update({'X-Dataverse-key':self.kwargs['key']})
        self.local = None
        if not self.__sufficient:
            err = ('Insufficient required arguments. '
                   'Include (url, key, '
                   '(pid or id)) or (local) keyword parameters.')
            raise TypeError(err)
        self.tempfile = None
        self.session = requests.Session()
        self.session.mount('https://',
                           requests.adapters.HTTPAdapter(max_retries=RETRY))
        self.checkable = {'.sav': self.stat_file_metadata,
                          '.sas7bdat': self.stat_file_metadata,
                          '.dta': self.stat_file_metadata,
                          '.csv': self.generic_metadata,
                          '.tsv': self.generic_metadata,
                          '.rdata': self.generic_metadata,
                          '.rda': self.generic_metadata}
        self.filename = None #get it later
        self.enhance()

    def __del__(self):
        '''
        Cleanup old temporary files on object deletion.
        '''
        self.session.close()
        del self.tempfile

    def __sufficient(self)->bool:
        '''
        Checks if sufficient information is supplied for intialization, with
        local files taking preference over remote.
        '''
        if self.kwargs.get('local'):
            return True
        if (self.kwargs['url'] and self.kwargs['key']
           and (self.kwargs.get('pid') or self.kwargs.get('id'))):
            return True
        return False

    def __clean_url(self, badurl:str)->str:
        '''
        Sanitize URL. Ensures ssl and no trailing slash.

        Parameters
        ----------
        badurl: str
            URL
        '''
        clean = badurl.strip().strip('/')
        if not clean.startswith('https://'):
            clean = f'https://{clean}'
        return clean

    def __get_filename(self, head:dict)->typing.Union[str, None]:
        '''
        Determines whether or not this is a file that should be
        downloaded for further checking.

        Parameters
        ----------
        head : dict
            Header from GET request

        Returns
        -------
        True if extended metadata can be obtained
        '''
        fname = head.get('content-type')
        if fname:
            if 'name=' in fname:
                start = head['content-type'].find('name=')+5
                end = head['content-type'].find(';', start)
                if end != -1:
                    fname = head['content-type'][start:end].strip('"')
                else:
                    fname = head['content-type'][start:].strip('"')
        fname = self.kwargs.get('filename', fname)
        return fname

    @property
    def __whichfile(self):
        '''
        Returns the location of the path being analyzed.
        '''
        return self.tempfile.name if self.tempfile else self.local

    def __check(self):
        '''
        Determines if this is one of the filetypes which supports extra metadata.
        '''
        if pathlib.Path(self.filename).suffix.lower() in self.checkable:
            return True
        return False

    def download(self, block_size:int=1024, force=False, local=None)-> None:
        '''
        Download the file to a temporary location for analysis.

        --------------------
        block_size : int
            Streaming block size
        force : bool
            Download even if not a file that is checkable
        local : str
            Path to local file
        '''
        # pylint: disable=consider-using-with
        self.tempfile = tempfile.NamedTemporaryFile(delete=True,
                                                    delete_on_close=False)
        if local:
            self.local = local
            self.filename = local
            self.tempfile.close()
            del self.tempfile #to erase it
            self.tempfile = None
            return
        start = datetime.datetime.now()
        params = {'format':'original'}
        url = self.__clean_url(self.kwargs['url'])
        if self.kwargs.get('pid'):
            params.update({'persistentId':self.kwargs['pid']})
            data = self.session.get(f'{url}/api/access/datafile/:persistentId',
                                    headers=self.headers,
                                    params=params,
                                    stream=True)
        else:
            data = self.session.get(f'{url}/api/access/datafile/{self.kwargs["id"]}',
                                    headers=self.headers,
                                    params=params,
                                    stream=True)
        data.raise_for_status()
        finish = datetime.datetime.now()
        self.filename = self.__get_filename(data.headers)
        LOGGER.info('Downloaded header for %s. Elapsed time: %s',
                    self.filename, finish-start)
        if self.__check() or force:
            filesize = self.kwargs.get('filesize_bytes',
                                       data.headers.get('content-length', 9e9))
            filesize = int(filesize) # comes out as string from header
            with tqdm.tqdm(total=filesize, unit='B', unit_scale=True, desc=self.filename) as t:
                for _ in data.iter_content(block_size):
                    self.tempfile.file.write(_)
                    t.update(len(_))
            self.tempfile.close()

    def enhance(self):
        '''
        Convenience function for downloading and creating extra metadata,
        ie, "enhancing" the metadata. Use this instead of going through the
        steps manually.
        '''
        self.download(local=self.kwargs.get('local'))
        do_it = pathlib.Path(self.filename).suffix.lower()
        if do_it in self.checkable:
            self.checkable[do_it](ext=do_it)

    def stat_file_metadata(self, ext:str)->dict:
        '''
        Produces metadata from SAS, SPSS and Stata files.

        Parameters
        ----------
        ext : str
            File extension of statistical package file. Include the '.'. Eg. '.sav'
        '''
        matcher = {'.sav': pyreadstat.read_sav,
                   '.dta': pyreadstat.read_dta,
                   '.sas7bdat': pyreadstat.read_sas7bdat}
        if not self.filename or ext not in matcher:
            return
        #whichfile = self.tempfile.name if self.tempfile else self.local
        statdata, meta = matcher[ext](self.__whichfile)
        outmeta = {}
        outmeta['variables'] = {_:{} for _ in meta.column_names_to_labels}

        for k, v in meta.column_names_to_labels.items():
            outmeta['variables'][k]['Variable label'] = v
        for k, v in meta.original_variable_types.items():
            outmeta['variables'][k]['Variable type'] = v
        for k, v in meta.variable_to_label.items():
            outmeta['variables'][k]['Value labels'] = meta.value_labels.get(v, '')
        outmeta['encoding'] = meta.file_encoding
        for dt in statdata.columns:
            desc = {k:str(v) for k, v in dict(statdata[dt].describe()).items()}
            outmeta['variables'][dt].update(desc)
        self.update(outmeta)
        return


    def generic_metadata(self, ext)->None:
        '''
        Make metadata for a [ct]sv file and RData. Updates
        self.

        Parameters
        ----------
        ext : str
            extension ('.csv' or '.tsv')
        '''
        #if ext == '.tsv':
        #    data = pd.read_csv(self.__whichfile, sep='\t')
        #else:
        #    data = pd.read_csv(self.__whichfile)

        lookuptable ={'.tsv': {'func': pd.read_csv,
                                'kwargs' : {'sep':'\t'}},
                        '.csv': {'func' : pd.read_csv},
                        '.rda': {'func' : pyreadr.read_r},
                       '.rdata':{'func' : pyreadr.read_r}}
        data = lookuptable[ext]['func'](self.__whichfile,
                                              **lookuptable[ext].get('kwargs', {}))
        if ext  in ['.rda', '.rdata']:
            data = data[None] #why pyreadr why
        outmeta = {}
        outmeta['variables'] = {_:{} for _ in data.columns}
        for dt in data.columns:
            outmeta['variables'][dt]['Variable type'] = str(data[dt].dtype)
            # Make something from nothing
            desc = {k:str(v) for k, v in dict(data[dt].describe()).items()}
            outmeta['variables'][dt].update(desc)
        self.update(outmeta)

    @property
    def md(self):
        '''
        Create Markdown text out of a FileAnalysis object.
        '''
        out = io.StringIO()
        indent = '\u00A0' # &nbsp;
        if not self.get('variables'):
            return None
        for k, v in self.items():
            if k != 'variables':
                out.write(f'**{k.capitalize()}** : {v}  \n')
        for k, v in self.get('variables',{}).items():
            out.write(f"**{k}**: {v.get('Variable label', 'Description N/A')}  \n")
            for kk, vv, in v.items():
                if kk == 'Variable label':
                    continue
                if not isinstance(vv, dict):
                    out.write(f'**{kk.capitalize()}**: {vv}  \n')
                else:
                    out.write(f'**{kk.capitalize()}**:  \n')
                    for kkk, vvv in vv.items():
                        #this one only originally
                        out.write(f'{4*indent}{kkk}: {vvv}  \n')
            out.write('\n')

        out.seek(0)
        return out.read()

__whichfile property

Returns the location of the path being analyzed.

md property

Create Markdown text out of a FileAnalysis object.

__check()

Determines if this is one of the filetypes which supports extra metadata.

Source code in src/dataverse_utils/collections.py
def __check(self):
    '''
    Determines if this is one of the filetypes which supports extra metadata.
    '''
    if pathlib.Path(self.filename).suffix.lower() in self.checkable:
        return True
    return False

__clean_url(badurl)

Sanitize URL. Ensures ssl and no trailing slash.

Parameters:
  • badurl (str) –

    URL

Source code in src/dataverse_utils/collections.py
def __clean_url(self, badurl:str)->str:
    '''
    Sanitize URL. Ensures ssl and no trailing slash.

    Parameters
    ----------
    badurl: str
        URL
    '''
    clean = badurl.strip().strip('/')
    if not clean.startswith('https://'):
        clean = f'https://{clean}'
    return clean

__del__()

Cleanup old temporary files on object deletion.

Source code in src/dataverse_utils/collections.py
def __del__(self):
    '''
    Cleanup old temporary files on object deletion.
    '''
    self.session.close()
    del self.tempfile

__get_filename(head)

Determines whether or not this is a file that should be downloaded for further checking.

Parameters:
  • head (dict) –

    Header from GET request

Returns:
  • True if extended metadata can be obtained
Source code in src/dataverse_utils/collections.py
def __get_filename(self, head:dict)->typing.Union[str, None]:
    '''
    Determines whether or not this is a file that should be
    downloaded for further checking.

    Parameters
    ----------
    head : dict
        Header from GET request

    Returns
    -------
    True if extended metadata can be obtained
    '''
    fname = head.get('content-type')
    if fname:
        if 'name=' in fname:
            start = head['content-type'].find('name=')+5
            end = head['content-type'].find(';', start)
            if end != -1:
                fname = head['content-type'][start:end].strip('"')
            else:
                fname = head['content-type'][start:].strip('"')
    fname = self.kwargs.get('filename', fname)
    return fname

__init__(**kwargs)

Intialize the object.

Parameters:
  • **kwargs (dict, default: {} ) –

    Keyword arguments

  • local (str) –

    Path to local file

  • url (str) –

    URL of Dataverse instance

  • key (str) –

    API key for downloading

  • fid (int) –

    Integer file id

  • pid (str) –

    Persistent ID of file

  • filename (str) –

    File name (original)

  • filesize_bytes (int) –

    File size in bytes

Notes

Either local must be supplied, or url, key and at least one of fid or pid must be supplied

Source code in src/dataverse_utils/collections.py
def __init__(self, **kwargs):
    '''
    Intialize the object.

    Parameters
    ----------
    **kwargs : dict
        Keyword arguments

    Other parameters
    ----------------
    local : str
        Path to local file

    url : str
        URL of Dataverse instance

    key : str
        API key for downloading

    fid : int
        Integer file id

    pid : str
        Persistent ID of file

    filename : str
        File name (original)

    filesize_bytes : int
        File size in bytes

    Notes
    -----
    Either `local` must be supplied, or `url`, `key` and at least one of
    `fid` or `pid` must be supplied

    '''

    #self.url = self.__clean_url(url)
    self.headers = UAHEADER.copy()
    self.kwargs = kwargs
    if self.kwargs.get('key'):
        self.headers.update({'X-Dataverse-key':self.kwargs['key']})
    self.local = None
    if not self.__sufficient:
        err = ('Insufficient required arguments. '
               'Include (url, key, '
               '(pid or id)) or (local) keyword parameters.')
        raise TypeError(err)
    self.tempfile = None
    self.session = requests.Session()
    self.session.mount('https://',
                       requests.adapters.HTTPAdapter(max_retries=RETRY))
    self.checkable = {'.sav': self.stat_file_metadata,
                      '.sas7bdat': self.stat_file_metadata,
                      '.dta': self.stat_file_metadata,
                      '.csv': self.generic_metadata,
                      '.tsv': self.generic_metadata,
                      '.rdata': self.generic_metadata,
                      '.rda': self.generic_metadata}
    self.filename = None #get it later
    self.enhance()

__sufficient()

Checks if sufficient information is supplied for intialization, with local files taking preference over remote.

Source code in src/dataverse_utils/collections.py
def __sufficient(self)->bool:
    '''
    Checks if sufficient information is supplied for intialization, with
    local files taking preference over remote.
    '''
    if self.kwargs.get('local'):
        return True
    if (self.kwargs['url'] and self.kwargs['key']
       and (self.kwargs.get('pid') or self.kwargs.get('id'))):
        return True
    return False

download(block_size=1024, force=False, local=None)

Download the file to a temporary location for analysis.


block_size : int Streaming block size force : bool Download even if not a file that is checkable local : str Path to local file

Source code in src/dataverse_utils/collections.py
def download(self, block_size:int=1024, force=False, local=None)-> None:
    '''
    Download the file to a temporary location for analysis.

    --------------------
    block_size : int
        Streaming block size
    force : bool
        Download even if not a file that is checkable
    local : str
        Path to local file
    '''
    # pylint: disable=consider-using-with
    self.tempfile = tempfile.NamedTemporaryFile(delete=True,
                                                delete_on_close=False)
    if local:
        self.local = local
        self.filename = local
        self.tempfile.close()
        del self.tempfile #to erase it
        self.tempfile = None
        return
    start = datetime.datetime.now()
    params = {'format':'original'}
    url = self.__clean_url(self.kwargs['url'])
    if self.kwargs.get('pid'):
        params.update({'persistentId':self.kwargs['pid']})
        data = self.session.get(f'{url}/api/access/datafile/:persistentId',
                                headers=self.headers,
                                params=params,
                                stream=True)
    else:
        data = self.session.get(f'{url}/api/access/datafile/{self.kwargs["id"]}',
                                headers=self.headers,
                                params=params,
                                stream=True)
    data.raise_for_status()
    finish = datetime.datetime.now()
    self.filename = self.__get_filename(data.headers)
    LOGGER.info('Downloaded header for %s. Elapsed time: %s',
                self.filename, finish-start)
    if self.__check() or force:
        filesize = self.kwargs.get('filesize_bytes',
                                   data.headers.get('content-length', 9e9))
        filesize = int(filesize) # comes out as string from header
        with tqdm.tqdm(total=filesize, unit='B', unit_scale=True, desc=self.filename) as t:
            for _ in data.iter_content(block_size):
                self.tempfile.file.write(_)
                t.update(len(_))
        self.tempfile.close()

enhance()

Convenience function for downloading and creating extra metadata, ie, “enhancing” the metadata. Use this instead of going through the steps manually.

Source code in src/dataverse_utils/collections.py
def enhance(self):
    '''
    Convenience function for downloading and creating extra metadata,
    ie, "enhancing" the metadata. Use this instead of going through the
    steps manually.
    '''
    self.download(local=self.kwargs.get('local'))
    do_it = pathlib.Path(self.filename).suffix.lower()
    if do_it in self.checkable:
        self.checkable[do_it](ext=do_it)

generic_metadata(ext)

Make metadata for a [ct]sv file and RData. Updates self.

Parameters:
  • ext (str) –

    extension (‘.csv’ or ‘.tsv’)

Source code in src/dataverse_utils/collections.py
def generic_metadata(self, ext)->None:
    '''
    Make metadata for a [ct]sv file and RData. Updates
    self.

    Parameters
    ----------
    ext : str
        extension ('.csv' or '.tsv')
    '''
    #if ext == '.tsv':
    #    data = pd.read_csv(self.__whichfile, sep='\t')
    #else:
    #    data = pd.read_csv(self.__whichfile)

    lookuptable ={'.tsv': {'func': pd.read_csv,
                            'kwargs' : {'sep':'\t'}},
                    '.csv': {'func' : pd.read_csv},
                    '.rda': {'func' : pyreadr.read_r},
                   '.rdata':{'func' : pyreadr.read_r}}
    data = lookuptable[ext]['func'](self.__whichfile,
                                          **lookuptable[ext].get('kwargs', {}))
    if ext  in ['.rda', '.rdata']:
        data = data[None] #why pyreadr why
    outmeta = {}
    outmeta['variables'] = {_:{} for _ in data.columns}
    for dt in data.columns:
        outmeta['variables'][dt]['Variable type'] = str(data[dt].dtype)
        # Make something from nothing
        desc = {k:str(v) for k, v in dict(data[dt].describe()).items()}
        outmeta['variables'][dt].update(desc)
    self.update(outmeta)

stat_file_metadata(ext)

Produces metadata from SAS, SPSS and Stata files.

Parameters:
  • ext (str) –

    File extension of statistical package file. Include the ‘.’. Eg. ‘.sav’

Source code in src/dataverse_utils/collections.py
def stat_file_metadata(self, ext:str)->dict:
    '''
    Produces metadata from SAS, SPSS and Stata files.

    Parameters
    ----------
    ext : str
        File extension of statistical package file. Include the '.'. Eg. '.sav'
    '''
    matcher = {'.sav': pyreadstat.read_sav,
               '.dta': pyreadstat.read_dta,
               '.sas7bdat': pyreadstat.read_sas7bdat}
    if not self.filename or ext not in matcher:
        return
    #whichfile = self.tempfile.name if self.tempfile else self.local
    statdata, meta = matcher[ext](self.__whichfile)
    outmeta = {}
    outmeta['variables'] = {_:{} for _ in meta.column_names_to_labels}

    for k, v in meta.column_names_to_labels.items():
        outmeta['variables'][k]['Variable label'] = v
    for k, v in meta.original_variable_types.items():
        outmeta['variables'][k]['Variable type'] = v
    for k, v in meta.variable_to_label.items():
        outmeta['variables'][k]['Value labels'] = meta.value_labels.get(v, '')
    outmeta['encoding'] = meta.file_encoding
    for dt in statdata.columns:
        desc = {k:str(v) for k, v in dict(statdata[dt].describe()).items()}
        outmeta['variables'][dt].update(desc)
    self.update(outmeta)
    return

MetadataError

Bases: Exception

MetadataError

Source code in src/dataverse_utils/collections.py
class MetadataError(Exception):
    '''
    MetadataError
    '''

ReadmeCreator

Make formatted README documents out of a StudyMetadata object.

Source code in src/dataverse_utils/collections.py
class ReadmeCreator:
    '''
    Make  formatted README documents out of a StudyMetadata object.
    '''
    def __init__(self, study_metadata_obj: StudyMetadata, **kwargs):
        '''
        Send in StudyMetadata dict to create a nicely formatted README document

        Parameters
        ----------
        study_metadata_obj : StudyMetadata
            A study metadata object

        **kwargs : dict
            Keyword arguments

        Other parameters
        ----------------
        url : str
            The base URL for a Dataverse instance

        pid : typing.Union[str, int]
            The persistent identifier of a file or a file id

        key : str
            A valid API key for performing operations on Dataverse studies

        local : str
            Path to the top level directory which holds study files.
            If present, the Readme creator will try to create extended data from
            local files instead of downloading.

        Notes
        -----
        Either `local` must be supplied, or `url`, `pid` and `key` must supplied
        '''
        self.meta = study_metadata_obj
        self.kwargs = kwargs

        warnings.filterwarnings('ignore', category=bs4.MarkupResemblesLocatorWarning)
        #These values are the first part of the keys that need
        #concatenation to make them more legible.
        self.concat = ['author', 'datasetContact','otherId', 'keyword', 'topic', 'publication',
                       'producer', 'production', 'distributor', 'series', 'software',
                       'dsDescription', 'grant', 'contributor']

    def __html_to_md(self, inval:str)->str:
        '''
        Convert any HTML to markdown, or as much as possible.

        Parameters
        ----------
        inval : str
            HTML string to convert
        '''
        if isinstance(inval, str):
            #markdownify kwargs are here:
            #https://github.com/matthewwithanm/python-markdownify
            return markdownify.markdownify(inval)
        return str(inval)

    def make_md_heads(self, inkey:str)->str:
        '''
        Make markdown H2 headings for selected sections, currently title, description,
        licence and terms of use.

        Parameters
        ----------
        inkey : str
            Section heading
        '''
        section_heads = {'Title':'## ',
                        'Description':'**Description**\n\n',
                        'Licence': '### Licence\n\n',
                        'Terms of Use': '### Terms of Use\n\n'}
        if inkey in section_heads:
            return section_heads[inkey]
        multi = [self.rename_field(_) for _ in  self.concat]
        if inkey in multi:
            if inkey not in ['Series', 'Software', 'Production']:
                return f'{inkey}(s):  \n'
            return f'{inkey}:  \n'
        return f'{inkey}: '

    @property
    def file_metadata_md(self)->str:
        '''
        Produce pretty markdown for file metadata. Outputs
        markdown text string.
        '''
        fmeta = []
        for fil in self.meta.files:
            fileout = {}
            fileout['File'] = fil['filename']
            for k, v in fil.items():
                fileout[k.capitalize().replace('_',' ').replace('Pid', 'Persistent Identifier')] = v
            fileout['Message digest'] = f'{fileout["Chk type"]}: {fileout["Chk digest"]}'
            for rem in ['Chk type', 'Chk digest', 'Id', 'Has tab file', 'Study pid',
                        'File label', 'Filename']:
                del fileout[rem]
            #not everyone has a pid for the file
            if not fileout.get('Persistent Identifier'):
                del fileout['Persistent Identifier']
            # Should I only have remote material here? What about
            # local files?
            if self.kwargs.get('local'):
                #TODO, if local
                fpath = pathlib.Path(self.kwargs['local'])
                #And from here you have to walk the tree to get the file in fil['filename']
                #One day I will do this
            elif self.meta.kwargs.get('url'): # Should this be optional? ie,
                                              # and self.kwargs.get('download') or summat
                d_dict = FileAnalysis(url=self.meta.kwargs['url'],
                                      key=self.meta.kwargs.get('key'),
                                      **fil).md
                #I test here
                #d_dict = FileAnalysis(local='tmp/eics_2023_pumf_v1.sav').md
                if d_dict:
                    fileout['Data Dictionary'] = d_dict

            fmeta.append(fileout)
        #----- original
        #outtmp = []
        #for li in fmeta:
        #    outtmp.append('  \n'.join(f'{k}: {v}' for k, v in li.items()))
        #return '\n\n'.join(outtmp)
        #-------
        outtmp = []
        for li in fmeta:
            o2 = []
            for k, v in li.items():
                if k == 'Data Dictionary':
                    o2.append(f'### {k} for {li["File"]}  \n{v}')
                else:
                    o2.append(f'{k}: {v}')
            outtmp.append('  \n'.join(o2))
        outtmp = '\n\n'.join(outtmp)
        return outtmp

    @property
    def readme_md(self)->str:
        '''
        Generate a Markdown text string (ie, the entire README) for entire an
        entire StudyMetadata object.
        '''
        metatmp = self.meta.copy()
        neworder = self.reorder_fields(metatmp)
        addme = self.concatenator(metatmp)
        metatmp.update(addme)
        out = {_:None for _ in neworder} # A new dictionary with the correct order
        for k, v in metatmp.items():
            out[k]=v
        #Now remove keys that should be gone
        for rem in self.concat:
            out = {k:v for k,v in out.items()
                       if not (k.startswith(rem) and len(k) > len(rem))}
        fout = {self.rename_field(k): self.__fix_relation_type(self.__html_to_md(v))
                for k, v in out.items()}
        #cludgy geometry hack is best hack
        if self.bbox():
            fout.update(self.bbox())
            delme = [_ for _ in fout if _.endswith('tude')]
            for _ in delme:
                del fout[_]

        outstr =  '\n\n'.join(f'{self.make_md_heads(k)}{v}' for k, v in fout.items())
        outstr += '\n\n## File information\n\n'
        outstr += self.file_metadata_md

        return outstr

    def bbox(self)->dict:
        '''
        Produce sane bounding boxes from Dataverse metadata.
        Note that older versions of Dataverse used North and South *longitude*.

        Outputs a dict with bounding boxes contcatenated into a single line
        with each coordinate suffixed by its direction (eg: '42.97 E'), with coordinates
        separated by commas and multiple boxes separated by semi-colons.
        '''
        #Yes, northLongitude, etc. Blame Harvard.
        bbox_order =['westLongitude',
                     'southLongitude',
                     'southLatitude',
                     'eastLongitude',
                     'northLongitude',
                     'northLatitude']

        geog_me = {_: self.meta[_].split(';')
                   for _ in bbox_order if self.meta.get(_)}# Checking for existence causes problems
        if not geog_me: #Sometimes there is no bounding box
            return {}
        bbox = {k: [f'{v} {k[0].capitalize()}'.strip()
                  for v in geog_me[k]] for k in bbox_order if geog_me.get(k)}
        boxes =  self.max_zip(*bbox.values())
        boxes = [', '.join(_) for _ in boxes]
        boxes = [f'({_})' for _ in boxes]
        return {'Bounding box(es)': '; '.join(boxes)}

    def __fix_relation_type(self, badstr:str)->str:
        '''
        For some reason, Dataverse puts camelCase values in the 'values' field
        for publication relation. This will make it more readable.

        Parameters
        ----------
        badstr : str
            Input string; problematic values will be fixed, all others returned as-is.
        '''
        fixthese = ['IsCitedBy', 'IsSupplementTo', 'IsSupplementedBy', 'IsReferencedBy']
        for val in fixthese:
            badstr=badstr.replace(val, self.rename_field(val))
        return badstr

    def reorder_fields(self, indict:dict)->list:
        '''
        Create a list which contains a list of keys in the right (corrected) order.
        This ensures that concatenated fields are inserted into the right place
        and not at the end of the dictionary, keeping the structure
        of Dataverse metadata intact while concatenating values that need
        combining.

        Parameters
        ----------
        indict : dict
            Metadata dictionary
        '''
        fieldlist = list(indict)
        for val in self.concat:
            pts = [n for n, x in enumerate(fieldlist) if x.startswith(val)]
            if pts:
                ins_point = min(pts)
                fieldlist.insert(ins_point, val)
        #Geography fields are a special case yay.
        #westLongitude is the fist one
        if 'westLongitude' in fieldlist:
            ins_here = fieldlist.index('westLongitude')
            fieldlist.insert(ins_here, 'Bounding box(es)')
        return fieldlist

    def rename_field(self, instr:str)->str:
        '''
        Split and capitalize camelCase fields as required.
        eg: keywordValue -> Keyword Value
        eg: termsOfUse -> Terms of Use

        Parameters
        ----------
        instr : str
            Camel case tring to split into words and capitalize.
        '''
        noncap = ['A', 'Of', 'The']

        wordsp = ''.join(map(lambda x: x if x not in string.ascii_uppercase
                             else f' {x}', list(instr)))
        wordsp = wordsp.split(' ')
        #wordsp[0] = wordsp[0].capitalize()
        #wordsp = ' '.join(map(lambda x: x if x not in noncap else x.lower(), wordsp))
        wordsp = list(map(lambda x: x if x not in noncap else x.lower(), wordsp))
        wordsp[0] = wordsp[0].capitalize()
        wordsp = ' '.join(wordsp)
        #because they can't even use camelCaseConsistently
        #Also pluralization of concatenated fields
        fixthese ={'U R L': 'URL',
                   'U R I': 'URI',
                   'I D':
                   'ID',
                   'Ds': '',
                   'Country':'Country(ies)',
                   'State':'State(s)',
                   'City':'City(ies)',
                   'Geographic Unit':'Geographic unit(s)'}
        for k, v in fixthese.items():
            wordsp = wordsp.replace(k, v)
        return wordsp.strip()

    def concatenator(self, meta:dict)->dict:
        '''
        Produce a concatenated dictionary with the key being just the prefix.
        For fields like author[whatever], etc, where there are multiple
        *components* of similar metadata held in completely separated
        fields.

        Parameters
        ----------
        meta : dict
            Input metadata
        '''
        #The keys are the first part of the fields that need concatenation
        concat = {_:[] for _ in self.concat}

        for k, v in meta.items():
            for fk in concat:
                if k.startswith(fk):
                    if v:
                        if concat[fk]:
                            concat[fk].append(v.split(';'))
                        else:
                            concat[fk] = [v.split(';')]


        outdict = {}
        for ke, va in concat.items():
            if va:
                interim = self.max_zip(*va)
                interim = [' - '.join([y.strip() for y in _ if y]) for _ in interim ]
                #interim = '; '.join(interim) # Should it be newline?
                #interim = '  \n'.join(interim) # Should it be newline?
                interim= '<br/>'.join(interim)# Markdownify strips internal spaces
                #if ke.startswith('keyw'):
                outdict[ke] = interim
        return outdict

    def max_zip(self, *args):
        '''
        Like built-in zip, only uses the *maximum* length and appends None if not found
        instead of stopping at the shortest iterable.

        Parameters
        ----------
        *args : iterable
            Any iterable
        '''
        length = max(map(len, args))
        outlist=[]
        for n in range(length):
            vals = []
            for arg in args:
                try:
                    vals.append(arg[n])
                except IndexError:
                    vals.append(None)
            outlist.append(vals)
        return outlist

    def write_pdf(self, dest:str)->None:
        '''
        Make the PDF of a README and save it to a file.

        Parameters
        ----------
        dest : str
            Destination of file, optionally including path.
            eg: /Users/foo/study/README.pdf or
            ~/tmp/README_I_AM_METADATA.pdf
        '''
        dest = pathlib.Path(dest).expanduser().absolute()
        output = markdown_pdf.MarkdownPdf(toc_level=1)
        content = markdown_pdf.Section(self.readme_md, toc=False)
        output.add_section(content)
        output.save(dest)

    def write_md(self, dest:str)->None:
        '''
        Write Markdown text of the complete documentation to a file.

        Parameters
        ----------
        dest : str
            Destination of file, optionally including path.
            eg: /Users/foo/study/README.md or
            ~/tmp/README_I_AM_METADATA.md
        '''
        dest = pathlib.Path(dest).expanduser().absolute()
        with open(file=dest, mode='w', encoding='utf=8') as f:
            f.write(self.readme_md)

file_metadata_md property

Produce pretty markdown for file metadata. Outputs markdown text string.

readme_md property

Generate a Markdown text string (ie, the entire README) for entire an entire StudyMetadata object.

__fix_relation_type(badstr)

For some reason, Dataverse puts camelCase values in the ‘values’ field for publication relation. This will make it more readable.

Parameters:
  • badstr (str) –

    Input string; problematic values will be fixed, all others returned as-is.

Source code in src/dataverse_utils/collections.py
def __fix_relation_type(self, badstr:str)->str:
    '''
    For some reason, Dataverse puts camelCase values in the 'values' field
    for publication relation. This will make it more readable.

    Parameters
    ----------
    badstr : str
        Input string; problematic values will be fixed, all others returned as-is.
    '''
    fixthese = ['IsCitedBy', 'IsSupplementTo', 'IsSupplementedBy', 'IsReferencedBy']
    for val in fixthese:
        badstr=badstr.replace(val, self.rename_field(val))
    return badstr

__html_to_md(inval)

Convert any HTML to markdown, or as much as possible.

Parameters:
  • inval (str) –

    HTML string to convert

Source code in src/dataverse_utils/collections.py
def __html_to_md(self, inval:str)->str:
    '''
    Convert any HTML to markdown, or as much as possible.

    Parameters
    ----------
    inval : str
        HTML string to convert
    '''
    if isinstance(inval, str):
        #markdownify kwargs are here:
        #https://github.com/matthewwithanm/python-markdownify
        return markdownify.markdownify(inval)
    return str(inval)

__init__(study_metadata_obj, **kwargs)

Send in StudyMetadata dict to create a nicely formatted README document

Parameters:
  • study_metadata_obj (StudyMetadata) –

    A study metadata object

  • **kwargs (dict, default: {} ) –

    Keyword arguments

  • url (str) –

    The base URL for a Dataverse instance

  • pid (Union[str, int]) –

    The persistent identifier of a file or a file id

  • key (str) –

    A valid API key for performing operations on Dataverse studies

  • local (str) –

    Path to the top level directory which holds study files. If present, the Readme creator will try to create extended data from local files instead of downloading.

Notes

Either local must be supplied, or url, pid and key must supplied

Source code in src/dataverse_utils/collections.py
def __init__(self, study_metadata_obj: StudyMetadata, **kwargs):
    '''
    Send in StudyMetadata dict to create a nicely formatted README document

    Parameters
    ----------
    study_metadata_obj : StudyMetadata
        A study metadata object

    **kwargs : dict
        Keyword arguments

    Other parameters
    ----------------
    url : str
        The base URL for a Dataverse instance

    pid : typing.Union[str, int]
        The persistent identifier of a file or a file id

    key : str
        A valid API key for performing operations on Dataverse studies

    local : str
        Path to the top level directory which holds study files.
        If present, the Readme creator will try to create extended data from
        local files instead of downloading.

    Notes
    -----
    Either `local` must be supplied, or `url`, `pid` and `key` must supplied
    '''
    self.meta = study_metadata_obj
    self.kwargs = kwargs

    warnings.filterwarnings('ignore', category=bs4.MarkupResemblesLocatorWarning)
    #These values are the first part of the keys that need
    #concatenation to make them more legible.
    self.concat = ['author', 'datasetContact','otherId', 'keyword', 'topic', 'publication',
                   'producer', 'production', 'distributor', 'series', 'software',
                   'dsDescription', 'grant', 'contributor']

bbox()

Produce sane bounding boxes from Dataverse metadata. Note that older versions of Dataverse used North and South longitude.

Outputs a dict with bounding boxes contcatenated into a single line with each coordinate suffixed by its direction (eg: ‘42.97 E’), with coordinates separated by commas and multiple boxes separated by semi-colons.

Source code in src/dataverse_utils/collections.py
def bbox(self)->dict:
    '''
    Produce sane bounding boxes from Dataverse metadata.
    Note that older versions of Dataverse used North and South *longitude*.

    Outputs a dict with bounding boxes contcatenated into a single line
    with each coordinate suffixed by its direction (eg: '42.97 E'), with coordinates
    separated by commas and multiple boxes separated by semi-colons.
    '''
    #Yes, northLongitude, etc. Blame Harvard.
    bbox_order =['westLongitude',
                 'southLongitude',
                 'southLatitude',
                 'eastLongitude',
                 'northLongitude',
                 'northLatitude']

    geog_me = {_: self.meta[_].split(';')
               for _ in bbox_order if self.meta.get(_)}# Checking for existence causes problems
    if not geog_me: #Sometimes there is no bounding box
        return {}
    bbox = {k: [f'{v} {k[0].capitalize()}'.strip()
              for v in geog_me[k]] for k in bbox_order if geog_me.get(k)}
    boxes =  self.max_zip(*bbox.values())
    boxes = [', '.join(_) for _ in boxes]
    boxes = [f'({_})' for _ in boxes]
    return {'Bounding box(es)': '; '.join(boxes)}

concatenator(meta)

Produce a concatenated dictionary with the key being just the prefix. For fields like author[whatever], etc, where there are multiple components of similar metadata held in completely separated fields.

Parameters:
  • meta (dict) –

    Input metadata

Source code in src/dataverse_utils/collections.py
def concatenator(self, meta:dict)->dict:
    '''
    Produce a concatenated dictionary with the key being just the prefix.
    For fields like author[whatever], etc, where there are multiple
    *components* of similar metadata held in completely separated
    fields.

    Parameters
    ----------
    meta : dict
        Input metadata
    '''
    #The keys are the first part of the fields that need concatenation
    concat = {_:[] for _ in self.concat}

    for k, v in meta.items():
        for fk in concat:
            if k.startswith(fk):
                if v:
                    if concat[fk]:
                        concat[fk].append(v.split(';'))
                    else:
                        concat[fk] = [v.split(';')]


    outdict = {}
    for ke, va in concat.items():
        if va:
            interim = self.max_zip(*va)
            interim = [' - '.join([y.strip() for y in _ if y]) for _ in interim ]
            #interim = '; '.join(interim) # Should it be newline?
            #interim = '  \n'.join(interim) # Should it be newline?
            interim= '<br/>'.join(interim)# Markdownify strips internal spaces
            #if ke.startswith('keyw'):
            outdict[ke] = interim
    return outdict

make_md_heads(inkey)

Make markdown H2 headings for selected sections, currently title, description, licence and terms of use.

Parameters:
  • inkey (str) –

    Section heading

Source code in src/dataverse_utils/collections.py
def make_md_heads(self, inkey:str)->str:
    '''
    Make markdown H2 headings for selected sections, currently title, description,
    licence and terms of use.

    Parameters
    ----------
    inkey : str
        Section heading
    '''
    section_heads = {'Title':'## ',
                    'Description':'**Description**\n\n',
                    'Licence': '### Licence\n\n',
                    'Terms of Use': '### Terms of Use\n\n'}
    if inkey in section_heads:
        return section_heads[inkey]
    multi = [self.rename_field(_) for _ in  self.concat]
    if inkey in multi:
        if inkey not in ['Series', 'Software', 'Production']:
            return f'{inkey}(s):  \n'
        return f'{inkey}:  \n'
    return f'{inkey}: '

max_zip(*args)

Like built-in zip, only uses the maximum length and appends None if not found instead of stopping at the shortest iterable.

Parameters:
  • *args (iterable, default: () ) –

    Any iterable

Source code in src/dataverse_utils/collections.py
def max_zip(self, *args):
    '''
    Like built-in zip, only uses the *maximum* length and appends None if not found
    instead of stopping at the shortest iterable.

    Parameters
    ----------
    *args : iterable
        Any iterable
    '''
    length = max(map(len, args))
    outlist=[]
    for n in range(length):
        vals = []
        for arg in args:
            try:
                vals.append(arg[n])
            except IndexError:
                vals.append(None)
        outlist.append(vals)
    return outlist

rename_field(instr)

Split and capitalize camelCase fields as required. eg: keywordValue -> Keyword Value eg: termsOfUse -> Terms of Use

Parameters:
  • instr (str) –

    Camel case tring to split into words and capitalize.

Source code in src/dataverse_utils/collections.py
def rename_field(self, instr:str)->str:
    '''
    Split and capitalize camelCase fields as required.
    eg: keywordValue -> Keyword Value
    eg: termsOfUse -> Terms of Use

    Parameters
    ----------
    instr : str
        Camel case tring to split into words and capitalize.
    '''
    noncap = ['A', 'Of', 'The']

    wordsp = ''.join(map(lambda x: x if x not in string.ascii_uppercase
                         else f' {x}', list(instr)))
    wordsp = wordsp.split(' ')
    #wordsp[0] = wordsp[0].capitalize()
    #wordsp = ' '.join(map(lambda x: x if x not in noncap else x.lower(), wordsp))
    wordsp = list(map(lambda x: x if x not in noncap else x.lower(), wordsp))
    wordsp[0] = wordsp[0].capitalize()
    wordsp = ' '.join(wordsp)
    #because they can't even use camelCaseConsistently
    #Also pluralization of concatenated fields
    fixthese ={'U R L': 'URL',
               'U R I': 'URI',
               'I D':
               'ID',
               'Ds': '',
               'Country':'Country(ies)',
               'State':'State(s)',
               'City':'City(ies)',
               'Geographic Unit':'Geographic unit(s)'}
    for k, v in fixthese.items():
        wordsp = wordsp.replace(k, v)
    return wordsp.strip()

reorder_fields(indict)

Create a list which contains a list of keys in the right (corrected) order. This ensures that concatenated fields are inserted into the right place and not at the end of the dictionary, keeping the structure of Dataverse metadata intact while concatenating values that need combining.

Parameters:
  • indict (dict) –

    Metadata dictionary

Source code in src/dataverse_utils/collections.py
def reorder_fields(self, indict:dict)->list:
    '''
    Create a list which contains a list of keys in the right (corrected) order.
    This ensures that concatenated fields are inserted into the right place
    and not at the end of the dictionary, keeping the structure
    of Dataverse metadata intact while concatenating values that need
    combining.

    Parameters
    ----------
    indict : dict
        Metadata dictionary
    '''
    fieldlist = list(indict)
    for val in self.concat:
        pts = [n for n, x in enumerate(fieldlist) if x.startswith(val)]
        if pts:
            ins_point = min(pts)
            fieldlist.insert(ins_point, val)
    #Geography fields are a special case yay.
    #westLongitude is the fist one
    if 'westLongitude' in fieldlist:
        ins_here = fieldlist.index('westLongitude')
        fieldlist.insert(ins_here, 'Bounding box(es)')
    return fieldlist

write_md(dest)

Write Markdown text of the complete documentation to a file.

Parameters:
  • dest (str) –

    Destination of file, optionally including path. eg: /Users/foo/study/README.md or ~/tmp/README_I_AM_METADATA.md

Source code in src/dataverse_utils/collections.py
def write_md(self, dest:str)->None:
    '''
    Write Markdown text of the complete documentation to a file.

    Parameters
    ----------
    dest : str
        Destination of file, optionally including path.
        eg: /Users/foo/study/README.md or
        ~/tmp/README_I_AM_METADATA.md
    '''
    dest = pathlib.Path(dest).expanduser().absolute()
    with open(file=dest, mode='w', encoding='utf=8') as f:
        f.write(self.readme_md)

write_pdf(dest)

Make the PDF of a README and save it to a file.

Parameters:
  • dest (str) –

    Destination of file, optionally including path. eg: /Users/foo/study/README.pdf or ~/tmp/README_I_AM_METADATA.pdf

Source code in src/dataverse_utils/collections.py
def write_pdf(self, dest:str)->None:
    '''
    Make the PDF of a README and save it to a file.

    Parameters
    ----------
    dest : str
        Destination of file, optionally including path.
        eg: /Users/foo/study/README.pdf or
        ~/tmp/README_I_AM_METADATA.pdf
    '''
    dest = pathlib.Path(dest).expanduser().absolute()
    output = markdown_pdf.MarkdownPdf(toc_level=1)
    content = markdown_pdf.Section(self.readme_md, toc=False)
    output.add_section(content)
    output.save(dest)

StudyMetadata

Bases: dict

The metadata container for a single study.

Source code in src/dataverse_utils/collections.py
class StudyMetadata(dict):
    '''
    The metadata container for a single study.
    '''
    def __init__(self, **kwargs):
        '''
        Intializize a StudyMetadata object.

        Parameters
        ----------
        **kwargs: dict
            At least some of the following


        Other parameters
        ----------------
        study_meta : dict, optional
            The dataverse study metadata JSON

        url :  str, optional
            Base URL to dataverse instance

        pid : str, optional
            Persistent ID of a study

        key : str
            Dataverse instance API key (needed for unpublished studies)

        Notes
        -----
        Either `study_meta` is required OR `pid` and `url`. `key` _may_ be required
        if either a draft study is being accessed or the Dataverse installation
        requires API keys for all requests.
        '''
        self.kwargs = kwargs
        self.study_meta  = kwargs.get('study_meta')
        self.url = kwargs.get('url')
        self.pid = kwargs.get('pid')
        self.headers = UAHEADER.copy()
        if not (('study_meta' in kwargs) or ('url' in kwargs and 'pid' in kwargs)):
            raise TypeError('At least one of a URL/pid combo (url, pid) (and possibly key) or '
            'study metadata json (study_meta) is required.')
        if not self.study_meta:
            self.study_meta = self.__obtain_metadata()
        try:
            self.extract_metadata()
        except KeyError as e:
            raise MetadataError(f'Unable to parse study metadata. Do you need an API key?\n'
                           f'{e} key not found.\n'
                           f'Offending JSON: {self.study_meta}') from e
        self.__files = None

    def __obtain_metadata(self):
        '''
        Obtain study metadata as required.
        '''
        if self.kwargs.get('key'):
            self.headers.update({'X-Dataverse-key':self.kwargs['key']})
        params = {'persistentId': self.pid}
        self.session = requests.Session()
        self.session.mount('https://',
                           requests.adapters.HTTPAdapter(max_retries=RETRY))
        self.url = self.url.strip('/')
        if not self.url.startswith('https://'):
            self.url = f'https://{self.url}'
        data = self.session.get(f'{self.url}/api/datasets/:persistentId',
                                headers=self.headers, params=params)
        return data.json()

    def __has_metadata(self)->bool:
        '''
        Returns a boolean to ensure if there *is* study metadata.
        Deacessioned items are notable for their lack of any indication
        that they are deacessioned. However, they lack the "latestVersion" key,
        which serves as a proxy. Ideally.
        '''
        #try:
        #    t = self.study_meta['data']
        #    del t #OMG This is so dumb
        #except KeyError as e:
        #    raise e

        if not self.study_meta.get('data'):
            raise KeyError('data')

        testfields = ['id', 'identifier', 'authority', 'latestVersion']
        if all(self.study_meta['data'].get(_) for _ in testfields):
            return True
        return False

    def extract_metadata(self):
        '''
        Convenience function for parsing the study metadata of the latest version.

        Results are written to self, accessible as a dictionary.
        '''
        if not self.__has_metadata():
            return


        for v in self.study_meta['data']['latestVersion']['metadataBlocks'].values():
            for field in v['fields']:
                self.extract_field_metadata(field)
        self.__extract_licence_info()
        self.__version()
        #['data']['latestVersion']['versionNumber']
        #['data']['latestVersion']['versionMinorNumber']

    def extract_field_metadata(self, field):
        '''
        Extract the metadata from a single field and make it into a human-readable dict.
        Output updates self.
        '''
        #pylint: disable=too-many-branches, too-many-nested-blocks
        #typeClass: compound = dict, primitive = string
        #multiple: false= one thing, true=list
        # so typeClass:compound AND multiple:true = a list of dicts.
        # also, typeClass can be "controlledVocabulary" because reasons.
        #is this crap recursive or is one level enough?
        #[[x['typeName'], x['typeClass'], x['multiple']] for x in citation['fields']]
        # {('primitive', False), ('compound', True), ('compound', False),
        # ('primitive', True), ('controlledVocabulary', True)}
        if not field['multiple']:
            if field['typeClass']=='primitive':
                self.update({field['typeName']: field['value']})
            if field['typeClass'] == 'compound':
                for v2 in field['value']:
                    self.extract_field_metadata(field['value'][v2])
        if field['multiple']:
            if field['typeClass'] == 'compound':
                #produce a list of similar values concatenated
                for v3 in field['value']:
                    interim = {}
                    for insane_dict in field['value']:
                        for v3 in insane_dict.values():
                            if interim.get(v3['typeName']):
                                interim.update({v3['typeName']:
                                                interim[v3['typeName']]+ [v3['value']]})
                            else:
                                #sometimes value is None because reasons.
                                interim[v3['typeName']] = [v3.get('value', [] )]
                            LOGGER.debug(interim)
                for k9, v9 in interim.items():
                    self.update({k9: '; '.join(v9)})

            if field['typeClass'] == 'primitive':
                self.update({field['typeName'] :  '; '.join(field['value'])})

        if field['typeClass'] == 'controlledVocabulary':
            if isinstance(field['value'], list):
                self.update({field['typeName'] : '; '.join(field['value'])})
            else:
                self.update({field['typeName'] : field['value']})
        # And that should cover every option!

    @property
    def files(self)->list:
        '''
        Return a list of of dicts with file metadata.
        '''
        if not self.__files:
            self.__extract_files()
        return self.__files

    def __extract_files(self):
        '''
        Extract file level metadata, and write to self.__files.
        '''
        #Note: ALL other dict values for this object are single values,
        #but files would (usually) be an arbitrary number of files.
        #That bothers me on an intellectual level. Therefore, it will be attribute.
        #Iterate over StudyMetadata.files if you want to know the contents
        if not self.__files:
            outie = []
            for v in self.study_meta['data']['latestVersion']['files']:
                innie = {}
                fpath = v.get('directoryLabel', '').strip('/')
                innie['filename'] = v['dataFile'].get('originalFileName', v['dataFile']['filename'])
                #innie['full_path'] = '/'.join([fpath, innie['filename']])
                #In case it's pathless, drop any leading slash, because
                #'' is not the same as None, and None can't be joined.
                innie['filename'] = '/'.join([fpath, innie['filename']]).strip('/')
                innie['file_label'] = v.get('label')
                innie['description'] = v.get('description')
                innie['filesize_bytes'] = v['dataFile'].get('originalFileSize',
                                                             v['dataFile']['filesize'])
                innie['chk_type'] = v['dataFile']['checksum']['type']
                innie['chk_digest'] =v['dataFile']['checksum']['value']
                innie['id'] = v['dataFile']['id']
                innie['pid'] = v['dataFile'].get('persistentId')
                innie['has_tab_file'] = v['dataFile'].get('tabularData', False)
                innie['study_pid'] = (f"{self.study_meta['data']['protocol']}:"
                                     f"{self.study_meta['data']['authority']}/"
                                     f"{self.study_meta['data']['identifier']}")
                innie['tags'] = ', '.join(v.get('categories', []))
                if not innie['tags']:
                    del innie['tags']#tagless
                #innie['path'] = v.get('directoryLabel', '')
                outie.append(innie)
            self.__files = outie

    def __extract_licence_info(self):
        '''
        Extract all the licence information fields and add them
        to self['licence'] *if present*.
        '''
        lic_fields = ('termsOfUse',
                      'confidentialityDeclaration',
                      'specialPermissions',
                      'restrictions',
                      'citationRequirements',
                      'depositorRequirements', 'conditions',
                      'disclaimer',
                      'dataAccessPlace',
                      'originalArchive',
                      'availabilityStatus',
                      'contactForAccess',
                      'sizeOfCollection',
                      'studyCompletion',
                      'fileAccessRequest')
        for field in self.study_meta['data']['latestVersion']:
            if field in lic_fields:
                self[field] = self.study_meta['data']['latestVersion'][field]
        common_lic = self.study_meta['data']['latestVersion'].get('license')
        if isinstance(common_lic, str) and common_lic != 'NONE':
            self['licence'] = common_lic
        elif isinstance(common_lic, dict):
            self['licence'] = self.study_meta['data']['latestVersion']['license'].get('name')
            link = self.study_meta['data']['latestVersion']['license'].get('uri')
            if link:
                self['licenceLink'] = link

    def __version(self):
        '''
        Obtain the current version and add it to self['studyVersion'].
        '''
        if self.study_meta['data']['latestVersion']['versionState'] == 'RELEASED':
            self['studyVersion'] = (f"{self.study_meta['data']['latestVersion']['versionNumber']}."
                           f"{self.study_meta['data']['latestVersion']['versionMinorNumber']}")
            return
        self['studyVersion'] = self.study_meta['data']['latestVersion']['versionState']
        return

files property

Return a list of of dicts with file metadata.

__extract_files()

Extract file level metadata, and write to self.__files.

Source code in src/dataverse_utils/collections.py
def __extract_files(self):
    '''
    Extract file level metadata, and write to self.__files.
    '''
    #Note: ALL other dict values for this object are single values,
    #but files would (usually) be an arbitrary number of files.
    #That bothers me on an intellectual level. Therefore, it will be attribute.
    #Iterate over StudyMetadata.files if you want to know the contents
    if not self.__files:
        outie = []
        for v in self.study_meta['data']['latestVersion']['files']:
            innie = {}
            fpath = v.get('directoryLabel', '').strip('/')
            innie['filename'] = v['dataFile'].get('originalFileName', v['dataFile']['filename'])
            #innie['full_path'] = '/'.join([fpath, innie['filename']])
            #In case it's pathless, drop any leading slash, because
            #'' is not the same as None, and None can't be joined.
            innie['filename'] = '/'.join([fpath, innie['filename']]).strip('/')
            innie['file_label'] = v.get('label')
            innie['description'] = v.get('description')
            innie['filesize_bytes'] = v['dataFile'].get('originalFileSize',
                                                         v['dataFile']['filesize'])
            innie['chk_type'] = v['dataFile']['checksum']['type']
            innie['chk_digest'] =v['dataFile']['checksum']['value']
            innie['id'] = v['dataFile']['id']
            innie['pid'] = v['dataFile'].get('persistentId')
            innie['has_tab_file'] = v['dataFile'].get('tabularData', False)
            innie['study_pid'] = (f"{self.study_meta['data']['protocol']}:"
                                 f"{self.study_meta['data']['authority']}/"
                                 f"{self.study_meta['data']['identifier']}")
            innie['tags'] = ', '.join(v.get('categories', []))
            if not innie['tags']:
                del innie['tags']#tagless
            #innie['path'] = v.get('directoryLabel', '')
            outie.append(innie)
        self.__files = outie

__extract_licence_info()

Extract all the licence information fields and add them to self[‘licence’] if present.

Source code in src/dataverse_utils/collections.py
def __extract_licence_info(self):
    '''
    Extract all the licence information fields and add them
    to self['licence'] *if present*.
    '''
    lic_fields = ('termsOfUse',
                  'confidentialityDeclaration',
                  'specialPermissions',
                  'restrictions',
                  'citationRequirements',
                  'depositorRequirements', 'conditions',
                  'disclaimer',
                  'dataAccessPlace',
                  'originalArchive',
                  'availabilityStatus',
                  'contactForAccess',
                  'sizeOfCollection',
                  'studyCompletion',
                  'fileAccessRequest')
    for field in self.study_meta['data']['latestVersion']:
        if field in lic_fields:
            self[field] = self.study_meta['data']['latestVersion'][field]
    common_lic = self.study_meta['data']['latestVersion'].get('license')
    if isinstance(common_lic, str) and common_lic != 'NONE':
        self['licence'] = common_lic
    elif isinstance(common_lic, dict):
        self['licence'] = self.study_meta['data']['latestVersion']['license'].get('name')
        link = self.study_meta['data']['latestVersion']['license'].get('uri')
        if link:
            self['licenceLink'] = link

__has_metadata()

Returns a boolean to ensure if there is study metadata. Deacessioned items are notable for their lack of any indication that they are deacessioned. However, they lack the “latestVersion” key, which serves as a proxy. Ideally.

Source code in src/dataverse_utils/collections.py
def __has_metadata(self)->bool:
    '''
    Returns a boolean to ensure if there *is* study metadata.
    Deacessioned items are notable for their lack of any indication
    that they are deacessioned. However, they lack the "latestVersion" key,
    which serves as a proxy. Ideally.
    '''
    #try:
    #    t = self.study_meta['data']
    #    del t #OMG This is so dumb
    #except KeyError as e:
    #    raise e

    if not self.study_meta.get('data'):
        raise KeyError('data')

    testfields = ['id', 'identifier', 'authority', 'latestVersion']
    if all(self.study_meta['data'].get(_) for _ in testfields):
        return True
    return False

__init__(**kwargs)

Intializize a StudyMetadata object.

Parameters:
  • **kwargs

    At least some of the following

  • study_meta (dict) –

    The dataverse study metadata JSON

  • url ( str) –

    Base URL to dataverse instance

  • pid (str) –

    Persistent ID of a study

  • key (str) –

    Dataverse instance API key (needed for unpublished studies)

Notes

Either study_meta is required OR pid and url. key may be required if either a draft study is being accessed or the Dataverse installation requires API keys for all requests.

Source code in src/dataverse_utils/collections.py
def __init__(self, **kwargs):
    '''
    Intializize a StudyMetadata object.

    Parameters
    ----------
    **kwargs: dict
        At least some of the following


    Other parameters
    ----------------
    study_meta : dict, optional
        The dataverse study metadata JSON

    url :  str, optional
        Base URL to dataverse instance

    pid : str, optional
        Persistent ID of a study

    key : str
        Dataverse instance API key (needed for unpublished studies)

    Notes
    -----
    Either `study_meta` is required OR `pid` and `url`. `key` _may_ be required
    if either a draft study is being accessed or the Dataverse installation
    requires API keys for all requests.
    '''
    self.kwargs = kwargs
    self.study_meta  = kwargs.get('study_meta')
    self.url = kwargs.get('url')
    self.pid = kwargs.get('pid')
    self.headers = UAHEADER.copy()
    if not (('study_meta' in kwargs) or ('url' in kwargs and 'pid' in kwargs)):
        raise TypeError('At least one of a URL/pid combo (url, pid) (and possibly key) or '
        'study metadata json (study_meta) is required.')
    if not self.study_meta:
        self.study_meta = self.__obtain_metadata()
    try:
        self.extract_metadata()
    except KeyError as e:
        raise MetadataError(f'Unable to parse study metadata. Do you need an API key?\n'
                       f'{e} key not found.\n'
                       f'Offending JSON: {self.study_meta}') from e
    self.__files = None

__obtain_metadata()

Obtain study metadata as required.

Source code in src/dataverse_utils/collections.py
def __obtain_metadata(self):
    '''
    Obtain study metadata as required.
    '''
    if self.kwargs.get('key'):
        self.headers.update({'X-Dataverse-key':self.kwargs['key']})
    params = {'persistentId': self.pid}
    self.session = requests.Session()
    self.session.mount('https://',
                       requests.adapters.HTTPAdapter(max_retries=RETRY))
    self.url = self.url.strip('/')
    if not self.url.startswith('https://'):
        self.url = f'https://{self.url}'
    data = self.session.get(f'{self.url}/api/datasets/:persistentId',
                            headers=self.headers, params=params)
    return data.json()

__version()

Obtain the current version and add it to self[‘studyVersion’].

Source code in src/dataverse_utils/collections.py
def __version(self):
    '''
    Obtain the current version and add it to self['studyVersion'].
    '''
    if self.study_meta['data']['latestVersion']['versionState'] == 'RELEASED':
        self['studyVersion'] = (f"{self.study_meta['data']['latestVersion']['versionNumber']}."
                       f"{self.study_meta['data']['latestVersion']['versionMinorNumber']}")
        return
    self['studyVersion'] = self.study_meta['data']['latestVersion']['versionState']
    return

extract_field_metadata(field)

Extract the metadata from a single field and make it into a human-readable dict. Output updates self.

Source code in src/dataverse_utils/collections.py
def extract_field_metadata(self, field):
    '''
    Extract the metadata from a single field and make it into a human-readable dict.
    Output updates self.
    '''
    #pylint: disable=too-many-branches, too-many-nested-blocks
    #typeClass: compound = dict, primitive = string
    #multiple: false= one thing, true=list
    # so typeClass:compound AND multiple:true = a list of dicts.
    # also, typeClass can be "controlledVocabulary" because reasons.
    #is this crap recursive or is one level enough?
    #[[x['typeName'], x['typeClass'], x['multiple']] for x in citation['fields']]
    # {('primitive', False), ('compound', True), ('compound', False),
    # ('primitive', True), ('controlledVocabulary', True)}
    if not field['multiple']:
        if field['typeClass']=='primitive':
            self.update({field['typeName']: field['value']})
        if field['typeClass'] == 'compound':
            for v2 in field['value']:
                self.extract_field_metadata(field['value'][v2])
    if field['multiple']:
        if field['typeClass'] == 'compound':
            #produce a list of similar values concatenated
            for v3 in field['value']:
                interim = {}
                for insane_dict in field['value']:
                    for v3 in insane_dict.values():
                        if interim.get(v3['typeName']):
                            interim.update({v3['typeName']:
                                            interim[v3['typeName']]+ [v3['value']]})
                        else:
                            #sometimes value is None because reasons.
                            interim[v3['typeName']] = [v3.get('value', [] )]
                        LOGGER.debug(interim)
            for k9, v9 in interim.items():
                self.update({k9: '; '.join(v9)})

        if field['typeClass'] == 'primitive':
            self.update({field['typeName'] :  '; '.join(field['value'])})

    if field['typeClass'] == 'controlledVocabulary':
        if isinstance(field['value'], list):
            self.update({field['typeName'] : '; '.join(field['value'])})
        else:
            self.update({field['typeName'] : field['value']})

extract_metadata()

Convenience function for parsing the study metadata of the latest version.

Results are written to self, accessible as a dictionary.

Source code in src/dataverse_utils/collections.py
def extract_metadata(self):
    '''
    Convenience function for parsing the study metadata of the latest version.

    Results are written to self, accessible as a dictionary.
    '''
    if not self.__has_metadata():
        return


    for v in self.study_meta['data']['latestVersion']['metadataBlocks'].values():
        for field in v['fields']:
            self.extract_field_metadata(field)
    self.__extract_licence_info()
    self.__version()