%load_ext autoreload
%autoreload 2

class Process[source]

Process(queries:list[Callable], steps:list[Callable], filter:Callable)

Process(queries: 'list[Callable]', steps: 'list[Callable]', filter: 'Callable')

Simple in Memory Runner

class RunnerMemory[source]

RunnerMemory(process:Process, progress_bar:bool=True)

Minibatch : too small leads to time overhead on setting up connections, too large can lead to memory issues.

minibatch[source]

minibatch(seq, size)

compress_encode[source]

compress_encode(obj:bytes)

compress_decode[source]

compress_decode(obj)

class RunnerCached[source]

RunnerCached(process:Process, path:Union[str, Path], progress_bar:bool=True, batch_size:int=1024)

Simple test process

In practice we'd use a something like parsel, beautifulsoup or selectolax. However for a simple demo this has no external dependencies.

from html.parser import HTMLParser

class SkeptricHTMLParser(HTMLParser):
    def __init__(self):
        super().__init__()
        self.extract = {}
        self.field = None
        
    def handle_starttag(self, tag, attrs):
        if dict(attrs).get('class') == 'post-full-title':
            self.field = 'title'
        if dict(attrs).get('class') == 'byline-meta-date':
            self.field = 'date'

    def handle_endtag(self, tag):
        self.field = None

    def handle_data(self, data):
        if self.field is not None:
            self.extract[self.field] = data

def skeptric_filter(records):
    for r in records:
        if r.mime == 'text/html' and r.status == 200:
            yield r
        
def skeptric_extract(content, metadata):
    parser = SkeptricHTMLParser()
    html = content.decode('utf-8')
    parser.feed(html)
    data = parser.extract
    data['url'] = metadata.url
    data['timestamp'] = metadata.timestamp
    return data

def skeptric_verify_extract(content, metadata):
    if not content.get('title'):
        raise ValueError('Missing title')
    if not content.get('date'):
        raise ValueError('Missing date')
    return content

from datetime import datetime
def skeptric_normalise(content, metadata):
    content = content.copy()
    content['date'] = datetime.strptime(content['date'], '%d %B %Y')
    return content

from webrefine.query import WarcFileQuery
test_data = '../resources/test/skeptric.warc.gz'

skeptric_query = WarcFileQuery(test_data)

Test Memory Runner

skeptric_process = Process(queries=[skeptric_query],
                     filter=skeptric_filter,
                     steps=[skeptric_extract, skeptric_verify_extract, skeptric_normalise])
data = list(RunnerMemory(skeptric_process).run())
data
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2') at step skeptric_verify_extract: Missing title
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/tags/data/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 38), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=130269, digest='R7CLAACFU5L7T5LKI5G53RZSMCNUNV6F') at step skeptric_verify_extract: Missing title
[{'title': "Pagination in Internet Archive's Wayback Machine with CDX",
  'date': datetime.datetime(2021, 11, 23, 0, 0),
  'url': 'https://skeptric.com/pagination-wayback-cdx/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 34)},
 {'title': 'About Skeptric',
  'date': datetime.datetime(2021, 10, 18, 0, 0),
  'url': 'https://skeptric.com/about/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 37)},
 {'title': 'Searching 100 Billion Webpages Pages With Capture Index',
  'date': datetime.datetime(2020, 6, 11, 0, 0),
  'url': 'https://skeptric.com/searching-100b-pages-cdx/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 39)},
 {'title': 'Fast Web Dataset Extraction Worfklow',
  'date': datetime.datetime(2021, 11, 21, 0, 0),
  'url': 'https://skeptric.com/fast-web-data-workflow/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 39)},
 {'title': 'Unique Key for Web Captures',
  'date': datetime.datetime(2021, 11, 19, 0, 0),
  'url': 'https://skeptric.com/key-web-captures/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 40)},
 {'title': 'Hugo Readdir Error with Emacs',
  'date': datetime.datetime(2021, 11, 22, 0, 0),
  'url': 'https://skeptric.com/emacs-tempfile-hugo/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 40)}]

We can always look up an error

Would be nicer if everything was a string so we didn't have to handle the imports...

from webrefine.query import WarcFileRecord
from pathlib import PosixPath
WarcFileRecord(url='https://skeptric.com/', timestamp=datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2')
WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2')

Test Persistent Runner

%%time
from pathlib import Path
test_cache_path = Path('./test_skeptric_cache.sqlite')

if test_cache_path.exists():
    test_cache_path.unlink()

data_cached = list(RunnerCached(skeptric_process, test_cache_path).run())
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2') at step skeptric_verify_extract: Missing title
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/tags/data/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 38), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=130269, digest='R7CLAACFU5L7T5LKI5G53RZSMCNUNV6F') at step skeptric_verify_extract: Missing title
CPU times: user 868 ms, sys: 40.2 ms, total: 908 ms
Wall time: 908 ms
assert data_cached == data
assert data_cached == list(RunnerCached(skeptric_process, test_cache_path).run())
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2') at step skeptric_verify_extract: Missing title
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/tags/data/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 38), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=130269, digest='R7CLAACFU5L7T5LKI5G53RZSMCNUNV6F') at step skeptric_verify_extract: Missing title
%%time
from pathlib import Path
test_cache_path = Path('./test_skeptric_cache.sqlite')

if test_cache_path.exists():
    test_cache_path.unlink()

data_cached_small_batch = list(RunnerCached(skeptric_process, test_cache_path, batch_size=2).run())
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2') at step skeptric_verify_extract: Missing title
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/tags/data/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 38), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=130269, digest='R7CLAACFU5L7T5LKI5G53RZSMCNUNV6F') at step skeptric_verify_extract: Missing title
CPU times: user 797 ms, sys: 25 ms, total: 822 ms
Wall time: 858 ms
assert data_cached == data_cached_small_batch
from webrefine.query import CommonCrawlQuery, WaybackQuery
from datetime import datetime

skeptric_cc = CommonCrawlQuery('skeptric.com/*', apis=['CC-MAIN-2021-43'])

skeptric_wb = WaybackQuery('skeptric.com/*', start='202103', end='202111')

def skeptric_filter_strict(records):
    for r in records:
        if r.mime != 'text/html' or r.status != 200:
            continue
        if '/tags/' in r.url or '/notebooks/' in r.url or r.url.endswith('skeptric.com/'):
            continue
        yield r

skeptric_process_all = Process(queries=[skeptric_query, skeptric_cc, skeptric_wb],
                     filter=skeptric_filter_strict,
                     steps=[skeptric_extract, skeptric_verify_extract, skeptric_normalise])

The cached runner has to evaluate everything the first time and so is slow.

TODO: We need to fix the fetch progress bar (callbacks?)

%time data_all = list(RunnerCached(skeptric_process_all, test_cache_path).run())
len(data_all)
CPU times: user 18.9 s, sys: 1.47 s, total: 20.4 s
Wall time: 1min 14s
882

Cache size in MB

test_cache_path.stat().st_size / 1024**2
1.828125

It runs much faster the second time

%time data_all_2 = list(RunnerCached(skeptric_process_all, test_cache_path).run())
assert data_all == data_all_2
CPU times: user 8.93 s, sys: 187 ms, total: 9.11 s
Wall time: 9.02 s
test_cache_path.unlink()