-
Notifications
You must be signed in to change notification settings - Fork 0
/
search.py
83 lines (66 loc) · 2.68 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import string
from collections import defaultdict
from math import log
def update_url_scores(old: dict[str, float], new: dict[str, float]):
for url, score in new.items():
if url in old:
old[url] += score
else:
old[url] = score
return old
def normalize_string(input_string: str) -> str:
translation_table = str.maketrans(string.punctuation, " " * len(string.punctuation))
string_without_punc = input_string.translate(translation_table)
string_without_double_spaces = " ".join(string_without_punc.split())
return string_without_double_spaces.lower()
class SearchEngine:
def __init__(self, k1: float = 1.5, b: float = 0.75):
self._index: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
self._documents: dict[str, int] = {}
self.k1 = k1
self.b = b
@property
def posts(self) -> list[str]:
return list(self._documents.keys())
@property
def number_of_documents(self) -> int:
return len(self._documents)
@property
def avdl(self) -> float:
if not hasattr(self, "_avdl"):
self._avdl = sum(len(d) for d in self._documents.values()) / len(self._documents)
return self._avdl
def idf(self, kw: str) -> float:
N = self.number_of_documents
n_kw = len(self.get_urls(kw))
return log((N - n_kw + 0.5) / (n_kw + 0.5) + 1)
def bm25(self, kw: str) -> dict[str, float]:
result = {}
idf_score = self.idf(kw)
avdl = self.avdl
for url, freq in self.get_urls(kw).items():
numerator = freq * (self.k1 + 1)
denominator = freq + self.k1 * (1 - self.b + self.b * len(self._documents[url]) / avdl)
result[url] = idf_score * numerator / denominator
return result
def search(self, query: str) -> dict[str, float]:
keywords = normalize_string(query).split(" ")
url_scores: dict[str, float] = {}
for kw in keywords:
kw_urls_score = self.bm25(kw)
url_scores = update_url_scores(url_scores, kw_urls_score)
return url_scores
def index(self, url: str, content: str) -> None:
self._documents[url] = content
words = normalize_string(content).split(" ")
for word in words:
self._index[word][url] += 1
if hasattr(self, "_avdl"):
del self._avdl # pragma: no cover
def bulk_index(self, documents: list[tuple[str, str]]):
for url, content in documents:
self.index(url, content)
def get_urls(self, keyword: str) -> dict[str, int]:
keyword = normalize_string(keyword)
return self._index[keyword]
engine = SearchEngine()