-
Notifications
You must be signed in to change notification settings - Fork 19
/
score.py
146 lines (122 loc) · 5.67 KB
/
score.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from time import time
from pathlib import Path
import os
import codecs
import random
import gzip
random.seed(3) # set random seed for each run of the script to produce the same results
SCORED_PARTS = ('train', 'dev', 'test', 'dev-b', 'test-b')
FILIMDB_PATH = Path(__file__).with_name("FILIMDB")
def load_dataset_fast(data_dir=FILIMDB_PATH, parts=('train', 'dev', 'test')):
"""
Loads data from specified directory. Returns dictionary part->(list of texts, list of corresponding labels).
"""
part2xy = {} # tuple(list of texts, list of their labels) for train and test parts
for part in parts:
print('Loading %s set ' % part)
xpath = os.path.join(data_dir, '%s.texts' % part)
with codecs.open(xpath, 'r', encoding='utf-8') as inp:
texts = [s.strip() for s in inp.read().strip().split('\n')]
ypath = os.path.join(data_dir, '%s.labels' % part)
if os.path.exists(ypath):
with codecs.open(ypath, 'r', encoding='utf-8') as inp:
labels = [s.strip() for s in inp.readlines()]
assert len(labels) == len(texts), 'Number of labels and texts differ in %s set!' % part
for cls in set(labels):
print(cls, sum((1 for l in labels if l == cls)))
else:
labels = None
print('unlabeled', len(texts))
part2xy[part] = (['%s/%d' % (part, i) for i in range(len(texts))], texts, labels)
return part2xy
def load_labels_only(data_dir=FILIMDB_PATH, parts=('train', 'dev', 'test')):
"""
Loads data from specified directory. Returns dictionary part->(list of corresponding labels).
"""
part2xy = {} # tuple(indexes, list of labels) for train and test parts
for part in parts:
print(f"Loading {part} set labels")
ypath = os.path.join(data_dir, f"{part}.labels")
if os.path.exists(ypath):
with codecs.open(ypath, 'r', encoding='utf-8') as inp:
labels = [s.strip() for s in inp.readlines()]
texts_number = len(labels)
else:
labels, texts_number = None, 0
part2xy[part] = (['%s/%d' % (part, i) for i in range(texts_number)], labels)
return part2xy
# def load_dataset(data_dir='ILIMDB', parts=('train', 'dev', 'test', 'train_unlabeled')):
# """
# Deprecated! Used load_dataset_fast() instead!
# Loads data from specified directory. Returns dictionary part->(list of texts, list of corresponding labels).
# """
# part2xy = {} # tuple(list of texts, list of their labels) for train and test parts
# for part in parts:
# print('Loading %s set ' % part)
#
# unlabeled_subdir = os.path.join(data_dir, part, 'unlabeled')
# unlabeled = os.path.exists(unlabeled_subdir)
# examples = []
#
# if unlabeled:
# load_dir(unlabeled_subdir, None, examples)
# else:
# for cls in ('pos', 'neg'):
# subdir = os.path.join(data_dir, part, cls)
# load_dir(subdir, cls, examples)
# # shuffle examples: if the classifiers overfits to a particular order of labels,
# # it will show bad results on dev/test set;
# # train set should be shuffled by the train() function if the classifier can overfit to the order!
# if part != 'train':
# random.shuffle(examples)
# ids, texts, labels = list(zip(*examples)) # convert list of (text,label) pairs to 2 parallel lists
# part2xy[part] = (ids, texts, None) if unlabeled else (ids, texts, labels)
# for cls in set(labels):
# print(cls, sum((1 for l in labels if l == cls)))
# return part2xy
def load_dir(subdir, cls, examples):
st = time()
for fname in os.listdir(subdir):
fpath = os.path.join(subdir, fname)
with codecs.open(fpath, mode='r', encoding='utf-8') as inp:
s = ' '.join(inp.readlines()) # Join all lines into single line
examples.append((fpath, s, cls))
print(subdir, time() - st)
def score(y_pred, y_true):
assert len(y_pred) == len(y_true), 'Received %d but expected %d labels' % (len(y_pred), len(y_true))
correct = sum(y1 == y2 for y1, y2 in zip(y_pred, y_true))
print('Number of correct/total predictions: %d/%d' % (correct, len(y_pred)))
acc = 100.0 * correct / len(y_pred)
return acc
def save_preds(preds, preds_fname):
"""
Save classifier predictions in format appropriate for scoring.
"""
with codecs.open(preds_fname, 'w') as outp:
for a, b in preds:
print(a, b, sep='\t', file=outp)
print('Predictions saved to %s' % preds_fname)
def load_preds(preds_fname, compressed=False):
"""
Save classifier predictions in format appropriate for scoring.
"""
with (gzip.open(preds_fname, 'rt') if compressed else codecs.open(preds_fname, 'r')) as inp:
pairs = [l.strip().split('\t') for l in inp.readlines()]
ids, preds = zip(*pairs)
return ids, preds
def score_preds(preds_fname, data_dir=FILIMDB_PATH, compressed=False):
part2labels = load_labels_only(data_dir=data_dir, parts=SCORED_PARTS)
return score_preds_loaded(part2labels, preds_fname, compressed=compressed)
def score_preds_loaded(part2labels, preds_fname, compressed=False):
pred_ids, pred_y = load_preds(preds_fname, compressed=compressed)
pred_dict = {i: y for i, y in zip(pred_ids, pred_y)}
scores = {}
for part, (true_ids, true_y) in part2labels.items():
if true_y is None:
print('no labels for %s set' % part)
continue
pred_y = [pred_dict[i] for i in true_ids]
acc = score(pred_y, true_y)
print('%s set accuracy: %.2f' % (part, acc))
scores[part] = acc
return scores