-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.py
142 lines (114 loc) · 3.71 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#coding=utf-8
__author__ = 'xiyuanbupt'
import unittest
#from tests.example import TestFoo
#from tests.testFileDownloader import TestFileDownloader
#from tests.testSender import TestSender
import requests
import urlparse
import json
import pprint
import time
import datetime
from SOAPpy import SOAPProxy
url_base = 'http://localhost:8000'
def test_sender():
print u'测试推送xmly音频文件,推送两个文件并且未强制推送'
url = urlparse.urljoin(url_base,'api/sender/vod/xmly')
body = {
"post_address":"http://httpbin.org/post",
"_ids":["571a2e0de1382377590c23e7","571a2e0ce1382377590c23e6"],
"force_push":True
}
res = requests.post(url,json=body)
print(u'测试的结果是: ')
pprint.pprint(json.loads(res.text))
assert res.ok == True
print u''
print u'测试推送qt音频文件与图片文件,推送两个文件并强制推送'
url = urlparse.urljoin(url_base,'api/sender/vod/qt')
body = {
"post_address":"http://httpbin.org/post",
"_ids":["582568ef0e640d11bb38329c","582568ef0e640d11bb38329b","582568ef0e640d11bb38329a"],
"force_push":True
}
res = requests.post(url,json=body)
pprint.pprint(json.loads(res.text))
assert res.ok == True
def test_stop_runner():
url = urlparse.urljoin(url_base,'api/xmly/full')
res = requests.get(url)
pprint.pprint(res.text)
assert json.loads(res.text).get('statename') == 'STOPPED'
class TestApi(unittest.TestCase):
def testSender(self):
url = urlparse.urljoin(url_base,'api/sender/vod/xmly')
body = {
"post_address":"http://localhost:8080/",
"_ids":["571a2e0de1382377590c23e7","571a2e0ce1382377590c23e6"],
"force_push":True
}
res = requests.post(url,json=body)
self.assertTrue(res.ok)
url = urlparse.urljoin(url_base,'api/sender/vod/qt')
body = {
"post_address":"http://localhost:8080/",
"_ids":["582568ef0e640d11bb38329c","582568ef0e640d11bb38329b","582568ef0e640d11bb38329a"],
"force_push":True
}
res = requests.post(url,json=body)
pprint.pprint(json.loads(res.text))
self.assertTrue(res.ok)
'''
def testXMLYFullStopRunner(self):
url = urlparse.urljoin(url_base,'api/xmly/full')
res = requests.get(url)
self.assertEqual(
json.loads(res.text).get('statename'),
'STOPPED'
)
def testXMLYTopnStopRunner(self):
url = urlparse.urljoin(url_base,'api/xmly/topn')
before = datetime.datetime.now()
print requests.delete(url).text
after = datetime.datetime.now()
print (after-before).seconds
res = requests.get(url)
print res.text
self.assertEqual(
json.loads(res.text).get('state'),
100
)
res = requests.post(
url,
json={
"topn_n":12,
"urls":[]
}
)
print res.text
time.sleep(2)
res = requests.get(url)
self.assertEqual(
json.loads(res.text).get('statename'),
'RUNNING'
)
before = datetime.datetime.now()
print requests.delete(url).text
after = datetime.datetime.now()
print (after-before).seconds
'''
def test_cnr_api():
namespace = ("m", "urn:mpc")
url = "http://10.20.30.21:8088/"
proxy = SOAPProxy(url,namespace)
proxy.config.debug = 1
proxy.mpccommit(
strInput = "foo"
)
if __name__ == "__main__":
test_cnr_api()
unittest.main()
pass
# unittest.main()
# sudo docker run --name mongo_instance_001 -d my/repo --noprealloc --smallfiles