Ticket #2705: 2705-for_update-r16022.diff
File 2705-for_update-r16022.diff, 21.7 KB (added by , 14 years ago) |
---|
-
django/AUTHORS
168 168 eriks@win.tue.nl 169 169 Tomáš Ehrlich <tomas.ehrlich@gmail.com> 170 170 Dirk Eschler <dirk.eschler@gmx.net> 171 Dan Fairs <dan@fezconsulting.com> 171 172 Marc Fargas <telenieko@telenieko.com> 172 173 Szilveszter Farkas <szilveszter.farkas@gmail.com> 173 174 Grigory Fateyev <greg@dial.com.ru> -
django/docs/ref/models/querysets.txt
966 966 # queries the database with the 'backup' alias 967 967 >>> Entry.objects.using('backup') 968 968 969 select_for_update 970 ~~~~~~~~~~~~~~~~~ 969 971 972 .. method:: select_for_update(nowait=False) 973 974 .. versionadded:: 1.4 975 976 Returns a queryset that will lock rows until the end of the transaction, 977 generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases. 978 979 For example:: 980 981 entries = Entry.objects.select_for_update().filter(author=request.user) 982 983 All matched entries will be locked until the end of the transaction block, 984 meaning that other transactions will be prevented from changing or acquiring 985 locks on them. 986 987 Usually, if another transaction has already acquired a lock on one of the 988 selected rows, the query will block until the lock is released. If this is 989 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 990 make the call non-blocking. If a conflicting lock is already acquired by 991 another transaction, ``django.db.utils.DatabaseError`` will be raised when 992 the queryset is evaluated. 993 994 Note that using ``select_for_update`` will cause the current transaction to be set 995 dirty, if under transaction management. This is to ensure that Django issues a 996 ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR 997 UPDATE``. 998 999 Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 1000 database backends support ``select_for_update()``. However, MySQL has no 1001 support for the ``nowait`` argument. 1002 1003 Passing ``nowait=True`` to ``select_for_update`` using database backends that 1004 do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be 1005 raised. This is in order to prevent code unexpectedly blocking. 1006 1007 Using ``select_for_update`` on backends which do not support 1008 ``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect. 1009 1010 970 1011 Methods that do not return QuerySets 971 1012 ------------------------------------ 972 1013 -
django/docs/ref/databases.txt
359 359 :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField` 360 360 respectively, a ``ValueError`` is raised rather than truncating data. 361 361 362 Row locking with ``QuerySet.select_for_update()`` 363 ------------------------------------------------- 364 365 MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE`` 366 statement. If ``select_for_update()`` is used with ``nowait=True`` then a 367 ``DatabaseError`` will be raised. 368 362 369 .. _sqlite-notes: 363 370 364 371 SQLite notes … … 493 500 This will simply make SQLite wait a bit longer before throwing "database 494 501 is locked" errors; it won't really do anything to solve them. 495 502 503 ``QuerySet.select_for_update()`` not supported 504 ---------------------------------------------- 505 506 SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will 507 have no effect. 508 496 509 .. _oracle-notes: 497 510 498 511 Oracle notes -
django/django/db/models/sql/compiler.py
1 1 from django.core.exceptions import FieldError 2 2 from django.db import connections 3 from django.db import transaction 3 4 from django.db.backends.util import truncate_name 4 5 from django.db.models.sql.constants import * 5 6 from django.db.models.sql.datastructures import EmptyResultSet 6 7 from django.db.models.sql.expressions import SQLEvaluator 7 8 from django.db.models.sql.query import get_proxied_model, get_order_dir, \ 8 9 select_related_descend, Query 10 from django.db.utils import DatabaseError 9 11 10 12 class SQLCompiler(object): 11 13 def __init__(self, query, connection, using): … … 117 119 result.append('LIMIT %d' % val) 118 120 result.append('OFFSET %d' % self.query.low_mark) 119 121 122 if self.query.select_for_update and self.connection.features.has_select_for_update: 123 # If we've been asked for a NOWAIT query but the backend does not support it, 124 # raise a DatabaseError otherwise we could get an unexpected deadlock. 125 nowait = self.query.select_for_update_nowait 126 if nowait and not self.connection.features.has_select_for_update_nowait: 127 raise DatabaseError('NOWAIT is not supported on this database backend.') 128 result.append(self.connection.ops.for_update_sql(nowait=nowait)) 129 120 130 return ' '.join(result), tuple(params) 121 131 122 132 def as_nested_sql(self): … … 677 687 resolve_columns = hasattr(self, 'resolve_columns') 678 688 fields = None 679 689 has_aggregate_select = bool(self.query.aggregate_select) 690 # Set transaction dirty if we're using SELECT FOR UPDATE to ensure 691 # a subsequent commit/rollback is executed, so any database locks 692 # are released. 693 if self.query.select_for_update and transaction.is_managed(self.using): 694 transaction.set_dirty(self.using) 680 695 for rows in self.execute_sql(MULTI): 681 696 for row in rows: 682 697 if resolve_columns: -
django/django/db/models/sql/query.py
125 125 self.order_by = [] 126 126 self.low_mark, self.high_mark = 0, None # Used for offset/limit 127 127 self.distinct = False 128 self.select_for_update = False 129 self.select_for_update_nowait = False 128 130 self.select_related = False 129 131 self.related_select_cols = [] 130 132 … … 254 256 obj.order_by = self.order_by[:] 255 257 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 256 258 obj.distinct = self.distinct 259 obj.select_for_update = self.select_for_update 260 obj.select_for_update_nowait = self.select_for_update_nowait 257 261 obj.select_related = self.select_related 258 262 obj.related_select_cols = [] 259 263 obj.aggregates = copy.deepcopy(self.aggregates, memo=memo) … … 360 364 361 365 query.clear_ordering(True) 362 366 query.clear_limits() 367 query.select_for_update = False 363 368 query.select_related = False 364 369 query.related_select_cols = [] 365 370 query.related_select_fields = [] -
django/django/db/models/manager.py
164 164 def order_by(self, *args, **kwargs): 165 165 return self.get_query_set().order_by(*args, **kwargs) 166 166 167 def select_for_update(self, *args, **kwargs): 168 return self.get_query_set().select_for_update(*args, **kwargs) 169 167 170 def select_related(self, *args, **kwargs): 168 171 return self.get_query_set().select_related(*args, **kwargs) 169 172 -
django/django/db/models/query.py
435 435 del_query._for_write = True 436 436 437 437 # Disable non-supported fields. 438 del_query.query.select_for_update = False 438 439 del_query.query.select_related = False 439 440 del_query.query.clear_ordering() 440 441 … … 583 584 else: 584 585 return self._filter_or_exclude(None, **filter_obj) 585 586 587 def select_for_update(self, **kwargs): 588 """ 589 Returns a new QuerySet instance that will select objects with a 590 FOR UPDATE lock. 591 """ 592 # Default to false for nowait 593 nowait = kwargs.pop('nowait', False) 594 obj = self._clone() 595 obj.query.select_for_update = True 596 obj.query.select_for_update_nowait = nowait 597 return obj 598 586 599 def select_related(self, *fields, **kwargs): 587 600 """ 588 601 Returns a new QuerySet instance that will select related objects. -
django/django/db/backends/mysql/base.py
124 124 allows_group_by_pk = True 125 125 related_fields_match_type = True 126 126 allow_sliced_subqueries = False 127 has_select_for_update = True 128 has_select_for_update_nowait = False 127 129 supports_forward_references = False 128 130 supports_long_model_names = False 129 131 supports_microsecond_precision = False -
django/django/db/backends/oracle/base.py
70 70 needs_datetime_string_cast = False 71 71 interprets_empty_strings_as_nulls = True 72 72 uses_savepoints = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 73 75 can_return_id_from_insert = True 74 76 allow_sliced_subqueries = False 75 77 supports_subqueries_in_group_by = False -
django/django/db/backends/__init__.py
279 279 # integer primary keys. 280 280 related_fields_match_type = False 281 281 allow_sliced_subqueries = True 282 has_select_for_update = False 283 has_select_for_update_nowait = False 282 284 283 285 # Does the default test database allow multiple connections? 284 286 # Usually an indication that the test database is in-memory … … 476 478 """ 477 479 return [] 478 480 481 def for_update_sql(self, nowait=False): 482 """ 483 Returns the FOR UPDATE SQL clause to lock rows for an update operation. 484 """ 485 if nowait: 486 return 'FOR UPDATE NOWAIT' 487 else: 488 return 'FOR UPDATE' 489 479 490 def fulltext_search_sql(self, field_name): 480 491 """ 481 492 Returns the SQL WHERE clause to use in order to perform a full-text -
django/django/db/backends/postgresql_psycopg2/base.py
70 70 requires_rollback_on_dirty_transaction = True 71 71 has_real_datatype = True 72 72 can_defer_constraint_checks = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 75 73 76 74 77 class DatabaseWrapper(BaseDatabaseWrapper): 75 78 vendor = 'postgresql' -
django/tests/modeltests/select_for_update/__init__.py
1 # -
django/tests/modeltests/select_for_update/tests.py
1 import time 2 from django.conf import settings 3 from django.db import transaction, connection 4 from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError 5 from django.test import (TransactionTestCase, skipIfDBFeature, 6 skipUnlessDBFeature) 7 from django.utils.functional import wraps 8 from django.utils import unittest 9 10 from models import Person 11 12 try: 13 import threading 14 def requires_threading(func): 15 return func 16 except ImportError: 17 # Note we can't use dummy_threading here, as our tests will actually 18 # block. We just have to skip the test completely. 19 def requires_threading(func): 20 @wraps(func) 21 def wrapped(*args, **kw): 22 raise unittest.SkipTest('threading required') 23 24 class SelectForUpdateTests(TransactionTestCase): 25 26 def setUp(self): 27 transaction.enter_transaction_management(True) 28 transaction.managed(True) 29 self.person = Person.objects.create(name='Reinhardt') 30 31 # We have to commit here so that code in run_select_for_update can 32 # see this data. 33 transaction.commit() 34 35 # We need another database connection to test that one connection 36 # issuing a SELECT ... FOR UPDATE will block. 37 new_connections = ConnectionHandler(settings.DATABASES) 38 self.new_connection = new_connections[DEFAULT_DB_ALIAS] 39 40 # We need to set settings.DEBUG to True so we can capture 41 # the output SQL to examine. 42 self._old_debug = settings.DEBUG 43 settings.DEBUG = True 44 45 def tearDown(self): 46 try: 47 # We don't really care if this fails - some of the tests will set 48 # this in the course of their run. 49 transaction.managed(False) 50 transaction.leave_transaction_management() 51 except transaction.TransactionManagementError: 52 pass 53 self.new_connection.close() 54 settings.DEBUG = self._old_debug 55 try: 56 self.end_blocking_transaction() 57 except (DatabaseError, AttributeError): 58 pass 59 60 def start_blocking_transaction(self): 61 # Start a blocking transaction. At some point, 62 # end_blocking_transaction() should be called. 63 self.cursor = self.new_connection.cursor() 64 sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { 65 'db_table': Person._meta.db_table, 66 'for_update': self.new_connection.ops.for_update_sql(), 67 } 68 self.cursor.execute(sql, ()) 69 result = self.cursor.fetchone() 70 71 def end_blocking_transaction(self): 72 # Roll back the blocking transaction. 73 self.new_connection._rollback() 74 75 def has_for_update_sql(self, tested_connection, nowait=False): 76 # Examine the SQL that was executed to determine whether it 77 # contains the 'SELECT..FOR UPDATE' stanza. 78 for_update_sql = tested_connection.ops.for_update_sql(nowait) 79 sql = tested_connection.queries[-1]['sql'] 80 return bool(sql.find(for_update_sql) > -1) 81 82 def check_exc(self, exc): 83 self.failUnless(isinstance(exc, DatabaseError)) 84 85 @skipUnlessDBFeature('has_select_for_update') 86 def test_for_update_sql_generated(self): 87 """ 88 Test that the backend's FOR UPDATE variant appears in 89 generated SQL when select_for_update is invoked. 90 """ 91 list(Person.objects.all().select_for_update()) 92 self.assertTrue(self.has_for_update_sql(connection)) 93 94 @skipUnlessDBFeature('has_select_for_update_nowait') 95 def test_for_update_sql_generated_nowait(self): 96 """ 97 Test that the backend's FOR UPDATE NOWAIT variant appears in 98 generated SQL when select_for_update is invoked. 99 """ 100 list(Person.objects.all().select_for_update(nowait=True)) 101 self.assertTrue(self.has_for_update_sql(connection, nowait=True)) 102 103 @requires_threading 104 @skipUnlessDBFeature('has_select_for_update_nowait') 105 def test_nowait_raises_error_on_block(self): 106 """ 107 If nowait is specified, we expect an error to be raised rather 108 than blocking. 109 """ 110 self.start_blocking_transaction() 111 status = [] 112 thread = threading.Thread( 113 target=self.run_select_for_update, 114 args=(status,), 115 kwargs={'nowait': True}, 116 ) 117 118 thread.start() 119 time.sleep(1) 120 thread.join() 121 self.end_blocking_transaction() 122 self.check_exc(status[-1]) 123 124 @skipIfDBFeature('has_select_for_update_nowait') 125 @skipUnlessDBFeature('has_select_for_update') 126 def test_unsupported_nowait_raises_error(self): 127 """ 128 If a SELECT...FOR UPDATE NOWAIT is run on a database backend 129 that supports FOR UPDATE but not NOWAIT, then we should find 130 that a DatabaseError is raised. 131 """ 132 self.assertRaises( 133 DatabaseError, 134 list, 135 Person.objects.all().select_for_update(nowait=True) 136 ) 137 138 def run_select_for_update(self, status, nowait=False): 139 """ 140 Utility method that runs a SELECT FOR UPDATE against all 141 Person instances. After the select_for_update, it attempts 142 to update the name of the only record, save, and commit. 143 144 In general, this will be run in a separate thread. 145 """ 146 status.append('started') 147 try: 148 # We need to enter transaction management again, as this is done on 149 # per-thread basis 150 transaction.enter_transaction_management(True) 151 transaction.managed(True) 152 people = list( 153 Person.objects.all().select_for_update(nowait=nowait) 154 ) 155 people[0].name = 'Fred' 156 people[0].save() 157 transaction.commit() 158 except DatabaseError, e: 159 status.append(e) 160 except Exception, e: 161 raise 162 163 @requires_threading 164 @skipUnlessDBFeature('has_select_for_update') 165 @skipUnlessDBFeature('supports_transactions') 166 def test_block(self): 167 """ 168 Check that a thread running a select_for_update that 169 accesses rows being touched by a similar operation 170 on another connection blocks correctly. 171 """ 172 # First, let's start the transaction in our thread. 173 self.start_blocking_transaction() 174 175 # Now, try it again using the ORM's select_for_update 176 # facility. Do this in a separate thread. 177 status = [] 178 thread = threading.Thread( 179 target=self.run_select_for_update, args=(status,) 180 ) 181 182 # The thread should immediately block, but we'll sleep 183 # for a bit to make sure. 184 thread.start() 185 sanity_count = 0 186 while len(status) != 1 and sanity_count < 10: 187 sanity_count += 1 188 time.sleep(1) 189 if sanity_count >= 10: 190 raise ValueError, 'Thread did not run and block' 191 192 # Check the person hasn't been updated. Since this isn't 193 # using FOR UPDATE, it won't block. 194 p = Person.objects.get(pk=self.person.pk) 195 self.assertEqual('Reinhardt', p.name) 196 197 # When we end our blocking transaction, our thread should 198 # be able to continue. 199 self.end_blocking_transaction() 200 thread.join(5.0) 201 202 # Check the thread has finished. Assuming it has, we should 203 # find that it has updated the person's name. 204 self.failIf(thread.isAlive()) 205 p = Person.objects.get(pk=self.person.pk) 206 self.assertEqual('Fred', p.name) 207 208 @requires_threading 209 @skipUnlessDBFeature('has_select_for_update') 210 def test_raw_lock_not_available(self): 211 """ 212 Check that running a raw query which can't obtain a FOR UPDATE lock 213 raises the correct exception 214 """ 215 self.start_blocking_transaction() 216 def raw(status): 217 try: 218 list( 219 Person.objects.raw( 220 'SELECT * FROM %s %s' % ( 221 Person._meta.db_table, 222 connection.ops.for_update_sql(nowait=True) 223 ) 224 ) 225 ) 226 except DatabaseError, e: 227 status.append(e) 228 status = [] 229 thread = threading.Thread(target=raw, kwargs={'status': status}) 230 thread.start() 231 time.sleep(1) 232 thread.join() 233 self.end_blocking_transaction() 234 self.check_exc(status[-1]) 235 236 @skipUnlessDBFeature('has_select_for_update') 237 def test_transaction_dirty_managed(self): 238 """ Check that a select_for_update sets the transaction to be 239 dirty when executed under txn management. Setting the txn dirty 240 means that it will be either committed or rolled back by Django, 241 which will release any locks held by the SELECT FOR UPDATE. 242 """ 243 people = list(Person.objects.select_for_update()) 244 self.assertTrue(transaction.is_dirty()) 245 246 @skipUnlessDBFeature('has_select_for_update') 247 def test_transaction_not_dirty_unmanaged(self): 248 """ If we're not under txn management, the txn will never be 249 marked as dirty. 250 """ 251 transaction.managed(False) 252 transaction.leave_transaction_management() 253 people = list(Person.objects.select_for_update()) 254 self.assertFalse(transaction.is_dirty()) -
django/tests/modeltests/select_for_update/models.py
1 from django.db import models 2 3 class Person(models.Model): 4 name = models.CharField(max_length=30)