Ticket #2705: 2705-for_update-r15174.diff
File 2705-for_update-r15174.diff, 21.2 KB (added by , 14 years ago) |
---|
-
django/db/models/sql/compiler.py
1 1 from django.core.exceptions import FieldError 2 2 from django.db import connections 3 from django.db import transaction 3 4 from django.db.backends.util import truncate_name 4 5 from django.db.models.sql.constants import * 5 6 from django.db.models.sql.datastructures import EmptyResultSet 6 7 from django.db.models.sql.expressions import SQLEvaluator 7 8 from django.db.models.sql.query import get_proxied_model, get_order_dir, \ 8 9 select_related_descend, Query 10 from django.db.utils import DatabaseError 9 11 10 12 class SQLCompiler(object): 11 13 def __init__(self, query, connection, using): … … 117 119 result.append('LIMIT %d' % val) 118 120 result.append('OFFSET %d' % self.query.low_mark) 119 121 122 if self.query.select_for_update and self.connection.features.has_select_for_update: 123 # If we've been asked for a NOWAIT query but the backend does not support it, 124 # raise a DatabaseError otherwise we could get an unexpected deadlock. 125 nowait = self.query.select_for_update_nowait 126 if nowait and not self.connection.features.has_select_for_update_nowait: 127 raise DatabaseError('NOWAIT is not supported on this database backend.') 128 result.append(self.connection.ops.for_update_sql(nowait=nowait)) 129 120 130 return ' '.join(result), tuple(params) 121 131 122 132 def as_nested_sql(self): … … 677 687 resolve_columns = hasattr(self, 'resolve_columns') 678 688 fields = None 679 689 has_aggregate_select = bool(self.query.aggregate_select) 690 # Set transaction dirty if we're using SELECT FOR UPDATE to ensure 691 # a subsequent commit/rollback is executed, so any database locks 692 # are released. 693 if self.query.select_for_update and transaction.is_managed(self.using): 694 transaction.set_dirty(self.using) 680 695 for rows in self.execute_sql(MULTI): 681 696 for row in rows: 682 697 if resolve_columns: -
django/db/models/sql/query.py
131 131 self.order_by = [] 132 132 self.low_mark, self.high_mark = 0, None # Used for offset/limit 133 133 self.distinct = False 134 self.select_for_update = False 135 self.select_for_update_nowait = False 134 136 self.select_related = False 135 137 self.related_select_cols = [] 136 138 … … 260 262 obj.order_by = self.order_by[:] 261 263 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 262 264 obj.distinct = self.distinct 265 obj.select_for_update = self.select_for_update 266 obj.select_for_update_nowait = self.select_for_update_nowait 263 267 obj.select_related = self.select_related 264 268 obj.related_select_cols = [] 265 269 obj.aggregates = deepcopy(self.aggregates, memo=memo) … … 366 370 367 371 query.clear_ordering(True) 368 372 query.clear_limits() 373 query.select_for_update = False 369 374 query.select_related = False 370 375 query.related_select_cols = [] 371 376 query.related_select_fields = [] -
django/db/models/manager.py
164 164 def order_by(self, *args, **kwargs): 165 165 return self.get_query_set().order_by(*args, **kwargs) 166 166 167 def select_for_update(self, *args, **kwargs): 168 return self.get_query_set().select_for_update(*args, **kwargs) 169 167 170 def select_related(self, *args, **kwargs): 168 171 return self.get_query_set().select_related(*args, **kwargs) 169 172 -
django/db/models/query.py
436 436 del_query._for_write = True 437 437 438 438 # Disable non-supported fields. 439 del_query.query.select_for_update = False 439 440 del_query.query.select_related = False 440 441 del_query.query.clear_ordering() 441 442 … … 584 585 else: 585 586 return self._filter_or_exclude(None, **filter_obj) 586 587 588 def select_for_update(self, **kwargs): 589 """ 590 Returns a new QuerySet instance that will select objects with a 591 FOR UPDATE lock. 592 """ 593 # Default to false for nowait 594 nowait = kwargs.pop('nowait', False) 595 obj = self._clone() 596 obj.query.select_for_update = True 597 obj.query.select_for_update_nowait = nowait 598 return obj 599 587 600 def select_related(self, *fields, **kwargs): 588 601 """ 589 602 Returns a new QuerySet instance that will select related objects. -
django/db/backends/mysql/base.py
124 124 allows_group_by_pk = True 125 125 related_fields_match_type = True 126 126 allow_sliced_subqueries = False 127 has_select_for_update = True 128 has_select_for_update_nowait = False 127 129 supports_forward_references = False 128 130 supports_long_model_names = False 129 131 supports_microsecond_precision = False -
django/db/backends/oracle/base.py
70 70 needs_datetime_string_cast = False 71 71 interprets_empty_strings_as_nulls = True 72 72 uses_savepoints = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 73 75 can_return_id_from_insert = True 74 76 allow_sliced_subqueries = False 75 77 supports_subqueries_in_group_by = False -
django/db/backends/__init__.py
103 103 # integer primary keys. 104 104 related_fields_match_type = False 105 105 allow_sliced_subqueries = True 106 has_select_for_update = False 107 has_select_for_update_nowait = False 106 108 107 109 # Does the default test database allow multiple connections? 108 110 # Usually an indication that the test database is in-memory … … 291 293 """ 292 294 return [] 293 295 296 def for_update_sql(self, nowait=False): 297 """ 298 Returns the FOR UPDATE SQL clause to lock rows for an update operation. 299 """ 300 if nowait: 301 return 'FOR UPDATE NOWAIT' 302 else: 303 return 'FOR UPDATE' 304 294 305 def fulltext_search_sql(self, field_name): 295 306 """ 296 307 Returns the SQL WHERE clause to use in order to perform a full-text -
django/db/backends/postgresql_psycopg2/base.py
70 70 requires_rollback_on_dirty_transaction = True 71 71 has_real_datatype = True 72 72 can_defer_constraint_checks = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 75 73 76 74 77 class DatabaseOperations(PostgresqlDatabaseOperations): 75 78 def last_executed_query(self, cursor, sql, params): -
tests/modeltests/select_for_update/__init__.py
1 # -
tests/modeltests/select_for_update/tests.py
1 import time 2 from django.conf import settings 3 from django.db import transaction, connection 4 from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError 5 from django.test import (TransactionTestCase, skipIfDBFeature, 6 skipUnlessDBFeature) 7 from django.utils.functional import wraps 8 from django.utils import unittest 9 10 from models import Person 11 12 try: 13 import threading 14 def requires_threading(func): 15 return func 16 except ImportError: 17 # Note we can't use dummy_threading here, as our tests will actually 18 # block. We just have to skip the test completely. 19 def requires_threading(func): 20 @wraps(func) 21 def wrapped(*args, **kw): 22 raise unittest.SkipTest('threading required') 23 24 class SelectForUpdateTests(TransactionTestCase): 25 26 def setUp(self): 27 transaction.enter_transaction_management(True) 28 transaction.managed(True) 29 self.person = Person.objects.create(name='Reinhardt') 30 31 # We have to commit here so that code in run_select_for_update can 32 # see this data. 33 transaction.commit() 34 35 # We need another database connection to test that one connection 36 # issuing a SELECT ... FOR UPDATE will block. 37 new_connections = ConnectionHandler(settings.DATABASES) 38 self.new_connection = new_connections[DEFAULT_DB_ALIAS] 39 40 # We need to set settings.DEBUG to True so we can capture 41 # the output SQL to examine. 42 self._old_debug = settings.DEBUG 43 settings.DEBUG = True 44 45 def tearDown(self): 46 try: 47 # We don't really care if this fails - some of the tests will set 48 # this in the course of their run. 49 transaction.managed(False) 50 transaction.leave_transaction_management() 51 except transaction.TransactionManagementError: 52 pass 53 self.new_connection.close() 54 settings.DEBUG = self._old_debug 55 try: 56 self.end_blocking_transaction() 57 except (DatabaseError, AttributeError): 58 pass 59 60 def start_blocking_transaction(self): 61 # Start a blocking transaction. At some point, 62 # end_blocking_transaction() should be called. 63 self.cursor = self.new_connection.cursor() 64 sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { 65 'db_table': Person._meta.db_table, 66 'for_update': self.new_connection.ops.for_update_sql(), 67 } 68 self.cursor.execute(sql, ()) 69 result = self.cursor.fetchone() 70 71 def end_blocking_transaction(self): 72 # Roll back the blocking transaction. 73 self.new_connection._rollback() 74 75 def has_for_update_sql(self, tested_connection, nowait=False): 76 # Examine the SQL that was executed to determine whether it 77 # contains the 'SELECT..FOR UPDATE' stanza. 78 for_update_sql = tested_connection.ops.for_update_sql(nowait) 79 sql = tested_connection.queries[-1]['sql'] 80 return bool(sql.find(for_update_sql) > -1) 81 82 def check_exc(self, exc): 83 self.failUnless(isinstance(exc, DatabaseError)) 84 85 @skipUnlessDBFeature('has_select_for_update') 86 def test_for_update_sql_generated(self): 87 """ 88 Test that the backend's FOR UPDATE variant appears in 89 generated SQL when select_for_update is invoked. 90 """ 91 list(Person.objects.all().select_for_update()) 92 self.assertTrue(self.has_for_update_sql(connection)) 93 94 @skipUnlessDBFeature('has_select_for_update_nowait') 95 def test_for_update_sql_generated_nowait(self): 96 """ 97 Test that the backend's FOR UPDATE NOWAIT variant appears in 98 generated SQL when select_for_update is invoked. 99 """ 100 list(Person.objects.all().select_for_update(nowait=True)) 101 self.assertTrue(self.has_for_update_sql(connection, nowait=True)) 102 103 @requires_threading 104 @skipUnlessDBFeature('has_select_for_update_nowait') 105 def test_nowait_raises_error_on_block(self): 106 """ 107 If nowait is specified, we expect an error to be raised rather 108 than blocking. 109 """ 110 self.start_blocking_transaction() 111 status = [] 112 thread = threading.Thread( 113 target=self.run_select_for_update, 114 args=(status,), 115 kwargs={'nowait': True}, 116 ) 117 118 thread.start() 119 time.sleep(1) 120 thread.join() 121 self.end_blocking_transaction() 122 self.check_exc(status[-1]) 123 124 @skipIfDBFeature('has_select_for_update_nowait') 125 @skipUnlessDBFeature('has_select_for_update') 126 def test_unsupported_nowait_raises_error(self): 127 """ 128 If a SELECT...FOR UPDATE NOWAIT is run on a database backend 129 that supports FOR UPDATE but not NOWAIT, then we should find 130 that a DatabaseError is raised. 131 """ 132 self.assertRaises( 133 DatabaseError, 134 list, 135 Person.objects.all().select_for_update(nowait=True) 136 ) 137 138 def run_select_for_update(self, status, nowait=False): 139 """ 140 Utility method that runs a SELECT FOR UPDATE against all 141 Person instances. After the select_for_update, it attempts 142 to update the name of the only record, save, and commit. 143 144 In general, this will be run in a separate thread. 145 """ 146 status.append('started') 147 try: 148 # We need to enter transaction management again, as this is done on 149 # per-thread basis 150 transaction.enter_transaction_management(True) 151 transaction.managed(True) 152 people = list( 153 Person.objects.all().select_for_update(nowait=nowait) 154 ) 155 people[0].name = 'Fred' 156 people[0].save() 157 transaction.commit() 158 except DatabaseError, e: 159 status.append(e) 160 except Exception, e: 161 raise 162 163 @requires_threading 164 @skipUnlessDBFeature('has_select_for_update') 165 @skipUnlessDBFeature('supports_transactions') 166 def test_block(self): 167 """ 168 Check that a thread running a select_for_update that 169 accesses rows being touched by a similar operation 170 on another connection blocks correctly. 171 """ 172 # First, let's start the transaction in our thread. 173 self.start_blocking_transaction() 174 175 # Now, try it again using the ORM's select_for_update 176 # facility. Do this in a separate thread. 177 status = [] 178 thread = threading.Thread( 179 target=self.run_select_for_update, args=(status,) 180 ) 181 182 # The thread should immediately block, but we'll sleep 183 # for a bit to make sure. 184 thread.start() 185 sanity_count = 0 186 while len(status) != 1 and sanity_count < 10: 187 sanity_count += 1 188 time.sleep(1) 189 if sanity_count >= 10: 190 raise ValueError, 'Thread did not run and block' 191 192 # Check the person hasn't been updated. Since this isn't 193 # using FOR UPDATE, it won't block. 194 p = Person.objects.get(pk=self.person.pk) 195 self.assertEqual('Reinhardt', p.name) 196 197 # When we end our blocking transaction, our thread should 198 # be able to continue. 199 self.end_blocking_transaction() 200 thread.join(5.0) 201 202 # Check the thread has finished. Assuming it has, we should 203 # find that it has updated the person's name. 204 self.failIf(thread.isAlive()) 205 p = Person.objects.get(pk=self.person.pk) 206 self.assertEqual('Fred', p.name) 207 208 @requires_threading 209 @skipUnlessDBFeature('has_select_for_update') 210 def test_raw_lock_not_available(self): 211 """ 212 Check that running a raw query which can't obtain a FOR UPDATE lock 213 raises the correct exception 214 """ 215 self.start_blocking_transaction() 216 def raw(status): 217 try: 218 list( 219 Person.objects.raw( 220 'SELECT * FROM %s %s' % ( 221 Person._meta.db_table, 222 connection.ops.for_update_sql(nowait=True) 223 ) 224 ) 225 ) 226 except DatabaseError, e: 227 status.append(e) 228 status = [] 229 thread = threading.Thread(target=raw, kwargs={'status': status}) 230 thread.start() 231 time.sleep(1) 232 thread.join() 233 self.end_blocking_transaction() 234 self.check_exc(status[-1]) 235 236 @skipUnlessDBFeature('has_select_for_update') 237 def test_transaction_dirty_managed(self): 238 """ Check that a select_for_update sets the transaction to be 239 dirty when executed under txn management. Setting the txn dirty 240 means that it will be either committed or rolled back by Django, 241 which will release any locks held by the SELECT FOR UPDATE. 242 """ 243 people = list(Person.objects.select_for_update()) 244 self.assertTrue(transaction.is_dirty()) 245 246 @skipUnlessDBFeature('has_select_for_update') 247 def test_transaction_not_dirty_unmanaged(self): 248 """ If we're not under txn management, the txn will never be 249 marked as dirty. 250 """ 251 transaction.managed(False) 252 transaction.leave_transaction_management() 253 people = list(Person.objects.select_for_update()) 254 self.assertFalse(transaction.is_dirty()) -
tests/modeltests/select_for_update/models.py
1 from django.db import models 2 3 class Person(models.Model): 4 name = models.CharField(max_length=30) -
AUTHORS
165 165 eriks@win.tue.nl 166 166 Tomáš Ehrlich <tomas.ehrlich@gmail.com> 167 167 Dirk Eschler <dirk.eschler@gmx.net> 168 Dan Fairs <dan@fezconsulting.com> 168 169 Marc Fargas <telenieko@telenieko.com> 169 170 Szilveszter Farkas <szilveszter.farkas@gmail.com> 170 171 Grigory Fateyev <greg@dial.com.ru> -
docs/ref/models/querysets.txt
940 940 # queries the database with the 'backup' alias 941 941 >>> Entry.objects.using('backup') 942 942 943 select_for_update 944 ~~~~~~~~~~~~~~~~~ 943 945 946 .. method:: select_for_update(nowait=False) 947 948 .. versionadded:: 1.4 949 950 Returns a queryset that will lock rows until the end of the transaction, 951 generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases. 952 953 For example:: 954 955 entries = Entry.objects.select_for_update().filter(author=request.user) 956 957 All matched entries will be locked until the end of the transaction block, 958 meaning that other transactions will be prevented from changing or acquiring 959 locks on them. 960 961 Usually, if another transaction has already acquired a lock on one of the 962 selected rows, the query will block until the lock is released. If this is 963 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 964 make the call non-blocking. If a conflicting lock is already acquired by 965 another transaction, ``django.db.utils.DatabaseError`` will be raised when 966 the queryset is evaluated. 967 968 Note that using ``select_related`` will cause the current transaction to be set 969 dirty, if under transaction management. This is to ensure that Django issues a 970 ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR 971 UPDATE``. 972 973 Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 974 database backends support ``select_for_update()``. However, MySQL has no 975 support for the ``nowait`` argument. 976 977 Passing ``nowait=True`` to ``select_for_update`` using database backends that 978 do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be 979 raised. This is in order to prevent code unexpectedly blocking. 980 981 Using ``select_for_update`` on backends which do not support 982 ``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect. 983 984 944 985 Methods that do not return QuerySets 945 986 ------------------------------------ 946 987 -
docs/ref/databases.txt
368 368 :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField` 369 369 respectively, a ``ValueError`` is raised rather than truncating data. 370 370 371 Row locking with ``QuerySet.select_for_update()`` 372 ------------------------------------------------- 373 374 MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE`` 375 statement. However, you may call the ``select_for_update()`` method of a 376 queryset with ``nowait=True``. In that case, the argument will be silently 377 discarded and the generated query will block until the requested lock can be 378 acquired. 379 371 380 .. _sqlite-notes: 372 381 373 382 SQLite notes