Source code for ndb_adapter.report_parser
import re
from io import BytesIO
from typing import List, Callable
import xlrd
from ndb_adapter.enums import ReportType
from ndb_adapter.html_parser import NDBHtmlParser
from ndb_adapter.ndb_base import NDBBase
from ndb_adapter.search_report import AdvancedReport, SimpleReport, StatisticReport
from ndb_adapter.search_result import SearchResult, SimpleResult, AdvancedResult
from ndb_adapter.summary_result import SummaryResult
[docs]def parse_to_table(text: str) -> List[str]:
"""To parse text to list of strings
:param text: input string
:return: list of string
:rtype: List[str]
"""
return [t.strip() for t in text.splitlines()]
[docs]def parse_csv(table: List[str], result_class: Callable[[], AdvancedReport]) -> List[AdvancedReport]:
"""To parse table of string as csv to list of AdvancedReport
:param table: string table to parse
:type table: List[str]
:param result_class: class that init object before adding to results
:type result_class: Callable[[], AdvancedReport]
:return: list of advanced report
:rtype: List[AdvancedReport]
"""
result = []
try:
headers = table[0].split(',')
for elem in table[1:]:
before = 0
last = 0
record = {}
open_quotes = False
values = elem.split(',')
values_len = len(values)
for header in headers:
for i in range(last, values_len):
if '"' in values[i]:
if not open_quotes:
before = i
open_quotes = True
else:
record[header] = ",".join(values[before:i+1]).replace('"', '')
open_quotes = False
last = i+1
break
elif not open_quotes:
record[header] = values[i]
last = i+1
break
result.append(result_class(record))
except IndexError:
pass
return result
[docs]def parse_xls(file: BytesIO) -> List[SimpleReport]:
"""To parse xls file to list of Simplereport
:param file: file bytes to parse
:type file: BytesIO
:return: list of SimpleReport
:rtype: List[SimpleReport]
"""
result = []
try:
book = xlrd.open_workbook(file_contents=file.read())
sheet = book.sheet_by_index(0)
headers = sheet.row_values(0)
for row in range(1, sheet.nrows):
record = dict(zip(headers, sheet.row_values(row)))
result.append(SimpleReport(record))
except TypeError:
pass
return result
[docs]def parse_advanced_search_report(text: str, text_stats: str, report_type: ReportType) -> AdvancedResult:
"""To parse advanced search report from text to AdvancedResult
:param text: text to parse
:type text: str
:param text_stats: statistics text to parse
:type text_stats: str
:param report_type: type of report to parse
:type report_type: ReportType
:return: advanced search result
:rtype: AdvancedResult
"""
result = AdvancedResult()
result_class = report_type.value
raw_table = parse_to_table(text)
count = raw_table[1].rpartition(': ')[-1]
report = parse_csv(raw_table[2:], result_class)
if text_stats:
raw_stats = parse_to_table(text_stats)
stats_list = parse_csv(raw_stats[2:], StatisticReport)
result.statistics = stats_list
try:
result.count = (int(count))
except (TypeError, ValueError, OverflowError):
pass
result.report = report
return result
[docs]def parse_search_report(html: str) -> SimpleResult:
"""To parse simple search report from html to SimpleResult
:param html: html string to parse
:type html: str
:return: simple search result
:rtype: SimpleResult
"""
result = SearchResult()
parser = NDBHtmlParser()
parser.analyze(html)
count_tag = parser.find_one('span', params={'id': 'numRec'})
file_tag = parser.find_one('a', after=count_tag, params={'id': 'fileGal'})
url = file_tag.attrs.get('href', '') if file_tag else ''
from ndb_adapter.ndb_download import DownloadHelper
file = DownloadHelper.download_file(NDBBase.siteUrl + url)
report = parse_xls(file)
try:
result.count = int(count_tag.data)
except (TypeError, ValueError, OverflowError, AttributeError):
pass
result.report = report
return result
[docs]def parse_summary(html: str) -> SummaryResult:
"""To parse summary search from html to SummaryResult
:param html: html string to parse
:type html: str
:return: summary search result
:rtype: SummaryResult
"""
report = {}
result = SummaryResult()
parser = NDBHtmlParser()
parser.analyze(html)
summary_tag = parser.find_one('div', params={'id': 'summary'})
if summary_tag:
heading_tag = parser.find_one('h2', params={'class': 'justHeading'})
if heading_tag and "NDB ID" in heading_tag.data:
ndb_id_tag = next(heading_tag)
if ndb_id_tag:
report["NDB ID"] = ndb_id_tag.data
report["PDB ID"] = ndb_id_tag.next_data()
details_tags = parser.find_all('h3', after=heading_tag, params={'id': 'dataKey'})
if details_tags:
for i in range(len(details_tags) - 1):
tag = details_tags[i]
if 'Nucleic Acid Sequence' in tag.data:
chains_tags = parser.find_one('div', params={'id': 'naSeq'})
if chains_tags:
report['Nucleic Acid Sequence'] = {}
for chain_tag in chains_tags.children:
if chain_tag and chain_tag.attrs.get('class') == 'blueBoldTxt':
report['Nucleic Acid Sequence'][chain_tag.data] = chain_tag.next_data()
elif 'Protein Sequence' in tag.data:
chains_tags = parser.find_one('div', params={'id': 'protSeq'})
if chains_tags:
report['Protein Sequence'] = {}
for chain_tag in chains_tags.children:
if chain_tag and chain_tag.attrs.get('class') == 'blueBoldTxt':
report['Protein Sequence'][chain_tag.data] = chain_tag.next_data()
elif 'Primary Citation' in tag.data:
report['Primary Citation'] = {}
report['Primary Citation']['Authors'] = tag.next_data()
before = details_tags[i+1] if i + 1 < len(details_tags) else None
journal_tag = parser.find_one('i', after=tag.next(), before=before)
if journal_tag:
report['Primary Citation']['Journal'] = journal_tag.data
title_tag = parser.find_one('a', after=tag.next(), before=before)
if title_tag:
report['Primary Citation']['Title'] = title_tag.data
report['Primary Citation']['Pubmed Id'] = title_tag.attrs.get("href", "").split("/")[-1]
next_data = tag.next().next_data()
if next_data:
next_data = next_data.split(',')
try:
report['Primary Citation']['Year'] = next_data[-1]
report['Primary Citation']['pp'] = next_data[-2]
except IndexError:
pass
else:
next_data = tag.next().next_data()
if next_data:
next_data = next_data.split(',')
try:
report['Primary Citation']['Year'] = next_data[-1]
report['Primary Citation']['pp'] = next_data[-2]
report['Primary Citation']['Title'] = ','.join(next_data[:-3])
except IndexError:
pass
elif 'Download Data' in tag.data:
print("Download: ", str(tag))
elif 'Cell Constants' in tag.data:
text = ''
next_tag = next(tag)
while next_tag and next_tag.name == 'p':
text += next_tag.data
next_tag = next(next_tag)
pattern = re.compile(r"([^\W\d])\s+=\s+([\d\.]+)", re.UNICODE)
matches = pattern.findall(text)
if matches:
report['Cell Constants'] = {}
for match in matches:
try:
if 'α' == match[0]:
report['Cell Constants']['alpha'] = float(match[1])
elif 'β' == match[0]:
report['Cell Constants']['beta'] = float(match[1])
elif 'γ' == match[0]:
report['Cell Constants']['gamma'] = float(match[1])
else:
report['Cell Constants'][match[0]] = float(match[1])
except ValueError:
pass
else:
report[tag.data.replace(":", "")] = tag.next_data()
result.update(report)
return result