Ticket #2705: for_update_r15007.diff
File for_update_r15007.diff, 19.2 KB (added by , 14 years ago) |
---|
-
django/db/models/sql/compiler.py
1 1 from django.core.exceptions import FieldError 2 2 from django.db import connections 3 from django.db import transaction 3 4 from django.db.backends.util import truncate_name 4 5 from django.db.models.sql.constants import * 5 6 from django.db.models.sql.datastructures import EmptyResultSet … … 117 118 result.append('LIMIT %d' % val) 118 119 result.append('OFFSET %d' % self.query.low_mark) 119 120 121 if self.query.select_for_update and self.connection.features.has_select_for_update: 122 nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update 123 result.append(self.connection.ops.for_update_sql(nowait=nowait)) 124 120 125 return ' '.join(result), tuple(params) 121 126 122 127 def as_nested_sql(self): … … 677 682 resolve_columns = hasattr(self, 'resolve_columns') 678 683 fields = None 679 684 has_aggregate_select = bool(self.query.aggregate_select) 685 # Set transaction dirty if we're using SELECT FOR UPDATE to ensure 686 # a subsequent commit/rollback is exectuted, so any database locks 687 # are released. 688 if self.query.select_for_update and transaction.is_managed(self.using): 689 transaction.set_dirty(self.using) 680 690 for rows in self.execute_sql(MULTI): 681 691 for row in rows: 682 692 if resolve_columns: -
django/db/models/sql/query.py
131 131 self.order_by = [] 132 132 self.low_mark, self.high_mark = 0, None # Used for offset/limit 133 133 self.distinct = False 134 self.select_for_update = False 135 self.select_for_update_nowait = False 134 136 self.select_related = False 135 137 self.related_select_cols = [] 136 138 … … 260 262 obj.order_by = self.order_by[:] 261 263 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 262 264 obj.distinct = self.distinct 265 obj.select_for_update = self.select_for_update 266 obj.select_for_update_nowait = self.select_for_update_nowait 263 267 obj.select_related = self.select_related 264 268 obj.related_select_cols = [] 265 269 obj.aggregates = deepcopy(self.aggregates, memo=memo) … … 366 370 367 371 query.clear_ordering(True) 368 372 query.clear_limits() 373 query.select_for_update = False 369 374 query.select_related = False 370 375 query.related_select_cols = [] 371 376 query.related_select_fields = [] -
django/db/models/manager.py
164 164 def order_by(self, *args, **kwargs): 165 165 return self.get_query_set().order_by(*args, **kwargs) 166 166 167 def select_for_update(self, *args, **kwargs): 168 return self.get_query_set().select_for_update(*args, **kwargs) 169 167 170 def select_related(self, *args, **kwargs): 168 171 return self.get_query_set().select_related(*args, **kwargs) 169 172 -
django/db/models/query.py
432 432 del_query._for_write = True 433 433 434 434 # Disable non-supported fields. 435 del_query.query.select_for_update = False 435 436 del_query.query.select_related = False 436 437 del_query.query.clear_ordering() 437 438 … … 580 581 else: 581 582 return self._filter_or_exclude(None, **filter_obj) 582 583 584 def select_for_update(self, **kwargs): 585 """ 586 Returns a new QuerySet instance that will select objects with a 587 FOR UPDATE lock. 588 """ 589 # Default to false for nowait 590 nowait = kwargs.pop('nowait', False) 591 obj = self._clone() 592 obj.query.select_for_update = True 593 obj.query.select_for_update_nowait = nowait 594 return obj 595 583 596 def select_related(self, *fields, **kwargs): 584 597 """ 585 598 Returns a new QuerySet instance that will select related objects. -
django/db/backends/mysql/base.py
124 124 allows_group_by_pk = True 125 125 related_fields_match_type = True 126 126 allow_sliced_subqueries = False 127 has_select_for_update = True 128 has_select_for_update_nowait = False 127 129 supports_forward_references = False 128 130 supports_long_model_names = False 129 131 supports_microsecond_precision = False -
django/db/backends/oracle/base.py
70 70 needs_datetime_string_cast = False 71 71 interprets_empty_strings_as_nulls = True 72 72 uses_savepoints = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 73 75 can_return_id_from_insert = True 74 76 allow_sliced_subqueries = False 75 77 supports_subqueries_in_group_by = False -
django/db/backends/__init__.py
103 103 # integer primary keys. 104 104 related_fields_match_type = False 105 105 allow_sliced_subqueries = True 106 has_select_for_update = False 107 has_select_for_update_nowait = False 106 108 107 109 # Does the default test database allow multiple connections? 108 110 # Usually an indication that the test database is in-memory … … 282 284 """ 283 285 return [] 284 286 287 def for_update_sql(self, nowait=False): 288 """ 289 Return FOR UPDATE SQL clause to lock row for update 290 """ 291 if nowait: 292 return 'FOR UPDATE NOWAIT' 293 else: 294 return 'FOR UPDATE' 295 285 296 def fulltext_search_sql(self, field_name): 286 297 """ 287 298 Returns the SQL WHERE clause to use in order to perform a full-text -
django/db/backends/postgresql_psycopg2/base.py
70 70 requires_rollback_on_dirty_transaction = True 71 71 has_real_datatype = True 72 72 can_defer_constraint_checks = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 75 73 76 74 77 class DatabaseOperations(PostgresqlDatabaseOperations): 75 78 def last_executed_query(self, cursor, sql, params): -
tests/modeltests/select_for_update/tests.py
1 import functools 2 import time 3 from django.conf import settings 4 from django.db import connection 5 from django.db import transaction, connection 6 from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError 7 from django.test import TransactionTestCase, skipUnlessDBFeature 8 from django.utils import unittest as ut2 9 10 from models import Person 11 12 try: 13 import threading 14 def requires_threading(func): 15 return func 16 except ImportError: 17 # Note we can't use dummy_threading here, as our tests will actually 18 # block. We just have to skip the test completely. 19 def requires_threading(func): 20 @functools.wraps(func) 21 def wrapped(*args, **kw): 22 raise ut2.SkipTest('threading required') 23 24 class SelectForUpdateTests(TransactionTestCase): 25 26 def setUp(self): 27 connection._rollback() 28 connection._enter_transaction_management(True) 29 self.new_connections = ConnectionHandler(settings.DATABASES) 30 self.person = Person.objects.create(name='Reinhardt') 31 32 # We need to set settings.DEBUG to True so we can capture 33 # the output SQL to examine. 34 self._old_debug = settings.DEBUG 35 settings.DEBUG = True 36 37 def tearDown(self): 38 connection._leave_transaction_management(True) 39 settings.DEBUG = self._old_debug 40 try: 41 self.end_blocking_transaction() 42 except (DatabaseError, AttributeError): 43 pass 44 45 def start_blocking_transaction(self): 46 # Start a blocking transaction. At some point, 47 # end_blocking_transaction() should be called. 48 self.new_connection = self.new_connections[DEFAULT_DB_ALIAS] 49 self.new_connection._enter_transaction_management(True) 50 self.cursor = self.new_connection.cursor() 51 sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { 52 'db_table': Person._meta.db_table, 53 'for_update': self.new_connection.ops.for_update_sql(), 54 } 55 self.cursor.execute(sql, ()) 56 result = self.cursor.fetchone() 57 58 def end_blocking_transaction(self): 59 # Roll back the blocking transaction. 60 self.new_connection._rollback() 61 self.new_connection.close() 62 self.new_connection._leave_transaction_management(True) 63 64 def has_for_update_sql(self, tested_connection, nowait=False): 65 # Examine the SQL that was executed to determine whether it 66 # contains the 'SELECT..FOR UPDATE' stanza. 67 for_update_sql = tested_connection.ops.for_update_sql(nowait) 68 sql = tested_connection.queries[-1]['sql'] 69 return bool(sql.find(for_update_sql) > -1) 70 71 def check_exc(self, exc): 72 self.failUnless(isinstance(exc, DatabaseError)) 73 74 @skipUnlessDBFeature('has_select_for_update') 75 def test_for_update_sql_generated(self): 76 """ 77 Test that the backend's FOR UPDATE variant appears in 78 generated SQL when select_for_update is invoked. 79 """ 80 list(Person.objects.all().select_for_update()) 81 self.assertTrue(self.has_for_update_sql(connection)) 82 83 @skipUnlessDBFeature('has_select_for_update_nowait') 84 def test_for_update_sql_generated_nowait(self): 85 """ 86 Test that the backend's FOR UPDATE NOWAIT variant appears in 87 generated SQL when select_for_update is invoked. 88 """ 89 list(Person.objects.all().select_for_update(nowait=True)) 90 self.assertTrue(self.has_for_update_sql(connection, nowait=True)) 91 92 @requires_threading 93 @skipUnlessDBFeature('has_select_for_update_nowait') 94 def test_nowait_raises_error_on_block(self): 95 """ 96 If nowait is specified, we expect an error to be raised rather 97 than blocking. 98 """ 99 self.start_blocking_transaction() 100 status = [] 101 thread = threading.Thread( 102 target=self.run_select_for_update, 103 args=(status,), 104 kwargs={'nowait': True}, 105 ) 106 107 thread.start() 108 time.sleep(1) 109 thread.join() 110 self.end_blocking_transaction() 111 self.check_exc(status[-1]) 112 113 def run_select_for_update(self, status, nowait=False): 114 status.append('started') 115 try: 116 connection._rollback() 117 people = list(Person.objects.all().select_for_update(nowait=nowait)) 118 people[0].name = 'Fred' 119 people[0].save() 120 connection._commit() 121 except DatabaseError, e: 122 status.append(e) 123 except Exception, e: 124 raise 125 126 @requires_threading 127 @skipUnlessDBFeature('has_select_for_update') 128 def test_block(self): 129 """ 130 Check that a thread running a select_for_update that 131 accesses rows being touched by a similar operation 132 on another connection blocks correctly. 133 """ 134 # First, let's start the transaction in our thread. 135 self.start_blocking_transaction() 136 137 # Now, try it again using the ORM's select_for_update 138 # facility. Do this in a separate thread. 139 status = [] 140 thread = threading.Thread(target=self.run_select_for_update, args=(status,)) 141 142 # The thread should immediately block, but we'll sleep 143 # for a bit to make sure 144 thread.start() 145 sanity_count = 0 146 while len(status) != 1 and sanity_count < 10: 147 sanity_count += 1 148 time.sleep(1) 149 if sanity_count >= 10: 150 raise ValueError, 'Thread did not run and block' 151 152 # Check the person hasn't been updated. Since this isn't 153 # using FOR UPDATE, it won't block. 154 p = Person.objects.get(pk=self.person.pk) 155 self.assertEqual('Reinhardt', p.name) 156 157 # When we end our blocking transaction, our thread should 158 # be able to continue. 159 self.end_blocking_transaction() 160 thread.join(5.0) 161 162 # Check the thread has finished. Assuming it has, we should 163 # find that it has updated the person's name. 164 self.failIf(thread.is_alive()) 165 p = Person.objects.get(pk=self.person.pk) 166 self.assertEqual('Fred', p.name) 167 168 @requires_threading 169 @skipUnlessDBFeature('has_select_for_update') 170 def test_raw_lock_not_available(self): 171 """ 172 Check that running a raw query which can't obtain a FOR UPDATE lock 173 raises the correct exception 174 """ 175 self.start_blocking_transaction() 176 def raw(status): 177 try: 178 list( 179 Person.objects.raw( 180 'SELECT * FROM %s %s' % ( 181 Person._meta.db_table, 182 connection.ops.for_update_sql(nowait=True) 183 ) 184 ) 185 ) 186 except DatabaseError, e: 187 status.append(e) 188 status = [] 189 thread = threading.Thread(target=raw, kwargs={'status': status}) 190 thread.start() 191 time.sleep(1) 192 thread.join() 193 self.end_blocking_transaction() 194 self.check_exc(status[-1]) 195 196 @skipUnlessDBFeature('has_select_for_update') 197 def test_transaction_dirty_managed(self): 198 """ Check that a select_for_update sets the transaction to be 199 dirty when executed under txn management. Setting the txn dirty 200 means that it will be either committed or rolled back by Django, 201 which will release any locks held by the SELECT FOR UPDATE. 202 """ 203 transaction.enter_transaction_management(True) 204 transaction.managed(True) 205 try: 206 people = list(Person.objects.select_for_update()) 207 self.assertTrue(transaction.is_dirty()) 208 finally: 209 transaction.rollback() 210 transaction.leave_transaction_management() 211 212 @skipUnlessDBFeature('has_select_for_update') 213 def test_transaction_not_dirty_unmanaged(self): 214 """ If we're not under txn management, the txn will never be 215 marked as dirty. 216 """ 217 people = list(Person.objects.select_for_update()) 218 self.assertFalse(transaction.is_dirty()) -
tests/modeltests/select_for_update/models.py
1 from django.db import models 2 3 class Person(models.Model): 4 name = models.CharField(max_length=30) 5 No newline at end of file -
AUTHORS
524 524 Gasper Zejn <zejn@kiberpipa.org> 525 525 Jarek Zgoda <jarek.zgoda@gmail.com> 526 526 Cheng Zhang 527 Dan Fairs <dan@fezconsulting.com> 527 528 528 529 A big THANK YOU goes to: 529 530 -
docs/ref/models/querysets.txt
975 975 # queries the database with the 'backup' alias 976 976 >>> Entry.objects.using('backup') 977 977 978 select_for_update 979 ~~~~~~~~~~~~~~~~~ 978 980 981 .. method:: select_for_update(nowait=False) 982 983 .. versionadded:: 1.3 984 985 Returns a queryset that will lock rows until the end of the transaction, 986 generating a SELECT ... FOR UPDATE statement on supported databases. 987 988 For example:: 989 990 entries = Entry.objects.select_for_update().filter(author=request.user) 991 992 All matched entries will be locked until the end of the transaction block, 993 meaning that other transactions will be prevented from changing or acquiring 994 locks on them. 995 996 Usually, if another transaction has already acquired a lock on one of the 997 selected rows, the query will block until the lock is released. If this is 998 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 999 make the call non-blocking. If a conflicting lock is already acquired by 1000 another transaction, ``django.db.utils.DatabaseError`` will be raised when 1001 the queryset is evaluated. 1002 1003 Note that using ``select_related`` will cause the current transaction to be 1004 set dirty, if under transaction management. This is to ensure that Django 1005 issues a COMMIT or ROLLBACK, releasing any locks held by the SELECT FOR UPDATE. 1006 1007 Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 1008 database backends support ``select_for_update()`` but MySQL has no 1009 support for the ``nowait`` argument. Other backends will simply 1010 generate queries as if ``select_for_update()`` had not been used. 1011 979 1012 Methods that do not return QuerySets 980 1013 ------------------------------------ 981 1014 … … 1253 1286 the only restriction on the :class:`QuerySet` that is updated is that it can 1254 1287 only update columns in the model's main table. Filtering based on related 1255 1288 fields is still possible. You cannot call ``update()`` on a 1256 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 1289 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 1257 1290 filtered. 1258 1291 1259 1292 For example, if you wanted to update all the entries in a particular blog -
docs/ref/databases.txt
364 364 column types have a maximum length restriction of 255 characters, regardless 365 365 of whether ``unique=True`` is specified or not. 366 366 367 Row locking with ``QuerySet.select_for_update()`` 368 ------------------------------------------------- 369 370 MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 371 statement. However, you may call the ``select_for_update()`` method of a 372 queryset with ``nowait=True``. In that case, the argument will be silently 373 discarded and the generated query will block until the requested lock can be 374 acquired. 375 367 376 .. _sqlite-notes: 368 377 369 378 SQLite notes