-
Notifications
You must be signed in to change notification settings - Fork 1
/
elasticInsert.py
executable file
·66 lines (59 loc) · 2.05 KB
/
elasticInsert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python3
from elasticsearch import helpers, Elasticsearch
import csv
es = Elasticsearch(
[{'host': 'localhost', 'port': 9200}])
indexName = "movie_index1"
# profanws edw vazoume to antistoixo path gia to movies.csv
csvFilePath = 'movies.csv'
# vazoume similarity = BM25, opote by default psaxnei me to metric pou 8eloume
request_body = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"similarity": {
"default": {
"type": "BM25",
"b": 0.5,
"k1": 0
}
}
},
"mappings": {
"properties": {
"movieId": {"type": "integer"},
"title": {
"type": "text",
"analyzer": "english"
},
"genres": {"type": "text"},
"ratingArr": {
"type": "nested",
"properties": {
"movieId":{"type": "integer"},
"rating": {"type": "float"},
"userId": {"type": "integer"}
}
}
}
}
}
print("Creating " + indexName)
es.indices.create(index=indexName, body=request_body)
with open(csvFilePath, encoding="utf-8") as movies, open('ratings.csv', encoding="utf-8") as ratings:
movieReader = csv.DictReader(movies)
ratingReader = csv.DictReader(ratings)
sortedRating = sorted(ratingReader, key=lambda row: int(row['movieId'])) #sorting ratings by movieId for faster insert
z = 0
rows = []
for row in movieReader:
row['ratingArr'] = []
for i in range(z, len(sortedRating)):
if sortedRating[i]['movieId'] == row['movieId']:
gen = {"movieId": sortedRating[i]['movieId'], "rating": sortedRating[i]['rating'], "userId": sortedRating[i]['userId']}
row['ratingArr'].append(gen)
else:
z = i #so for loop can continue from where it stopped (break)
break
rows.append(row)
helpers.bulk(es, rows, index=indexName)