-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
113 lines (91 loc) · 3.37 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from collections import defaultdict
from Crypto.PublicKey import ElGamal
from Crypto.Random import get_random_bytes
from pypbc import *
class Server:
def __init__(self):
# Init data store
self.data = defaultdict(list)
# Init key lists
self.enc_keys = {}
self.td_keys = {}
# Init default encryption
_key = ElGamal.generate(bits=256, randfunc=get_random_bytes)
self._enc_prime = int(_key.p)
self._enc_generator = _key.g
# Init trapdoor encryption
self._td_params = Parameters(qbits=512, rbits=160)
self._td_pairing = Pairing(self._td_params)
self._td_generator = Element.random(self._td_pairing, G1)
def enc_prime(self):
"""Prime for default encryption"""
return self._enc_prime
def enc_generator(self):
"""Generator for default encryption"""
return self._enc_generator
def td_generator(self):
"""Generator for trapdoor encryption"""
return self._td_generator
def td_pairing(self):
"""Pairing for trapdoor encryption"""
return self._td_pairing
def register_user(self, user_id, enc_pub_key, td_pub_key):
self.enc_keys[user_id] = enc_pub_key
self.td_keys[user_id] = td_pub_key
def user_enc_pub(self, user_id):
"""Remember, user_id 0 is the consultant"""
return self.enc_keys[user_id]
def user_td_pub(self, user_id):
"""Remember, user_id 0 is the consultant"""
return self.td_keys[user_id]
def store_data(self, user_id, data):
"""
Store data in local datastore.
Data is stored as:
{
user_id: [(sigma, m_peck), (sigma, m_peck), ...],
...
}
"""
self.data[0].append(data) # Store separately for the consultant
self.data[user_id].append(data)
def evaluate_trapdoor(self, trapdoor, user_id):
"""
:return: List of sigma's for which the trapdoor was True
"""
data_list = self.data[user_id]
result = []
for (sigma, m_peck) in data_list:
if self.evaluate_trapdoor_single_mpeck(trapdoor, user_id, m_peck):
result.append(sigma)
return result
def evaluate_trapdoor_single_mpeck(self, trapdoor, user_id, m_peck):
"""
This method evaluates the given trapdoor with a single m_peck.
:return: True/False
"""
tjq1, tjq2, tjq3, indices = trapdoor
a, bs, cs = m_peck
# If the user id is not 0, get the second (1)
# element from bs later
uid = 1 if user_id != 0 else 0
# Return False if there are more keywords 'needed' than included in the m_peck.
# We can only return True if ALL keywords are present.
if max(indices) > len(cs) - 1:
return False
e = lambda e1, e2: self.td_pairing().apply(e1, e2)
left = Element.one(self.td_pairing(), G1)
for i in indices:
left = left * cs[i]
left = e(tjq1, left)
right1 = Element.one(self.td_pairing(), G1)
for i in indices:
right1 = right1 * tjq2[i]
right1 = e(a, right1)
g_s = bs[uid]
right2 = Element.one(self.td_pairing(), G1)
for i in indices:
right2 = right2 * tjq3[i]
right2 = e(g_s, right2)
right = right1 * right2
return left == right