Ticket #2705: for_update_tests_14778.diff
File for_update_tests_14778.diff, 20.0 KB (added by , 14 years ago) |
---|
-
django/db/models/sql/compiler.py
117 117 result.append('LIMIT %d' % val) 118 118 result.append('OFFSET %d' % self.query.low_mark) 119 119 120 if self.query.select_for_update and self.connection.features.has_select_for_update: 121 nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update 122 result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait)) 123 120 124 return ' '.join(result), tuple(params) 121 125 122 126 def as_nested_sql(self): -
django/db/models/sql/query.py
11 11 from django.utils.tree import Node 12 12 from django.utils.datastructures import SortedDict 13 13 from django.utils.encoding import force_unicode 14 from django.db import connections, DEFAULT_DB_ALIAS 14 from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError 15 15 from django.db.models import signals 16 16 from django.db.models.fields import FieldDoesNotExist 17 17 from django.db.models.query_utils import select_related_descend, InvalidQuery … … 23 23 ExtraWhere, AND, OR) 24 24 from django.core.exceptions import FieldError 25 25 26 __all__ = ['Query', 'RawQuery' ]26 __all__ = ['Query', 'RawQuery', 'LockNotAvailable'] 27 27 28 29 class LockNotAvailable(DatabaseError): 30 ''' 31 Raised when a query fails because a lock was not available. 32 ''' 33 pass 34 35 28 36 class RawQuery(object): 29 37 """ 30 38 A single raw SQL query … … 83 91 return "<RawQuery: %r>" % (self.sql % self.params) 84 92 85 93 def _execute_query(self): 86 self.cursor = connections[self.using].cursor() 87 self.cursor.execute(self.sql, self.params) 94 connection = connections[self.using] 95 self.cursor = connection.cursor() 96 try: 97 self.cursor.execute(self.sql, self.params) 98 except DatabaseError, e: 99 if connection.features.has_select_for_update_nowait and connection.ops.signals_lock_not_available(e): 100 raise LockNotAvailable(*e.args) 101 raise 88 102 89 90 103 class Query(object): 91 104 """ 92 105 A single SQL query. … … 131 144 self.order_by = [] 132 145 self.low_mark, self.high_mark = 0, None # Used for offset/limit 133 146 self.distinct = False 147 self.select_for_update = False 148 self.select_for_update_nowait = False 134 149 self.select_related = False 135 150 self.related_select_cols = [] 136 151 … … 260 275 obj.order_by = self.order_by[:] 261 276 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 262 277 obj.distinct = self.distinct 278 obj.select_for_update = self.select_for_update 279 obj.select_for_update_nowait = self.select_for_update_nowait 263 280 obj.select_related = self.select_related 264 281 obj.related_select_cols = [] 265 282 obj.aggregates = deepcopy(self.aggregates, memo=memo) … … 366 383 367 384 query.clear_ordering(True) 368 385 query.clear_limits() 386 query.select_for_update = False 369 387 query.select_related = False 370 388 query.related_select_cols = [] 371 389 query.related_select_fields = [] -
django/db/models/manager.py
164 164 def order_by(self, *args, **kwargs): 165 165 return self.get_query_set().order_by(*args, **kwargs) 166 166 167 def select_for_update(self, *args, **kwargs): 168 return self.get_query_set().select_for_update(*args, **kwargs) 169 167 170 def select_related(self, *args, **kwargs): 168 171 return self.get_query_set().select_related(*args, **kwargs) 169 172 -
django/db/models/query.py
432 432 del_query._for_write = True 433 433 434 434 # Disable non-supported fields. 435 del_query.query.select_for_update = False 435 436 del_query.query.select_related = False 436 437 del_query.query.clear_ordering() 437 438 … … 580 581 else: 581 582 return self._filter_or_exclude(None, **filter_obj) 582 583 584 def select_for_update(self, **kwargs): 585 """ 586 Returns a new QuerySet instance that will select objects with a 587 FOR UPDATE lock. 588 """ 589 # Default to false for nowait 590 nowait = kwargs.pop('nowait', False) 591 obj = self._clone() 592 obj.query.select_for_update = True 593 obj.query.select_for_update_nowait = nowait 594 return obj 595 583 596 def select_related(self, *fields, **kwargs): 584 597 """ 585 598 Returns a new QuerySet instance that will select related objects. -
django/db/backends/mysql/base.py
23 23 raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__) 24 24 25 25 from MySQLdb.converters import conversions 26 from MySQLdb.constants import FIELD_TYPE, FLAG, CLIENT 26 from MySQLdb.constants import FIELD_TYPE, FLAG, CLIENT, ER 27 27 28 28 from django.db import utils 29 29 from django.db.backends import * … … 124 124 allows_group_by_pk = True 125 125 related_fields_match_type = True 126 126 allow_sliced_subqueries = False 127 has_select_for_update = True 128 has_select_for_update_nowait = False 127 129 supports_forward_references = False 128 130 supports_long_model_names = False 129 131 supports_microsecond_precision = False … … 135 137 136 138 class DatabaseOperations(BaseDatabaseOperations): 137 139 compiler_module = "django.db.backends.mysql.compiler" 140 signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK 138 141 139 142 def date_extract_sql(self, lookup_type, field_name): 140 143 # http://dev.mysql.com/doc/mysql/en/date-and-time-functions.html -
django/db/backends/oracle/base.py
48 48 needs_datetime_string_cast = False 49 49 interprets_empty_strings_as_nulls = True 50 50 uses_savepoints = True 51 has_select_for_update = True 52 has_select_for_update_nowait = True 51 53 can_return_id_from_insert = True 52 54 allow_sliced_subqueries = False 53 55 supports_subqueries_in_group_by = False … … 285 287 'column': column_name}) 286 288 return output 287 289 290 def signals_deadlock(self, exception): 291 return exception.args[0].code == 60 292 293 def signals_lock_not_available(self, exception): 294 return exception.args[0].code == 54 295 288 296 def start_transaction_sql(self): 289 297 return '' 290 298 -
django/db/backends/__init__.py
103 103 # integer primary keys. 104 104 related_fields_match_type = False 105 105 allow_sliced_subqueries = True 106 has_select_for_update = False 107 has_select_for_update_nowait = False 106 108 107 109 # Does the default test database allow multiple connections? 108 110 # Usually an indication that the test database is in-memory … … 282 284 """ 283 285 return [] 284 286 287 def for_update_sql(self, nowait=False): 288 """ 289 Return FOR UPDATE SQL clause to lock row for update 290 """ 291 if nowait: 292 nowaitstr = ' NOWAIT' 293 else: 294 nowaitstr = '' 295 return 'FOR UPDATE' + nowaitstr 296 285 297 def fulltext_search_sql(self, field_name): 286 298 """ 287 299 Returns the SQL WHERE clause to use in order to perform a full-text -
django/db/backends/postgresql_psycopg2/base.py
19 19 try: 20 20 import psycopg2 as Database 21 21 import psycopg2.extensions 22 from psycopg2 import errorcodes 22 23 except ImportError, e: 23 24 from django.core.exceptions import ImproperlyConfigured 24 25 raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e) … … 70 71 requires_rollback_on_dirty_transaction = True 71 72 has_real_datatype = True 72 73 can_defer_constraint_checks = True 74 has_select_for_update = True 75 has_select_for_update_nowait = True 76 73 77 74 78 class DatabaseOperations(PostgresqlDatabaseOperations): 79 80 def _pg_error(self, e, code): 81 return getattr(e, 'pgcode', None) == code 82 83 def signals_deadlock(self, e): 84 return self._pg_error(e, errorcodes.DEADLOCK_DETECTED) 85 86 def signals_lock_not_available(self, e): 87 return self._pg_error(e, errorcodes.LOCK_NOT_AVAILABLE) 88 75 89 def last_executed_query(self, cursor, sql, params): 76 90 # With psycopg2, cursor objects have a "query" attribute that is the 77 91 # exact query sent to the database. See docs here: -
tests/modeltests/select_for_update/tests.py
1 import threading 2 import time 3 from django.conf import settings 4 from django.db import connection 5 from django.db import transaction, connection 6 from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError 7 from django.test import TransactionTestCase, skipUnlessDBFeature 8 9 from models import Person 10 11 class SelectForUpdateTests(TransactionTestCase): 12 13 def setUp(self): 14 connection._rollback() 15 connection._enter_transaction_management(True) 16 self.new_connections = ConnectionHandler(settings.DATABASES) 17 self.person = Person.objects.create(name='Reinhardt') 18 19 # We need to set settings.DEBUG to True so we can capture 20 # the output SQL to examine. 21 self._old_debug = settings.DEBUG 22 settings.DEBUG = True 23 24 def tearDown(self): 25 connection._leave_transaction_management(True) 26 settings.DEBUG = self._old_debug 27 try: 28 self.end_blocking_transaction() 29 except (DatabaseError, AttributeError): 30 pass 31 32 def start_blocking_transaction(self): 33 self.new_connection = self.new_connections[DEFAULT_DB_ALIAS] 34 self.new_connection._enter_transaction_management(True) 35 self.cursor = self.new_connection.cursor() 36 sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { 37 'db_table': Person._meta.db_table, 38 'for_update': self.new_connection.ops.for_update_sql(), 39 } 40 self.cursor.execute(sql, ()) 41 result = self.cursor.fetchone() 42 43 def end_blocking_transaction(self): 44 self.new_connection._rollback() 45 self.new_connection.close() 46 self.new_connection._leave_transaction_management(True) 47 48 def has_for_update_sql(self, tested_connection, nowait=False): 49 for_update_sql = tested_connection.ops.for_update_sql(nowait) 50 sql = tested_connection.queries[-1]['sql'] 51 return bool(sql.find(for_update_sql) > -1) 52 53 def check_exc(self, exc): 54 self.failUnless(isinstance(exc, DatabaseError)) 55 56 @skipUnlessDBFeature('has_select_for_update') 57 def test_for_update_sql_generated(self): 58 """ 59 Test that the backend's FOR UPDATE variant appears in 60 generated SQL when select_for_update is invoked. 61 """ 62 list(Person.objects.all().select_for_update()) 63 self.assertTrue(self.has_for_update_sql(connection)) 64 65 @skipUnlessDBFeature('has_select_for_update_nowait') 66 def test_for_update_sql_generated_nowait(self): 67 """ 68 Test that the backend's FOR UPDATE NOWAIT variant appears in 69 generated SQL when select_for_update is invoked. 70 """ 71 list(Person.objects.all().select_for_update(nowait=True)) 72 self.assertTrue(self.has_for_update_sql(connection, nowait=True)) 73 74 @skipUnlessDBFeature('has_select_for_update_nowait') 75 def test_nowait_raises_error_on_block(self): 76 """ 77 If nowait is specified, we expect an error to be raised rather 78 than blocking. 79 """ 80 self.start_blocking_transaction() 81 status = [] 82 thread = threading.Thread( 83 target=self.run_select_for_update, 84 args=(status,), 85 kwargs={'nowait': True}, 86 ) 87 88 thread.start() 89 time.sleep(1) 90 thread.join() 91 self.end_blocking_transaction() 92 self.check_exc(status[-1]) 93 94 def run_select_for_update(self, status, nowait=False): 95 status.append('started') 96 try: 97 connection._rollback() 98 people = list(Person.objects.all().select_for_update(nowait=nowait)) 99 people[0].name = 'Fred' 100 people[0].save() 101 connection._commit() 102 except DatabaseError, e: 103 status.append(e) 104 except Exception, e: 105 raise 106 107 @skipUnlessDBFeature('has_select_for_update') 108 def test_block(self): 109 """ 110 Check that a thread running a select_for_update that 111 accesses rows being touched by a similar operation 112 on another connection blocks correctly. 113 """ 114 # First, let's start the transaction in our thread. 115 self.start_blocking_transaction() 116 117 # Now, try it again using the ORM's select_for_update 118 # facility. Do this in a separate thread. 119 status = [] 120 thread = threading.Thread(target=self.run_select_for_update, args=(status,)) 121 122 # The thread should immediately block, but we'll sleep 123 # for a bit to make sure 124 thread.start() 125 sanity_count = 0 126 while len(status) != 1 and sanity_count < 10: 127 sanity_count += 1 128 time.sleep(1) 129 if sanity_count >= 10: 130 raise ValueError, 'Thread did not run and block' 131 132 # Check the person hasn't been updated. Since this isn't 133 # using FOR UPDATE, it won't block. 134 p = Person.objects.get(pk=self.person.pk) 135 self.assertEqual('Reinhardt', p.name) 136 137 # When we end our blocking transaction, our thread should 138 # be able to continue. 139 self.end_blocking_transaction() 140 thread.join(5.0) 141 142 # Check the thread has finished. Assuming it has, we should 143 # find that it has updated the person's name. 144 self.failIf(thread.is_alive()) 145 p = Person.objects.get(pk=self.person.pk) 146 self.assertEqual('Fred', p.name) 147 148 @skipUnlessDBFeature('has_select_for_update') 149 def test_raw_lock_not_available(self): 150 """ 151 Check that running a raw query which can't obtain a FOR UPDATE lock 152 raises the correct exception 153 """ 154 self.start_blocking_transaction() 155 def raw(status): 156 try: 157 list( 158 Person.objects.raw( 159 'SELECT * FROM %s %s' % ( 160 Person._meta.db_table, 161 connection.ops.for_update_sql(nowait=True) 162 ) 163 ) 164 ) 165 except DatabaseError, e: 166 status.append(e) 167 status = [] 168 thread = threading.Thread(target=raw, kwargs={'status': status}) 169 thread.start() 170 time.sleep(1) 171 thread.join() 172 self.end_blocking_transaction() 173 self.check_exc(status[-1]) -
tests/modeltests/select_for_update/models.py
1 from django.db import models 2 3 class Person(models.Model): 4 name = models.CharField(max_length=30) 5 No newline at end of file -
AUTHORS
522 522 Gasper Zejn <zejn@kiberpipa.org> 523 523 Jarek Zgoda <jarek.zgoda@gmail.com> 524 524 Cheng Zhang 525 Dan Fairs <dan@fezconsulting.com> 525 526 526 527 A big THANK YOU goes to: 527 528 -
docs/ref/models/querysets.txt
975 975 # queries the database with the 'backup' alias 976 976 >>> Entry.objects.using('backup') 977 977 978 ``select_for_update(nowait=False)`` 979 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 980 981 Returns a queryset that will lock rows until the end of the transaction, 982 generating a SELECT ... FOR UPDATE statement on supported databases. 983 984 For example:: 985 986 entries = Entry.objects.select_for_update().filter(author=request.user) 987 988 All matched entries will be locked until the end of the transaction block, 989 meaning that other transactions will be prevented from changing or acquiring 990 locks on them. 991 992 Usually, if another transaction has already acquired a lock on one of the 993 selected rows, the query will block until the lock is released. If this is 994 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 995 make the call non-blocking. If a conflicting lock is already acquired by 996 another transaction, ``django.db.models.LockNotAvailable`` will be raised when 997 the queryset is evaluated. 998 999 Using blocking locks on a database can lead to deadlocks. This occurs when two 1000 concurrent transactions are both waiting on a lock the other transaction 1001 already holds. To deal with deadlocks, wrap your views that use 1002 ``select_for_update(nowait=False)`` with the 1003 ``django.views.decorators.deadlock.handle_deadlocks`` decorator. 1004 1005 For example:: 1006 1007 from django.db import transaction 1008 from django.views.decorators.deadlock import handle_deadlocks 1009 1010 @handle_deadlocks(max_retries=2) 1011 @transaction.commit_on_success 1012 def my_view(request): 1013 ... 1014 1015 If the database engine detects a deadlock involving ``my_view`` and decides 1016 to abort its transaction, it will be automatically retried. If deadlocks keep 1017 occurring after two repeated attempts, 1018 ``django.views.decorators.DeadlockError`` will be raised, which can be 1019 propagated to the user or handled in a middleware. 1020 1021 Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 1022 database backends support ``select_for_update()`` but MySQL has no 1023 support for the ``nowait`` argument. Other backends will simply 1024 generate queries as if ``select_for_update()`` had not been used. 978 1025 979 1026 Methods that do not return QuerySets 980 1027 ------------------------------------ -
docs/ref/databases.txt
362 362 column types have a maximum length restriction of 255 characters, regardless 363 363 of whether ``unique=True`` is specified or not. 364 364 365 Row locking with ``QuerySet.select_for_update()`` 366 ------------------------------------------------- 367 368 MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 369 statement. However, you may call the ``select_for_update()`` method of a 370 queryset with ``nowait=True``. In that case, the argument will be silently 371 discarded and the generated query will block until the requested lock can be 372 acquired. 373 365 374 .. _sqlite-notes: 366 375 367 376 SQLite notes