1 | """
|
---|
2 | This file demonstrates writing tests using the unittest module. These will pass
|
---|
3 | when you run "manage.py test".
|
---|
4 |
|
---|
5 | Replace this with more appropriate tests for your application.
|
---|
6 | """
|
---|
7 |
|
---|
8 | from django.test import TestCase
|
---|
9 |
|
---|
10 | import threading
|
---|
11 | import time
|
---|
12 | from django.db import connection, connections
|
---|
13 |
|
---|
14 |
|
---|
15 | def wait_and_check(conn):
|
---|
16 | time.sleep(10)
|
---|
17 | cursor = conn.cursor()
|
---|
18 | cursor.execute("select datname from pg_stat_activity where procpid = pg_backend_pid()")
|
---|
19 | print cursor.fetchone()[0]
|
---|
20 |
|
---|
21 | def wait_and_check_shared(conn):
|
---|
22 | # different method so that stack trace shows us where we are...
|
---|
23 | wait_and_check(conn)
|
---|
24 |
|
---|
25 | def open_wait_and_check(conn):
|
---|
26 | cursor = conn.cursor()
|
---|
27 | cursor.execute("select 1")
|
---|
28 | print cursor.fetchone()[0]
|
---|
29 | wait_and_check(conn)
|
---|
30 |
|
---|
31 | class SimpleTest(TestCase):
|
---|
32 | def test_new_conn(self):
|
---|
33 | t = threading.Thread(target=wait_and_check,
|
---|
34 | args=(connection,))
|
---|
35 | t.start()
|
---|
36 |
|
---|
37 | def test_share_conn(self):
|
---|
38 | connections['default'].allow_thread_sharing = True
|
---|
39 | t = threading.Thread(target=wait_and_check_shared,
|
---|
40 | args=(connections['default'],))
|
---|
41 | t.start()
|
---|
42 |
|
---|
43 | """ Disabled by default - prevents dropping the test database
|
---|
44 | def test_open_conn(self):
|
---|
45 | t = threading.Thread(target=open_wait_and_check,
|
---|
46 | args=(connection,))
|
---|
47 | t.start()
|
---|
48 | """
|
---|