Ticket #2705: 2705-for_update-r15021.diff
File 2705-for_update-r15021.diff, 18.7 KB (added by , 14 years ago) |
---|
-
AUTHORS
diff --git a/AUTHORS b/AUTHORS
a b 164 164 eriks@win.tue.nl 165 165 Tomáš Ehrlich <tomas.ehrlich@gmail.com> 166 166 Dirk Eschler <dirk.eschler@gmx.net> 167 Dan Fairs <dan@fezconsulting.com> 167 168 Marc Fargas <telenieko@telenieko.com> 168 169 Szilveszter Farkas <szilveszter.farkas@gmail.com> 169 170 Grigory Fateyev <greg@dial.com.ru> -
django/db/backends/__init__.py
diff --git a/django/db/backends/__init__.py b/django/db/backends/__init__.py
a b 103 103 # integer primary keys. 104 104 related_fields_match_type = False 105 105 allow_sliced_subqueries = True 106 has_select_for_update = False 107 has_select_for_update_nowait = False 106 108 107 109 # Does the default test database allow multiple connections? 108 110 # Usually an indication that the test database is in-memory … … 282 284 """ 283 285 return [] 284 286 287 def for_update_sql(self, nowait=False): 288 """ 289 Returns the FOR UPDATE SQL clause to lock rows for an update operation. 290 """ 291 if nowait: 292 return 'FOR UPDATE NOWAIT' 293 else: 294 return 'FOR UPDATE' 295 285 296 def fulltext_search_sql(self, field_name): 286 297 """ 287 298 Returns the SQL WHERE clause to use in order to perform a full-text -
django/db/backends/mysql/base.py
diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py
a b 124 124 allows_group_by_pk = True 125 125 related_fields_match_type = True 126 126 allow_sliced_subqueries = False 127 has_select_for_update = True 128 has_select_for_update_nowait = False 127 129 supports_forward_references = False 128 130 supports_long_model_names = False 129 131 supports_microsecond_precision = False -
django/db/backends/oracle/base.py
diff --git a/django/db/backends/oracle/base.py b/django/db/backends/oracle/base.py
a b 70 70 needs_datetime_string_cast = False 71 71 interprets_empty_strings_as_nulls = True 72 72 uses_savepoints = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 73 75 can_return_id_from_insert = True 74 76 allow_sliced_subqueries = False 75 77 supports_subqueries_in_group_by = False -
django/db/backends/postgresql_psycopg2/base.py
diff --git a/django/db/backends/postgresql_psycopg2/base.py b/django/db/backends/postgresql_psycopg2/base.py
a b 70 70 requires_rollback_on_dirty_transaction = True 71 71 has_real_datatype = True 72 72 can_defer_constraint_checks = True 73 has_select_for_update = True 74 has_select_for_update_nowait = True 75 73 76 74 77 class DatabaseOperations(PostgresqlDatabaseOperations): 75 78 def last_executed_query(self, cursor, sql, params): -
django/db/models/manager.py
diff --git a/django/db/models/manager.py b/django/db/models/manager.py
a b 164 164 def order_by(self, *args, **kwargs): 165 165 return self.get_query_set().order_by(*args, **kwargs) 166 166 167 def select_for_update(self, *args, **kwargs): 168 return self.get_query_set().select_for_update(*args, **kwargs) 169 167 170 def select_related(self, *args, **kwargs): 168 171 return self.get_query_set().select_related(*args, **kwargs) 169 172 -
django/db/models/query.py
diff --git a/django/db/models/query.py b/django/db/models/query.py
a b 432 432 del_query._for_write = True 433 433 434 434 # Disable non-supported fields. 435 del_query.query.select_for_update = False 435 436 del_query.query.select_related = False 436 437 del_query.query.clear_ordering() 437 438 … … 580 581 else: 581 582 return self._filter_or_exclude(None, **filter_obj) 582 583 584 def select_for_update(self, **kwargs): 585 """ 586 Returns a new QuerySet instance that will select objects with a 587 FOR UPDATE lock. 588 """ 589 # Default to false for nowait 590 nowait = kwargs.pop('nowait', False) 591 obj = self._clone() 592 obj.query.select_for_update = True 593 obj.query.select_for_update_nowait = nowait 594 return obj 595 583 596 def select_related(self, *fields, **kwargs): 584 597 """ 585 598 Returns a new QuerySet instance that will select related objects. -
django/db/models/sql/compiler.py
diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py
a b 1 1 from django.core.exceptions import FieldError 2 2 from django.db import connections 3 from django.db import transaction 3 4 from django.db.backends.util import truncate_name 4 5 from django.db.models.sql.constants import * 5 6 from django.db.models.sql.datastructures import EmptyResultSet … … 117 118 result.append('LIMIT %d' % val) 118 119 result.append('OFFSET %d' % self.query.low_mark) 119 120 121 if self.query.select_for_update and self.connection.features.has_select_for_update: 122 nowait = self.query.select_for_update_nowait and self.connection.features.has_select_for_update 123 result.append(self.connection.ops.for_update_sql(nowait=nowait)) 124 120 125 return ' '.join(result), tuple(params) 121 126 122 127 def as_nested_sql(self): … … 677 682 resolve_columns = hasattr(self, 'resolve_columns') 678 683 fields = None 679 684 has_aggregate_select = bool(self.query.aggregate_select) 685 # Set transaction dirty if we're using SELECT FOR UPDATE to ensure 686 # a subsequent commit/rollback is executed, so any database locks 687 # are released. 688 if self.query.select_for_update and transaction.is_managed(self.using): 689 transaction.set_dirty(self.using) 680 690 for rows in self.execute_sql(MULTI): 681 691 for row in rows: 682 692 if resolve_columns: -
django/db/models/sql/query.py
diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py
a b 131 131 self.order_by = [] 132 132 self.low_mark, self.high_mark = 0, None # Used for offset/limit 133 133 self.distinct = False 134 self.select_for_update = False 135 self.select_for_update_nowait = False 134 136 self.select_related = False 135 137 self.related_select_cols = [] 136 138 … … 260 262 obj.order_by = self.order_by[:] 261 263 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 262 264 obj.distinct = self.distinct 265 obj.select_for_update = self.select_for_update 266 obj.select_for_update_nowait = self.select_for_update_nowait 263 267 obj.select_related = self.select_related 264 268 obj.related_select_cols = [] 265 269 obj.aggregates = deepcopy(self.aggregates, memo=memo) … … 366 370 367 371 query.clear_ordering(True) 368 372 query.clear_limits() 373 query.select_for_update = False 369 374 query.select_related = False 370 375 query.related_select_cols = [] 371 376 query.related_select_fields = [] -
docs/ref/databases.txt
diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt
a b 364 364 column types have a maximum length restriction of 255 characters, regardless 365 365 of whether ``unique=True`` is specified or not. 366 366 367 Row locking with ``QuerySet.select_for_update()`` 368 ------------------------------------------------- 369 370 MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE`` 371 statement. However, you may call the ``select_for_update()`` method of a 372 queryset with ``nowait=True``. In that case, the argument will be silently 373 discarded and the generated query will block until the requested lock can be 374 acquired. 375 367 376 .. _sqlite-notes: 368 377 369 378 SQLite notes -
docs/ref/models/querysets.txt
diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt
a b 975 975 # queries the database with the 'backup' alias 976 976 >>> Entry.objects.using('backup') 977 977 978 select_for_update 979 ~~~~~~~~~~~~~~~~~ 980 981 .. method:: select_for_update(nowait=False) 982 983 .. versionadded:: 1.3 984 985 Returns a queryset that will lock rows until the end of the transaction, 986 generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases. 987 988 For example:: 989 990 entries = Entry.objects.select_for_update().filter(author=request.user) 991 992 All matched entries will be locked until the end of the transaction block, 993 meaning that other transactions will be prevented from changing or acquiring 994 locks on them. 995 996 Usually, if another transaction has already acquired a lock on one of the 997 selected rows, the query will block until the lock is released. If this is 998 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 999 make the call non-blocking. If a conflicting lock is already acquired by 1000 another transaction, ``django.db.utils.DatabaseError`` will be raised when 1001 the queryset is evaluated. 1002 1003 Note that using ``select_related`` will cause the current transaction to be set 1004 dirty, if under transaction management. This is to ensure that Django issues a 1005 ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR 1006 UPDATE``. 1007 1008 Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 1009 database backends support ``select_for_update()`` but MySQL has no 1010 support for the ``nowait`` argument. Other backends will simply 1011 generate queries as if ``select_for_update()`` had not been used. 978 1012 979 1013 Methods that do not return QuerySets 980 1014 ------------------------------------ … … 1253 1287 the only restriction on the :class:`QuerySet` that is updated is that it can 1254 1288 only update columns in the model's main table. Filtering based on related 1255 1289 fields is still possible. You cannot call ``update()`` on a 1256 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 1290 :class:`QuerySet` that has had a slice taken or can otherwise no longer be 1257 1291 filtered. 1258 1292 1259 1293 For example, if you wanted to update all the entries in a particular blog -
new file tests/modeltests/select_for_update/__init__.py
diff --git a/tests/modeltests/select_for_update/__init__.py b/tests/modeltests/select_for_update/__init__.py new file mode 100644
- + 1 # -
new file tests/modeltests/select_for_update/models.py
diff --git a/tests/modeltests/select_for_update/models.py b/tests/modeltests/select_for_update/models.py new file mode 100644
- + 1 from django.db import models 2 3 class Person(models.Model): 4 name = models.CharField(max_length=30) -
new file tests/modeltests/select_for_update/tests.py
diff --git a/tests/modeltests/select_for_update/tests.py b/tests/modeltests/select_for_update/tests.py new file mode 100644
- + 1 import time 2 from django.conf import settings 3 from django.db import transaction, connection 4 from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError 5 from django.test import TransactionTestCase, skipUnlessDBFeature 6 from django.utils.functional import wraps 7 from django.utils import unittest 8 9 from models import Person 10 11 try: 12 import threading 13 def requires_threading(func): 14 return func 15 except ImportError: 16 # Note we can't use dummy_threading here, as our tests will actually 17 # block. We just have to skip the test completely. 18 def requires_threading(func): 19 @wraps(func) 20 def wrapped(*args, **kw): 21 raise unittest.SkipTest('threading required') 22 23 class SelectForUpdateTests(TransactionTestCase): 24 25 def setUp(self): 26 connection._rollback() 27 connection._enter_transaction_management(True) 28 self.new_connections = ConnectionHandler(settings.DATABASES) 29 self.person = Person.objects.create(name='Reinhardt') 30 31 # We need to set settings.DEBUG to True so we can capture 32 # the output SQL to examine. 33 self._old_debug = settings.DEBUG 34 settings.DEBUG = True 35 36 def tearDown(self): 37 connection._leave_transaction_management(True) 38 settings.DEBUG = self._old_debug 39 try: 40 self.end_blocking_transaction() 41 except (DatabaseError, AttributeError): 42 pass 43 44 def start_blocking_transaction(self): 45 # Start a blocking transaction. At some point, 46 # end_blocking_transaction() should be called. 47 self.new_connection = self.new_connections[DEFAULT_DB_ALIAS] 48 self.new_connection._enter_transaction_management(True) 49 self.cursor = self.new_connection.cursor() 50 sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { 51 'db_table': Person._meta.db_table, 52 'for_update': self.new_connection.ops.for_update_sql(), 53 } 54 self.cursor.execute(sql, ()) 55 result = self.cursor.fetchone() 56 57 def end_blocking_transaction(self): 58 # Roll back the blocking transaction. 59 self.new_connection._rollback() 60 self.new_connection.close() 61 self.new_connection._leave_transaction_management(True) 62 63 def has_for_update_sql(self, tested_connection, nowait=False): 64 # Examine the SQL that was executed to determine whether it 65 # contains the 'SELECT..FOR UPDATE' stanza. 66 for_update_sql = tested_connection.ops.for_update_sql(nowait) 67 sql = tested_connection.queries[-1]['sql'] 68 return bool(sql.find(for_update_sql) > -1) 69 70 def check_exc(self, exc): 71 self.failUnless(isinstance(exc, DatabaseError)) 72 73 @skipUnlessDBFeature('has_select_for_update') 74 def test_for_update_sql_generated(self): 75 """ 76 Test that the backend's FOR UPDATE variant appears in 77 generated SQL when select_for_update is invoked. 78 """ 79 list(Person.objects.all().select_for_update()) 80 self.assertTrue(self.has_for_update_sql(connection)) 81 82 @skipUnlessDBFeature('has_select_for_update_nowait') 83 def test_for_update_sql_generated_nowait(self): 84 """ 85 Test that the backend's FOR UPDATE NOWAIT variant appears in 86 generated SQL when select_for_update is invoked. 87 """ 88 list(Person.objects.all().select_for_update(nowait=True)) 89 self.assertTrue(self.has_for_update_sql(connection, nowait=True)) 90 91 @requires_threading 92 @skipUnlessDBFeature('has_select_for_update_nowait') 93 def test_nowait_raises_error_on_block(self): 94 """ 95 If nowait is specified, we expect an error to be raised rather 96 than blocking. 97 """ 98 self.start_blocking_transaction() 99 status = [] 100 thread = threading.Thread( 101 target=self.run_select_for_update, 102 args=(status,), 103 kwargs={'nowait': True}, 104 ) 105 106 thread.start() 107 time.sleep(1) 108 thread.join() 109 self.end_blocking_transaction() 110 self.check_exc(status[-1]) 111 112 def run_select_for_update(self, status, nowait=False): 113 status.append('started') 114 try: 115 connection._rollback() 116 people = list(Person.objects.all().select_for_update(nowait=nowait)) 117 people[0].name = 'Fred' 118 people[0].save() 119 connection._commit() 120 except DatabaseError, e: 121 status.append(e) 122 except Exception, e: 123 raise 124 125 @requires_threading 126 @skipUnlessDBFeature('has_select_for_update') 127 @skipUnlessDBFeature('supports_transactions') 128 def test_block(self): 129 """ 130 Check that a thread running a select_for_update that 131 accesses rows being touched by a similar operation 132 on another connection blocks correctly. 133 """ 134 # First, let's start the transaction in our thread. 135 self.start_blocking_transaction() 136 137 # Now, try it again using the ORM's select_for_update 138 # facility. Do this in a separate thread. 139 status = [] 140 thread = threading.Thread(target=self.run_select_for_update, args=(status,)) 141 142 # The thread should immediately block, but we'll sleep 143 # for a bit to make sure 144 thread.start() 145 sanity_count = 0 146 while len(status) != 1 and sanity_count < 10: 147 sanity_count += 1 148 time.sleep(1) 149 if sanity_count >= 10: 150 raise ValueError, 'Thread did not run and block' 151 152 # Check the person hasn't been updated. Since this isn't 153 # using FOR UPDATE, it won't block. 154 p = Person.objects.get(pk=self.person.pk) 155 self.assertEqual('Reinhardt', p.name) 156 157 # When we end our blocking transaction, our thread should 158 # be able to continue. 159 self.end_blocking_transaction() 160 thread.join(5.0) 161 162 # Check the thread has finished. Assuming it has, we should 163 # find that it has updated the person's name. 164 self.failIf(thread.isAlive()) 165 p = Person.objects.get(pk=self.person.pk) 166 self.assertEqual('Fred', p.name) 167 168 @requires_threading 169 @skipUnlessDBFeature('has_select_for_update') 170 def test_raw_lock_not_available(self): 171 """ 172 Check that running a raw query which can't obtain a FOR UPDATE lock 173 raises the correct exception 174 """ 175 self.start_blocking_transaction() 176 def raw(status): 177 try: 178 list( 179 Person.objects.raw( 180 'SELECT * FROM %s %s' % ( 181 Person._meta.db_table, 182 connection.ops.for_update_sql(nowait=True) 183 ) 184 ) 185 ) 186 except DatabaseError, e: 187 status.append(e) 188 status = [] 189 thread = threading.Thread(target=raw, kwargs={'status': status}) 190 thread.start() 191 time.sleep(1) 192 thread.join() 193 self.end_blocking_transaction() 194 self.check_exc(status[-1]) 195 196 @skipUnlessDBFeature('has_select_for_update') 197 def test_transaction_dirty_managed(self): 198 """ Check that a select_for_update sets the transaction to be 199 dirty when executed under txn management. Setting the txn dirty 200 means that it will be either committed or rolled back by Django, 201 which will release any locks held by the SELECT FOR UPDATE. 202 """ 203 transaction.enter_transaction_management(True) 204 transaction.managed(True) 205 try: 206 people = list(Person.objects.select_for_update()) 207 self.assertTrue(transaction.is_dirty()) 208 finally: 209 transaction.rollback() 210 transaction.leave_transaction_management() 211 212 @skipUnlessDBFeature('has_select_for_update') 213 def test_transaction_not_dirty_unmanaged(self): 214 """ If we're not under txn management, the txn will never be 215 marked as dirty. 216 """ 217 people = list(Person.objects.select_for_update()) 218 self.assertFalse(transaction.is_dirty())