Ticket #2705: for_update_9975.diff
File for_update_9975.diff, 19.6 KB (added by , 16 years ago) |
---|
-
django/db/models/sql/query.py
13 13 from django.utils.datastructures import SortedDict 14 14 from django.utils.encoding import force_unicode 15 15 from django.db.backends.util import truncate_name 16 from django.db import connection 16 from django.db import connection, DatabaseError 17 17 from django.db.models import signals 18 18 from django.db.models.fields import FieldDoesNotExist 19 19 from django.db.models.query_utils import select_related_descend … … 29 29 except NameError: 30 30 from sets import Set as set # Python 2.3 fallback 31 31 32 __all__ = ['Query', 'BaseQuery' ]32 __all__ = ['Query', 'BaseQuery', 'LockNotAvailable'] 33 33 34 class LockNotAvailable(DatabaseError): 35 ''' 36 Raised when a query fails because a lock was not available. 37 ''' 38 pass 39 34 40 class BaseQuery(object): 35 41 """ 36 42 A single SQL query. … … 74 80 self.order_by = [] 75 81 self.low_mark, self.high_mark = 0, None # Used for offset/limit 76 82 self.distinct = False 83 self.select_for_update = False 84 self.select_for_update_nowait = False 77 85 self.select_related = False 78 86 self.related_select_cols = [] 79 87 … … 189 197 obj.order_by = self.order_by[:] 190 198 obj.low_mark, obj.high_mark = self.low_mark, self.high_mark 191 199 obj.distinct = self.distinct 200 obj.select_for_update = self.select_for_update 201 obj.select_for_update_nowait = self.select_for_update_nowait 192 202 obj.select_related = self.select_related 193 203 obj.related_select_cols = [] 194 204 obj.aggregates = self.aggregates.copy() … … 310 320 311 321 query.clear_ordering(True) 312 322 query.clear_limits() 323 query.select_for_update = False 313 324 query.select_related = False 314 325 query.related_select_cols = [] 315 326 query.related_select_fields = [] … … 428 439 result.append('LIMIT %d' % val) 429 440 result.append('OFFSET %d' % self.low_mark) 430 441 442 if self.select_for_update and self.connection.features.has_select_for_update: 443 nowait = self.select_for_update_nowait and self.connection.features.has_select_for_update 444 result.append("%s" % self.connection.ops.for_update_sql(nowait=nowait)) 445 431 446 params.extend(self.extra_params) 432 447 return ' '.join(result), tuple(params) 433 448 … … 2079 2094 else: 2080 2095 return 2081 2096 cursor = self.connection.cursor() 2082 cursor.execute(sql, params) 2097 try: 2098 cursor.execute(sql, params) 2099 except DatabaseError, e: 2100 if self.connection.features.has_select_for_update_nowait and self.connection.ops.signals_lock_not_available(e): 2101 raise LockNotAvailable(*e.args) 2102 raise 2083 2103 2084 2104 if not result_type: 2085 2105 return cursor -
django/db/models/manager.py
125 125 def order_by(self, *args, **kwargs): 126 126 return self.get_query_set().order_by(*args, **kwargs) 127 127 128 def select_for_update(self, *args, **kwargs): 129 return self.get_query_set().select_for_update(*args, **kwargs) 130 128 131 def select_related(self, *args, **kwargs): 129 132 return self.get_query_set().select_related(*args, **kwargs) 130 133 -
django/db/models/__init__.py
11 11 from django.db.models.fields.subclassing import SubfieldBase 12 12 from django.db.models.fields.files import FileField, ImageField 13 13 from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel, ManyToManyRel, OneToOneRel 14 from django.db.models.sql.query import LockNotAvailable 14 15 from django.db.models import signals 15 16 16 17 # Admin stages. -
django/db/models/query.py
418 418 del_query = self._clone() 419 419 420 420 # Disable non-supported fields. 421 del_query.query.select_for_update = False 421 422 del_query.query.select_related = False 422 423 del_query.query.clear_ordering() 423 424 … … 557 558 else: 558 559 return self._filter_or_exclude(None, **filter_obj) 559 560 561 def select_for_update(self, **kwargs): 562 """ 563 Returns a new QuerySet instance that will select objects with a 564 FOR UPDATE lock. 565 """ 566 # Default to false for nowait 567 nowait = kwargs.pop('nowait', False) 568 obj = self._clone() 569 obj.query.select_for_update = True 570 obj.query.select_for_update_nowait = nowait 571 transaction.commit_unless_managed() 572 return obj 573 560 574 def select_related(self, *fields, **kwargs): 561 575 """ 562 576 Returns a new QuerySet instance that will select related objects. -
django/db/backends/mysql/base.py
22 22 raise ImproperlyConfigured("MySQLdb-1.2.1p2 or newer is required; you have %s" % Database.__version__) 23 23 24 24 from MySQLdb.converters import conversions 25 from MySQLdb.constants import FIELD_TYPE, FLAG 25 from MySQLdb.constants import FIELD_TYPE, FLAG, ER 26 26 27 27 from django.db.backends import * 28 28 from django.db.backends.mysql.client import DatabaseClient … … 112 112 update_can_self_select = False 113 113 allows_group_by_pk = True 114 114 related_fields_match_type = True 115 has_select_for_update = True 116 has_select_for_update_nowait = False 115 117 116 118 class DatabaseOperations(BaseDatabaseOperations): 117 119 def date_extract_sql(self, lookup_type, field_name): … … 207 209 # MySQL doesn't support microseconds 208 210 return unicode(value.replace(microsecond=0)) 209 211 212 signals_deadlock = lambda self, e: e.args[0] == ER.LOCK_DEADLOCK 213 210 214 def year_lookup_bounds(self, value): 211 215 # Again, no microseconds 212 216 first = '%s-01-01 00:00:00' -
django/db/backends/oracle/base.py
36 36 needs_datetime_string_cast = False 37 37 uses_custom_query_class = True 38 38 interprets_empty_strings_as_nulls = True 39 has_select_for_update = True 40 has_select_for_update_nowait = True 39 41 40 42 41 43 class DatabaseOperations(BaseDatabaseOperations): … … 201 203 'column': column_name}) 202 204 return output 203 205 206 def signals_deadlock(self, exception): 207 return exception.args[0].code == 60 208 209 def signals_lock_not_available(self, exception): 210 return exception.args[0].code == 54 211 204 212 def start_transaction_sql(self): 205 213 return '' 206 214 -
django/db/backends/__init__.py
81 81 # If True, don't use integer foreign keys referring to, e.g., positive 82 82 # integer primary keys. 83 83 related_fields_match_type = False 84 has_select_for_update = False 85 has_select_for_update_nowait = False 84 86 85 87 class BaseDatabaseOperations(object): 86 88 """ … … 158 160 """ 159 161 return [] 160 162 163 def for_update_sql(self, nowait=False): 164 """ 165 Return FOR UPDATE SQL clause to lock row for update 166 """ 167 if nowait: 168 nowaitstr = ' NOWAIT' 169 else: 170 nowaitstr = '' 171 return 'FOR UPDATE' + nowaitstr 172 161 173 def fulltext_search_sql(self, field_name): 162 174 """ 163 175 Returns the SQL WHERE clause to use in order to perform a full-text -
django/db/backends/postgresql_psycopg2/base.py
15 15 try: 16 16 import psycopg2 as Database 17 17 import psycopg2.extensions 18 from psycopg2 import errorcodes 18 19 except ImportError, e: 19 20 from django.core.exceptions import ImproperlyConfigured 20 21 raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e) … … 29 30 class DatabaseFeatures(BaseDatabaseFeatures): 30 31 needs_datetime_string_cast = False 31 32 uses_savepoints = True 33 has_select_for_update = True 34 has_select_for_update_nowait = True 32 35 33 36 class DatabaseOperations(PostgresqlDatabaseOperations): 34 37 def last_executed_query(self, cursor, sql, params): … … 37 40 # http://www.initd.org/tracker/psycopg/wiki/psycopg2_documentation#postgresql-status-message-and-executed-query 38 41 return cursor.query 39 42 43 signals_deadlock = lambda self, e: e.pgcode == errorcodes.DEADLOCK_DETECTED 44 45 signals_lock_not_available = lambda self, e: e.pgcode == errorcodes.LOCK_NOT_AVAILABLE 46 47 40 48 class DatabaseWrapper(BaseDatabaseWrapper): 41 49 operators = { 42 50 'exact': '= %s', -
django/views/decorators/deadlock.py
1 """ 2 Decorators for deadlock handling. 3 """ 4 import sys 5 try: 6 from functools import wraps 7 except ImportError: 8 from django.utils.functional import wraps # Python 2.3, 2.4 fallback. 9 10 from django.db import transaction, connection, DatabaseError 11 12 class DeadlockError(Exception): 13 """ 14 Thrown by a view decorated by handle_deadlock(max_retries) when a deadlock 15 has been detected and the view won't be called again to retry the aborted 16 transaction. 17 """ 18 pass 19 20 def handle_deadlocks(max_retries=2): 21 """ 22 Decorator to retry a view when a database deadlock is detected. 23 24 When there are no retries left, raises DeadlockError with the traceback of 25 the original, database backend-specific exception. 26 27 Views using querysets constructed with select_for_update() should use this 28 decorator. If the backend does not support locking and/or deadlock 29 handling, this doesn't do anything. 30 """ 31 if connection.features.has_select_for_update: 32 signals_deadlock = connection.ops.signals_deadlock 33 else: 34 return lambda f: f 35 def decorator(func): 36 def inner(*args, **kwargs): 37 retries = 0 38 while 1: 39 try: 40 return func(*args, **kwargs) 41 except DatabaseError, e: 42 if signals_deadlock(e): 43 # Rollback needed by PostgreSQL and Oracle 44 transaction.rollback() 45 transaction.set_clean() 46 if retries == max_retries: 47 raise DeadlockError, 'Deadlock detected', sys.exc_info()[2] 48 retries += 1 49 continue 50 raise 51 return wraps(func)(inner) 52 return decorator 53 -
tests/regressiontests/select_for_update/__init__.py
1 2 -
tests/regressiontests/select_for_update/tests.py
1 import time 2 import threading 3 from unittest import TestCase 4 5 from django.conf import settings 6 from django.db import transaction, connection 7 from django.db.models import LockNotAvailable 8 from django.views.decorators.deadlock import DeadlockError, handle_deadlocks 9 10 from regressiontests.select_for_update.models import Tag 11 12 class SelectForUpdateTests(TestCase): 13 14 def setUp(self): 15 Tag.objects.create(name='1') 16 Tag.objects.create(name='2') 17 18 def test_basics(self): 19 def test(): 20 t = Tag(name='update') 21 t.save() 22 transaction.commit() 23 tfound = Tag.objects.select_for_update().get(pk=t.id) 24 tfound.name = 'update2' 25 tfound.save() 26 transaction.commit() 27 tfound = Tag.objects.select_for_update().get(pk=t.id) 28 tfound.delete() 29 transaction.commit() 30 test = transaction.commit_manually(test) 31 test() 32 33 def test_backend_features(self): 34 if settings.DATABASE_ENGINE in ('postgresql_psycopg2', 'oracle'): 35 self.failUnless(hasattr(connection.ops, 'signals_deadlock')) 36 self.failUnless(hasattr(connection.ops, 'signals_lock_not_available')) 37 elif settings.DATABASE_ENGINE == 'mysql': 38 self.failUnless(hasattr(connection.ops, 'signals_deadlock')) 39 40 def test_deadlock(self): 41 ''' 42 This test will fail on MySQL if the storage engine is not InnoDB. 43 ''' 44 # Don't look for deadlocks if the backend doesn't support SELECT FOR UPDATE 45 if not connection.features.has_select_for_update: 46 return 47 def test(max_retries): 48 vars = {0: None, 1: None} 49 def view0(): 50 t1 = Tag.objects.select_for_update().get(pk=1) 51 time.sleep(1) 52 t2 = Tag.objects.select_for_update().get(pk=2) 53 transaction.commit() 54 view0 = transaction.commit_manually(handle_deadlocks(max_retries=max_retries)(view0)) 55 def view1(): 56 t2 = Tag.objects.select_for_update().get(pk=2) 57 time.sleep(1) 58 t1 = Tag.objects.select_for_update().get(pk=1) 59 transaction.commit() 60 view1 = transaction.commit_manually(handle_deadlocks(max_retries=max_retries)(view1)) 61 def thread0(vars): 62 try: 63 view0() 64 except Exception, e: 65 vars[0] = e 66 def thread1(vars): 67 try: 68 view1() 69 except Exception, e: 70 vars[1] = e 71 t0 = threading.Thread(target=thread0, args=(vars,)) 72 t1 = threading.Thread(target=thread1, args=(vars,)) 73 t0.start() 74 t1.start() 75 t0.join() 76 t1.join() 77 return vars[0], vars[1] 78 # Make a deadlock and don't retry the aborted transaction 79 # We are expecting a DeadlockError 80 e0, e1 = test(0) 81 self.assertEqual(e0 or e1, e1 or e0) 82 self.assert_(isinstance(e0 or e1, DeadlockError)) 83 # Make a deadlock and retry the aborted transaction 84 # We expect no errors 85 e0, e1 = test(1) 86 self.assertEqual(e0 or e1, None) 87 88 def test_nowait(self): 89 if not connection.features.has_select_for_update_nowait: 90 return 91 def view(): 92 try: 93 t1 = Tag.objects.select_for_update(nowait=True).get(pk=1) 94 time.sleep(1) 95 finally: 96 transaction.rollback() 97 view = transaction.commit_manually(view) 98 t = threading.Thread(target=view) 99 t.start() 100 time.sleep(.25) 101 try: 102 view() 103 except LockNotAvailable: 104 pass 105 else: 106 self.fail('Expected view to raise LockNotAvailable') 107 t.join() -
tests/regressiontests/select_for_update/models.py
1 from django.db import models 2 3 class Tag(models.Model): 4 name = models.CharField(max_length=10) 5 parent = models.ForeignKey('self', blank=True, null=True, 6 related_name='children') 7 8 class Meta: 9 ordering = ['name'] 10 11 def __unicode__(self): 12 return self.name 13 -
docs/ref/models/querysets.txt
768 768 769 769 Entry.objects.extra(where=['headline=%s'], params=['Lennon']) 770 770 771 ``select_for_update(nowait=False)`` 772 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 773 774 Returns a queryset that will lock rows until the end of the transaction, 775 generating a SELECT ... FOR UPDATE statement on supported databases. 776 777 For example:: 778 779 entries = Entry.objects.select_for_update().filter(author=request.user) 780 781 All matched entries will be locked until the end of the transaction block, 782 meaning that other transactions will be prevented from changing or acquiring 783 locks on them. 784 785 Usually, if another transaction has already acquired a lock on one of the 786 selected rows, the query will block until the lock is released. If this is 787 not the behaviour you want, call ``select_for_update(nowait=True)``. This will 788 make the call non-blocking. If a conflicting lock is already acquired by 789 another transaction, ``django.db.models.LockNotAvailable`` will be raised when 790 the queryset is evaluated. 791 792 Using blocking locks on a database can lead to deadlocks. This occurs when two 793 concurrent transactions are both waiting on a lock the other transaction 794 already holds. To deal with deadlocks, wrap your views that use 795 ``select_for_update(nowait=False)`` with the 796 ``django.views.decorators.deadlock.handle_deadlocks`` decorator. 797 798 For example:: 799 800 from django.db import transaction 801 from django.views.decorators.deadlock import handle_deadlocks 802 803 @handle_deadlocks(max_retries=2) 804 @transaction.commit_on_success 805 def my_view(request): 806 ... 807 808 If the database engine detects a deadlock involving ``my_view`` and decides 809 to abort its transaction, it will be automatically retried. If deadlocks keep 810 occurring after two repeated attempts, 811 ``django.views.decorators.DeadlockError`` will be raised, which can be 812 propagated to the user or handled in a middleware. 813 814 Currently the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` 815 database backends support ``select_for_update()`` but MySQL has no 816 support for the ``nowait`` argument. Other backends will simply 817 generate queries as if ``select_for_update()`` had not been used. 818 771 819 QuerySet methods that do not return QuerySets 772 820 --------------------------------------------- 773 821 -
docs/ref/databases.txt
276 276 column types have a maximum length restriction of 255 characters, regardless 277 277 of whether ``unique=True`` is specified or not. 278 278 279 Row locking with ``QuerySet.select_for_update()`` 280 ------------------------------------------------- 281 282 MySQL does not support the NOWAIT option to the SELECT ... FOR UPDATE 283 statement. However, you may call the ``select_for_update()`` method of a 284 queryset with ``nowait=True``. In that case, the argument will be silently 285 discarded and the generated query will block until the requested lock can be 286 acquired. 287 279 288 .. _sqlite-notes: 280 289 281 290 SQLite notes