# Example implementation of the countsketch streaming algorithm import numpy as np import hashlib from math import log from statistics import median import string import random from tqdm import tqdm def is_power_of_two(n): return (n != 0) and (n & (n-1) == 0) def h(val, idx, k): # Should return a number between 0 and k-1 # complete including return statement def g(val, idx): if idx > 127: print("Run out of bits in g() function") quit() bits = bin(int.from_bytes(hashlib.md5(val.encode()).digest(), "little"))[2:].zfill(128) return int(bits[idx])*2-1 # Map to {-1, 1} def countsketch(seq, t, k): k = next_power_of_2(k) # Round up to the next power of two print("k =", k) C = np.zeros((t, k)) for pair in tqdm(seq): # Complete missing lines return C, k def next_power_of_2(x): return 1 if x == 0 else 2**(x - 1).bit_length() def createturnstilesequence(length): a = 1.5 weights = [1]+[x**(-a) for x in range(1, len(string.printable))] seq = [] for _ in range(length): c = random.choice([-1, 1, 2]) # random counts l = random.choices(string.printable, weights=weights, k=1)[0] seq.append((c,l)) return seq def estimate(val, C, t, k): return median([g(val, j)*C[j, h(val, j, k)] for j in range(t)]) random.seed(7) seq = createturnstilesequence(10**5) t = 4 k = 32 C, k = countsketch(seq, t, k) # compute the countsketch from collections import defaultdict truecounts = defaultdict(int) for (c, x) in seq: truecounts[x] += c for l in string.printable: print("*"+l+"*", "estimate", estimate(l, C, t, k), "true count", truecounts[l])