# Example implementation of the countsketch streaming algorithm import numpy as np from numpy.random import zipf import hashlib from math import log from statistics import median import string import random from tqdm import tqdm def is_power_of_two(n): return (n != 0) and (n & (n-1) == 0) def h(val, idx, k): # Should return a number between 0 and k-1 assert(is_power_of_two(k)) # print(val, idx, k, int(log(k,2))) if (idx+1) * log(k, 2) > 256: print("Run out of bits in h() function") quit() bits = bin(int.from_bytes(hashlib.sha256(val.encode()).digest(), "little"))[2:].zfill(256) bits_for_h = bits[idx*(int(log(k, 2))):(idx+1)*(int(log(k, 2)))] assert(0 <= int(bits_for_h, 2) < k) return int(bits_for_h, 2) def g(val, idx): if idx > 127: print("Run out of bits in g() function") quit() bits = bin(int.from_bytes(hashlib.md5(val.encode()).digest(), "little"))[2:].zfill(128) return int(bits[idx])*2-1 # Map to {-1, 1} def countsketch(seq, t, k): k = next_power_of_2(k) # Round up to the next power of two print("k =", k) #bits = bin(int.from_bytes(hashlib.sha256("1".encode()).digest(), "little"))[2:].zfill(256) C = np.zeros((t, k)) for pair in tqdm(seq): c, x = pair for j in range(t): C[j, h(x, j, k)] += c*g(x, j) return C, k def next_power_of_2(x): return 1 if x == 0 else 2**(x - 1).bit_length() def createturnstilesequence(length): a = 1.5 weights = [1]+[x**(-a) for x in range(1, len(string.printable))] seq = [] for _ in range(length): c = random.choice([-1, 1, 2]) # random counts l = random.choices(string.printable, weights=weights, k=1)[0] seq.append((c,l)) return seq def estimate(val, C, t, k): return median([g(val, j)*C[j, h(val, j, k)] for j in range(t)]) random.seed(7) seq = createturnstilesequence(10**5) t = 4 k = 32 C, k = countsketch(seq, t, k) # compute the countsketch from collections import defaultdict truecounts = defaultdict(int) for (c, x) in seq: truecounts[x] += c for l in string.printable: print("*"+l+"*", "estimate", estimate(l, C, t, k), "true count", truecounts[l])