Ticket #2705: 2705-for_update-r16053.diff
File 2705-for_update-r16053.diff, 22.1 KB (added by , 14 years ago) |
---|
-
django/AUTHORS
168 168 eriks@win.tue.nl 169 169 Tomáš Ehrlich <tomas.ehrlich@gmail.com> 170 170 Dirk Eschler <dirk.eschler@gmx.net> 171 Dan Fairs <dan@fezconsulting.com> 171 172 Marc Fargas <telenieko@telenieko.com> 172 173 Szilveszter Farkas <szilveszter.farkas@gmail.com> 173 174 Grigory Fateyev <greg@dial.com.ru> -
django/docs/ref/models/querysets.txt
966 966 # queries the database with the 'backup' alias 967 967 >>> Entry.objects.using('backup') 968 968 969 select_for_update 970 ~~~~~~~~~~~~~~~~~ 969 971 972 .. method:: select_for_update(nowait=False) 973 974 .. versionadded:: 1.4 975 976 Returns a queryset that will lock rows until the end of the transaction, 977 generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases. 978 979 For example:: 980 981 entries = Entry.objects.select_for_update().filter(author=request.user) 982 983 All matched entries will be locked until the end of the transaction block, 984 meaning that other transactions will be prevented from changing or acquiring 985 locks on them. 986 987 Usually, if another transaction has already acquired a lock on one of the 988 selected rows, the query will block until the lock is released. If this is 989 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 990 make the call non-blocking. If a conflicting lock is already acquired by 991 another transaction, ``django.db.utils.DatabaseError`` will be raised when 992 the queryset is evaluated. 993 994 Note that using ``select_for_update`` will cause the current transaction to be set 995 dirty, if under transaction management. This is to ensure that Django issues a 996 ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR 997 UPDATE``. 998 999 Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 1000 database backends support ``select_for_update()``. However, MySQL has no 1001 support for the ``nowait`` argument. 1002 1003 Passing ``nowait=True`` to ``select_for_update`` using database backends that 1004 do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be 1005 raised. This is in order to prevent code unexpectedly blocking. 1006 1007 Using ``select_for_update`` on backends which do not support 1008 ``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect. 1009 1010 970 1011 Methods that do not return QuerySets 971 1012 ------------------------------------ 972 1013 -
django/docs/ref/databases.txt
359 359 :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField` 360 360 respectively, a ``ValueError`` is raised rather than truncating data. 361 361 362 Row locking with ``QuerySet.select_for_update()`` 363 ------------------------------------------------- 364 365 MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE`` 366 statement. If ``select_for_update()`` is used with ``nowait=True`` then a 367 ``DatabaseError`` will be raised. 368 362 369 .. _sqlite-notes: 363 370 364 371 SQLite notes … … 493 500 This will simply make SQLite wait a bit longer before throwing "database 494 501 is locked" errors; it won't really do anything to solve them. 495 502 503 ``QuerySet.select_for_update()`` not supported 504 ---------------------------------------------- 505 506 SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will 507 have no effect. 508 496 509 .. _oracle-notes: 497 510 498 511 Oracle notes -
django/tests/modeltests/select_for_update/__init__.py
1 # -
django/tests/modeltests/select_for_update/tests.py
1 import sys 2 import time 3 import unittest 4 from django.conf import settings 5 from django.db import transaction, connection 6 from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError 7 from django.test import (TransactionTestCase, skipIfDBFeature, 8 skipUnlessDBFeature) 9 from django.utils.functional import wraps 10 from django.utils import unittest 11 12 from models import Person 13 14 try: 15 import threading 16 def requires_threading(func): 17 return func 18 except ImportError: 19 # Note we can't use dummy_threading here, as our tests will actually 20 # block. We just have to skip the test completely. 21 def requires_threading(func): 22 @wraps(func) 23 def wrapped(*args, **kw): 24 raise unittest.SkipTest('threading required') 25 26 class SelectForUpdateTests(TransactionTestCase): 27 28 def setUp(self): 29 transaction.enter_transaction_management(True) 30 transaction.managed(True) 31 self.person = Person.objects.create(name='Reinhardt') 32 33 # We have to commit here so that code in run_select_for_update can 34 # see this data. 35 transaction.commit() 36 37 # We need another database connection to test that one connection 38 # issuing a SELECT ... FOR UPDATE will block. 39 new_connections = ConnectionHandler(settings.DATABASES) 40 self.new_connection = new_connections[DEFAULT_DB_ALIAS] 41 42 # We need to set settings.DEBUG to True so we can capture 43 # the output SQL to examine. 44 self._old_debug = settings.DEBUG 45 settings.DEBUG = True 46 47 def tearDown(self): 48 try: 49 # We don't really care if this fails - some of the tests will set 50 # this in the course of their run. 51 transaction.managed(False) 52 transaction.leave_transaction_management() 53 except transaction.TransactionManagementError: 54 pass 55 self.new_connection.close() 56 settings.DEBUG = self._old_debug 57 try: 58 self.end_blocking_transaction() 59 except (DatabaseError, AttributeError): 60 pass 61 62 def start_blocking_transaction(self): 63 # Start a blocking transaction. At some point, 64 # end_blocking_transaction() should be called. 65 self.cursor = self.new_connection.cursor() 66 sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { 67 'db_table': Person._meta.db_table, 68 'for_update': self.new_connection.ops.for_update_sql(), 69 } 70 self.cursor.execute(sql, ()) 71 result = self.cursor.fetchone() 72 73 def end_blocking_transaction(self): 74 # Roll back the blocking transaction. 75 self.new_connection._rollback() 76 77 def has_for_update_sql(self, tested_connection, nowait=False): 78 # Examine the SQL that was executed to determine whether it 79 # contains the 'SELECT..FOR UPDATE' stanza. 80 for_update_sql = tested_connection.ops.for_update_sql(nowait) 81 sql = tested_connection.queries[-1]['sql'] 82 return bool(sql.find(for_update_sql) > -1) 83 84 def check_exc(self, exc): 85 self.failUnless(isinstance(exc, DatabaseError)) 86 87 @skipUnlessDBFeature('has_select_for_update') 88 def test_for_update_sql_generated(self): 89 """ 90 Test that the backend's FOR UPDATE variant appears in 91 generated SQL when select_for_update is invoked. 92 """ 93 list(Person.objects.all().select_for_update()) 94 self.assertTrue(self.has_for_update_sql(connection)) 95 96 @skipUnlessDBFeature('has_select_for_update_nowait') 97 def test_for_update_sql_generated_nowait(self): 98 """ 99 Test that the backend's FOR UPDATE NOWAIT variant appears in 100 generated SQL when select_for_update is invoked. 101 """ 102 list(Person.objects.all().select_for_update(nowait=True)) 103 self.assertTrue(self.has_for_update_sql(connection, nowait=True)) 104 105 # In Python 2.6 beta and some final releases, exceptions raised in __len__ 106 # are swallowed (Python issue 1242657), so these cases return an empty 107 # list, rather than raising an exception. Not a lot we can do about that, 108 # unfortunately, due to the way Python handles list() calls internally. 109 # Thus, we skip this test for Python 2.6. 110 @requires_threading 111 @skipUnlessDBFeature('has_select_for_update_nowait') 112 @unittest.skipIf(sys.version_info[:2] == (2, 6), "Python version is 2.6") 113 def test_nowait_raises_error_on_block(self): 114 """ 115 If nowait is specified, we expect an error to be raised rather 116 than blocking. 117 """ 118 self.start_blocking_transaction() 119 status = [] 120 thread = threading.Thread( 121 target=self.run_select_for_update, 122 args=(status,), 123 kwargs={'nowait': True}, 124 ) 125 126 thread.start() 127 time.sleep(1) 128 thread.join() 129 self.end_blocking_transaction() 130 self.check_exc(status[-1]) 131 132 @skipIfDBFeature('has_select_for_update_nowait') 133 @skipUnlessDBFeature('has_select_for_update') 134 def test_unsupported_nowait_raises_error(self): 135 """ 136 If a SELECT...FOR UPDATE NOWAIT is run on a database backend 137 that supports FOR UPDATE but not NOWAIT, then we should find 138 that a DatabaseError is raised. 139 """ 140 self.assertRaises( 141 DatabaseError, 142 list, 143 Person.objects.all().select_for_update(nowait=True) 144 ) 145 146 def run_select_for_update(self, status, nowait=False): 147 """ 148 Utility method that runs a SELECT FOR UPDATE against all 149 Person instances. After the select_for_update, it attempts 150 to update the name of the only record, save, and commit. 151 152 In general, this will be run in a separate thread. 153 """ 154 status.append('started') 155 try: 156 # We need to enter transaction management again, as this is done on 157 # per-thread basis 158 transaction.enter_transaction_management(True) 159 transaction.managed(True) 160 people = list( 161 Person.objects.all().select_for_update(nowait=nowait) 162 ) 163 people[0].name = 'Fred' 164 people[0].save() 165 transaction.commit() 166 except DatabaseError, e: 167 status.append(e) 168 except Exception, e: 169 raise 170 171 @requires_threading 172 @skipUnlessDBFeature('has_select_for_update') 173 @skipUnlessDBFeature('supports_transactions') 174 def test_block(self): 175 """ 176 Check that a thread running a select_for_update that 177 accesses rows being touched by a similar operation 178 on another connection blocks correctly. 179 """ 180 # First, let's start the transaction in our thread. 181 self.start_blocking_transaction() 182 183 # Now, try it again using the ORM's select_for_update 184 # facility. Do this in a separate thread. 185 status = [] 186 thread = threading.Thread( 187 target=self.run_select_for_update, args=(status,) 188 ) 189 190 # The thread should immediately block, but we'll sleep 191 # for a bit to make sure. 192 thread.start() 193 sanity_count = 0 194 while len(status) != 1 and sanity_count < 10: 195 sanity_count += 1 196 time.sleep(1) 197 if sanity_count >= 10: 198 raise ValueError, 'Thread did not run and block' 199 200 # Check the person hasn't been updated. Since this isn't 201 # using FOR UPDATE, it won't block. 202 p = Person.objects.get(pk=self.person.pk) 203 self.assertEqual('Reinhardt', p.name) 204 205 # When we end our blocking transaction, our thread should 206 # be able to continue. 207 self.end_blocking_transaction() 208 thread.join(5.0) 209 210 # Check the thread has finished. Assuming it has, we should 211 # find that it has updated the person's name. 212 self.failIf(thread.isAlive()) 213 p = Person.objects.get(pk=self.person.pk) 214 self.assertEqual('Fred', p.name) 215 216 @requires_threading 217 @skipUnlessDBFeature('has_select_for_update') 218 def test_raw_lock_not_available(self): 219 """ 220 Check that running a raw query which can't obtain a FOR UPDATE lock 221 raises the correct exception 222 """ 223 self.start_blocking_transaction() 224 def raw(status): 225 try: 226 list( 227 Person.objects.raw( 228 'SELECT * FROM %s %s' % ( 229 Person._meta.db_table, 230 connection.ops.for_update_sql(nowait=True) 231 ) 232 ) 233 ) 234 except DatabaseError, e: 235 status.append(e) 236 status = [] 237 thread = threading.Thread(target=raw, kwargs={'status': status}) 238 thread.start() 239 time.sleep(1) 240 thread.join() 241 self.end_blocking_transaction() 242 self.check_exc(status[-1]) 243 244 @skipUnlessDBFeature('has_select_for_update') 245 def test_transaction_dirty_managed(self): 246 """ Check that a select_for_update sets the transaction to be 247 dirty when executed under txn management. Setting the txn dirty 248 means that it will be either committed or rolled back by Django, 249 which will release any locks held by the SELECT FOR UPDATE. 250 """ 251 people = list(Person.objects.select_for_update()) 252 self.assertTrue(transaction.is_dirty()) 253 254 @skipUnlessDBFeature('has_select_for_update') 255 def test_transaction_not_dirty_unmanaged(self): 256 """ If we're not under txn management, the txn will never be 257 marked as dirty. 258 """ 259 transaction.managed(False) 260 transaction.leave_transaction_management() 261 people = list(Person.objects.select_for_update()) 262 self.assertFalse(transaction.is_dirty()) -
django/tests/modeltests/select_for_update/models.py
1 from django.db import models 2 3 class Person(models.Model): 4 name = models.CharField(max_length=30) -
django/django/db/models/sql/compiler.py
1 1 from django.core.exceptions import FieldError 2 2 from django.db import connections 3 from django.db import transaction 3 4 from django.db.backends.util import truncate_name 4 5 from django.db.models.sql.constants import * 5 6 from django.db.models.sql.datastructures import EmptyResultSet 6 7 from django.db.models.sql.expressions import SQLEvaluator 7 8 from django.db.models.sql.query import get_proxied_model, get_order_dir, \ 8 9 select_related_descend, Query 10 from django.db.utils import DatabaseError 9 11 10 12 class SQLCompiler(object): 11 13 def __init__(self, query, connection, using): … … 117 119 result.append('LIMIT %d' % val) 118 120 result.append('OFFSET %d' % self.query.low_mark) 119 121 122 if self.query.select_for_update and self.connection.features.has_select_for_update: 123 # If we've been asked for a NOWAIT query but the backend does not support it, 124 # raise a DatabaseError otherwise we could get an unexpected deadlock. 125 nowait = self.query.select_for_update_nowait 126 if nowait and not self.connection.features.has_select_for_update_nowait: 127 raise DatabaseError('NOWAIT is not supported on this database backend.') 128 result.append(self.connection.ops.for_update_sql(nowait=nowait)) 129 120 130 return ' '.join(result), tuple(params) 121 131 122 132 def as_nested_sql(self): … … 677 687 resolve_columns = hasattr(self, 'resolve_columns') 678 688 fields = None 679 689 has_aggregate_select = bool(self.query.aggregate_select) 690 # Set transaction dirty if we're using SELECT FOR UPDATE to ensure 691 # a subsequent commit/rollback is executed, so any database locks 692 # are released. 693 if self.query.select_for_update and transaction.is_managed(self.using): 694 transaction.set_dirty(self.using) 680 695 for rows in self.execute_sql(MULTI): 681 696 for row in rows: 682 697 if resolve_columns: -
django/django/db/models/sql/query.py
125 125 self.order_by = [] 126 126 self.low_mark, self.high_mark = 0, None # Used for offset/limit 127 127 self.distinct = False 128 self.select_for_update = False 129 self.select_for_update_nowait = False 128 130 self.select_related = False 129 131 self.related_select_cols = [] 130 132 … … 254 256 obj.order_by = self.order_by[:] 255 257 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 256 258 obj.distinct = self.distinct 259 obj.select_for_update = self.select_for_update 260 obj.select_for_update_nowait = self.select_for_update_nowait 257 261 obj.select_related = self.select_related 258 262 obj.related_select_cols = [] 259 263 obj.aggregates = copy.deepcopy(self.aggregates, memo=memo) … … 360 364 361 365 query.clear_ordering(True) 362 366 query.clear_limits() 367 query.select_for_update = False 363 368 query.select_related = False 364 369 query.related_select_cols = [] 365 370 query.related_select_fields = [] -
django/django/db/models/manager.py
164 164 def order_by(self, *args, **kwargs): 165 165 return self.get_query_set().order_by(*args, **kwargs) 166 166 167 def select_for_update(self, *args, **kwargs): 168 return self.get_query_set().select_for_update(*args, **kwargs) 169 167 170 def select_related(self, *args, **kwargs): 168 171 return self.get_query_set().select_related(*args, **kwargs) 169 172 -
django/django/db/models/query.py
435 435 del_query._for_write = True 436 436 437 437 # Disable non-supported fields. 438 del_query.query.select_for_update = False 438 439 del_query.query.select_related = False 439 440 del_query.query.clear_ordering() 440 441 … … 583 584 else: 584 585 return self._filter_or_exclude(None, **filter_obj) 585 586 587 def select_for_update(self, **kwargs): 588 """ 589 Returns a new QuerySet instance that will select objects with a 590 FOR UPDATE lock. 591 """ 592 # Default to false for nowait 593 nowait = kwargs.pop('nowait', False) 594 obj = self._clone() 595 obj.query.select_for_update = True 596 obj.query.select_for_update_nowait = nowait 597 return obj 598 586 599 def select_related(self, *fields, **kwargs): 587 600 """ 588 601 Returns a new QuerySet instance that will select related objects. -
django/django/db/backends/mysql/base.py
124 124 allows_group_by_pk = True 125 125 related_fields_match_type = True 126 126 allow_sliced_subqueries = False 127 has_select_for_update = True 128 has_select_for_update_nowait = False 127 129 supports_forward_references = False 128 130 supports_long_model_names = False 129 131 supports_microsecond_precision = False -
django/django/db/backends/oracle/base.py
70 70 needs_datetime_string_cast = False 71 71 interprets_empty_strings_as_nulls = True 72 72 uses_savepoints = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 73 75 can_return_id_from_insert = True 74 76 allow_sliced_subqueries = False 75 77 supports_subqueries_in_group_by = False -
django/django/db/backends/__init__.py
279 279 # integer primary keys. 280 280 related_fields_match_type = False 281 281 allow_sliced_subqueries = True 282 has_select_for_update = False 283 has_select_for_update_nowait = False 282 284 283 285 # Does the default test database allow multiple connections? 284 286 # Usually an indication that the test database is in-memory … … 476 478 """ 477 479 return [] 478 480 481 def for_update_sql(self, nowait=False): 482 """ 483 Returns the FOR UPDATE SQL clause to lock rows for an update operation. 484 """ 485 if nowait: 486 return 'FOR UPDATE NOWAIT' 487 else: 488 return 'FOR UPDATE' 489 479 490 def fulltext_search_sql(self, field_name): 480 491 """ 481 492 Returns the SQL WHERE clause to use in order to perform a full-text -
django/django/db/backends/postgresql_psycopg2/base.py
70 70 requires_rollback_on_dirty_transaction = True 71 71 has_real_datatype = True 72 72 can_defer_constraint_checks = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 75 73 76 74 77 class DatabaseWrapper(BaseDatabaseWrapper): 75 78 vendor = 'postgresql'