%load_ext autoreload
%autoreload 2
Minibatch : too small leads to time overhead on setting up connections, too large can lead to memory issues.
In practice we'd use a something like parsel, beautifulsoup or selectolax. However for a simple demo this has no external dependencies.
from html.parser import HTMLParser
class SkeptricHTMLParser(HTMLParser):
def __init__(self):
super().__init__()
self.extract = {}
self.field = None
def handle_starttag(self, tag, attrs):
if dict(attrs).get('class') == 'post-full-title':
self.field = 'title'
if dict(attrs).get('class') == 'byline-meta-date':
self.field = 'date'
def handle_endtag(self, tag):
self.field = None
def handle_data(self, data):
if self.field is not None:
self.extract[self.field] = data
def skeptric_filter(records):
for r in records:
if r.mime == 'text/html' and r.status == 200:
yield r
def skeptric_extract(content, metadata):
parser = SkeptricHTMLParser()
html = content.decode('utf-8')
parser.feed(html)
data = parser.extract
data['url'] = metadata.url
data['timestamp'] = metadata.timestamp
return data
def skeptric_verify_extract(content, metadata):
if not content.get('title'):
raise ValueError('Missing title')
if not content.get('date'):
raise ValueError('Missing date')
return content
from datetime import datetime
def skeptric_normalise(content, metadata):
content = content.copy()
content['date'] = datetime.strptime(content['date'], '%d %B %Y')
return content
from webrefine.query import WarcFileQuery
test_data = '../resources/test/skeptric.warc.gz'
skeptric_query = WarcFileQuery(test_data)
skeptric_process = Process(queries=[skeptric_query],
filter=skeptric_filter,
steps=[skeptric_extract, skeptric_verify_extract, skeptric_normalise])
data = list(RunnerMemory(skeptric_process).run())
data
We can always look up an error
Would be nicer if everything was a string so we didn't have to handle the imports...
from webrefine.query import WarcFileRecord
from pathlib import PosixPath
WarcFileRecord(url='https://skeptric.com/', timestamp=datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2')
%%time
from pathlib import Path
test_cache_path = Path('./test_skeptric_cache.sqlite')
if test_cache_path.exists():
test_cache_path.unlink()
data_cached = list(RunnerCached(skeptric_process, test_cache_path).run())
assert data_cached == data
assert data_cached == list(RunnerCached(skeptric_process, test_cache_path).run())
%%time
from pathlib import Path
test_cache_path = Path('./test_skeptric_cache.sqlite')
if test_cache_path.exists():
test_cache_path.unlink()
data_cached_small_batch = list(RunnerCached(skeptric_process, test_cache_path, batch_size=2).run())
assert data_cached == data_cached_small_batch
from webrefine.query import CommonCrawlQuery, WaybackQuery
from datetime import datetime
skeptric_cc = CommonCrawlQuery('skeptric.com/*', apis=['CC-MAIN-2021-43'])
skeptric_wb = WaybackQuery('skeptric.com/*', start='202103', end='202111')
def skeptric_filter_strict(records):
for r in records:
if r.mime != 'text/html' or r.status != 200:
continue
if '/tags/' in r.url or '/notebooks/' in r.url or r.url.endswith('skeptric.com/'):
continue
yield r
skeptric_process_all = Process(queries=[skeptric_query, skeptric_cc, skeptric_wb],
filter=skeptric_filter_strict,
steps=[skeptric_extract, skeptric_verify_extract, skeptric_normalise])
The cached runner has to evaluate everything the first time and so is slow.
TODO: We need to fix the fetch progress bar (callbacks?)
%time data_all = list(RunnerCached(skeptric_process_all, test_cache_path).run())
len(data_all)
Cache size in MB
test_cache_path.stat().st_size / 1024**2
It runs much faster the second time
%time data_all_2 = list(RunnerCached(skeptric_process_all, test_cache_path).run())
assert data_all == data_all_2
test_cache_path.unlink()