Package cherrypy :: Package test :: Module test_session
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_session

  1  from cherrypy.test import test 
  2  test.prefer_parent_path() 
  3   
  4  import os 
  5  localDir = os.path.dirname(__file__) 
  6  import sys 
  7  import threading 
  8  import time 
  9   
 10  import cherrypy 
 11  from cherrypy.lib import sessions 
 12   
 13   
14 -def setup_server():
15 class Root: 16 17 _cp_config = {'tools.sessions.on': True, 18 'tools.sessions.storage_type' : 'ram', 19 'tools.sessions.storage_path' : localDir, 20 'tools.sessions.timeout': 0.017, # 1.02 secs 21 'tools.sessions.clean_freq': 0.017, 22 } 23 24 def testGen(self): 25 counter = cherrypy.session.get('counter', 0) + 1 26 cherrypy.session['counter'] = counter 27 yield str(counter)
28 testGen.exposed = True 29 30 def testStr(self): 31 counter = cherrypy.session.get('counter', 0) + 1 32 cherrypy.session['counter'] = counter 33 return str(counter) 34 testStr.exposed = True 35 36 def setsessiontype(self, newtype): 37 self.__class__._cp_config.update({'tools.sessions.storage_type': newtype}) 38 setsessiontype.exposed = True 39 40 def index(self): 41 sess = cherrypy.session 42 c = sess.get('counter', 0) + 1 43 time.sleep(0.01) 44 sess['counter'] = c 45 return str(c) 46 index.exposed = True 47 48 def keyin(self, key): 49 return str(key in cherrypy.session) 50 keyin.exposed = True 51 52 def delete(self): 53 cherrypy.session.delete() 54 sessions.expire() 55 return "done" 56 delete.exposed = True 57 58 def delkey(self, key): 59 del cherrypy.session[key] 60 return "OK" 61 delkey.exposed = True 62 63 def blah(self): 64 return self._cp_config['tools.sessions.storage_type'] 65 blah.exposed = True 66 67 def iredir(self): 68 raise cherrypy.InternalRedirect('/blah') 69 iredir.exposed = True 70 71 cherrypy.tree.mount(Root()) 72 cherrypy.config.update({'environment': 'test_suite'}) 73 74 75 from cherrypy.test import helper 76
77 -class SessionTest(helper.CPWebCase):
78
79 - def test_0_Session(self):
80 self.getPage('/testStr') 81 self.assertBody('1') 82 self.getPage('/testGen', self.cookies) 83 self.assertBody('2') 84 self.getPage('/testStr', self.cookies) 85 self.assertBody('3') 86 self.getPage('/delkey?key=counter', self.cookies) 87 self.assertStatus(200) 88 89 self.getPage('/setsessiontype/file') 90 self.getPage('/testStr') 91 self.assertBody('1') 92 self.getPage('/testGen', self.cookies) 93 self.assertBody('2') 94 self.getPage('/testStr', self.cookies) 95 self.assertBody('3') 96 self.getPage('/delkey?key=counter', self.cookies) 97 self.assertStatus(200) 98 99 # Wait for the session.timeout (1.02 secs) 100 time.sleep(1.25) 101 self.getPage('/') 102 self.assertBody('1') 103 104 # Test session __contains__ 105 self.getPage('/keyin?key=counter', self.cookies) 106 self.assertBody("True") 107 108 # Test session delete 109 self.getPage('/delete', self.cookies) 110 self.assertBody("done") 111 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')] 112 self.assertEqual(f(), []) 113 114 # Wait for the cleanup thread to delete remaining session files 115 self.getPage('/') 116 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')] 117 self.assertNotEqual(f(), []) 118 time.sleep(2) 119 self.assertEqual(f(), [])
120
121 - def test_1_Ram_Concurrency(self):
122 self.getPage('/setsessiontype/ram') 123 self._test_Concurrency()
124
125 - def test_2_File_Concurrency(self):
126 self.getPage('/setsessiontype/file') 127 self._test_Concurrency()
128
129 - def _test_Concurrency(self):
130 client_thread_count = 5 131 request_count = 30 132 133 # Get initial cookie 134 self.getPage("/") 135 self.assertBody("1") 136 cookies = self.cookies 137 138 data_dict = {} 139 140 def request(index): 141 for i in xrange(request_count): 142 self.getPage("/", cookies) 143 # Uncomment the following line to prove threads overlap. 144 ## print index, 145 data_dict[index] = v = int(self.body)
146 147 # Start <request_count> concurrent requests from 148 # each of <client_thread_count> clients 149 ts = [] 150 for c in xrange(client_thread_count): 151 data_dict[c] = 0 152 t = threading.Thread(target=request, args=(c,)) 153 ts.append(t) 154 t.start() 155 156 for t in ts: 157 t.join() 158 159 hitcount = max(data_dict.values()) 160 expected = 1 + (client_thread_count * request_count) 161 self.assertEqual(hitcount, expected)
162
163 - def test_3_Redirect(self):
164 # Start a new session 165 self.getPage('/testStr') 166 self.getPage('/iredir', self.cookies) 167 self.assertBody("file")
168
169 - def test_4_File_deletion(self):
170 # Start a new session 171 self.getPage('/testStr') 172 # Delete the session file manually and retry. 173 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 174 path = os.path.join(localDir, "session-" + id) 175 os.unlink(path) 176 self.getPage('/testStr', self.cookies)
177 178 179 180 if __name__ == "__main__": 181 setup_server() 182 helper.testmain() 183