API Reference¶
dataverse_utils
¶
Generalized dataverse utilities. Note that
import dataverse_utils is the equivalent of
import dataverse_utils.dataverse_utils
DvGeneralUploadError
¶
Bases: Exception
Raised on non-200 URL response
Source code in src/dataverse_utils/dataverse_utils.py
class DvGeneralUploadError(Exception):
'''
Raised on non-200 URL response
'''
Md5Error
¶
Bases: Exception
Raised on md5 mismatch
Source code in src/dataverse_utils/dataverse_utils.py
class Md5Error(Exception):
'''
Raised on md5 mismatch
'''
check_lock(dv_url, study, apikey)
¶
Checks study lock status; returns True if locked.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dataverse_utils.py
def check_lock(dv_url, study, apikey) -> bool:
'''
Checks study lock status; returns True if locked.
Parameters
----------
dv_url : str
URL of Dataverse installation
study: str
Persistent ID of study
apikey : str
API key for user
'''
dv_url, headers, params = _make_info(dv_url, study, apikey)
lock_status = requests.get(f'{dv_url}/api/datasets/:persistentId/locks',
headers=headers,
params=params, timeout=300)
lock_status.raise_for_status()
data = lock_status.json().get('data')
if data:
LOGGER.warning('Study %s has been locked', study)
LOGGER.warning('Lock info:\n%s', lock_status.json())
return True
return False
dump_tsv(start_dir, filename, in_list=None, **kwargs)
¶
Dumps output of make_tsv manifest to a file.
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/dataverse_utils.py
def dump_tsv(start_dir, filename, in_list=None,
**kwargs):
'''
Dumps output of make_tsv manifest to a file.
Parameters
----------
start_dir : str
Path to start directory
in_list : list
List of files for which to create manifest entries. Will
default to recursive directory crawl
**kwargs : dict
Other parameters
Other parameters
----------------
def_tag : str, optional, default='Data'
Default Dataverse tag (eg, Data, Documentation, etc).
Separate tags with an easily splitable character:
eg. ('Data, 2016')
inc_header : bool, optional, default=True
Include header for tsv.
quotype : int, optional, default=csv.QUOTE_MINIMAL
integer value or csv quote type.
Acceptable values:
* csv.QUOTE_MINIMAL / 0
* csv.QUOTE_ALL / 1
* csv.QUOTE_NONNUMERIC / 2
* csv.QUOTE_NONE / 3
'''
def_tag= kwargs.get('def_tag', 'Data')
inc_header =kwargs.get('inc_header', True)
mime = kwargs.get('mime', False)
path = kwargs.get('path', False)
quotype = kwargs.get('quotype', csv.QUOTE_MINIMAL)
dumper = make_tsv(start_dir, in_list, def_tag, inc_header, mime, quotype, path=path)
with open(filename, 'w', newline='', encoding='utf-8') as tsvfile:
tsvfile.write(dumper)
file_path(fpath, trunc='')
¶
Create relative file path from full path string
| Parameters: |
|
|---|
Notes
>>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp/')
'Data/2011'
>>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp')
'Data/2011'
Source code in src/dataverse_utils/dataverse_utils.py
def file_path(fpath, trunc='') -> str:
'''
Create relative file path from full path string
Parameters
----------
fpath : str
File location (ie, complete path)
trunc : str
Leftmost portion of path to remove
Notes
-----
```
>>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp/')
'Data/2011'
>>> file_path('/tmp/Data/2011/excelfile.xlsx', '/tmp')
'Data/2011'
```
'''
if trunc and not trunc.endswith(os.sep):
trunc += os.sep
path = os.path.dirname(fpath)
try:
if fpath.find(trunc) == -1:
dirlabel = os.path.relpath(os.path.split(path)[0])
dirlabel = os.path.relpath(path[path.find(trunc)+len(trunc):])
if dirlabel == '.':
dirlabel = ''
return dirlabel
except ValueError:
return ''
force_notab_unlock(study, dv_url, fid, apikey, try_uningest=True)
¶
Forcibly unlocks and uningests to prevent tabular file processing. Required if mime and filename spoofing is not sufficient.
Returns 0 if unlocked, file id if locked (and then unlocked).
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dataverse_utils.py
def force_notab_unlock(study, dv_url, fid, apikey, try_uningest=True) -> int:
'''
Forcibly unlocks and uningests
to prevent tabular file processing. Required if mime and filename
spoofing is not sufficient.
Returns 0 if unlocked, file id if locked (and then unlocked).
Parameters
----------
study : str
Persistent indentifer of study
dv_url : str
URL to base Dataverse installation
fid : str
File ID for file object
apikey : str
API key for user
try_uningest : bool
Try to uningest the file that was locked.
Default: True
'''
dv_url, headers, params = _make_info(dv_url, study, apikey)
force_unlock = requests.delete(f'{dv_url}/api/datasets/:persistentId/locks',
params=params, headers=headers,
timeout=300)
LOGGER.warning('Lock removed for %s', study)
LOGGER.warning('Lock status:\n %s', force_unlock.json())
if try_uningest:
uningest_file(dv_url, fid, apikey, study)
return int(fid)
return 0
make_tsv(start_dir, in_list=None, def_tag='Data', inc_header=True, mime=False, quotype=csv.QUOTE_MINIMAL, **kwargs)
¶
Recurses the tree for files and produces tsv output with with headers ‘file’, ‘description’, ‘tags’.
The ‘description’ is the filename without an extension.
Returns tsv as string.
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/dataverse_utils.py
def make_tsv(start_dir, in_list=None, def_tag='Data',
inc_header=True,
mime=False,
quotype=csv.QUOTE_MINIMAL,
**kwargs) -> str:
# pylint: disable=too-many-positional-arguments
# pylint: disable=too-many-arguments
'''
Recurses the tree for files and produces tsv output with
with headers 'file', 'description', 'tags'.
The 'description' is the filename without an extension.
Returns tsv as string.
Parameters
----------
start_dir : str
Path to start directory
in_list : list
Input file list. Defaults to recursive walk of current directory.
def_tag : str
Default Dataverse tag (eg, Data, Documentation, etc)
Separate tags with a comma:
eg. ('Data, 2016')
inc_header : bool
Include header row
mime : bool
Include automatically determined mimetype
quotype: int
integer value or csv quote type.
Default = csv.QUOTE_MINIMAL
Acceptable values:
csv.QUOTE_MINIMAL / 0
csv.QUOTE_ALL / 1
csv.QUOTE_NONNUMERIC / 2
csv.QUOTE_NONE / 3
**kwargs : dict
Other parameters
Other parameters
----------------
path : bool
If true include a 'path' field so that you can type
in a custom path instead of actually structuring
your data
'''
if start_dir.endswith(os.sep):
#start_dir += os.sep
start_dir = start_dir[:-1]
if not in_list:
in_list = [f'{x[0]}{os.sep}{y}'
for x in os.walk(start_dir)
for y in x[2]
if not y.startswith('.')]
if isinstance(in_list, set):
in_list=list(in_list)
in_list.sort()
def_tag = ", ".join([x.strip() for x in def_tag.split(',')])
headers = ['file', 'description', 'tags']
if mime:
headers.append('mimetype')
if kwargs.get('path'):
headers.insert(1, 'path')
outf = io.StringIO(newline='')
tsv_writer = csv.DictWriter(outf, delimiter='\t',
quoting=quotype,
fieldnames=headers,
extrasaction='ignore')
if inc_header:
tsv_writer.writeheader()
for row in in_list:
#the columns
r = {}
r['file'] = row
r['description'] = os.path.splitext(os.path.basename(row))[0]
r['mimetype'] = mimetypes.guess_type(row)[0]
r['tags'] = def_tag
r['path'] = ''
tsv_writer.writerow(r)
outf.seek(0)
outfile = outf.read()
outf.close()
return outfile
restrict_file(**kwargs)
¶
Restrict file in Dataverse study.
| Parameters: |
|
|---|
|
Notes
One of pid or fid is required
Source code in src/dataverse_utils/dataverse_utils.py
def restrict_file(**kwargs):
'''
Restrict file in Dataverse study.
Parameters
----------
**kwargs : dict
Other parameters
----------------
pid : str, optional
file persistent ID
fid : str, optional
file database ID
dv : str, required
url to base Dataverse installation
eg: 'https://abacus.library.ubc.ca'
apikey : str, required
API key for user
rest : bool
On True, restrict. Default True
Notes
--------
One of `pid` or `fid` is **required**
'''
headers = {'X-Dataverse-key': kwargs['apikey']}
headers.update(dataverse_utils.UAHEADER)
#Requires a true/false *string* for the API.
if kwargs.get('rest', True):
rest = 'true'
else:
rest= 'false'
if kwargs.get('pid'):
params={'persistentId':kwargs['pid']}
rest = requests.put(f'{kwargs["dv"]}/api/files/:persistentId/restrict',
headers=headers,
params=params,
data=rest,
timeout=300)
elif kwargs.get('fid'):
rest = requests.put(f'{kwargs["dv"]}/api/files/{kwargs["fid"]}/restrict',
headers=headers, data=rest, timeout=300)
else:
LOGGER.error('No file ID/PID supplied for file restriction')
raise KeyError('One of persistentId (pid) or database ID'
'(fid) is required for file restriction')
script_ver_stmt(name)
¶
Returns a formatted version statement for any script
| Parameters: |
|
|---|
Source code in src/dataverse_utils/__init__.py
def script_ver_stmt(name:str)->str:
'''
Returns a formatted version statement for any script
Parameters
----------
name : str
Name of utility to join to create version statement. Normally %prog from argparse.
'''
key = pathlib.Path(name).stem
if not SCRIPT_VERSIONS.get(key):
return f'dataverse_utils: v{__version__}'
return (f"{key} v{'.'.join(map(str, SCRIPT_VERSIONS[key]))} / "
f'dataverse_utils v{__version__}')
uningest_file(dv_url, fid, apikey, study='n/a')
¶
Tries to uningest a file that has been ingested. Requires superuser API key.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dataverse_utils.py
def uningest_file(dv_url, fid, apikey, study='n/a'):
'''
Tries to uningest a file that has been ingested.
Requires superuser API key.
Parameters
----------
dv_url : str
URL to base Dataverse installation
fid : int or str
File ID of file to uningest
apikey : str
API key for superuser
study : str, optional
Optional handle parameter for log messages
'''
dv_url, headers, params = _make_info(dv_url, fid, apikey)
fid = params['persistentId']
#TODONE: Awaiting answer from Harvard on how to remove progress bar
#for uploaded tab files that squeak through.
#Answer: you can't!
try:
uningest = requests.post(f'{dv_url}/api/files/{fid}/uningest',
headers=headers,
timeout=300)
LOGGER.warning('Ingest halted for file %s for fileID %s', fid, study)
uningest.raise_for_status()
except requests.exceptions.HTTPError:
LOGGER.error('Uningestion error: %s', uningest.reason)
print(uningest.reason)
upload_file(fpath, hdl, **kwargs)
¶
Uploads file to Dataverse study and sets file metadata and tags.
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/dataverse_utils.py
def upload_file(fpath, hdl, **kwargs):
'''
Uploads file to Dataverse study and sets file metadata and tags.
Parameters
----------
fpath : str
file location (ie, complete path)
hdl : str
Dataverse persistent ID for study (handle or DOI)
**kwargs : dict
Other parameters
Other parameters
----------------
dv : str, required
URL to base Dataverse installation
eg: 'https://abacus.library.ubc.ca'
apikey : str, required
API key for user
descr : str, optional
File description
md5 : str, optional
md5sum for file checking
tags : list, optional
list of text file tags. Eg ['Data', 'June 2020']
dirlabel : str, optional
Unix style relative pathname for Dataverse
file path: eg: path/to/file/
nowait : bool, optional
Force a file unlock and uningest instead of waiting for processing
to finish
trunc : str, optional
Leftmost portion of path to remove
rest : bool, optional
Restrict file. Defaults to false unless True supplied
mimetype : str, optional
Mimetype of file. Useful if using File Previewers. Mimetype for zip files
(application/zip) will be ignored to circumvent Dataverse's automatic
unzipping function.
label : str, optional
If included in kwargs, this value will be used for the label
timeout : int, optional
Timeout in seconds
override : bool, optional
Ignore NOTAB (ie, NOTAB = [])
'''
#Why are SPSS files getting processed anyway?
#Does SPSS detection happen *after* upload
#Does the file need to be renamed post hoc?
#I don't think this can be fixed. Goddamitsomuch.
dvurl = kwargs['dv'].strip('\\ /')
if os.path.splitext(fpath)[1].lower() in NOTAB and not kwargs.get('override'):
file_name_clean = os.path.basename(fpath)
#file_name = os.path.basename(fpath) + '.NOPROCESS'
# using .NOPROCESS doesn't seem to work?
file_name = os.path.basename(fpath) + '.NOPROCESS'
else:
file_name = os.path.basename(fpath)
file_name_clean = file_name
#My workstation python on Windows produces null for isos for some reason
if mimetypes.guess_type('test.iso') == (None, None):
mimetypes.add_type('application/x-iso9660-image', '.iso')
mime = mimetypes.guess_type(fpath)[0]
if kwargs.get('mimetype'):
mime = kwargs['mimetype']
if file_name.endswith('.NOPROCESS') or mime == 'application/zip':
mime = 'application/octet-stream'
#create file metadata in nice, simple, chunks
dv4_meta = {'label' : kwargs.get('label', file_name_clean),
'description' : kwargs.get('descr', ''),
'directoryLabel': kwargs.get('dirlabel', ''),
'categories': kwargs.get('tags', []),
'mimetype' : mime}
fpath = os.path.abspath(fpath)
fields = {'file': (file_name, open(fpath, 'rb'), mime)}#pylint: disable=consider-using-with
fields.update({'jsonData' : json.dumps(dv4_meta)})
multi = MultipartEncoder(fields=fields) # use multipart streaming for large files
headers = {'X-Dataverse-key' : kwargs.get('apikey'),
'Content-type' : multi.content_type}
headers.update(dataverse_utils.UAHEADER)
params = {'persistentId' : hdl}
LOGGER.info('Uploading %s to %s', fpath, hdl)
upload = requests.post(f"{dvurl}/api/datasets/:persistentId/add",
params=params, headers=headers, data=multi,
timeout=kwargs.get('timeout',1000))
try:
print(upload.json())
except json.decoder.JSONDecodeError:
#This can happend when Glassfish crashes
LOGGER.critical(upload.text)
print(upload.text)
err = ('It\'s possible Glassfish may have crashed. '
'Check server logs for anomalies')
LOGGER.exception(err)
print(err)
raise
#SPSS files still process despite spoof, so there's
#a forcible unlock check
fid = upload.json()['data']['files'][0]['dataFile']['id']
print(f'FID: {fid}')
if kwargs.get('nowait') and check_lock(dvurl, hdl, kwargs['apikey']):
force_notab_unlock(hdl, dvurl, fid, kwargs['apikey'])
else:
while check_lock(dvurl, hdl, kwargs['apikey']):
time.sleep(10)
if upload.status_code != 200:
LOGGER.critical('Upload failure: %s', (upload.status_code, upload.reason))
raise DvGeneralUploadError(f'\nReason: {(upload.status_code, upload.reason)}'
f'\n{upload.text}')
if kwargs.get('md5'):
if upload.json()['data']['files'][0]['dataFile']['md5'] != kwargs.get('md5'):
LOGGER.warning('md5sum mismatch on %s', fpath)
raise Md5Error('md5sum mismatch')
restrict_file(fid=fid, dv=dvurl, apikey=kwargs.get('apikey'),
rest=kwargs.get('rest', False))
upload_from_tsv(fil, hdl, **kwargs)
¶
Utility for bulk uploading. Assumes fil is formatted as tsv with headers ‘file’, ‘description’, ‘tags’.
‘tags’ field will be split on commas.
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/dataverse_utils.py
def upload_from_tsv(fil, hdl, **kwargs):
'''
Utility for bulk uploading. Assumes fil is formatted
as tsv with headers 'file', 'description', 'tags'.
'tags' field will be split on commas.
Parameters
----------
fil
Open file object or io.IOStream()
hdl : str
Dataverse persistent ID for study (handle or DOI)
**kwargs : dict
Other parameters
Other parameters
----------------
trunc : str
Leftmost portion of Dataverse study file path to remove.
eg: trunc ='/home/user/' if the tsv field is
'/home/user/Data/ASCII'
would set the path for that line of the tsv to 'Data/ASCII'.
Defaults to None.
dv : str, required
url to base Dataverse installation
eg: 'https://abacus.library.ubc.ca'
apikey : str, required
API key for user
rest : bool, optional
On True, restrict access. Default False
'''
#reader = csv.reader(fil, delimiter='\t', quotechar='"')
#new, optional mimetype column allows using GeoJSONS.
#Read the headers from the file first before using DictReader
headers = fil.readline().strip('\n\r').split('\t')#Goddamn it Windows
fil.seek(0)
reader = csv.DictReader(fil, fieldnames=headers, quotechar='"', delimiter='\t')
#See API call for "Adding File Metadata"
for num, row in enumerate(reader):
if num == 0:
continue
#dirlabel = file_path(row[0], './')
if row.get('path'):
#Explicit separate path because that way you can organize
#on upload
dirlabel = row.get('path')
else:
dirlabel = file_path(row['file'], kwargs.get('trunc', ''))
tags = row['tags'].split(',')
tags = [x.strip() for x in tags]
descr = row['description']
mimetype = row.get('mimetype')
params = {'dv' : kwargs.get('dv'),
'tags' : tags,
'descr' : descr,
'dirlabel' : dirlabel,
'apikey' : kwargs.get('apikey'),
'md5' : kwargs.get('md5', ''),
'rest': kwargs.get('rest', False)}
if mimetype:
params['mimetype'] = mimetype
#So that you can pass everything all at once, params
#is merged onto kwargs. This is for easier upgradability
kwargs.update(params)
upload_file(row['file'], hdl, **kwargs)
dataverse_utils.dvdata
¶
Dataverse studies and files
File
¶
Bases: dict
Class representing a file on a Dataverse instance
Source code in src/dataverse_utils/dvdata.py
class File(dict):
'''
Class representing a file on a Dataverse instance
'''
def __init__(self, url:str, key:str,
**kwargs):
'''
Dataverse file object
Parameters
----------
url : str
Base URL to host Dataverse instance
key : str
Dataverse API key with downloader privileges
**kwargs : dict
Other parameters
Notes
-----
To initialize correctly, pass a value from Study['file_info'].
Eg: `File('https://test.invalid', 'ABC123', **Study_instance['file_info'][0])`
Not to be confused with the FileAnalysis object in `dataverse_utils.collections`.
'''
self['url'] = url
self.__key = key
self['downloaded'] = False
self['downloaded_file_name'] = None
self['downloaded_checksum'] = None
self['verified'] = None
#self['dv_file_metadata'] = None
# if not self['dv_file_metadata']:
# self['dv_file_metadata'] = self._get_file_metadata()
for keey, val in kwargs.items():
self[keey] = val
self['timeout'] = kwargs.get('timeout', TIMEOUT)
def download_file(self):
'''
Downloads the file to a temporary location. Data will be in the ORIGINAL format,
not Dataverse-processed TSVs
'''
if not self['downloaded'] or not os.path.exists(self.get('downloaded_file_name', '')):
headers = headers={'X-Dataverse-key':self.__key}
headers.update(UAHEADER)
try:
#curl "$SERVER_URL/api/access/datafile/:persistentId/?persistentId=$PERSISTENT_ID"
dwnld = requests.get(self['url']+'/api/access/datafile/'+
str(self['dataFile']['id']),
headers=headers,
params = {'format':'original'},
timeout=self['timeout'])
with tempfile.NamedTemporaryFile(delete=False) as fil:
self['downloaded_file_name'] = fil.name
fil.write(dwnld.content)
self['downloaded'] = True
return True
except requests.exceptions.HTTPError as err:
LOGGER.exception(err)
LOGGER.exception(traceback.format_exc())
self['downloaded'] = False
return False
return None
def del_tempfile(self):
'''
Delete tempfile if it exists
'''
if os.path.exists(self['downloaded_file_name']):
os.remove(self['downloaded_file_name'])
self['downloaded'] = False
self['downloaded_file_name'] = None
self['verified'] = None
def produce_digest(self, prot: str = 'md5', blocksize: int = 2**16) -> str:
'''
Returns hex digest for object
Parameters
----------
prot : str, optional, default='md5'
Hash type. Supported hashes: 'sha1', 'sha224', 'sha256',
'sha384', 'sha512', 'blake2b', 'blake2s', 'md5'.
Default: 'md5'
blocksize : int, optional, default=2**16
Read block size in bytes
'''
if not self['downloaded_file_name']:
return None
ok_hash = {'sha1' : hashlib.sha1(),
'sha224' : hashlib.sha224(),
'sha256' : hashlib.sha256(),
'sha384' : hashlib.sha384(),
'sha512' : hashlib.sha512(),
'blake2b' : hashlib.blake2b(),
'blake2s' : hashlib.blake2s(),
'md5': hashlib.md5()}
with open(self['downloaded_file_name'], 'rb') as _fobj:
try:
_hash = ok_hash[prot]
except (UnboundLocalError, KeyError) as err:
message = ('Unsupported hash type. Valid values are '
f'{list(ok_hash)}.' )
LOGGER.exception(err)
LOGGER.exception(message)
LOGGER.exception(traceback.format_exc())
raise
fblock = _fobj.read(blocksize)
while fblock:
_hash.update(fblock)
fblock = _fobj.read(blocksize)
return _hash.hexdigest()
def verify(self)->None:
'''
Compares actual checksum with stated checksum
'''
if not self.get('downloaded_file_name') or not self.get('downloaded'):
LOGGER.error('File has not been downloaded')
self['verified'] = None
self['downloaded_checksum'] = None
return None
_hash = self.produce_digest(self['dataFile']['checksum']['type'].lower())
if _hash == self['dataFile']['checksum']['value']:
self['verified'] = True
self['downloaded_checksum'] = hash
return True
LOGGER.error('Checksum mismatch in %s', self.get('label'))
self['verified'] = False
self['downloaded_checksum'] = _hash
return False
__init__(url, key, **kwargs)
¶
Dataverse file object
| Parameters: |
|
|---|
Notes
To initialize correctly, pass a value from Study[‘file_info’].
Eg: File('https://test.invalid', 'ABC123', **Study_instance['file_info'][0])
Not to be confused with the FileAnalysis object in dataverse_utils.collections.
Source code in src/dataverse_utils/dvdata.py
def __init__(self, url:str, key:str,
**kwargs):
'''
Dataverse file object
Parameters
----------
url : str
Base URL to host Dataverse instance
key : str
Dataverse API key with downloader privileges
**kwargs : dict
Other parameters
Notes
-----
To initialize correctly, pass a value from Study['file_info'].
Eg: `File('https://test.invalid', 'ABC123', **Study_instance['file_info'][0])`
Not to be confused with the FileAnalysis object in `dataverse_utils.collections`.
'''
self['url'] = url
self.__key = key
self['downloaded'] = False
self['downloaded_file_name'] = None
self['downloaded_checksum'] = None
self['verified'] = None
#self['dv_file_metadata'] = None
# if not self['dv_file_metadata']:
# self['dv_file_metadata'] = self._get_file_metadata()
for keey, val in kwargs.items():
self[keey] = val
self['timeout'] = kwargs.get('timeout', TIMEOUT)
del_tempfile()
¶
Delete tempfile if it exists
Source code in src/dataverse_utils/dvdata.py
def del_tempfile(self):
'''
Delete tempfile if it exists
'''
if os.path.exists(self['downloaded_file_name']):
os.remove(self['downloaded_file_name'])
self['downloaded'] = False
self['downloaded_file_name'] = None
self['verified'] = None
download_file()
¶
Downloads the file to a temporary location. Data will be in the ORIGINAL format, not Dataverse-processed TSVs
Source code in src/dataverse_utils/dvdata.py
def download_file(self):
'''
Downloads the file to a temporary location. Data will be in the ORIGINAL format,
not Dataverse-processed TSVs
'''
if not self['downloaded'] or not os.path.exists(self.get('downloaded_file_name', '')):
headers = headers={'X-Dataverse-key':self.__key}
headers.update(UAHEADER)
try:
#curl "$SERVER_URL/api/access/datafile/:persistentId/?persistentId=$PERSISTENT_ID"
dwnld = requests.get(self['url']+'/api/access/datafile/'+
str(self['dataFile']['id']),
headers=headers,
params = {'format':'original'},
timeout=self['timeout'])
with tempfile.NamedTemporaryFile(delete=False) as fil:
self['downloaded_file_name'] = fil.name
fil.write(dwnld.content)
self['downloaded'] = True
return True
except requests.exceptions.HTTPError as err:
LOGGER.exception(err)
LOGGER.exception(traceback.format_exc())
self['downloaded'] = False
return False
return None
produce_digest(prot='md5', blocksize=2 ** 16)
¶
Returns hex digest for object
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dvdata.py
def produce_digest(self, prot: str = 'md5', blocksize: int = 2**16) -> str:
'''
Returns hex digest for object
Parameters
----------
prot : str, optional, default='md5'
Hash type. Supported hashes: 'sha1', 'sha224', 'sha256',
'sha384', 'sha512', 'blake2b', 'blake2s', 'md5'.
Default: 'md5'
blocksize : int, optional, default=2**16
Read block size in bytes
'''
if not self['downloaded_file_name']:
return None
ok_hash = {'sha1' : hashlib.sha1(),
'sha224' : hashlib.sha224(),
'sha256' : hashlib.sha256(),
'sha384' : hashlib.sha384(),
'sha512' : hashlib.sha512(),
'blake2b' : hashlib.blake2b(),
'blake2s' : hashlib.blake2s(),
'md5': hashlib.md5()}
with open(self['downloaded_file_name'], 'rb') as _fobj:
try:
_hash = ok_hash[prot]
except (UnboundLocalError, KeyError) as err:
message = ('Unsupported hash type. Valid values are '
f'{list(ok_hash)}.' )
LOGGER.exception(err)
LOGGER.exception(message)
LOGGER.exception(traceback.format_exc())
raise
fblock = _fobj.read(blocksize)
while fblock:
_hash.update(fblock)
fblock = _fobj.read(blocksize)
return _hash.hexdigest()
verify()
¶
Compares actual checksum with stated checksum
Source code in src/dataverse_utils/dvdata.py
def verify(self)->None:
'''
Compares actual checksum with stated checksum
'''
if not self.get('downloaded_file_name') or not self.get('downloaded'):
LOGGER.error('File has not been downloaded')
self['verified'] = None
self['downloaded_checksum'] = None
return None
_hash = self.produce_digest(self['dataFile']['checksum']['type'].lower())
if _hash == self['dataFile']['checksum']['value']:
self['verified'] = True
self['downloaded_checksum'] = hash
return True
LOGGER.error('Checksum mismatch in %s', self.get('label'))
self['verified'] = False
self['downloaded_checksum'] = _hash
return False
FileInfo
¶
Bases: dict
An object representing all of a dataverse study’s files. Easily parseable as a dict.
Source code in src/dataverse_utils/dvdata.py
class FileInfo(dict):
'''
An object representing all of a dataverse study's files.
Easily parseable as a dict.
'''
#Should this be incorporated into the above class? Probably.
def __init__(self, **kwargs)->None:
'''
Intialize a File object
Parameters
----------
**kwargs : dict
Keyword arguments as below
Other parameters
----------------
url : str, required
Base URL of dataverse installation
pid : str, required
Handle or DOI of study
apikey : str, optional
Dataverse API key; required for DRAFT or restricted material.
Or if the platform policy requires an API key.
timeout : int, optional
Optional timeout in seconds
'''
self.kwargs = kwargs
self['version_list'] = []
self.dv = None
self._get_json()
self._get_all_files()
self['headers'] = list(self[self['current_version']][0].keys())
def _get_json(self) -> None:
'''
Get study file json
'''
try:
headers={'X-Dataverse-key' : self.kwargs.get('apikey')}
headers.update(UAHEADER)
params = {'persistentId': self.kwargs['pid']}
self.dv = requests.get(f'{self.kwargs["url"]}/api/datasets/:persistentId/versions',
params=params,
timeout=self.kwargs.get('timeout', 100),
headers=headers)
self.dv.raise_for_status()
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.TooManyRedirects,
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.Timeout,
requests.exceptions.JSONDecodeError,
requests.exceptions.InvalidSchema) as err:
err.add_note(f'Connection error: {"\n".join((str(x) for x in err.args))}')
msg = '\n'.join(getattr(err, '__notes__', []))
LOGGER.critical(msg)
raise err
def _get_all_files(self):
'''
Iterates over self.dv_json()['data']. to produce a list of files
in self['files']
'''
try:
for num, version in enumerate(self.dv.json()['data']):
self._get_version_files(version, current=num)
except AttributeError as err:
err.add_note('No JSON present')
#LOGGER.exception('FileInfo AttributeError: %s', err)
#LOGGER.exception(traceback.format_exc())
raise err
except KeyError as err:
err.add_note(f'JSON parsing error: {err}')
err.add_note('Offending JSON:')
err.add_note(f'{self.dv.json()}')
msg = '\n'.join(getattr(err, '__notes__', []))
LOGGER.exception('FileInfo KeyError: %s', msg)
#LOGGER.exception(traceback.format_exc())
raise err
def _get_version_files(self, flist: list, current=1)->None:
'''
Set version number and assign file info a version key
Parameters
----------
flist : list
list of file metadata for a particular version
current: int, optional, default=1
Value of zero represents most current version
'''
if flist['versionState'] == 'DRAFT':
ver_info='DRAFT'
else:
ver_info = f"{flist['versionNumber']}.{flist['versionMinorNumber']}"
if current == 0:
self['current_version'] = ver_info
self['version_list'].append(ver_info)
self[ver_info] = []
for fil in flist['files']:
self[ver_info].append(self._get_file_info(fil,
ver_info=ver_info,
state_info=flist['versionState']))
def _get_file_info(self, file:dict, **kwargs)->dict:
'''
Returns a dict of required info from a chunk of dataverse study
version metadata
Parameters
----------
file : dict
The dict containing one file's metadata
**kwargs : dict
Keyword arguments
version_info: str
Version info string
state_info : str
Publication state
'''
# headers = ['file', 'description', 'pidURL','downloadURL', 'version', 'state']
file_name = file['dataFile'].get('originalFileName', file['label'])
filepath = pathlib.Path(file.get('directoryLabel', ''), file_name)
description = file.get('description', '')
try:
pid_url = file['dataFile']['pidURL']
except KeyError:
pid_url = f'{self.kwargs["url"]}/file.xhtml?fileId={file["dataFile"]["id"]}'
fid = file['dataFile']['id']
download_url = f'{self.kwargs["url"]}/api/access/datafile/{fid}?format=original'
out = {'file': str(filepath).strip(),
'description': description.strip(),
'pid_url': pid_url, 'download_url':download_url,
'version': kwargs['ver_info'],
'state' : kwargs['state_info']}
return out
__init__(**kwargs)
¶
Intialize a File object
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/dvdata.py
def __init__(self, **kwargs)->None:
'''
Intialize a File object
Parameters
----------
**kwargs : dict
Keyword arguments as below
Other parameters
----------------
url : str, required
Base URL of dataverse installation
pid : str, required
Handle or DOI of study
apikey : str, optional
Dataverse API key; required for DRAFT or restricted material.
Or if the platform policy requires an API key.
timeout : int, optional
Optional timeout in seconds
'''
self.kwargs = kwargs
self['version_list'] = []
self.dv = None
self._get_json()
self._get_all_files()
self['headers'] = list(self[self['current_version']][0].keys())
Study
¶
Bases: dict
Dataverse record. Dataverse study records are pure metadata so this is represented with a dictionary.
Source code in src/dataverse_utils/dvdata.py
class Study(dict): #pylint: disable=too-few-public-methods
'''
Dataverse record. Dataverse study records are pure metadata so this
is represented with a dictionary.
'''
def __init__(self, pid: str,
url:str, key:str,
**kwargs):
'''
Initialize a Study object
Parameters
----------
pid : str
Record persistent identifier: hdl or doi
url : str
Base URL to host Dataverse instance
key : str
Dataverse API key with downloader privileges
**kwargs : dict
Keyword arguments
Other parameters
----------------
timeout : int
Request timeout in seconds
'''
self['pid'] = pid
self['url'] = url
self.__key = key
self['orig_json'] = None
self['timeout'] = kwargs.get('timeout',TIMEOUT)
if not self['orig_json']:
self['orig_json'] = self._orig_json()
self['upload_json'] = self._upload_json
self['file_info'] = self['orig_json']['files']
self['file_ids'] = [x['dataFile'].get('id') for x in self['orig_json']['files']]
self['file_persistentIds'] = self._get_file_pids()
self['source_version'] = Study.get_version(url)
self['target_version'] = None
if not self['target_version']:
self['target_version'] = Study.get_version(url)
@classmethod
def get_version(cls, url:str, timeout:int=100)->float:
'''
Returns a float representing a Dataverse version number.
Floating point value composed of:
float(f'{major_version}.{minor_verson:03d}{patch:03d}')
ie, version 5.9.2 would be 5.009002
Parameters
----------
url : str
URL of base Dataverse instance. eg: 'https://abacus.library.ubc.ca'
timeout : int, default=100
Request timeout in seconds
'''
ver = requests.get(f'{url}/api/info/version',
headers=UAHEADER,
#headers = {'X-Dataverse-key' : key},
timeout = timeout)
try:
ver.raise_for_status()
except requests.exceptions.HTTPError as exc:
LOGGER.error(r'Error getting version for {url}')
LOGGER.exception(exc)
LOGGER.exception(traceback.format_exc())
raise requests.exceptions.HTTPError
#Scholars Portal version is formatted as v5.13.9-SP, so. . .
verf = ver.json()['data']['version'].strip('v ').split('.')
verf = [x.split('-')[0] for x in verf]
verf =[int(b)/10**(3*a) for a,b in enumerate(verf)]
#it's 3*a in case for some reason we hit, say v5.99.99 and there's more before v6.
verf = sum(verf)
return verf
def set_version(self, url:str, timeout:int=100)->None:
'''
Sets self['target_version'] to appropriate integer value *AND*
formats self['upload_json'] to correct JSON format
Parameters
----------
url : str
URL of *target* Dataverse instance
timeout : int, optional, default=100
request timeout in seconds
'''
self['target_version'] = Study.get_version(url, timeout)
# Now fix the metadata to work with various versions
if self['target_version'] >= 5.010:
self.fix_licence()
if self['target_version'] >= 5.013:
self.production_location()
def _orig_json(self) -> dict:
'''
Latest study version record JSON. Retrieved from
Dataverse installation so an internet connection
is required.
'''
#curl -H "X-Dataverse-key:$API_TOKEN" /
#$SERVER_URL/api/datasets/:persistentId/?persistentId=$PERSISTENT_IDENTIFIER
headers = {'X-Dataverse-key' : self.__key}
headers.update(UAHEADER)
getjson = requests.get(self['url']+'/api/datasets/:persistentId',
headers=headers,
params = {'persistentId': self['pid']},
timeout = self['timeout'])
getjson.raise_for_status()
return getjson.json()['data']['latestVersion']
def __add_email(self, upjson):
'''
Adds contact information if it's not there. Fills with dummy data
Parameters
----------
upjson : dict
Metadata
'''
#pylint: disable=possibly-used-before-assignment
for n, v in enumerate((upjson['datasetVersion']
['metadataBlocks']['citation']['fields'])):
if v['typeName'] == 'datasetContact':
contact_no = n
for _x in (upjson['datasetVersion']['metadataBlocks']
['citation']['fields'][contact_no]['value']):
if not _x.get('datasetContactEmail'):
_x['datasetContactEmail'] = {'typeName':'datasetContactEmail',
'multiple': False,
'typeClass':'primitive',
'value': 'suppressed_value@test.invalid'}
return upjson
@property
def _upload_json(self)->dict:
'''
A Dataverse JSON record with with PIDs and other information stripped
suitable for upload as a new Dataverse study record.
'''
upj = {'datasetVersion': {'license': self['orig_json']['license'],
'termsOfUse': self['orig_json'].get('termsOfUse',''),
'metadataBlocks': self['orig_json']['metadataBlocks']
}
}
return self.__add_email(upj)
@property
def _oldupload_json(self)->dict:
'''
A Dataverse JSON record with with PIDs and other information stripped
suitable for upload as a new Dataverse study record.
'''
return {'datasetVersion': {'license': self['orig_json']['license'],
'termsOfUse': self['orig_json'].get('termsOfUse',''),
'metadataBlocks': self['orig_json']['metadataBlocks']
}
}
def _get_file_pids(self)->list:
'''
Returns a list of file ids representing the file
objects in dataverse record
'''
pids = [x['dataFile'].get('persistentId') for x in self['orig_json']['files']]
if not all(pids):
return None
return pids
######
#JSON metdata fixes for different versions
######
def fix_licence(self)->None:
'''
Replaces non-standard licence with None
Notes
-----
With Dataverse v5.10+, a licence type of 'NONE' is now forbidden.
Now, as per <https://guides.dataverse.org/en/5.14/api/sword.html\
?highlight=invalid%20license>,
non-standard licences may be replaced with None.
This function edits the same Study object *in place*, so returns nothing.
'''
if self['upload_json']['datasetVersion']['license'] == 'NONE':
self['upload_json']['datasetVersion']['license'] = None
if not self['upload_json']['datasetVersion']['termsOfUse']:
#This shouldn't happen, but UBC has datasets from the early 1970s
self['upload_json']['datasetVersion']['termsOfUse'] = 'Not available'
def production_location(self)->None:
'''
Changes "multiple" to True where typeName == 'productionPlace' in
Study['upload_json'] Changes are done
*in place*.
Notes
-----
Multiple production places came into effect with Dataverse v5.13
'''
#{'typeName': 'productionPlace', 'multiple': True, 'typeClass': 'primitive',
#'value': ['Vancouver, BC', 'Ottawa, ON']}
# get index
indy = None
for ind, val in enumerate(self['upload_json']['datasetVersion']\
['metadataBlocks']['citation']['fields']):
if val['typeName'] == 'productionPlace':
indy = ind
break
if indy and not self['upload_json']['datasetVersion']['metadataBlocks']\
['citation']['fields'][indy]['multiple']:
self['upload_json']['datasetVersion']['metadataBlocks']\
['citation']['fields'][indy]['multiple'] = True
self['upload_json']['datasetVersion']['metadataBlocks']\
['citation']['fields'][indy]['value'] = [self['upload_json']['datasetVersion']\
['metadataBlocks']['citation']\
['fields'][indy]['value']]
__add_email(upjson)
¶
Adds contact information if it’s not there. Fills with dummy data
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dvdata.py
def __add_email(self, upjson):
'''
Adds contact information if it's not there. Fills with dummy data
Parameters
----------
upjson : dict
Metadata
'''
#pylint: disable=possibly-used-before-assignment
for n, v in enumerate((upjson['datasetVersion']
['metadataBlocks']['citation']['fields'])):
if v['typeName'] == 'datasetContact':
contact_no = n
for _x in (upjson['datasetVersion']['metadataBlocks']
['citation']['fields'][contact_no]['value']):
if not _x.get('datasetContactEmail'):
_x['datasetContactEmail'] = {'typeName':'datasetContactEmail',
'multiple': False,
'typeClass':'primitive',
'value': 'suppressed_value@test.invalid'}
return upjson
__init__(pid, url, key, **kwargs)
¶
Initialize a Study object
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/dvdata.py
def __init__(self, pid: str,
url:str, key:str,
**kwargs):
'''
Initialize a Study object
Parameters
----------
pid : str
Record persistent identifier: hdl or doi
url : str
Base URL to host Dataverse instance
key : str
Dataverse API key with downloader privileges
**kwargs : dict
Keyword arguments
Other parameters
----------------
timeout : int
Request timeout in seconds
'''
self['pid'] = pid
self['url'] = url
self.__key = key
self['orig_json'] = None
self['timeout'] = kwargs.get('timeout',TIMEOUT)
if not self['orig_json']:
self['orig_json'] = self._orig_json()
self['upload_json'] = self._upload_json
self['file_info'] = self['orig_json']['files']
self['file_ids'] = [x['dataFile'].get('id') for x in self['orig_json']['files']]
self['file_persistentIds'] = self._get_file_pids()
self['source_version'] = Study.get_version(url)
self['target_version'] = None
if not self['target_version']:
self['target_version'] = Study.get_version(url)
fix_licence()
¶
Replaces non-standard licence with None
Notes
With Dataverse v5.10+, a licence type of ‘NONE’ is now forbidden. Now, as per https://guides.dataverse.org/en/5.14/api/sword.html ?highlight=invalid%20license, non-standard licences may be replaced with None.
This function edits the same Study object in place, so returns nothing.
Source code in src/dataverse_utils/dvdata.py
def fix_licence(self)->None:
'''
Replaces non-standard licence with None
Notes
-----
With Dataverse v5.10+, a licence type of 'NONE' is now forbidden.
Now, as per <https://guides.dataverse.org/en/5.14/api/sword.html\
?highlight=invalid%20license>,
non-standard licences may be replaced with None.
This function edits the same Study object *in place*, so returns nothing.
'''
if self['upload_json']['datasetVersion']['license'] == 'NONE':
self['upload_json']['datasetVersion']['license'] = None
if not self['upload_json']['datasetVersion']['termsOfUse']:
#This shouldn't happen, but UBC has datasets from the early 1970s
self['upload_json']['datasetVersion']['termsOfUse'] = 'Not available'
get_version(url, timeout=100)
classmethod
¶
Returns a float representing a Dataverse version number. Floating point value composed of: float(f’{major_version}.{minor_verson:03d}{patch:03d}’) ie, version 5.9.2 would be 5.009002
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dvdata.py
@classmethod
def get_version(cls, url:str, timeout:int=100)->float:
'''
Returns a float representing a Dataverse version number.
Floating point value composed of:
float(f'{major_version}.{minor_verson:03d}{patch:03d}')
ie, version 5.9.2 would be 5.009002
Parameters
----------
url : str
URL of base Dataverse instance. eg: 'https://abacus.library.ubc.ca'
timeout : int, default=100
Request timeout in seconds
'''
ver = requests.get(f'{url}/api/info/version',
headers=UAHEADER,
#headers = {'X-Dataverse-key' : key},
timeout = timeout)
try:
ver.raise_for_status()
except requests.exceptions.HTTPError as exc:
LOGGER.error(r'Error getting version for {url}')
LOGGER.exception(exc)
LOGGER.exception(traceback.format_exc())
raise requests.exceptions.HTTPError
#Scholars Portal version is formatted as v5.13.9-SP, so. . .
verf = ver.json()['data']['version'].strip('v ').split('.')
verf = [x.split('-')[0] for x in verf]
verf =[int(b)/10**(3*a) for a,b in enumerate(verf)]
#it's 3*a in case for some reason we hit, say v5.99.99 and there's more before v6.
verf = sum(verf)
return verf
production_location()
¶
Changes “multiple” to True where typeName == ‘productionPlace’ in Study[‘upload_json’] Changes are done in place.
Notes
Multiple production places came into effect with Dataverse v5.13
Source code in src/dataverse_utils/dvdata.py
def production_location(self)->None:
'''
Changes "multiple" to True where typeName == 'productionPlace' in
Study['upload_json'] Changes are done
*in place*.
Notes
-----
Multiple production places came into effect with Dataverse v5.13
'''
#{'typeName': 'productionPlace', 'multiple': True, 'typeClass': 'primitive',
#'value': ['Vancouver, BC', 'Ottawa, ON']}
# get index
indy = None
for ind, val in enumerate(self['upload_json']['datasetVersion']\
['metadataBlocks']['citation']['fields']):
if val['typeName'] == 'productionPlace':
indy = ind
break
if indy and not self['upload_json']['datasetVersion']['metadataBlocks']\
['citation']['fields'][indy]['multiple']:
self['upload_json']['datasetVersion']['metadataBlocks']\
['citation']['fields'][indy]['multiple'] = True
self['upload_json']['datasetVersion']['metadataBlocks']\
['citation']['fields'][indy]['value'] = [self['upload_json']['datasetVersion']\
['metadataBlocks']['citation']\
['fields'][indy]['value']]
set_version(url, timeout=100)
¶
Sets self[‘target_version’] to appropriate integer value AND formats self[‘upload_json’] to correct JSON format
| Parameters: |
|
|---|
Source code in src/dataverse_utils/dvdata.py
def set_version(self, url:str, timeout:int=100)->None:
'''
Sets self['target_version'] to appropriate integer value *AND*
formats self['upload_json'] to correct JSON format
Parameters
----------
url : str
URL of *target* Dataverse instance
timeout : int, optional, default=100
request timeout in seconds
'''
self['target_version'] = Study.get_version(url, timeout)
# Now fix the metadata to work with various versions
if self['target_version'] >= 5.010:
self.fix_licence()
if self['target_version'] >= 5.013:
self.production_location()
dataverse_utils.ldc
¶
Creates dataverse JSON from Linguistic Data Consortium website page.
Ldc
¶
Bases: Serializer
An LDC item (eg, LDC2021T01)
Source code in src/dataverse_utils/ldc.py
class Ldc(ds.Serializer):#pylint: disable=too-many-instance-attributes
'''
An LDC item (eg, LDC2021T01)
'''
#pylint: disable=super-init-not-called, arguments-differ
def __init__(self, ldc, cert=None):
'''
Returns a dict with keys created from an LDC catalogue web
page.
Parameters
----------
ldc : str
Linguistic Consortium Catalogue Number (eg. 'LDC2015T05'.
This is what forms the last part of the LDC catalogue URL.
cert : str, optional, default=None
Path to certificate chain; LDC has had a problem
with intermediate certificates, so you can
download the chain with a browser and supply a
path to the .pem with this parameter
'''
self.ldc = ldc.strip().upper()
self.ldcHtml = None
self._ldcJson = None
self._dryadJson = None
self._dvJson = None
self.cert = cert
self.session = requests.Session()
self.session.mount('https://',
HTTPAdapter(max_retries=ds.constants.RETRY_STRATEGY))
if self.cert:
self.cert = os.path.expanduser(self.cert)
self.__fixdesc = None
@property
def ldcJson(self):
'''
Returns a JSON based on the LDC web page scraping
'''
if not self._ldcJson:
self._ldcJson = self.make_ldc_json()
return self._ldcJson
@property
def dryadJson(self):
'''
LDC metadata in Dryad JSON format
'''
if not self._dryadJson:
self._dryadJson = self.make_dryad_json()
return self._dryadJson
@property
def dvJson(self):
'''
LDC metadata in Dataverse JSON format
'''
#return False
if not self._dvJson:
self._dvJson = self.make_dv_json()
return self._dvJson
@property
def embargo(self)->bool:
'''
Boolean indicating embargo status
'''
return False
@property
def fileJson(self):
'''
Returns False: No attached files possible at LDC
'''
return False
@property
def files(self):
'''
Returns None. No files possible
'''
return None
@property
def oversize(self, maxsize=None):
'''
Make sure file is not too big for the Dataverse instance
Parameters
----------
maxsize : int, optional, default=None
Maximum size in bytes
'''
#pylint: disable=property-with-parameters
if not maxsize:
maxsize = ds.constants.MAX_UPLOAD
@property
def id(self):
'''
Returns LDC ID
'''
return self.ldc
def fetch_record(self, timeout=45):
'''
Downloads record from LDC website
Parameters
----------
timeout : int, optional, default=45
Request timeout in seconds
'''
interim = self.session.get(f'https://catalog.ldc.upenn.edu/{self.ldc}',
verify=self.cert, timeout=timeout)
interim.raise_for_status()
self.ldcHtml = interim.text
def make_ldc_json(self):
'''
Returns a dict with keys created from an LDC catalogue web
page.
'''
if not self.ldcHtml:
self.fetch_record()
soup = bs(self.ldcHtml, 'html.parser')
#Should data just look in the *first* table? Specifically tbody?
#Is it always the first? I assume yes.
tbody = soup.find('tbody')#new
data = [x.text.strip() for x in tbody.find_all('td')]#new
#data = [x.text.strip() for x in soup.find_all('td')]#original
LDC_dict = {data[x][:data[x].find('\n')].strip(): data[x+1].strip()
for x in range(0, len(data), 2)}
#Related Works appears to have an extra 'Hide' at the end
if LDC_dict.get('Related Works:'):
LDC_dict['Related Works'] = (x.strip() for x in LDC_dict['Related Works:'].split('\n'))
del LDC_dict['Related Works:'] #remove the renamed key
LDC_dict['Linguistic Data Consortium'] = LDC_dict['LDC Catalog No.']
del LDC_dict['LDC Catalog No.']#This key must be renamed for consistency
LDC_dict['Author(s)'] = [x.strip() for x in LDC_dict['Author(s)'].split(',')]
#Other metadata probably has HTML in it, so we keep as much as possible
other_meta = soup.find_all('div')
alldesc = [x for x in other_meta if x.attrs.get('itemprop') == 'description']
#sometimes they format pages oddly and we can use this for a
#quick and dirty fix
self.__fixdesc = copy.deepcopy(alldesc)
#sections use h3, so split on these
#24 Jan 23 Apparently, this is all done manually so some of them sometime use h4.
#Because reasons.
#was:
#alldesc = str(alldesc).split('<h3>')
#is now
alldesc = str(alldesc).replace('h4>', 'h3>').split('<h3>')
for i in range(1, len(alldesc)):
alldesc[i] = '<h3>' + alldesc[i]
#first one is not actually useful, so discard it
alldesc.pop(0)
#So far, so good. At this point the relative links need fixing
#and tables need to be converted to pre.
for desc in alldesc:
#It's already strings; replace relative links first
desc = desc.replace('../../../', 'https://catalog.ldc.upenn.edu/')
subsoup = bs(desc, 'html.parser')
key = subsoup.h3.text.strip()
#don't need the h3 tags anymore
subsoup.find('h3').extract()
# Convert tables to <pre>
for tab in subsoup.find_all('table'):
content = str(tab)
#convert to markdown
content = markdownify.markdownify(content)
tab.name = 'pre'
tab.string = content #There is not much documentation on the
#difference between tab.string and tab.content
#That was relatively easy
LDC_dict[key] = str(subsoup)
LDC_dict['Introduction'] = LDC_dict.get('Introduction',
self.__no_intro())
#LDC puts http in front of their DOI identifier
if LDC_dict.get('DOI'):
LDC_dict['DOI'] = LDC_dict['DOI'].strip('https://doi.org/')
return LDC_dict
def __no_intro(self)->str:
'''
Makes an introduction even if they forgot to include the word "Introduction"
'''
#self.__fixdesc is set in make_ldc_json
intro = [x for x in self.__fixdesc if
self.__fixdesc[0]['itemprop']=='description'][0]
while intro.find('div'): #nested?, not cleaning properly
intro.find('div').unwrap() # remove the div tag
intro = str(intro)
#Normally, there's an <h3>Introduction</h3> but sometimes there's not
#Assumes that the first section up to "<h" is an intro.
#You know what they say about assuming
intro = intro[:intro.find('<h')]
start = intro.find('<div')
if start != -1:
end = intro.find('>',start)+1
intro = intro.replace(intro[start:end], '').strip()
return intro
@staticmethod
def name_parser(name):
'''
Returns lastName/firstName JSON snippet from a name
Parameters
----------
name : str
A name
Notes
-----
Can't be 100% accurate, because names can be split in many ways. However, as they
say, 80% is good enough.
'''
names = name.split(' ')
return {'lastName': names[-1], 'firstName': ' '.join(names[:-1]), 'affiliation':''}
def make_dryad_json(self, ldc=None):
'''
Creates a Dryad-style dict from an LDC dictionary
Parameters
----------
ldc : dict, optional, default=self.ldcJson
Dictionary containing LDC data. Defaults to self.ldcJson
'''
if not ldc:
ldc = self.ldcJson
print(ldc)
dryad = {}
dryad['title'] = ldc['Item Name']
dryad['authors'] = [Ldc.name_parser(x) for x in ldc['Author(s)']]
abstract = ('<p><b>Introduction</b></p>'
f"<p>{ldc['Introduction']}</p>"
'<p><b>Data</b></p>'
f"<p>{ldc['Data']}</p>")
if ldc.get('Acknowledgement'):
abstract += ('<p><b>Acknowledgement</b></p>'
f"<p>{ldc['Acknowledgement']}</p>")
dryad['abstract'] = abstract
dryad['keywords'] = ['Linguistics']
#Dataverse accepts only ISO formatted date
try:
releaseDate = time.strptime(ldc['Release Date'], '%B %d, %Y')
releaseDate = time.strftime('%Y-%m-%d', releaseDate)
except KeyError:
#Older surveys don't have a release date field
#so it must be created from the record number
if self.ldc[3] == '9':
releaseDate = '19' + self.ldc[3:5]
dryad['lastModificationDate'] = releaseDate
dryad['publicationDate'] = releaseDate
return dryad
def _make_note(self, ldc=None)->str:
'''
Creates a generalizes HTML notes field from a bunch of
LDC fields that don't fit into dataverse
Parameters
----------
ldc : dict, optional, default=self.ldcJson
Dictionary containing LDC data
'''
if not ldc:
ldc = self.ldcJson
note_fields = ['DCMI Type(s)',
'Sample Type',
'Sample Rate',
'Application(s)',
'Language(s)',
'Language ID(s)']
outhtml = []
for note in note_fields:
if ldc.get(note):
data = ldc[note].split(',')
data = [x.strip() for x in data]
data = ', '.join(data)
if note != 'Language ID(s)':
data = data[0].capitalize() + data[1:]
#data = [x.capitalize() for x in data]
outhtml.append(f'{note}: {data}')
outhtml.append(f'Metadata automatically created from '
f'<a href="https://catalog.ldc.upenn.edu/{self.ldc}">'
f'https://catalog.ldc.upenn.edu/{self.ldc}</a> '
f'[{time.strftime("%d %b %Y", time.localtime())}]')
return '<br />'.join(outhtml)
@staticmethod
def find_block_index(dvjson, key):
'''
Finds the index number of an item in Dataverse's idiotic JSON list
Parameters
----------
dvjson : dict
Dataverse JSON
key : str
key for which to find list index
'''
for num, item in enumerate(dvjson['datasetVersion']
['metadataBlocks']['citation']['fields']):
if item['typeName'] == key:
return num
return None
def make_dv_json(self, ldc=None):#pylint: disable=too-many-locals, too-many-statements
'''
Returns complete Dataverse JSON
Parameters
----------
ldc : dict, optional, default=self.ldcJson
LDC dictionary.
'''
if not ldc:
ldc = self.ldcJson
dvjson = super().dvJson.copy()
#ID Numbers
otherid = super()._typeclass('otherId', True, 'compound')
ids = []
for item in ['Linguistic Data Consortium', 'ISBN', 'ISLRN', 'DOI']:
if ldc.get(item):
out = {}
agency = super()._convert_generic(inJson={item:item},
dryField=item,
dvField='otherIdAgency')
value = super()._convert_generic(inJson={item:ldc[item]},
dryField=item,
dvField='otherIdValue')
out.update(agency)
out.update(value)
ids.append(out)
otherid['value'] = ids
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(otherid)
#Producer and publisher
prod = super()._typeclass('producer', True, 'compound')
p_name = super()._convert_generic(inJson={'producerName': 'Linguistic Data Consortium'},
dryField='producerName',
dvField='producerName')
p_affil = super()._convert_generic(inJson={'producerAffiliation':
'University of Pennsylvania'},
dryField='producerName',
dvField='producerName')
p_url = super()._convert_generic(inJson={'producerURL': 'https://www.ldc.upenn.edu/'},
dryField='producerURL',
dvField='producerURL')
p_name.update(p_affil)
p_name.update(p_url)
prod['value'] = [p_name]
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(prod)
#Kind of data
kind = super()._typeclass('kindOfData', True, 'primitive')
kind['value'] = 'Linguistic data'
#Series
series = super()._typeclass('series', False, 'compound')
s_name = super()._convert_generic(inJson={'seriesName': 'LDC'},
dryField='seriesName',
dvField='seriesName')
s_info = super()._convert_generic(inJson={'seriesInformation':
'Linguistic Data Consortium'},
dryField='seriesInformation',
dvField='seriesInformation')
s_name.update(s_info)
series['value'] = s_name #not a list
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)
#Data sources
series = super()._typeclass('dataSources', True, 'primitive')
data_sources = ldc['Data Source(s)'].split(',')
data_sources = [x.strip().capitalize() for x in data_sources]
series['value'] = data_sources
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)
#Fix keyword labels that are hardcoded for Dryad
#There should be only one keyword block
keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']['metadataBlocks']
['citation']['fields'])
if y.get('typeName') == 'keyword'][0]
key_pos = [x for x, y in enumerate(keyword_field[1]['value'])
if y['keywordVocabulary']['value'] == 'Dryad'][0]
dvjson['datasetVersion']['metadataBlocks']['citation']\
['fields'][keyword_field[0]]['value'][key_pos]\
['keywordVocabulary']['value'] = 'Linguistic Data Consortium'
#The first keyword field is hardcoded in by dryad2dataverse.serializer
#So I think it needs to be deleted
keyword_field = [(x, y) for x, y in
enumerate(dvjson['datasetVersion']['metadataBlocks']['citation']['fields'])
if y.get('typeName') == 'otherId'][0] #ibid
del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]
#Notes
note_index = Ldc.find_block_index(dvjson, 'notesText')
if note_index:
dvjson['datasetVersion']['metadataBlocks']['citation']\
['fields'][note_index]['value'] = self._make_note()
else:
notes = super()._typeclass('notesText', False, 'primitive')
notes['value'] = self._make_note()
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(notes)
#Deletes unused "publication" fields: rewrite to make it a function call.
keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']
['metadataBlocks']['citation']['fields'])
if y.get('typeName') == 'publication'][0] #ibid
del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]
#And now the licence:
dvjson['datasetVersion']['license'] = LIC_NAME
dvjson['datasetVersion']['termsOfUse'] = LICENCE
return dvjson
def upload_metadata(self, **kwargs) -> dict:
'''
Uploads metadata to dataverse. Returns json from
connection attempt.
Parameters
----------
**kwargs : dict
Parameters
Other parameters
----------------
url : str
base url to Dataverse installation
key : str
api key
dv : str
Dataverse to which it is being uploaded
'''
url = kwargs['url'].strip('/')
key = kwargs['key']
dv = kwargs['dv']
json = kwargs.get('json', self.dvJson)
headers = {'X-Dataverse-key':key}
headers.update(UAHEADER)
try:
upload = self.session.post(f'{url}/api/dataverses/{dv}/datasets',
headers=headers,
json=json)
upload.raise_for_status()
return upload.json()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError):
print(upload.text)
raise
dryadJson
property
¶
LDC metadata in Dryad JSON format
dvJson
property
¶
LDC metadata in Dataverse JSON format
embargo
property
¶
Boolean indicating embargo status
fileJson
property
¶
Returns False: No attached files possible at LDC
files
property
¶
Returns None. No files possible
id
property
¶
Returns LDC ID
ldcJson
property
¶
Returns a JSON based on the LDC web page scraping
oversize
property
¶
Make sure file is not too big for the Dataverse instance
| Parameters: |
|
|---|
__init__(ldc, cert=None)
¶
Returns a dict with keys created from an LDC catalogue web page.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/ldc.py
def __init__(self, ldc, cert=None):
'''
Returns a dict with keys created from an LDC catalogue web
page.
Parameters
----------
ldc : str
Linguistic Consortium Catalogue Number (eg. 'LDC2015T05'.
This is what forms the last part of the LDC catalogue URL.
cert : str, optional, default=None
Path to certificate chain; LDC has had a problem
with intermediate certificates, so you can
download the chain with a browser and supply a
path to the .pem with this parameter
'''
self.ldc = ldc.strip().upper()
self.ldcHtml = None
self._ldcJson = None
self._dryadJson = None
self._dvJson = None
self.cert = cert
self.session = requests.Session()
self.session.mount('https://',
HTTPAdapter(max_retries=ds.constants.RETRY_STRATEGY))
if self.cert:
self.cert = os.path.expanduser(self.cert)
self.__fixdesc = None
__no_intro()
¶
Makes an introduction even if they forgot to include the word “Introduction”
Source code in src/dataverse_utils/ldc.py
def __no_intro(self)->str:
'''
Makes an introduction even if they forgot to include the word "Introduction"
'''
#self.__fixdesc is set in make_ldc_json
intro = [x for x in self.__fixdesc if
self.__fixdesc[0]['itemprop']=='description'][0]
while intro.find('div'): #nested?, not cleaning properly
intro.find('div').unwrap() # remove the div tag
intro = str(intro)
#Normally, there's an <h3>Introduction</h3> but sometimes there's not
#Assumes that the first section up to "<h" is an intro.
#You know what they say about assuming
intro = intro[:intro.find('<h')]
start = intro.find('<div')
if start != -1:
end = intro.find('>',start)+1
intro = intro.replace(intro[start:end], '').strip()
return intro
fetch_record(timeout=45)
¶
Downloads record from LDC website
| Parameters: |
|
|---|
Source code in src/dataverse_utils/ldc.py
def fetch_record(self, timeout=45):
'''
Downloads record from LDC website
Parameters
----------
timeout : int, optional, default=45
Request timeout in seconds
'''
interim = self.session.get(f'https://catalog.ldc.upenn.edu/{self.ldc}',
verify=self.cert, timeout=timeout)
interim.raise_for_status()
self.ldcHtml = interim.text
find_block_index(dvjson, key)
staticmethod
¶
Finds the index number of an item in Dataverse’s idiotic JSON list
| Parameters: |
|
|---|
Source code in src/dataverse_utils/ldc.py
@staticmethod
def find_block_index(dvjson, key):
'''
Finds the index number of an item in Dataverse's idiotic JSON list
Parameters
----------
dvjson : dict
Dataverse JSON
key : str
key for which to find list index
'''
for num, item in enumerate(dvjson['datasetVersion']
['metadataBlocks']['citation']['fields']):
if item['typeName'] == key:
return num
return None
make_dryad_json(ldc=None)
¶
Creates a Dryad-style dict from an LDC dictionary
| Parameters: |
|
|---|
Source code in src/dataverse_utils/ldc.py
def make_dryad_json(self, ldc=None):
'''
Creates a Dryad-style dict from an LDC dictionary
Parameters
----------
ldc : dict, optional, default=self.ldcJson
Dictionary containing LDC data. Defaults to self.ldcJson
'''
if not ldc:
ldc = self.ldcJson
print(ldc)
dryad = {}
dryad['title'] = ldc['Item Name']
dryad['authors'] = [Ldc.name_parser(x) for x in ldc['Author(s)']]
abstract = ('<p><b>Introduction</b></p>'
f"<p>{ldc['Introduction']}</p>"
'<p><b>Data</b></p>'
f"<p>{ldc['Data']}</p>")
if ldc.get('Acknowledgement'):
abstract += ('<p><b>Acknowledgement</b></p>'
f"<p>{ldc['Acknowledgement']}</p>")
dryad['abstract'] = abstract
dryad['keywords'] = ['Linguistics']
#Dataverse accepts only ISO formatted date
try:
releaseDate = time.strptime(ldc['Release Date'], '%B %d, %Y')
releaseDate = time.strftime('%Y-%m-%d', releaseDate)
except KeyError:
#Older surveys don't have a release date field
#so it must be created from the record number
if self.ldc[3] == '9':
releaseDate = '19' + self.ldc[3:5]
dryad['lastModificationDate'] = releaseDate
dryad['publicationDate'] = releaseDate
return dryad
make_dv_json(ldc=None)
¶
Returns complete Dataverse JSON
| Parameters: |
|
|---|
Source code in src/dataverse_utils/ldc.py
def make_dv_json(self, ldc=None):#pylint: disable=too-many-locals, too-many-statements
'''
Returns complete Dataverse JSON
Parameters
----------
ldc : dict, optional, default=self.ldcJson
LDC dictionary.
'''
if not ldc:
ldc = self.ldcJson
dvjson = super().dvJson.copy()
#ID Numbers
otherid = super()._typeclass('otherId', True, 'compound')
ids = []
for item in ['Linguistic Data Consortium', 'ISBN', 'ISLRN', 'DOI']:
if ldc.get(item):
out = {}
agency = super()._convert_generic(inJson={item:item},
dryField=item,
dvField='otherIdAgency')
value = super()._convert_generic(inJson={item:ldc[item]},
dryField=item,
dvField='otherIdValue')
out.update(agency)
out.update(value)
ids.append(out)
otherid['value'] = ids
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(otherid)
#Producer and publisher
prod = super()._typeclass('producer', True, 'compound')
p_name = super()._convert_generic(inJson={'producerName': 'Linguistic Data Consortium'},
dryField='producerName',
dvField='producerName')
p_affil = super()._convert_generic(inJson={'producerAffiliation':
'University of Pennsylvania'},
dryField='producerName',
dvField='producerName')
p_url = super()._convert_generic(inJson={'producerURL': 'https://www.ldc.upenn.edu/'},
dryField='producerURL',
dvField='producerURL')
p_name.update(p_affil)
p_name.update(p_url)
prod['value'] = [p_name]
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(prod)
#Kind of data
kind = super()._typeclass('kindOfData', True, 'primitive')
kind['value'] = 'Linguistic data'
#Series
series = super()._typeclass('series', False, 'compound')
s_name = super()._convert_generic(inJson={'seriesName': 'LDC'},
dryField='seriesName',
dvField='seriesName')
s_info = super()._convert_generic(inJson={'seriesInformation':
'Linguistic Data Consortium'},
dryField='seriesInformation',
dvField='seriesInformation')
s_name.update(s_info)
series['value'] = s_name #not a list
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)
#Data sources
series = super()._typeclass('dataSources', True, 'primitive')
data_sources = ldc['Data Source(s)'].split(',')
data_sources = [x.strip().capitalize() for x in data_sources]
series['value'] = data_sources
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(series)
#Fix keyword labels that are hardcoded for Dryad
#There should be only one keyword block
keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']['metadataBlocks']
['citation']['fields'])
if y.get('typeName') == 'keyword'][0]
key_pos = [x for x, y in enumerate(keyword_field[1]['value'])
if y['keywordVocabulary']['value'] == 'Dryad'][0]
dvjson['datasetVersion']['metadataBlocks']['citation']\
['fields'][keyword_field[0]]['value'][key_pos]\
['keywordVocabulary']['value'] = 'Linguistic Data Consortium'
#The first keyword field is hardcoded in by dryad2dataverse.serializer
#So I think it needs to be deleted
keyword_field = [(x, y) for x, y in
enumerate(dvjson['datasetVersion']['metadataBlocks']['citation']['fields'])
if y.get('typeName') == 'otherId'][0] #ibid
del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]
#Notes
note_index = Ldc.find_block_index(dvjson, 'notesText')
if note_index:
dvjson['datasetVersion']['metadataBlocks']['citation']\
['fields'][note_index]['value'] = self._make_note()
else:
notes = super()._typeclass('notesText', False, 'primitive')
notes['value'] = self._make_note()
dvjson['datasetVersion']['metadataBlocks']['citation']['fields'].append(notes)
#Deletes unused "publication" fields: rewrite to make it a function call.
keyword_field = [(x, y) for x, y in enumerate(dvjson['datasetVersion']
['metadataBlocks']['citation']['fields'])
if y.get('typeName') == 'publication'][0] #ibid
del dvjson['datasetVersion']['metadataBlocks']['citation']['fields'][keyword_field[0]]
#And now the licence:
dvjson['datasetVersion']['license'] = LIC_NAME
dvjson['datasetVersion']['termsOfUse'] = LICENCE
return dvjson
make_ldc_json()
¶
Returns a dict with keys created from an LDC catalogue web page.
Source code in src/dataverse_utils/ldc.py
def make_ldc_json(self):
'''
Returns a dict with keys created from an LDC catalogue web
page.
'''
if not self.ldcHtml:
self.fetch_record()
soup = bs(self.ldcHtml, 'html.parser')
#Should data just look in the *first* table? Specifically tbody?
#Is it always the first? I assume yes.
tbody = soup.find('tbody')#new
data = [x.text.strip() for x in tbody.find_all('td')]#new
#data = [x.text.strip() for x in soup.find_all('td')]#original
LDC_dict = {data[x][:data[x].find('\n')].strip(): data[x+1].strip()
for x in range(0, len(data), 2)}
#Related Works appears to have an extra 'Hide' at the end
if LDC_dict.get('Related Works:'):
LDC_dict['Related Works'] = (x.strip() for x in LDC_dict['Related Works:'].split('\n'))
del LDC_dict['Related Works:'] #remove the renamed key
LDC_dict['Linguistic Data Consortium'] = LDC_dict['LDC Catalog No.']
del LDC_dict['LDC Catalog No.']#This key must be renamed for consistency
LDC_dict['Author(s)'] = [x.strip() for x in LDC_dict['Author(s)'].split(',')]
#Other metadata probably has HTML in it, so we keep as much as possible
other_meta = soup.find_all('div')
alldesc = [x for x in other_meta if x.attrs.get('itemprop') == 'description']
#sometimes they format pages oddly and we can use this for a
#quick and dirty fix
self.__fixdesc = copy.deepcopy(alldesc)
#sections use h3, so split on these
#24 Jan 23 Apparently, this is all done manually so some of them sometime use h4.
#Because reasons.
#was:
#alldesc = str(alldesc).split('<h3>')
#is now
alldesc = str(alldesc).replace('h4>', 'h3>').split('<h3>')
for i in range(1, len(alldesc)):
alldesc[i] = '<h3>' + alldesc[i]
#first one is not actually useful, so discard it
alldesc.pop(0)
#So far, so good. At this point the relative links need fixing
#and tables need to be converted to pre.
for desc in alldesc:
#It's already strings; replace relative links first
desc = desc.replace('../../../', 'https://catalog.ldc.upenn.edu/')
subsoup = bs(desc, 'html.parser')
key = subsoup.h3.text.strip()
#don't need the h3 tags anymore
subsoup.find('h3').extract()
# Convert tables to <pre>
for tab in subsoup.find_all('table'):
content = str(tab)
#convert to markdown
content = markdownify.markdownify(content)
tab.name = 'pre'
tab.string = content #There is not much documentation on the
#difference between tab.string and tab.content
#That was relatively easy
LDC_dict[key] = str(subsoup)
LDC_dict['Introduction'] = LDC_dict.get('Introduction',
self.__no_intro())
#LDC puts http in front of their DOI identifier
if LDC_dict.get('DOI'):
LDC_dict['DOI'] = LDC_dict['DOI'].strip('https://doi.org/')
return LDC_dict
name_parser(name)
staticmethod
¶
Returns lastName/firstName JSON snippet from a name
| Parameters: |
|
|---|
Notes
Can’t be 100% accurate, because names can be split in many ways. However, as they say, 80% is good enough.
Source code in src/dataverse_utils/ldc.py
@staticmethod
def name_parser(name):
'''
Returns lastName/firstName JSON snippet from a name
Parameters
----------
name : str
A name
Notes
-----
Can't be 100% accurate, because names can be split in many ways. However, as they
say, 80% is good enough.
'''
names = name.split(' ')
return {'lastName': names[-1], 'firstName': ' '.join(names[:-1]), 'affiliation':''}
upload_metadata(**kwargs)
¶
Uploads metadata to dataverse. Returns json from connection attempt.
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/ldc.py
def upload_metadata(self, **kwargs) -> dict:
'''
Uploads metadata to dataverse. Returns json from
connection attempt.
Parameters
----------
**kwargs : dict
Parameters
Other parameters
----------------
url : str
base url to Dataverse installation
key : str
api key
dv : str
Dataverse to which it is being uploaded
'''
url = kwargs['url'].strip('/')
key = kwargs['key']
dv = kwargs['dv']
json = kwargs.get('json', self.dvJson)
headers = {'X-Dataverse-key':key}
headers.update(UAHEADER)
try:
upload = self.session.post(f'{url}/api/dataverses/{dv}/datasets',
headers=headers,
json=json)
upload.raise_for_status()
return upload.json()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError):
print(upload.text)
raise
dataverse_utils.collections
¶
Utilities for recursively analysing a Dataverse collection.
DvCollection
¶
Metadata for an entire dataverse collection, recursively.
Source code in src/dataverse_utils/collections.py
class DvCollection:
'''
Metadata for an *entire* dataverse collection, recursively.
'''
#pylint: disable=too-many-instance-attributes
def __init__(self, url:str, coll:str, key=None, **kwargs):
'''
All you need to start recursively crawling.
Parameters
----------
coll : str
short collection name or id number
url : str
base URL of Dataverse collection.
eg: https://borealisdata.ca
borealisdata.ca
key : str
API key (optional, only use if you want to see hidden material)
**kwargs: dict
Other parameters
Other parameters
----------------
timeout : int
retry timeout in seconds
'''
self.coll = coll
self.url = self.__clean_url(url)
self.headers = None
self.__key = key
if self.__key:
self.headers = {'X-Dataverse-key': self.__key}
self.headers.update(UAHEADER)
else:
self.headers = UAHEADER.copy()
if not kwargs.get('retry'):
self.retry_strategy = RETRY
else:
self.retry_strategy = kwargs['retry']
self.session = requests.Session()
self.session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=self.retry_strategy))
self.collections = None
self.studies = None
def __clean_url(self, badurl:str):
'''
Sanitize URL, return properly formatted HTTP string.
Parameters
----------
badurl: str
URL string
'''
clean = badurl.strip().strip('/')
if not clean.startswith('https://'):
clean = f'https://{clean}'
return clean
def __get_shortname(self, dvid):
'''
Get collection short name.
'''
shortname = self.session.get(f'{self.url}/api/dataverses/{dvid}', headers=self.headers)
shortname.raise_for_status()
return shortname.json()['data']['alias']
def get_collections(self, coll:str=None, output=None, **kwargs)->list:#pylint: disable=unused-argument
'''
Get a [recursive] listing of all dataverses in a collection.
Parameters
----------
coll : str, optional, default=None
Collection short name or id
output : list, optional, default=[]
output list to append to
**kwargs : dict
Other keyword arguments
'''
if not output:
output = []
if not coll:
coll = self.coll
x = self.session.get(f'{self.url}/api/dataverses/{coll}/contents',
headers=self.headers)
data = x.json().get('data')
#---
#Because it's possible that permissions errors can cause API read errors,
#we have this insane way of checking errors.
#I have no idea what kind of errors would be raised, so it has
#a bare except, which is bad. But what can you do?
dvs =[]
for _ in data:
if _['type'] == 'dataverse':
try:
out=self.__get_shortname(_['id'])
dvs.append((_['title'], out))
except Exception as e:
obscure_error = f'''
An error has occured where a collection can be
identified by ID but its name cannot be determined.
This is (normally) caused by a configuration error where
administrator permissions are not correctly inherited by
the child collection.
Please check with the system administrator to determine
any exact issues.
Problematic collection id number: {_.get("id",
"not available")}'''
print(50*'-')
print(textwrap.dedent(obscure_error))
print(e)
LOGGER.error(textwrap.fill(textwrap.dedent(obscure_error).strip()))
traceback.print_exc()
print(50*'-')
raise e
#---
if not dvs:
dvs = []
output.extend(dvs)
for dv in dvs:
LOGGER.debug('%s/api/dataverses/%s/contents', self.url, dv[1])
LOGGER.debug('recursive')
self.get_collections(dv[1], output)
self.collections = output
return output
def get_studies(self, root:str=None):
'''
return [(pid, title)..(pid_n, title_n)] of a collection.
Parameters
----------
root : str
Short name or id of *top* level of tree. Default self.coll
'''
all_studies = []
if not root:
root=self.coll
all_studies = self.get_collection_listing(root)
#collections = self.get_collections(root, self.url)
collections = self.get_collections(root)
for collection in collections:
all_studies.extend(self.get_collection_listing(collection[1]))
self.studies = all_studies
return all_studies
def get_collection_listing(self, coll_id):
'''
Return a listing of studies in a collection, with pid.
Parameters
----------
coll_id : str
Short name or id of a dataverse collection
'''
cl = self.session.get(f'{self.url}/api/dataverses/{coll_id}/contents',
headers=self.headers)
cl.raise_for_status()
pids = [f"{z['protocol']}:{z['authority']}/{z['identifier']}"
for z in cl.json()['data'] if z['type'] == 'dataset']
out = [(self.get_study_info(pid), pid) for pid in pids]
for _ in out:
_[0].update({'pid': _[1]})
return [x[0] for x in out]
def get_study_info(self, pid):
'''
Returns a StudyMetadata object with complete metadata for a study.
Parameters
----------
pid : str
Persistent ID of a Dataverse study
'''
meta = self.session.get(f'{self.url}/api/datasets/:persistentId',
params={'persistentId': pid},
headers=self.headers)
meta.raise_for_status()
LOGGER.debug(pid)
return StudyMetadata(study_meta=meta.json(), key=self.__key, url=self.url)
__clean_url(badurl)
¶
Sanitize URL, return properly formatted HTTP string.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def __clean_url(self, badurl:str):
'''
Sanitize URL, return properly formatted HTTP string.
Parameters
----------
badurl: str
URL string
'''
clean = badurl.strip().strip('/')
if not clean.startswith('https://'):
clean = f'https://{clean}'
return clean
__get_shortname(dvid)
¶
Get collection short name.
Source code in src/dataverse_utils/collections.py
def __get_shortname(self, dvid):
'''
Get collection short name.
'''
shortname = self.session.get(f'{self.url}/api/dataverses/{dvid}', headers=self.headers)
shortname.raise_for_status()
return shortname.json()['data']['alias']
__init__(url, coll, key=None, **kwargs)
¶
All you need to start recursively crawling.
| Parameters: |
|
|---|
|
Source code in src/dataverse_utils/collections.py
def __init__(self, url:str, coll:str, key=None, **kwargs):
'''
All you need to start recursively crawling.
Parameters
----------
coll : str
short collection name or id number
url : str
base URL of Dataverse collection.
eg: https://borealisdata.ca
borealisdata.ca
key : str
API key (optional, only use if you want to see hidden material)
**kwargs: dict
Other parameters
Other parameters
----------------
timeout : int
retry timeout in seconds
'''
self.coll = coll
self.url = self.__clean_url(url)
self.headers = None
self.__key = key
if self.__key:
self.headers = {'X-Dataverse-key': self.__key}
self.headers.update(UAHEADER)
else:
self.headers = UAHEADER.copy()
if not kwargs.get('retry'):
self.retry_strategy = RETRY
else:
self.retry_strategy = kwargs['retry']
self.session = requests.Session()
self.session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=self.retry_strategy))
self.collections = None
self.studies = None
get_collection_listing(coll_id)
¶
Return a listing of studies in a collection, with pid.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def get_collection_listing(self, coll_id):
'''
Return a listing of studies in a collection, with pid.
Parameters
----------
coll_id : str
Short name or id of a dataverse collection
'''
cl = self.session.get(f'{self.url}/api/dataverses/{coll_id}/contents',
headers=self.headers)
cl.raise_for_status()
pids = [f"{z['protocol']}:{z['authority']}/{z['identifier']}"
for z in cl.json()['data'] if z['type'] == 'dataset']
out = [(self.get_study_info(pid), pid) for pid in pids]
for _ in out:
_[0].update({'pid': _[1]})
return [x[0] for x in out]
get_collections(coll=None, output=None, **kwargs)
¶
Get a [recursive] listing of all dataverses in a collection.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def get_collections(self, coll:str=None, output=None, **kwargs)->list:#pylint: disable=unused-argument
'''
Get a [recursive] listing of all dataverses in a collection.
Parameters
----------
coll : str, optional, default=None
Collection short name or id
output : list, optional, default=[]
output list to append to
**kwargs : dict
Other keyword arguments
'''
if not output:
output = []
if not coll:
coll = self.coll
x = self.session.get(f'{self.url}/api/dataverses/{coll}/contents',
headers=self.headers)
data = x.json().get('data')
#---
#Because it's possible that permissions errors can cause API read errors,
#we have this insane way of checking errors.
#I have no idea what kind of errors would be raised, so it has
#a bare except, which is bad. But what can you do?
dvs =[]
for _ in data:
if _['type'] == 'dataverse':
try:
out=self.__get_shortname(_['id'])
dvs.append((_['title'], out))
except Exception as e:
obscure_error = f'''
An error has occured where a collection can be
identified by ID but its name cannot be determined.
This is (normally) caused by a configuration error where
administrator permissions are not correctly inherited by
the child collection.
Please check with the system administrator to determine
any exact issues.
Problematic collection id number: {_.get("id",
"not available")}'''
print(50*'-')
print(textwrap.dedent(obscure_error))
print(e)
LOGGER.error(textwrap.fill(textwrap.dedent(obscure_error).strip()))
traceback.print_exc()
print(50*'-')
raise e
#---
if not dvs:
dvs = []
output.extend(dvs)
for dv in dvs:
LOGGER.debug('%s/api/dataverses/%s/contents', self.url, dv[1])
LOGGER.debug('recursive')
self.get_collections(dv[1], output)
self.collections = output
return output
get_studies(root=None)
¶
return [(pid, title)..(pid_n, title_n)] of a collection.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def get_studies(self, root:str=None):
'''
return [(pid, title)..(pid_n, title_n)] of a collection.
Parameters
----------
root : str
Short name or id of *top* level of tree. Default self.coll
'''
all_studies = []
if not root:
root=self.coll
all_studies = self.get_collection_listing(root)
#collections = self.get_collections(root, self.url)
collections = self.get_collections(root)
for collection in collections:
all_studies.extend(self.get_collection_listing(collection[1]))
self.studies = all_studies
return all_studies
get_study_info(pid)
¶
Returns a StudyMetadata object with complete metadata for a study.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def get_study_info(self, pid):
'''
Returns a StudyMetadata object with complete metadata for a study.
Parameters
----------
pid : str
Persistent ID of a Dataverse study
'''
meta = self.session.get(f'{self.url}/api/datasets/:persistentId',
params={'persistentId': pid},
headers=self.headers)
meta.raise_for_status()
LOGGER.debug(pid)
return StudyMetadata(study_meta=meta.json(), key=self.__key, url=self.url)
FileAnalysis
¶
Bases: dict
Download and analyze a file from a dataverse installation and produce useful metadata.
Source code in src/dataverse_utils/collections.py
class FileAnalysis(dict):
'''
Download and analyze a file from a dataverse installation and
produce useful metadata.
'''
def __init__(self, **kwargs):
'''
Intialize the object.
Parameters
----------
**kwargs : dict
Keyword arguments
Other parameters
----------------
local : str
Path to local file
url : str
URL of Dataverse instance
key : str
API key for downloading
fid : int
Integer file id
pid : str
Persistent ID of file
filename : str
File name (original)
filesize_bytes : int
File size in bytes
Notes
-----
Either `local` must be supplied, or `url`, `key` and at least one of
`fid` or `pid` must be supplied
'''
#self.url = self.__clean_url(url)
self.headers = UAHEADER.copy()
self.kwargs = kwargs
if self.kwargs.get('key'):
self.headers.update({'X-Dataverse-key':self.kwargs['key']})
self.local = None
if not self.__sufficient:
err = ('Insufficient required arguments. '
'Include (url, key, '
'(pid or id)) or (local) keyword parameters.')
raise TypeError(err)
self.tempfile = None
self.session = requests.Session()
self.session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=RETRY))
self.checkable = {'.sav': self.stat_file_metadata,
'.sas7bdat': self.stat_file_metadata,
'.dta': self.stat_file_metadata,
'.csv': self.generic_metadata,
'.tsv': self.generic_metadata,
'.rdata': self.generic_metadata,
'.rda': self.generic_metadata}
self.filename = None #get it later
self.enhance()
def __del__(self):
'''
Cleanup old temporary files on object deletion.
'''
self.session.close()
del self.tempfile
def __sufficient(self)->bool:
'''
Checks if sufficient information is supplied for intialization, with
local files taking preference over remote.
'''
if self.kwargs.get('local'):
return True
if (self.kwargs['url'] and self.kwargs['key']
and (self.kwargs.get('pid') or self.kwargs.get('id'))):
return True
return False
def __clean_url(self, badurl:str)->str:
'''
Sanitize URL. Ensures ssl and no trailing slash.
Parameters
----------
badurl: str
URL
'''
clean = badurl.strip().strip('/')
if not clean.startswith('https://'):
clean = f'https://{clean}'
return clean
def __get_filename(self, head:dict)->typing.Union[str, None]:
'''
Determines whether or not this is a file that should be
downloaded for further checking.
Parameters
----------
head : dict
Header from GET request
Returns
-------
True if extended metadata can be obtained
'''
fname = head.get('content-type')
if fname:
if 'name=' in fname:
start = head['content-type'].find('name=')+5
end = head['content-type'].find(';', start)
if end != -1:
fname = head['content-type'][start:end].strip('"')
else:
fname = head['content-type'][start:].strip('"')
fname = self.kwargs.get('filename', fname)
return fname
@property
def __whichfile(self):
'''
Returns the location of the path being analyzed.
'''
return self.tempfile.name if self.tempfile else self.local
def __check(self):
'''
Determines if this is one of the filetypes which supports extra metadata.
'''
if pathlib.Path(self.filename).suffix.lower() in self.checkable:
return True
return False
def download(self, block_size:int=1024, force=False, local=None)-> None:
'''
Download the file to a temporary location for analysis.
--------------------
block_size : int
Streaming block size
force : bool
Download even if not a file that is checkable
local : str
Path to local file
'''
# pylint: disable=consider-using-with
self.tempfile = tempfile.NamedTemporaryFile(delete=True,
delete_on_close=False)
if local:
self.local = local
self.filename = local
self.tempfile.close()
del self.tempfile #to erase it
self.tempfile = None
return
start = datetime.datetime.now()
params = {'format':'original'}
url = self.__clean_url(self.kwargs['url'])
if self.kwargs.get('pid'):
params.update({'persistentId':self.kwargs['pid']})
data = self.session.get(f'{url}/api/access/datafile/:persistentId',
headers=self.headers,
params=params,
stream=True)
else:
data = self.session.get(f'{url}/api/access/datafile/{self.kwargs["id"]}',
headers=self.headers,
params=params,
stream=True)
data.raise_for_status()
finish = datetime.datetime.now()
self.filename = self.__get_filename(data.headers)
LOGGER.info('Downloaded header for %s. Elapsed time: %s',
self.filename, finish-start)
if self.__check() or force:
filesize = self.kwargs.get('filesize_bytes',
data.headers.get('content-length', 9e9))
filesize = int(filesize) # comes out as string from header
with tqdm.tqdm(total=filesize, unit='B', unit_scale=True, desc=self.filename) as t:
for _ in data.iter_content(block_size):
self.tempfile.file.write(_)
t.update(len(_))
self.tempfile.close()
def enhance(self):
'''
Convenience function for downloading and creating extra metadata,
ie, "enhancing" the metadata. Use this instead of going through the
steps manually.
'''
self.download(local=self.kwargs.get('local'))
do_it = pathlib.Path(self.filename).suffix.lower()
if do_it in self.checkable:
self.checkable[do_it](ext=do_it)
def stat_file_metadata(self, ext:str)->dict:
'''
Produces metadata from SAS, SPSS and Stata files.
Parameters
----------
ext : str
File extension of statistical package file. Include the '.'. Eg. '.sav'
'''
matcher = {'.sav': pyreadstat.read_sav,
'.dta': pyreadstat.read_dta,
'.sas7bdat': pyreadstat.read_sas7bdat}
if not self.filename or ext not in matcher:
return
#whichfile = self.tempfile.name if self.tempfile else self.local
statdata, meta = matcher[ext](self.__whichfile)
outmeta = {}
outmeta['variables'] = {_:{} for _ in meta.column_names_to_labels}
for k, v in meta.column_names_to_labels.items():
outmeta['variables'][k]['Variable label'] = v
for k, v in meta.original_variable_types.items():
outmeta['variables'][k]['Variable type'] = v
for k, v in meta.variable_to_label.items():
outmeta['variables'][k]['Value labels'] = meta.value_labels.get(v, '')
outmeta['encoding'] = meta.file_encoding
for dt in statdata.columns:
desc = {k:str(v) for k, v in dict(statdata[dt].describe()).items()}
outmeta['variables'][dt].update(desc)
self.update(outmeta)
return
def generic_metadata(self, ext)->None:
'''
Make metadata for a [ct]sv file and RData. Updates
self.
Parameters
----------
ext : str
extension ('.csv' or '.tsv')
'''
#if ext == '.tsv':
# data = pd.read_csv(self.__whichfile, sep='\t')
#else:
# data = pd.read_csv(self.__whichfile)
lookuptable ={'.tsv': {'func': pd.read_csv,
'kwargs' : {'sep':'\t'}},
'.csv': {'func' : pd.read_csv},
'.rda': {'func' : pyreadr.read_r},
'.rdata':{'func' : pyreadr.read_r}}
data = lookuptable[ext]['func'](self.__whichfile,
**lookuptable[ext].get('kwargs', {}))
if ext in ['.rda', '.rdata']:
data = data[None] #why pyreadr why
outmeta = {}
outmeta['variables'] = {_:{} for _ in data.columns}
for dt in data.columns:
outmeta['variables'][dt]['Variable type'] = str(data[dt].dtype)
# Make something from nothing
desc = {k:str(v) for k, v in dict(data[dt].describe()).items()}
outmeta['variables'][dt].update(desc)
self.update(outmeta)
@property
def md(self):
'''
Create Markdown text out of a FileAnalysis object.
'''
out = io.StringIO()
indent = '\u00A0' #
if not self.get('variables'):
return None
for k, v in self.items():
if k != 'variables':
out.write(f'**{k.capitalize()}** : {v} \n')
for k, v in self.get('variables',{}).items():
out.write(f"**{k}**: {v.get('Variable label', 'Description N/A')} \n")
for kk, vv, in v.items():
if kk == 'Variable label':
continue
if not isinstance(vv, dict):
out.write(f'**{kk.capitalize()}**: {vv} \n')
else:
out.write(f'**{kk.capitalize()}**: \n')
for kkk, vvv in vv.items():
#this one only originally
out.write(f'{4*indent}{kkk}: {vvv} \n')
out.write('\n')
out.seek(0)
return out.read()
__whichfile
property
¶
Returns the location of the path being analyzed.
md
property
¶
Create Markdown text out of a FileAnalysis object.
__check()
¶
Determines if this is one of the filetypes which supports extra metadata.
Source code in src/dataverse_utils/collections.py
def __check(self):
'''
Determines if this is one of the filetypes which supports extra metadata.
'''
if pathlib.Path(self.filename).suffix.lower() in self.checkable:
return True
return False
__clean_url(badurl)
¶
Sanitize URL. Ensures ssl and no trailing slash.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def __clean_url(self, badurl:str)->str:
'''
Sanitize URL. Ensures ssl and no trailing slash.
Parameters
----------
badurl: str
URL
'''
clean = badurl.strip().strip('/')
if not clean.startswith('https://'):
clean = f'https://{clean}'
return clean
__del__()
¶
Cleanup old temporary files on object deletion.
Source code in src/dataverse_utils/collections.py
def __del__(self):
'''
Cleanup old temporary files on object deletion.
'''
self.session.close()
del self.tempfile
__get_filename(head)
¶
Determines whether or not this is a file that should be downloaded for further checking.
| Parameters: |
|
|---|
| Returns: |
|
|---|
Source code in src/dataverse_utils/collections.py
def __get_filename(self, head:dict)->typing.Union[str, None]:
'''
Determines whether or not this is a file that should be
downloaded for further checking.
Parameters
----------
head : dict
Header from GET request
Returns
-------
True if extended metadata can be obtained
'''
fname = head.get('content-type')
if fname:
if 'name=' in fname:
start = head['content-type'].find('name=')+5
end = head['content-type'].find(';', start)
if end != -1:
fname = head['content-type'][start:end].strip('"')
else:
fname = head['content-type'][start:].strip('"')
fname = self.kwargs.get('filename', fname)
return fname
__init__(**kwargs)
¶
Intialize the object.
| Parameters: |
|
|---|
|
Notes
Either local must be supplied, or url, key and at least one of
fid or pid must be supplied
Source code in src/dataverse_utils/collections.py
def __init__(self, **kwargs):
'''
Intialize the object.
Parameters
----------
**kwargs : dict
Keyword arguments
Other parameters
----------------
local : str
Path to local file
url : str
URL of Dataverse instance
key : str
API key for downloading
fid : int
Integer file id
pid : str
Persistent ID of file
filename : str
File name (original)
filesize_bytes : int
File size in bytes
Notes
-----
Either `local` must be supplied, or `url`, `key` and at least one of
`fid` or `pid` must be supplied
'''
#self.url = self.__clean_url(url)
self.headers = UAHEADER.copy()
self.kwargs = kwargs
if self.kwargs.get('key'):
self.headers.update({'X-Dataverse-key':self.kwargs['key']})
self.local = None
if not self.__sufficient:
err = ('Insufficient required arguments. '
'Include (url, key, '
'(pid or id)) or (local) keyword parameters.')
raise TypeError(err)
self.tempfile = None
self.session = requests.Session()
self.session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=RETRY))
self.checkable = {'.sav': self.stat_file_metadata,
'.sas7bdat': self.stat_file_metadata,
'.dta': self.stat_file_metadata,
'.csv': self.generic_metadata,
'.tsv': self.generic_metadata,
'.rdata': self.generic_metadata,
'.rda': self.generic_metadata}
self.filename = None #get it later
self.enhance()
__sufficient()
¶
Checks if sufficient information is supplied for intialization, with local files taking preference over remote.
Source code in src/dataverse_utils/collections.py
def __sufficient(self)->bool:
'''
Checks if sufficient information is supplied for intialization, with
local files taking preference over remote.
'''
if self.kwargs.get('local'):
return True
if (self.kwargs['url'] and self.kwargs['key']
and (self.kwargs.get('pid') or self.kwargs.get('id'))):
return True
return False
download(block_size=1024, force=False, local=None)
¶
Download the file to a temporary location for analysis.
block_size : int Streaming block size force : bool Download even if not a file that is checkable local : str Path to local file
Source code in src/dataverse_utils/collections.py
def download(self, block_size:int=1024, force=False, local=None)-> None:
'''
Download the file to a temporary location for analysis.
--------------------
block_size : int
Streaming block size
force : bool
Download even if not a file that is checkable
local : str
Path to local file
'''
# pylint: disable=consider-using-with
self.tempfile = tempfile.NamedTemporaryFile(delete=True,
delete_on_close=False)
if local:
self.local = local
self.filename = local
self.tempfile.close()
del self.tempfile #to erase it
self.tempfile = None
return
start = datetime.datetime.now()
params = {'format':'original'}
url = self.__clean_url(self.kwargs['url'])
if self.kwargs.get('pid'):
params.update({'persistentId':self.kwargs['pid']})
data = self.session.get(f'{url}/api/access/datafile/:persistentId',
headers=self.headers,
params=params,
stream=True)
else:
data = self.session.get(f'{url}/api/access/datafile/{self.kwargs["id"]}',
headers=self.headers,
params=params,
stream=True)
data.raise_for_status()
finish = datetime.datetime.now()
self.filename = self.__get_filename(data.headers)
LOGGER.info('Downloaded header for %s. Elapsed time: %s',
self.filename, finish-start)
if self.__check() or force:
filesize = self.kwargs.get('filesize_bytes',
data.headers.get('content-length', 9e9))
filesize = int(filesize) # comes out as string from header
with tqdm.tqdm(total=filesize, unit='B', unit_scale=True, desc=self.filename) as t:
for _ in data.iter_content(block_size):
self.tempfile.file.write(_)
t.update(len(_))
self.tempfile.close()
enhance()
¶
Convenience function for downloading and creating extra metadata, ie, “enhancing” the metadata. Use this instead of going through the steps manually.
Source code in src/dataverse_utils/collections.py
def enhance(self):
'''
Convenience function for downloading and creating extra metadata,
ie, "enhancing" the metadata. Use this instead of going through the
steps manually.
'''
self.download(local=self.kwargs.get('local'))
do_it = pathlib.Path(self.filename).suffix.lower()
if do_it in self.checkable:
self.checkable[do_it](ext=do_it)
generic_metadata(ext)
¶
Make metadata for a [ct]sv file and RData. Updates self.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def generic_metadata(self, ext)->None:
'''
Make metadata for a [ct]sv file and RData. Updates
self.
Parameters
----------
ext : str
extension ('.csv' or '.tsv')
'''
#if ext == '.tsv':
# data = pd.read_csv(self.__whichfile, sep='\t')
#else:
# data = pd.read_csv(self.__whichfile)
lookuptable ={'.tsv': {'func': pd.read_csv,
'kwargs' : {'sep':'\t'}},
'.csv': {'func' : pd.read_csv},
'.rda': {'func' : pyreadr.read_r},
'.rdata':{'func' : pyreadr.read_r}}
data = lookuptable[ext]['func'](self.__whichfile,
**lookuptable[ext].get('kwargs', {}))
if ext in ['.rda', '.rdata']:
data = data[None] #why pyreadr why
outmeta = {}
outmeta['variables'] = {_:{} for _ in data.columns}
for dt in data.columns:
outmeta['variables'][dt]['Variable type'] = str(data[dt].dtype)
# Make something from nothing
desc = {k:str(v) for k, v in dict(data[dt].describe()).items()}
outmeta['variables'][dt].update(desc)
self.update(outmeta)
stat_file_metadata(ext)
¶
Produces metadata from SAS, SPSS and Stata files.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def stat_file_metadata(self, ext:str)->dict:
'''
Produces metadata from SAS, SPSS and Stata files.
Parameters
----------
ext : str
File extension of statistical package file. Include the '.'. Eg. '.sav'
'''
matcher = {'.sav': pyreadstat.read_sav,
'.dta': pyreadstat.read_dta,
'.sas7bdat': pyreadstat.read_sas7bdat}
if not self.filename or ext not in matcher:
return
#whichfile = self.tempfile.name if self.tempfile else self.local
statdata, meta = matcher[ext](self.__whichfile)
outmeta = {}
outmeta['variables'] = {_:{} for _ in meta.column_names_to_labels}
for k, v in meta.column_names_to_labels.items():
outmeta['variables'][k]['Variable label'] = v
for k, v in meta.original_variable_types.items():
outmeta['variables'][k]['Variable type'] = v
for k, v in meta.variable_to_label.items():
outmeta['variables'][k]['Value labels'] = meta.value_labels.get(v, '')
outmeta['encoding'] = meta.file_encoding
for dt in statdata.columns:
desc = {k:str(v) for k, v in dict(statdata[dt].describe()).items()}
outmeta['variables'][dt].update(desc)
self.update(outmeta)
return
MetadataError
¶
Bases: Exception
MetadataError
Source code in src/dataverse_utils/collections.py
class MetadataError(Exception):
'''
MetadataError
'''
ReadmeCreator
¶
Make formatted README documents out of a StudyMetadata object.
Source code in src/dataverse_utils/collections.py
class ReadmeCreator:
'''
Make formatted README documents out of a StudyMetadata object.
'''
def __init__(self, study_metadata_obj: StudyMetadata, **kwargs):
'''
Send in StudyMetadata dict to create a nicely formatted README document
Parameters
----------
study_metadata_obj : StudyMetadata
A study metadata object
**kwargs : dict
Keyword arguments
Other parameters
----------------
url : str
The base URL for a Dataverse instance
pid : typing.Union[str, int]
The persistent identifier of a file or a file id
key : str
A valid API key for performing operations on Dataverse studies
local : str
Path to the top level directory which holds study files.
If present, the Readme creator will try to create extended data from
local files instead of downloading.
Notes
-----
Either `local` must be supplied, or `url`, `pid` and `key` must supplied
'''
self.meta = study_metadata_obj
self.kwargs = kwargs
warnings.filterwarnings('ignore', category=bs4.MarkupResemblesLocatorWarning)
#These values are the first part of the keys that need
#concatenation to make them more legible.
self.concat = ['author', 'datasetContact','otherId', 'keyword', 'topic', 'publication',
'producer', 'production', 'distributor', 'series', 'software',
'dsDescription', 'grant', 'contributor']
def __html_to_md(self, inval:str)->str:
'''
Convert any HTML to markdown, or as much as possible.
Parameters
----------
inval : str
HTML string to convert
'''
if isinstance(inval, str):
#markdownify kwargs are here:
#https://github.com/matthewwithanm/python-markdownify
return markdownify.markdownify(inval)
return str(inval)
def make_md_heads(self, inkey:str)->str:
'''
Make markdown H2 headings for selected sections, currently title, description,
licence and terms of use.
Parameters
----------
inkey : str
Section heading
'''
section_heads = {'Title':'## ',
'Description':'**Description**\n\n',
'Licence': '### Licence\n\n',
'Terms of Use': '### Terms of Use\n\n'}
if inkey in section_heads:
return section_heads[inkey]
multi = [self.rename_field(_) for _ in self.concat]
if inkey in multi:
if inkey not in ['Series', 'Software', 'Production']:
return f'{inkey}(s): \n'
return f'{inkey}: \n'
return f'{inkey}: '
@property
def file_metadata_md(self)->str:
'''
Produce pretty markdown for file metadata. Outputs
markdown text string.
'''
fmeta = []
for fil in self.meta.files:
fileout = {}
fileout['File'] = fil['filename']
for k, v in fil.items():
fileout[k.capitalize().replace('_',' ').replace('Pid', 'Persistent Identifier')] = v
fileout['Message digest'] = f'{fileout["Chk type"]}: {fileout["Chk digest"]}'
for rem in ['Chk type', 'Chk digest', 'Id', 'Has tab file', 'Study pid',
'File label', 'Filename']:
del fileout[rem]
#not everyone has a pid for the file
if not fileout.get('Persistent Identifier'):
del fileout['Persistent Identifier']
# Should I only have remote material here? What about
# local files?
if self.kwargs.get('local'):
#TODO, if local
fpath = pathlib.Path(self.kwargs['local'])
#And from here you have to walk the tree to get the file in fil['filename']
#One day I will do this
elif self.meta.kwargs.get('url'): # Should this be optional? ie,
# and self.kwargs.get('download') or summat
d_dict = FileAnalysis(url=self.meta.kwargs['url'],
key=self.meta.kwargs.get('key'),
**fil).md
#I test here
#d_dict = FileAnalysis(local='tmp/eics_2023_pumf_v1.sav').md
if d_dict:
fileout['Data Dictionary'] = d_dict
fmeta.append(fileout)
#----- original
#outtmp = []
#for li in fmeta:
# outtmp.append(' \n'.join(f'{k}: {v}' for k, v in li.items()))
#return '\n\n'.join(outtmp)
#-------
outtmp = []
for li in fmeta:
o2 = []
for k, v in li.items():
if k == 'Data Dictionary':
o2.append(f'### {k} for {li["File"]} \n{v}')
else:
o2.append(f'{k}: {v}')
outtmp.append(' \n'.join(o2))
outtmp = '\n\n'.join(outtmp)
return outtmp
@property
def readme_md(self)->str:
'''
Generate a Markdown text string (ie, the entire README) for entire an
entire StudyMetadata object.
'''
metatmp = self.meta.copy()
neworder = self.reorder_fields(metatmp)
addme = self.concatenator(metatmp)
metatmp.update(addme)
out = {_:None for _ in neworder} # A new dictionary with the correct order
for k, v in metatmp.items():
out[k]=v
#Now remove keys that should be gone
for rem in self.concat:
out = {k:v for k,v in out.items()
if not (k.startswith(rem) and len(k) > len(rem))}
fout = {self.rename_field(k): self.__fix_relation_type(self.__html_to_md(v))
for k, v in out.items()}
#cludgy geometry hack is best hack
if self.bbox():
fout.update(self.bbox())
delme = [_ for _ in fout if _.endswith('tude')]
for _ in delme:
del fout[_]
outstr = '\n\n'.join(f'{self.make_md_heads(k)}{v}' for k, v in fout.items())
outstr += '\n\n## File information\n\n'
outstr += self.file_metadata_md
return outstr
def bbox(self)->dict:
'''
Produce sane bounding boxes from Dataverse metadata.
Note that older versions of Dataverse used North and South *longitude*.
Outputs a dict with bounding boxes contcatenated into a single line
with each coordinate suffixed by its direction (eg: '42.97 E'), with coordinates
separated by commas and multiple boxes separated by semi-colons.
'''
#Yes, northLongitude, etc. Blame Harvard.
bbox_order =['westLongitude',
'southLongitude',
'southLatitude',
'eastLongitude',
'northLongitude',
'northLatitude']
geog_me = {_: self.meta[_].split(';')
for _ in bbox_order if self.meta.get(_)}# Checking for existence causes problems
if not geog_me: #Sometimes there is no bounding box
return {}
bbox = {k: [f'{v} {k[0].capitalize()}'.strip()
for v in geog_me[k]] for k in bbox_order if geog_me.get(k)}
boxes = self.max_zip(*bbox.values())
boxes = [', '.join(_) for _ in boxes]
boxes = [f'({_})' for _ in boxes]
return {'Bounding box(es)': '; '.join(boxes)}
def __fix_relation_type(self, badstr:str)->str:
'''
For some reason, Dataverse puts camelCase values in the 'values' field
for publication relation. This will make it more readable.
Parameters
----------
badstr : str
Input string; problematic values will be fixed, all others returned as-is.
'''
fixthese = ['IsCitedBy', 'IsSupplementTo', 'IsSupplementedBy', 'IsReferencedBy']
for val in fixthese:
badstr=badstr.replace(val, self.rename_field(val))
return badstr
def reorder_fields(self, indict:dict)->list:
'''
Create a list which contains a list of keys in the right (corrected) order.
This ensures that concatenated fields are inserted into the right place
and not at the end of the dictionary, keeping the structure
of Dataverse metadata intact while concatenating values that need
combining.
Parameters
----------
indict : dict
Metadata dictionary
'''
fieldlist = list(indict)
for val in self.concat:
pts = [n for n, x in enumerate(fieldlist) if x.startswith(val)]
if pts:
ins_point = min(pts)
fieldlist.insert(ins_point, val)
#Geography fields are a special case yay.
#westLongitude is the fist one
if 'westLongitude' in fieldlist:
ins_here = fieldlist.index('westLongitude')
fieldlist.insert(ins_here, 'Bounding box(es)')
return fieldlist
def rename_field(self, instr:str)->str:
'''
Split and capitalize camelCase fields as required.
eg: keywordValue -> Keyword Value
eg: termsOfUse -> Terms of Use
Parameters
----------
instr : str
Camel case tring to split into words and capitalize.
'''
noncap = ['A', 'Of', 'The']
wordsp = ''.join(map(lambda x: x if x not in string.ascii_uppercase
else f' {x}', list(instr)))
wordsp = wordsp.split(' ')
#wordsp[0] = wordsp[0].capitalize()
#wordsp = ' '.join(map(lambda x: x if x not in noncap else x.lower(), wordsp))
wordsp = list(map(lambda x: x if x not in noncap else x.lower(), wordsp))
wordsp[0] = wordsp[0].capitalize()
wordsp = ' '.join(wordsp)
#because they can't even use camelCaseConsistently
#Also pluralization of concatenated fields
fixthese ={'U R L': 'URL',
'U R I': 'URI',
'I D':
'ID',
'Ds': '',
'Country':'Country(ies)',
'State':'State(s)',
'City':'City(ies)',
'Geographic Unit':'Geographic unit(s)'}
for k, v in fixthese.items():
wordsp = wordsp.replace(k, v)
return wordsp.strip()
def concatenator(self, meta:dict)->dict:
'''
Produce a concatenated dictionary with the key being just the prefix.
For fields like author[whatever], etc, where there are multiple
*components* of similar metadata held in completely separated
fields.
Parameters
----------
meta : dict
Input metadata
'''
#The keys are the first part of the fields that need concatenation
concat = {_:[] for _ in self.concat}
for k, v in meta.items():
for fk in concat:
if k.startswith(fk):
if v:
if concat[fk]:
concat[fk].append(v.split(';'))
else:
concat[fk] = [v.split(';')]
outdict = {}
for ke, va in concat.items():
if va:
interim = self.max_zip(*va)
interim = [' - '.join([y.strip() for y in _ if y]) for _ in interim ]
#interim = '; '.join(interim) # Should it be newline?
#interim = ' \n'.join(interim) # Should it be newline?
interim= '<br/>'.join(interim)# Markdownify strips internal spaces
#if ke.startswith('keyw'):
outdict[ke] = interim
return outdict
def max_zip(self, *args):
'''
Like built-in zip, only uses the *maximum* length and appends None if not found
instead of stopping at the shortest iterable.
Parameters
----------
*args : iterable
Any iterable
'''
length = max(map(len, args))
outlist=[]
for n in range(length):
vals = []
for arg in args:
try:
vals.append(arg[n])
except IndexError:
vals.append(None)
outlist.append(vals)
return outlist
def write_pdf(self, dest:str)->None:
'''
Make the PDF of a README and save it to a file.
Parameters
----------
dest : str
Destination of file, optionally including path.
eg: /Users/foo/study/README.pdf or
~/tmp/README_I_AM_METADATA.pdf
'''
dest = pathlib.Path(dest).expanduser().absolute()
output = markdown_pdf.MarkdownPdf(toc_level=1)
content = markdown_pdf.Section(self.readme_md, toc=False)
output.add_section(content)
output.save(dest)
def write_md(self, dest:str)->None:
'''
Write Markdown text of the complete documentation to a file.
Parameters
----------
dest : str
Destination of file, optionally including path.
eg: /Users/foo/study/README.md or
~/tmp/README_I_AM_METADATA.md
'''
dest = pathlib.Path(dest).expanduser().absolute()
with open(file=dest, mode='w', encoding='utf=8') as f:
f.write(self.readme_md)
file_metadata_md
property
¶
Produce pretty markdown for file metadata. Outputs markdown text string.
readme_md
property
¶
Generate a Markdown text string (ie, the entire README) for entire an entire StudyMetadata object.
__fix_relation_type(badstr)
¶
For some reason, Dataverse puts camelCase values in the ‘values’ field for publication relation. This will make it more readable.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def __fix_relation_type(self, badstr:str)->str:
'''
For some reason, Dataverse puts camelCase values in the 'values' field
for publication relation. This will make it more readable.
Parameters
----------
badstr : str
Input string; problematic values will be fixed, all others returned as-is.
'''
fixthese = ['IsCitedBy', 'IsSupplementTo', 'IsSupplementedBy', 'IsReferencedBy']
for val in fixthese:
badstr=badstr.replace(val, self.rename_field(val))
return badstr
__html_to_md(inval)
¶
Convert any HTML to markdown, or as much as possible.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def __html_to_md(self, inval:str)->str:
'''
Convert any HTML to markdown, or as much as possible.
Parameters
----------
inval : str
HTML string to convert
'''
if isinstance(inval, str):
#markdownify kwargs are here:
#https://github.com/matthewwithanm/python-markdownify
return markdownify.markdownify(inval)
return str(inval)
__init__(study_metadata_obj, **kwargs)
¶
Send in StudyMetadata dict to create a nicely formatted README document
| Parameters: |
|
|---|
|
Notes
Either local must be supplied, or url, pid and key must supplied
Source code in src/dataverse_utils/collections.py
def __init__(self, study_metadata_obj: StudyMetadata, **kwargs):
'''
Send in StudyMetadata dict to create a nicely formatted README document
Parameters
----------
study_metadata_obj : StudyMetadata
A study metadata object
**kwargs : dict
Keyword arguments
Other parameters
----------------
url : str
The base URL for a Dataverse instance
pid : typing.Union[str, int]
The persistent identifier of a file or a file id
key : str
A valid API key for performing operations on Dataverse studies
local : str
Path to the top level directory which holds study files.
If present, the Readme creator will try to create extended data from
local files instead of downloading.
Notes
-----
Either `local` must be supplied, or `url`, `pid` and `key` must supplied
'''
self.meta = study_metadata_obj
self.kwargs = kwargs
warnings.filterwarnings('ignore', category=bs4.MarkupResemblesLocatorWarning)
#These values are the first part of the keys that need
#concatenation to make them more legible.
self.concat = ['author', 'datasetContact','otherId', 'keyword', 'topic', 'publication',
'producer', 'production', 'distributor', 'series', 'software',
'dsDescription', 'grant', 'contributor']
bbox()
¶
Produce sane bounding boxes from Dataverse metadata. Note that older versions of Dataverse used North and South longitude.
Outputs a dict with bounding boxes contcatenated into a single line with each coordinate suffixed by its direction (eg: ‘42.97 E’), with coordinates separated by commas and multiple boxes separated by semi-colons.
Source code in src/dataverse_utils/collections.py
def bbox(self)->dict:
'''
Produce sane bounding boxes from Dataverse metadata.
Note that older versions of Dataverse used North and South *longitude*.
Outputs a dict with bounding boxes contcatenated into a single line
with each coordinate suffixed by its direction (eg: '42.97 E'), with coordinates
separated by commas and multiple boxes separated by semi-colons.
'''
#Yes, northLongitude, etc. Blame Harvard.
bbox_order =['westLongitude',
'southLongitude',
'southLatitude',
'eastLongitude',
'northLongitude',
'northLatitude']
geog_me = {_: self.meta[_].split(';')
for _ in bbox_order if self.meta.get(_)}# Checking for existence causes problems
if not geog_me: #Sometimes there is no bounding box
return {}
bbox = {k: [f'{v} {k[0].capitalize()}'.strip()
for v in geog_me[k]] for k in bbox_order if geog_me.get(k)}
boxes = self.max_zip(*bbox.values())
boxes = [', '.join(_) for _ in boxes]
boxes = [f'({_})' for _ in boxes]
return {'Bounding box(es)': '; '.join(boxes)}
concatenator(meta)
¶
Produce a concatenated dictionary with the key being just the prefix. For fields like author[whatever], etc, where there are multiple components of similar metadata held in completely separated fields.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def concatenator(self, meta:dict)->dict:
'''
Produce a concatenated dictionary with the key being just the prefix.
For fields like author[whatever], etc, where there are multiple
*components* of similar metadata held in completely separated
fields.
Parameters
----------
meta : dict
Input metadata
'''
#The keys are the first part of the fields that need concatenation
concat = {_:[] for _ in self.concat}
for k, v in meta.items():
for fk in concat:
if k.startswith(fk):
if v:
if concat[fk]:
concat[fk].append(v.split(';'))
else:
concat[fk] = [v.split(';')]
outdict = {}
for ke, va in concat.items():
if va:
interim = self.max_zip(*va)
interim = [' - '.join([y.strip() for y in _ if y]) for _ in interim ]
#interim = '; '.join(interim) # Should it be newline?
#interim = ' \n'.join(interim) # Should it be newline?
interim= '<br/>'.join(interim)# Markdownify strips internal spaces
#if ke.startswith('keyw'):
outdict[ke] = interim
return outdict
make_md_heads(inkey)
¶
Make markdown H2 headings for selected sections, currently title, description, licence and terms of use.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def make_md_heads(self, inkey:str)->str:
'''
Make markdown H2 headings for selected sections, currently title, description,
licence and terms of use.
Parameters
----------
inkey : str
Section heading
'''
section_heads = {'Title':'## ',
'Description':'**Description**\n\n',
'Licence': '### Licence\n\n',
'Terms of Use': '### Terms of Use\n\n'}
if inkey in section_heads:
return section_heads[inkey]
multi = [self.rename_field(_) for _ in self.concat]
if inkey in multi:
if inkey not in ['Series', 'Software', 'Production']:
return f'{inkey}(s): \n'
return f'{inkey}: \n'
return f'{inkey}: '
max_zip(*args)
¶
Like built-in zip, only uses the maximum length and appends None if not found instead of stopping at the shortest iterable.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def max_zip(self, *args):
'''
Like built-in zip, only uses the *maximum* length and appends None if not found
instead of stopping at the shortest iterable.
Parameters
----------
*args : iterable
Any iterable
'''
length = max(map(len, args))
outlist=[]
for n in range(length):
vals = []
for arg in args:
try:
vals.append(arg[n])
except IndexError:
vals.append(None)
outlist.append(vals)
return outlist
rename_field(instr)
¶
Split and capitalize camelCase fields as required. eg: keywordValue -> Keyword Value eg: termsOfUse -> Terms of Use
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def rename_field(self, instr:str)->str:
'''
Split and capitalize camelCase fields as required.
eg: keywordValue -> Keyword Value
eg: termsOfUse -> Terms of Use
Parameters
----------
instr : str
Camel case tring to split into words and capitalize.
'''
noncap = ['A', 'Of', 'The']
wordsp = ''.join(map(lambda x: x if x not in string.ascii_uppercase
else f' {x}', list(instr)))
wordsp = wordsp.split(' ')
#wordsp[0] = wordsp[0].capitalize()
#wordsp = ' '.join(map(lambda x: x if x not in noncap else x.lower(), wordsp))
wordsp = list(map(lambda x: x if x not in noncap else x.lower(), wordsp))
wordsp[0] = wordsp[0].capitalize()
wordsp = ' '.join(wordsp)
#because they can't even use camelCaseConsistently
#Also pluralization of concatenated fields
fixthese ={'U R L': 'URL',
'U R I': 'URI',
'I D':
'ID',
'Ds': '',
'Country':'Country(ies)',
'State':'State(s)',
'City':'City(ies)',
'Geographic Unit':'Geographic unit(s)'}
for k, v in fixthese.items():
wordsp = wordsp.replace(k, v)
return wordsp.strip()
reorder_fields(indict)
¶
Create a list which contains a list of keys in the right (corrected) order. This ensures that concatenated fields are inserted into the right place and not at the end of the dictionary, keeping the structure of Dataverse metadata intact while concatenating values that need combining.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def reorder_fields(self, indict:dict)->list:
'''
Create a list which contains a list of keys in the right (corrected) order.
This ensures that concatenated fields are inserted into the right place
and not at the end of the dictionary, keeping the structure
of Dataverse metadata intact while concatenating values that need
combining.
Parameters
----------
indict : dict
Metadata dictionary
'''
fieldlist = list(indict)
for val in self.concat:
pts = [n for n, x in enumerate(fieldlist) if x.startswith(val)]
if pts:
ins_point = min(pts)
fieldlist.insert(ins_point, val)
#Geography fields are a special case yay.
#westLongitude is the fist one
if 'westLongitude' in fieldlist:
ins_here = fieldlist.index('westLongitude')
fieldlist.insert(ins_here, 'Bounding box(es)')
return fieldlist
write_md(dest)
¶
Write Markdown text of the complete documentation to a file.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def write_md(self, dest:str)->None:
'''
Write Markdown text of the complete documentation to a file.
Parameters
----------
dest : str
Destination of file, optionally including path.
eg: /Users/foo/study/README.md or
~/tmp/README_I_AM_METADATA.md
'''
dest = pathlib.Path(dest).expanduser().absolute()
with open(file=dest, mode='w', encoding='utf=8') as f:
f.write(self.readme_md)
write_pdf(dest)
¶
Make the PDF of a README and save it to a file.
| Parameters: |
|
|---|
Source code in src/dataverse_utils/collections.py
def write_pdf(self, dest:str)->None:
'''
Make the PDF of a README and save it to a file.
Parameters
----------
dest : str
Destination of file, optionally including path.
eg: /Users/foo/study/README.pdf or
~/tmp/README_I_AM_METADATA.pdf
'''
dest = pathlib.Path(dest).expanduser().absolute()
output = markdown_pdf.MarkdownPdf(toc_level=1)
content = markdown_pdf.Section(self.readme_md, toc=False)
output.add_section(content)
output.save(dest)
StudyMetadata
¶
Bases: dict
The metadata container for a single study.
Source code in src/dataverse_utils/collections.py
class StudyMetadata(dict):
'''
The metadata container for a single study.
'''
def __init__(self, **kwargs):
'''
Intializize a StudyMetadata object.
Parameters
----------
**kwargs: dict
At least some of the following
Other parameters
----------------
study_meta : dict, optional
The dataverse study metadata JSON
url : str, optional
Base URL to dataverse instance
pid : str, optional
Persistent ID of a study
key : str
Dataverse instance API key (needed for unpublished studies)
Notes
-----
Either `study_meta` is required OR `pid` and `url`. `key` _may_ be required
if either a draft study is being accessed or the Dataverse installation
requires API keys for all requests.
'''
self.kwargs = kwargs
self.study_meta = kwargs.get('study_meta')
self.url = kwargs.get('url')
self.pid = kwargs.get('pid')
self.headers = UAHEADER.copy()
if not (('study_meta' in kwargs) or ('url' in kwargs and 'pid' in kwargs)):
raise TypeError('At least one of a URL/pid combo (url, pid) (and possibly key) or '
'study metadata json (study_meta) is required.')
if not self.study_meta:
self.study_meta = self.__obtain_metadata()
try:
self.extract_metadata()
except KeyError as e:
raise MetadataError(f'Unable to parse study metadata. Do you need an API key?\n'
f'{e} key not found.\n'
f'Offending JSON: {self.study_meta}') from e
self.__files = None
def __obtain_metadata(self):
'''
Obtain study metadata as required.
'''
if self.kwargs.get('key'):
self.headers.update({'X-Dataverse-key':self.kwargs['key']})
params = {'persistentId': self.pid}
self.session = requests.Session()
self.session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=RETRY))
self.url = self.url.strip('/')
if not self.url.startswith('https://'):
self.url = f'https://{self.url}'
data = self.session.get(f'{self.url}/api/datasets/:persistentId',
headers=self.headers, params=params)
return data.json()
def __has_metadata(self)->bool:
'''
Returns a boolean to ensure if there *is* study metadata.
Deacessioned items are notable for their lack of any indication
that they are deacessioned. However, they lack the "latestVersion" key,
which serves as a proxy. Ideally.
'''
#try:
# t = self.study_meta['data']
# del t #OMG This is so dumb
#except KeyError as e:
# raise e
if not self.study_meta.get('data'):
raise KeyError('data')
testfields = ['id', 'identifier', 'authority', 'latestVersion']
if all(self.study_meta['data'].get(_) for _ in testfields):
return True
return False
def extract_metadata(self):
'''
Convenience function for parsing the study metadata of the latest version.
Results are written to self, accessible as a dictionary.
'''
if not self.__has_metadata():
return
for v in self.study_meta['data']['latestVersion']['metadataBlocks'].values():
for field in v['fields']:
self.extract_field_metadata(field)
self.__extract_licence_info()
self.__version()
#['data']['latestVersion']['versionNumber']
#['data']['latestVersion']['versionMinorNumber']
def extract_field_metadata(self, field):
'''
Extract the metadata from a single field and make it into a human-readable dict.
Output updates self.
'''
#pylint: disable=too-many-branches, too-many-nested-blocks
#typeClass: compound = dict, primitive = string
#multiple: false= one thing, true=list
# so typeClass:compound AND multiple:true = a list of dicts.
# also, typeClass can be "controlledVocabulary" because reasons.
#is this crap recursive or is one level enough?
#[[x['typeName'], x['typeClass'], x['multiple']] for x in citation['fields']]
# {('primitive', False), ('compound', True), ('compound', False),
# ('primitive', True), ('controlledVocabulary', True)}
if not field['multiple']:
if field['typeClass']=='primitive':
self.update({field['typeName']: field['value']})
if field['typeClass'] == 'compound':
for v2 in field['value']:
self.extract_field_metadata(field['value'][v2])
if field['multiple']:
if field['typeClass'] == 'compound':
#produce a list of similar values concatenated
for v3 in field['value']:
interim = {}
for insane_dict in field['value']:
for v3 in insane_dict.values():
if interim.get(v3['typeName']):
interim.update({v3['typeName']:
interim[v3['typeName']]+ [v3['value']]})
else:
#sometimes value is None because reasons.
interim[v3['typeName']] = [v3.get('value', [] )]
LOGGER.debug(interim)
for k9, v9 in interim.items():
self.update({k9: '; '.join(v9)})
if field['typeClass'] == 'primitive':
self.update({field['typeName'] : '; '.join(field['value'])})
if field['typeClass'] == 'controlledVocabulary':
if isinstance(field['value'], list):
self.update({field['typeName'] : '; '.join(field['value'])})
else:
self.update({field['typeName'] : field['value']})
# And that should cover every option!
@property
def files(self)->list:
'''
Return a list of of dicts with file metadata.
'''
if not self.__files:
self.__extract_files()
return self.__files
def __extract_files(self):
'''
Extract file level metadata, and write to self.__files.
'''
#Note: ALL other dict values for this object are single values,
#but files would (usually) be an arbitrary number of files.
#That bothers me on an intellectual level. Therefore, it will be attribute.
#Iterate over StudyMetadata.files if you want to know the contents
if not self.__files:
outie = []
for v in self.study_meta['data']['latestVersion']['files']:
innie = {}
fpath = v.get('directoryLabel', '').strip('/')
innie['filename'] = v['dataFile'].get('originalFileName', v['dataFile']['filename'])
#innie['full_path'] = '/'.join([fpath, innie['filename']])
#In case it's pathless, drop any leading slash, because
#'' is not the same as None, and None can't be joined.
innie['filename'] = '/'.join([fpath, innie['filename']]).strip('/')
innie['file_label'] = v.get('label')
innie['description'] = v.get('description')
innie['filesize_bytes'] = v['dataFile'].get('originalFileSize',
v['dataFile']['filesize'])
innie['chk_type'] = v['dataFile']['checksum']['type']
innie['chk_digest'] =v['dataFile']['checksum']['value']
innie['id'] = v['dataFile']['id']
innie['pid'] = v['dataFile'].get('persistentId')
innie['has_tab_file'] = v['dataFile'].get('tabularData', False)
innie['study_pid'] = (f"{self.study_meta['data']['protocol']}:"
f"{self.study_meta['data']['authority']}/"
f"{self.study_meta['data']['identifier']}")
innie['tags'] = ', '.join(v.get('categories', []))
if not innie['tags']:
del innie['tags']#tagless
#innie['path'] = v.get('directoryLabel', '')
outie.append(innie)
self.__files = outie
def __extract_licence_info(self):
'''
Extract all the licence information fields and add them
to self['licence'] *if present*.
'''
lic_fields = ('termsOfUse',
'confidentialityDeclaration',
'specialPermissions',
'restrictions',
'citationRequirements',
'depositorRequirements', 'conditions',
'disclaimer',
'dataAccessPlace',
'originalArchive',
'availabilityStatus',
'contactForAccess',
'sizeOfCollection',
'studyCompletion',
'fileAccessRequest')
for field in self.study_meta['data']['latestVersion']:
if field in lic_fields:
self[field] = self.study_meta['data']['latestVersion'][field]
common_lic = self.study_meta['data']['latestVersion'].get('license')
if isinstance(common_lic, str) and common_lic != 'NONE':
self['licence'] = common_lic
elif isinstance(common_lic, dict):
self['licence'] = self.study_meta['data']['latestVersion']['license'].get('name')
link = self.study_meta['data']['latestVersion']['license'].get('uri')
if link:
self['licenceLink'] = link
def __version(self):
'''
Obtain the current version and add it to self['studyVersion'].
'''
if self.study_meta['data']['latestVersion']['versionState'] == 'RELEASED':
self['studyVersion'] = (f"{self.study_meta['data']['latestVersion']['versionNumber']}."
f"{self.study_meta['data']['latestVersion']['versionMinorNumber']}")
return
self['studyVersion'] = self.study_meta['data']['latestVersion']['versionState']
return
files
property
¶
Return a list of of dicts with file metadata.
__extract_files()
¶
Extract file level metadata, and write to self.__files.
Source code in src/dataverse_utils/collections.py
def __extract_files(self):
'''
Extract file level metadata, and write to self.__files.
'''
#Note: ALL other dict values for this object are single values,
#but files would (usually) be an arbitrary number of files.
#That bothers me on an intellectual level. Therefore, it will be attribute.
#Iterate over StudyMetadata.files if you want to know the contents
if not self.__files:
outie = []
for v in self.study_meta['data']['latestVersion']['files']:
innie = {}
fpath = v.get('directoryLabel', '').strip('/')
innie['filename'] = v['dataFile'].get('originalFileName', v['dataFile']['filename'])
#innie['full_path'] = '/'.join([fpath, innie['filename']])
#In case it's pathless, drop any leading slash, because
#'' is not the same as None, and None can't be joined.
innie['filename'] = '/'.join([fpath, innie['filename']]).strip('/')
innie['file_label'] = v.get('label')
innie['description'] = v.get('description')
innie['filesize_bytes'] = v['dataFile'].get('originalFileSize',
v['dataFile']['filesize'])
innie['chk_type'] = v['dataFile']['checksum']['type']
innie['chk_digest'] =v['dataFile']['checksum']['value']
innie['id'] = v['dataFile']['id']
innie['pid'] = v['dataFile'].get('persistentId')
innie['has_tab_file'] = v['dataFile'].get('tabularData', False)
innie['study_pid'] = (f"{self.study_meta['data']['protocol']}:"
f"{self.study_meta['data']['authority']}/"
f"{self.study_meta['data']['identifier']}")
innie['tags'] = ', '.join(v.get('categories', []))
if not innie['tags']:
del innie['tags']#tagless
#innie['path'] = v.get('directoryLabel', '')
outie.append(innie)
self.__files = outie
__extract_licence_info()
¶
Extract all the licence information fields and add them to self[‘licence’] if present.
Source code in src/dataverse_utils/collections.py
def __extract_licence_info(self):
'''
Extract all the licence information fields and add them
to self['licence'] *if present*.
'''
lic_fields = ('termsOfUse',
'confidentialityDeclaration',
'specialPermissions',
'restrictions',
'citationRequirements',
'depositorRequirements', 'conditions',
'disclaimer',
'dataAccessPlace',
'originalArchive',
'availabilityStatus',
'contactForAccess',
'sizeOfCollection',
'studyCompletion',
'fileAccessRequest')
for field in self.study_meta['data']['latestVersion']:
if field in lic_fields:
self[field] = self.study_meta['data']['latestVersion'][field]
common_lic = self.study_meta['data']['latestVersion'].get('license')
if isinstance(common_lic, str) and common_lic != 'NONE':
self['licence'] = common_lic
elif isinstance(common_lic, dict):
self['licence'] = self.study_meta['data']['latestVersion']['license'].get('name')
link = self.study_meta['data']['latestVersion']['license'].get('uri')
if link:
self['licenceLink'] = link
__has_metadata()
¶
Returns a boolean to ensure if there is study metadata. Deacessioned items are notable for their lack of any indication that they are deacessioned. However, they lack the “latestVersion” key, which serves as a proxy. Ideally.
Source code in src/dataverse_utils/collections.py
def __has_metadata(self)->bool:
'''
Returns a boolean to ensure if there *is* study metadata.
Deacessioned items are notable for their lack of any indication
that they are deacessioned. However, they lack the "latestVersion" key,
which serves as a proxy. Ideally.
'''
#try:
# t = self.study_meta['data']
# del t #OMG This is so dumb
#except KeyError as e:
# raise e
if not self.study_meta.get('data'):
raise KeyError('data')
testfields = ['id', 'identifier', 'authority', 'latestVersion']
if all(self.study_meta['data'].get(_) for _ in testfields):
return True
return False
__init__(**kwargs)
¶
Intializize a StudyMetadata object.
| Parameters: |
|
|---|
|
Notes
Either study_meta is required OR pid and url. key may be required
if either a draft study is being accessed or the Dataverse installation
requires API keys for all requests.
Source code in src/dataverse_utils/collections.py
def __init__(self, **kwargs):
'''
Intializize a StudyMetadata object.
Parameters
----------
**kwargs: dict
At least some of the following
Other parameters
----------------
study_meta : dict, optional
The dataverse study metadata JSON
url : str, optional
Base URL to dataverse instance
pid : str, optional
Persistent ID of a study
key : str
Dataverse instance API key (needed for unpublished studies)
Notes
-----
Either `study_meta` is required OR `pid` and `url`. `key` _may_ be required
if either a draft study is being accessed or the Dataverse installation
requires API keys for all requests.
'''
self.kwargs = kwargs
self.study_meta = kwargs.get('study_meta')
self.url = kwargs.get('url')
self.pid = kwargs.get('pid')
self.headers = UAHEADER.copy()
if not (('study_meta' in kwargs) or ('url' in kwargs and 'pid' in kwargs)):
raise TypeError('At least one of a URL/pid combo (url, pid) (and possibly key) or '
'study metadata json (study_meta) is required.')
if not self.study_meta:
self.study_meta = self.__obtain_metadata()
try:
self.extract_metadata()
except KeyError as e:
raise MetadataError(f'Unable to parse study metadata. Do you need an API key?\n'
f'{e} key not found.\n'
f'Offending JSON: {self.study_meta}') from e
self.__files = None
__obtain_metadata()
¶
Obtain study metadata as required.
Source code in src/dataverse_utils/collections.py
def __obtain_metadata(self):
'''
Obtain study metadata as required.
'''
if self.kwargs.get('key'):
self.headers.update({'X-Dataverse-key':self.kwargs['key']})
params = {'persistentId': self.pid}
self.session = requests.Session()
self.session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=RETRY))
self.url = self.url.strip('/')
if not self.url.startswith('https://'):
self.url = f'https://{self.url}'
data = self.session.get(f'{self.url}/api/datasets/:persistentId',
headers=self.headers, params=params)
return data.json()
__version()
¶
Obtain the current version and add it to self[‘studyVersion’].
Source code in src/dataverse_utils/collections.py
def __version(self):
'''
Obtain the current version and add it to self['studyVersion'].
'''
if self.study_meta['data']['latestVersion']['versionState'] == 'RELEASED':
self['studyVersion'] = (f"{self.study_meta['data']['latestVersion']['versionNumber']}."
f"{self.study_meta['data']['latestVersion']['versionMinorNumber']}")
return
self['studyVersion'] = self.study_meta['data']['latestVersion']['versionState']
return
extract_field_metadata(field)
¶
Extract the metadata from a single field and make it into a human-readable dict. Output updates self.
Source code in src/dataverse_utils/collections.py
def extract_field_metadata(self, field):
'''
Extract the metadata from a single field and make it into a human-readable dict.
Output updates self.
'''
#pylint: disable=too-many-branches, too-many-nested-blocks
#typeClass: compound = dict, primitive = string
#multiple: false= one thing, true=list
# so typeClass:compound AND multiple:true = a list of dicts.
# also, typeClass can be "controlledVocabulary" because reasons.
#is this crap recursive or is one level enough?
#[[x['typeName'], x['typeClass'], x['multiple']] for x in citation['fields']]
# {('primitive', False), ('compound', True), ('compound', False),
# ('primitive', True), ('controlledVocabulary', True)}
if not field['multiple']:
if field['typeClass']=='primitive':
self.update({field['typeName']: field['value']})
if field['typeClass'] == 'compound':
for v2 in field['value']:
self.extract_field_metadata(field['value'][v2])
if field['multiple']:
if field['typeClass'] == 'compound':
#produce a list of similar values concatenated
for v3 in field['value']:
interim = {}
for insane_dict in field['value']:
for v3 in insane_dict.values():
if interim.get(v3['typeName']):
interim.update({v3['typeName']:
interim[v3['typeName']]+ [v3['value']]})
else:
#sometimes value is None because reasons.
interim[v3['typeName']] = [v3.get('value', [] )]
LOGGER.debug(interim)
for k9, v9 in interim.items():
self.update({k9: '; '.join(v9)})
if field['typeClass'] == 'primitive':
self.update({field['typeName'] : '; '.join(field['value'])})
if field['typeClass'] == 'controlledVocabulary':
if isinstance(field['value'], list):
self.update({field['typeName'] : '; '.join(field['value'])})
else:
self.update({field['typeName'] : field['value']})
extract_metadata()
¶
Convenience function for parsing the study metadata of the latest version.
Results are written to self, accessible as a dictionary.
Source code in src/dataverse_utils/collections.py
def extract_metadata(self):
'''
Convenience function for parsing the study metadata of the latest version.
Results are written to self, accessible as a dictionary.
'''
if not self.__has_metadata():
return
for v in self.study_meta['data']['latestVersion']['metadataBlocks'].values():
for field in v['fields']:
self.extract_field_metadata(field)
self.__extract_licence_info()
self.__version()