-
Notifications
You must be signed in to change notification settings - Fork 2
/
init.lua
247 lines (227 loc) · 10.9 KB
/
init.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
local protocol = require "mongo.protocol"
-- 协议交互
local request_count = protocol.request_count
local request_query = protocol.request_query
local request_update = protocol.request_update
local request_insert = protocol.request_insert
local request_delete = protocol.request_delete
local request_mapreduce = protocol.request_mapreduce
local request_aggregate = protocol.request_aggregate
local request_getindexes = protocol.request_getindexes
local request_dropindexes = protocol.request_dropindexes
local request_createindex = protocol.request_createindex
-- 握手
local request_auth = protocol.request_auth
local request_handshake = protocol.request_handshake
local toint = math.tointeger
local tonumber = tonumber
local fmt = string.format
local tcp = require "internal.TCP"
local gridfs = require "mongo.gridfs"
local class = require "class"
---@class MongoDB @`MongoDB`对象
local mongo = class("MongoDB")
function mongo:ctor(opt)
self.reqid = 1
self.SSL = opt.SSL
self.db = opt.db or "admin"
self.host = opt.host or "localhost"
self.port = opt.port or 27017
self.username = opt.username
self.password = opt.password
self.auth_mode = opt.auth_mode or "SCRAM-SHA-1"
self.sock = tcp:new()
self.gridfs = gridfs:new({ctx = self})
self.have_transaction = false
self.connected = false
end
function mongo:set_timeout(timeout)
if self.sock and tonumber(timeout) then
self.sock._timeout = timeout
end
end
---comment 连接服务器
function mongo:connect()
if self.connected then
return true, "already connected."
end
if not self.sock then
self.sock = tcp:new()
end
local ok, err
ok, err = self.sock:connect(self.host, self.port)
if not ok then
return false, err
end
if self.SSL then
ok, err = self.sock:ssl_handshake()
if not ok then
return false, err or "Mongo SSL handshake failed."
end
end
ok, err = request_handshake(self)
if not ok then
return false, err
end
ok, err = request_auth(self)
if not ok then
return false, err
end
self.reqid = self.reqid + 1
self.connected = true
return true
end
---comment 查询数据
---@param database string @需要查询的数据库名称
---@param collect string @需要查询的集合名称
---@param filter table @需要执行查询的条件
---@param option table @需要查询的可选参数(`cursor`/`limit`/`skip`/`sort`)
---@return table, integer | nil, string @成功返回结果数据与游标`id`, 失败返回`false`与出错信息;
function mongo:find(database, collect, filter, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid find collect or database.")
local tab, err = request_query(self, database, collect, filter, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return tab.cursor.firstBatch or tab.cursor.nextBatch, tab.cursor.id
end
---comment 插入数据
---@param database string @需要插入的数据库名称
---@param collect string @需要插入的集合名称
---@param documents table @需要插入的文档数组
---@param option table @需要插入的文档可选参数(`ordered`)
function mongo:insert(database, collect, documents, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid insert collect or database.")
assert(type(documents) == 'table' and #documents > 0 and type(documents[1]) == "table", "Invalid insert documents.")
local tab, err = request_insert(self, database, collect, documents, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return { acknowledged = (tab['ok'] == 1 or tab['ok'] == true) and true or false, insertedCount = toint(tab['n']) }
end
---comment 修改数据
---@param database string @需要修改的数据库名称
---@param collect string @需要修改的集合名称
---@param filter table @需要更新的过滤条件
---@param update table @需要更新的文档结果
---@param option table @需要更新的可选参数(`upsert`/`multi`)
function mongo:update(database, collect, filter, update, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid update collect or database.")
assert(type(filter) == 'table', "Invalid update filter.")
local tab, err = request_update(self, database, collect, filter, update, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return{ acknowledged = (tab['ok'] == 1 or tab['ok'] == true) and true or false, matchedCount = toint(tab['n']), modifiedCount = toint(tab['nModified']) }
end
---comment 删除数据
---@param database string @需要删除的数据库名称
---@param collect string @需要删除的集合名称
---@param filter table @需要删除的过滤条件
---@param option table @需要删除的可选参数(`one`)
function mongo:delete(database, collect, filter, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid delete collect or database.")
assert(type(filter) == 'table', "Invalid delete filter.")
local tab, err = request_delete(self, database, collect, filter, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return { acknowledged = (tab['ok'] == 1 or tab['ok'] == true) and true or false, deletedCount = toint(tab['n']), }
end
---comment COUNT - 聚合函数
---@param database string @需要查询的数据库名称
---@param collect string @需要查询的集合名称
---@param filter table @需要执行查询的条件
---@param option table @需要查询的可选参数
---@return table, integer | nil, string @成功返回结果数据与游标`id`, 失败返回`false`与出错信息;
function mongo:count(database, collect, filter, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid count collect or database.")
local tab, err = request_count(self, database, collect, filter, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return { acknowledged = true , count = tab.cursor.firstBatch[1].n }, tab.cursor.id
end
---comment AGGREGATE - 聚合函数
---@param database string @需要查询的数据库名称
---@param collect string @需要查询的集合名称
---@param filter table @需要执行查询的条件
---@param option table @需要查询的可选参数
---@return table, integer | nil, string @成功返回结果数据与游标`id`, 失败返回`false`与出错信息;
function mongo:aggregate(database, collect, filter, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid aggregate collect or database.")
local tab, err = request_aggregate(self, database, collect, filter, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return tab.cursor.firstBatch or tab.cursor.nextBatch, tab.cursor.id
end
---comment MapReduce - 计算函数
---@param database string @需要查询的数据库名称
---@param collect string @需要查询的集合名称
---@param map string @需要查询的映射函数(`javascript function`)
---@param reduce string @需要查询的统计函数(`javascript function`)
---@param option table @需要查询的条件(`query`/`limit`/`sort`/`out`)
---@return table | nil, string @成功返回结果数据与游标`id`, 失败返回`false`与出错信息;
function mongo:mapreduce(database, collect, map, reduce, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid mapreduce collect or database.")
assert(type(map) == 'string' and map ~= '' and type(reduce) == 'string' and reduce ~= '', "Invalid Map or Reduce function.")
local tab, err = request_mapreduce(self, database, collect, map, reduce, option)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return tab.results
end
---comment 创建索引
---@param database string @需要指定的数据库名称
---@param collect string @需要指定的集合名称
---@param indexes table @索引名称与内容
---@param option table @索引的额外参数(`background`/`unique`等)
---@return table, nil | nil, string @成功返回结果创建内容, 失败返回`false`与出错信息;
function mongo:create_indexes(database, collect, indexes, option)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid Indexed collect or database.")
assert(type(indexes) == 'table', "Invalid Index type.")
local tab, err = request_createindex(self, database, collect, indexes, option or {})
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return tab
end
---comment 获取索引
---@param database string @需要指定的数据库名称
---@param collect string @需要指定的集合名称
---@return table, nil | nil, string @成功返回结果数据, 失败返回`false`与出错信息;
function mongo:get_indexes(database, collect)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid Indexed collect or database.")
local tab, err = request_getindexes(self, database, collect)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return tab.cursor.firstBatch
end
---comment 删除索引
---@param database string @需要指定的数据库名称
---@param collect string @需要指定的集合名称
---@param indexname string @需要删除的索引名称
function mongo:drop_indexes(database, collect, indexname)
assert(type(database) == 'string' and database ~= '' and type(collect) == 'string' and collect ~= '', "Invalid Indexed collect or database.")
assert(type(indexname) == 'string', "Invalid Index name.")
local tab, err = request_dropindexes(self, database, collect, indexname)
if not tab or tab.errmsg then
return false, err or fmt('{"errcode":%d,"errmsg":"%s"}', tab.code, tab.errmsg)
end
return tab
end
---comment 关闭连接
function mongo:close()
self.connected = false
if self.sock then
self.sock:close()
self.sock = nil
end
if self.gridfs then
self.gridfs:close()
self.gridfs = nil
end
end
return mongo