diff options
-rwxr-xr-x | scripts/digest.py | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/scripts/digest.py b/scripts/digest.py new file mode 100755 index 0000000..acff4c1 --- /dev/null +++ b/scripts/digest.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 + +__program_description =''' +A script that digests the output of onion-grab. Meant to be used for sorting +out which domains set what Onion-Location values; as well as which domains set +the same onion addresses when pruned to two labels. See also stderr prints. +''' + +import sys +import argparse +import logging + +log = logging.getLogger(__name__) + +import base64 +import hashlib + +def main(args): + input_lines = [] + for inputFile in args.input_file: + with open(inputFile) as fp: + input_lines += [ line for line in fp ] + + onion2domains, domain2onions, numHTTP, numHTML = parse_input(input_lines) + log.info(f'found {numHTTP} HTTP headers with Onion-Location') + log.info(f'found {numHTML} HTML meta attributes with Onion-Location') + log.info(f'found {len(domain2onions)} unqiue domain names that set Onion-Location') + log.info(f'found {len(onion2domains)} unique two-label onion addresses in the process') + + log.info(f'storing domains with valid Onion-Location configurations in {args.output_domains}') + with open(args.output_domains, "w") as fp: + for d, o in domain2onions.items(): + fp.write(f'{d} {" ".join(o)}\n') + + log.info(f'storing two-label onion addresses that domains referenced in {args.output_onions}') + with open(args.output_onions, "w") as fp: + for o, d in onion2domains.items(): + fp.write(f'{o} {" ".join(d)}\n') + + +def parse_input(lines): + ''' + Outputs the following two dictionaries and integers: + - onion2domain: key is a unique two-label onion address; the value is a list + of domain names that referenced it with Onion-Location. + - domain2onion: key is the domain name that configured Onion-Location; the + value is a non-modified list of onion URLs, i.e., including an http or + https scheme, subdomains (if any), port (if any), and path (if any). + - number of valid HTTP headers (at most one per non-unique domain) + - number of valid HTML meta attributes (at most one per non-unique domain) + ''' + onion2domain = {} + domain2onion = {} + numHTTP = 0 + numHTML = 0 + for line in lines: + try: + line = line[:len(line)-1] + domain, onions, isHTTP, isHTML = parse_line(line) + for onion in onions: + addr = trim_onion(trimPath(trimScheme(onion))) + + onion2domain.setdefault(addr, []) + onion2domain[addr] += [ domain ] + + domain2onion.setdefault(domain, []) + domain2onion[domain] += [ onion ] + + if isHTTP: + numHTTP += 1 + if isHTML: + numHTML += 1 + except Exception as e: + log.debug(f'"{line}": {e}') + + return deduplicate_values(onion2domain), deduplicate_values(domain2onion), numHTTP, numHTML + +def parse_line(line): + ''' + Line format is: + + <domain> http=[value] html=[value] + + where at least one of http or html should have a value. Note: there has + been no vetting of what <value> is. Outputs domain and a list of values, + and bolean values indicating if the domain used an HTTP and/or HTML config. + ''' + s = line.split(" ") + if len(s) != 3: + raise Exception(f'invalid line split') + + domain = s[0] + http2onion = s[1] + html2onion = s[2] + onions = [] + + isHTTP = False + s = http2onion.split("=") + if len(s) < 2: + raise Exception(f'invalid http split') + if len(s[1]) > 1: + onions += [ "=".join(s[1:]) ] + isHTTP = True + + isHTML = False + s = html2onion.split("=") + if len(s) < 2: + raise Exception(f'invalid html split') + if len(s[1]) > 1: + onions += [ "=".join(s[1:]) ] + isHTML = True + + if isHTTP and isHTML: + log.debug(f'{domain} sets both http header and html attribute') + + return domain, onions, isHTTP, isHTML + +def trimScheme(url): + ''' + Removes required http:// or https:// scheme from url. + ''' + for scheme in [ "http://", "https://" ]: + if url.startswith(scheme): + return url[len(scheme):] + + raise Exception(f'no http or https scheme') + +def trimPath(url): + ''' + Trims the path off from the url. + ''' + return url.split("/")[0] + +def trim_onion(host): + ''' + Parses host as a v3 onion address, ports and subdomains are trimmed. + ''' + s = host.split(":") + if len(s) > 2: + raise Exception(f'invalid host name') + if len(s) == 2: + port = int(s[1]) + if port < 1 or port > 2**16 - 1: + raise Exception(f'port number not in [1, {2**16 - 1}]') + + domain = s[0] + s = domain.split(".") + if len(s) < 2: + raise Exception(f'too few labels to be an onion address') + if s[len(s)-1] != "onion": + raise Exception(f'the final DNS label must be "onion"') + if len(s[len(s)-2]) != 56: + raise Exception(f'the DNS label before ".onion" must be 56 bytes') + + assert_v3(base64.b32decode(s[len(s)-2].upper().encode('UTF-8'))) + return ".".join(s[len(s)-2:]) + +def assert_v3(blob): + ''' + https://gitweb.torproject.org/torspec.git/tree/rend-spec-v3.txt#n2240 + ''' + pubkey = blob[:32] + checksum = blob[32:34] + version = blob[34:35] + if version[0] != 3: + raise Exception(f'invalid version: {version[0]}') + + h = hashlib.sha3_256() + h.update(b'.onion checksum') + h.update(pubkey) + h.update(version) + c = h.digest() + if checksum[0] != c[0] or checksum[1] != c[1]: + raise Exception(f'invalid checksum') + +def deduplicate_values(key2values): + for key in key2values: + key2values[key] = sorted(list(set(key2values[key]))) + + return key2values + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__program_description) + parser.add_argument("-v", "--verbosity", type=str, default="info", + help="logging verbosity, select from debug, info, warning, error, and critical") + parser.add_argument("-o", "--output-onions", required=False, type=str, default="onions.txt", + help="path to output file keyed on two-label onion addresses") + parser.add_argument("-d", "--output-domains", required=False, type=str, default="domains.txt", + help="path to output file keyed on domains with Onion-Location") + parser.add_argument('-i','--input-file', nargs='+', required=True, + help='input file with collected data') + + args = parser.parse_args() + logging.basicConfig(level=logging.__dict__[args.verbosity.upper()], + format='%(filename)s:%(lineno)d %(levelname)s: %(message)s') + + sys.exit(main(args)) |