Ticket #2705: 2705-for_update-r15174.diff

File 2705-for_update-r15174.diff, 21.2 KB (added by Dan Fairs, 14 years ago)

Updated patch to resolve incorrect transaction handling in the tests, and modify NOWAIT behaviour on backends that do not support it.

  • django/db/models/sql/compiler.py

     
    11from django.core.exceptions import FieldError
    22from django.db import connections
     3from django.db import transaction
    34from django.db.backends.util import truncate_name
    45from django.db.models.sql.constants import *
    56from django.db.models.sql.datastructures import EmptyResultSet
    67from django.db.models.sql.expressions import SQLEvaluator
    78from django.db.models.sql.query import get_proxied_model, get_order_dir, \
    89     select_related_descend, Query
     10from django.db.utils import DatabaseError
    911
    1012class SQLCompiler(object):
    1113    def __init__(self, query, connection, using):
     
    117119                        result.append('LIMIT %d' % val)
    118120                result.append('OFFSET %d' % self.query.low_mark)
    119121
     122        if self.query.select_for_update and self.connection.features.has_select_for_update:
     123            # If we've been asked for a NOWAIT query but the backend does not support it,
     124            # raise a DatabaseError otherwise we could get an unexpected deadlock.
     125            nowait = self.query.select_for_update_nowait
     126            if nowait and not self.connection.features.has_select_for_update_nowait:
     127                raise DatabaseError('NOWAIT is not supported on this database backend.')
     128            result.append(self.connection.ops.for_update_sql(nowait=nowait))
     129
    120130        return ' '.join(result), tuple(params)
    121131
    122132    def as_nested_sql(self):
     
    677687        resolve_columns = hasattr(self, 'resolve_columns')
    678688        fields = None
    679689        has_aggregate_select = bool(self.query.aggregate_select)
     690        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
     691        # a subsequent commit/rollback is executed, so any database locks
     692        # are released.
     693        if self.query.select_for_update and transaction.is_managed(self.using):
     694            transaction.set_dirty(self.using)
    680695        for rows in self.execute_sql(MULTI):
    681696            for row in rows:
    682697                if resolve_columns:
  • django/db/models/sql/query.py

     
    131131        self.order_by = []
    132132        self.low_mark, self.high_mark = 0, None  # Used for offset/limit
    133133        self.distinct = False
     134        self.select_for_update = False
     135        self.select_for_update_nowait = False
    134136        self.select_related = False
    135137        self.related_select_cols = []
    136138
     
    260262        obj.order_by = self.order_by[:]
    261263        obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
    262264        obj.distinct = self.distinct
     265        obj.select_for_update = self.select_for_update
     266        obj.select_for_update_nowait = self.select_for_update_nowait
    263267        obj.select_related = self.select_related
    264268        obj.related_select_cols = []
    265269        obj.aggregates = deepcopy(self.aggregates, memo=memo)
     
    366370
    367371        query.clear_ordering(True)
    368372        query.clear_limits()
     373        query.select_for_update = False
    369374        query.select_related = False
    370375        query.related_select_cols = []
    371376        query.related_select_fields = []
  • django/db/models/manager.py

     
    164164    def order_by(self, *args, **kwargs):
    165165        return self.get_query_set().order_by(*args, **kwargs)
    166166
     167    def select_for_update(self, *args, **kwargs):
     168        return self.get_query_set().select_for_update(*args, **kwargs)
     169
    167170    def select_related(self, *args, **kwargs):
    168171        return self.get_query_set().select_related(*args, **kwargs)
    169172
  • django/db/models/query.py

     
    436436        del_query._for_write = True
    437437
    438438        # Disable non-supported fields.
     439        del_query.query.select_for_update = False
    439440        del_query.query.select_related = False
    440441        del_query.query.clear_ordering()
    441442
     
    584585        else:
    585586            return self._filter_or_exclude(None, **filter_obj)
    586587
     588    def select_for_update(self, **kwargs):
     589        """
     590        Returns a new QuerySet instance that will select objects with a
     591        FOR UPDATE lock.
     592        """
     593        # Default to false for nowait
     594        nowait = kwargs.pop('nowait', False)
     595        obj = self._clone()
     596        obj.query.select_for_update = True
     597        obj.query.select_for_update_nowait = nowait
     598        return obj
     599
    587600    def select_related(self, *fields, **kwargs):
    588601        """
    589602        Returns a new QuerySet instance that will select related objects.
  • django/db/backends/mysql/base.py

     
    124124    allows_group_by_pk = True
    125125    related_fields_match_type = True
    126126    allow_sliced_subqueries = False
     127    has_select_for_update = True
     128    has_select_for_update_nowait = False
    127129    supports_forward_references = False
    128130    supports_long_model_names = False
    129131    supports_microsecond_precision = False
  • django/db/backends/oracle/base.py

     
    7070    needs_datetime_string_cast = False
    7171    interprets_empty_strings_as_nulls = True
    7272    uses_savepoints = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
    7375    can_return_id_from_insert = True
    7476    allow_sliced_subqueries = False
    7577    supports_subqueries_in_group_by = False
  • django/db/backends/__init__.py

     
    103103    # integer primary keys.
    104104    related_fields_match_type = False
    105105    allow_sliced_subqueries = True
     106    has_select_for_update = False
     107    has_select_for_update_nowait = False
    106108
    107109    # Does the default test database allow multiple connections?
    108110    # Usually an indication that the test database is in-memory
     
    291293        """
    292294        return []
    293295
     296    def for_update_sql(self, nowait=False):
     297        """
     298        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
     299        """
     300        if nowait:
     301            return 'FOR UPDATE NOWAIT'
     302        else:
     303            return 'FOR UPDATE'
     304
    294305    def fulltext_search_sql(self, field_name):
    295306        """
    296307        Returns the SQL WHERE clause to use in order to perform a full-text
  • django/db/backends/postgresql_psycopg2/base.py

     
    7070    requires_rollback_on_dirty_transaction = True
    7171    has_real_datatype = True
    7272    can_defer_constraint_checks = True
     73    has_select_for_update = True
     74    has_select_for_update_nowait = True
     75   
    7376
    7477class DatabaseOperations(PostgresqlDatabaseOperations):
    7578    def last_executed_query(self, cursor, sql, params):
  • tests/modeltests/select_for_update/__init__.py

     
     1#
  • tests/modeltests/select_for_update/tests.py

     
     1import time
     2from django.conf import settings
     3from django.db import transaction, connection
     4from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
     5from django.test import (TransactionTestCase, skipIfDBFeature,
     6    skipUnlessDBFeature)
     7from django.utils.functional import wraps
     8from django.utils import unittest
     9
     10from models import Person
     11
     12try:
     13    import threading
     14    def requires_threading(func):
     15        return func
     16except ImportError:
     17    # Note we can't use dummy_threading here, as our tests will actually
     18    # block. We just have to skip the test completely.
     19    def requires_threading(func):
     20        @wraps(func)
     21        def wrapped(*args, **kw):
     22            raise unittest.SkipTest('threading required')
     23
     24class SelectForUpdateTests(TransactionTestCase):
     25
     26    def setUp(self):
     27        transaction.enter_transaction_management(True)
     28        transaction.managed(True)
     29        self.person = Person.objects.create(name='Reinhardt')
     30
     31        # We have to commit here so that code in run_select_for_update can
     32        # see this data.
     33        transaction.commit()
     34
     35        # We need another database connection to test that one connection
     36        # issuing a SELECT ... FOR UPDATE will block.
     37        new_connections = ConnectionHandler(settings.DATABASES)
     38        self.new_connection = new_connections[DEFAULT_DB_ALIAS]
     39
     40        # We need to set settings.DEBUG to True so we can capture
     41        # the output SQL to examine.
     42        self._old_debug = settings.DEBUG
     43        settings.DEBUG = True
     44
     45    def tearDown(self):
     46        try:
     47            # We don't really care if this fails - some of the tests will set
     48            # this in the course of their run.
     49            transaction.managed(False)
     50            transaction.leave_transaction_management()
     51        except transaction.TransactionManagementError:
     52            pass
     53        self.new_connection.close()
     54        settings.DEBUG = self._old_debug
     55        try:
     56            self.end_blocking_transaction()
     57        except (DatabaseError, AttributeError):
     58            pass
     59
     60    def start_blocking_transaction(self):
     61        # Start a blocking transaction. At some point,
     62        # end_blocking_transaction() should be called.
     63        self.cursor = self.new_connection.cursor()
     64        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
     65            'db_table': Person._meta.db_table,
     66            'for_update': self.new_connection.ops.for_update_sql(),
     67            }
     68        self.cursor.execute(sql, ())
     69        result = self.cursor.fetchone()
     70
     71    def end_blocking_transaction(self):
     72        # Roll back the blocking transaction.
     73        self.new_connection._rollback()
     74
     75    def has_for_update_sql(self, tested_connection, nowait=False):
     76        # Examine the SQL that was executed to determine whether it
     77        # contains the 'SELECT..FOR UPDATE' stanza.
     78        for_update_sql = tested_connection.ops.for_update_sql(nowait)
     79        sql = tested_connection.queries[-1]['sql']
     80        return bool(sql.find(for_update_sql) > -1)
     81
     82    def check_exc(self, exc):
     83        self.failUnless(isinstance(exc, DatabaseError))
     84
     85    @skipUnlessDBFeature('has_select_for_update')
     86    def test_for_update_sql_generated(self):
     87        """
     88        Test that the backend's FOR UPDATE variant appears in
     89        generated SQL when select_for_update is invoked.
     90        """
     91        list(Person.objects.all().select_for_update())
     92        self.assertTrue(self.has_for_update_sql(connection))
     93
     94    @skipUnlessDBFeature('has_select_for_update_nowait')
     95    def test_for_update_sql_generated_nowait(self):
     96        """
     97        Test that the backend's FOR UPDATE NOWAIT variant appears in
     98        generated SQL when select_for_update is invoked.
     99        """
     100        list(Person.objects.all().select_for_update(nowait=True))
     101        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
     102
     103    @requires_threading
     104    @skipUnlessDBFeature('has_select_for_update_nowait')
     105    def test_nowait_raises_error_on_block(self):
     106        """
     107        If nowait is specified, we expect an error to be raised rather
     108        than blocking.
     109        """
     110        self.start_blocking_transaction()
     111        status = []
     112        thread = threading.Thread(
     113            target=self.run_select_for_update,
     114            args=(status,),
     115            kwargs={'nowait': True},
     116        )
     117
     118        thread.start()
     119        time.sleep(1)
     120        thread.join()
     121        self.end_blocking_transaction()
     122        self.check_exc(status[-1])
     123
     124    @skipIfDBFeature('has_select_for_update_nowait')
     125    @skipUnlessDBFeature('has_select_for_update')
     126    def test_unsupported_nowait_raises_error(self):
     127        """
     128        If a SELECT...FOR UPDATE NOWAIT is run on a database backend
     129        that supports FOR UPDATE but not NOWAIT, then we should find
     130        that a DatabaseError is raised.
     131        """
     132        self.assertRaises(
     133            DatabaseError,
     134            list,
     135            Person.objects.all().select_for_update(nowait=True)
     136        )
     137
     138    def run_select_for_update(self, status, nowait=False):
     139        """
     140        Utility method that runs a SELECT FOR UPDATE against all
     141        Person instances. After the select_for_update, it attempts
     142        to update the name of the only record, save, and commit.
     143
     144        In general, this will be run in a separate thread.
     145        """
     146        status.append('started')
     147        try:
     148            # We need to enter transaction management again, as this is done on
     149            # per-thread basis
     150            transaction.enter_transaction_management(True)
     151            transaction.managed(True)
     152            people = list(
     153                Person.objects.all().select_for_update(nowait=nowait)
     154            )
     155            people[0].name = 'Fred'
     156            people[0].save()
     157            transaction.commit()
     158        except DatabaseError, e:
     159            status.append(e)
     160        except Exception, e:
     161            raise
     162
     163    @requires_threading
     164    @skipUnlessDBFeature('has_select_for_update')
     165    @skipUnlessDBFeature('supports_transactions')
     166    def test_block(self):
     167        """
     168        Check that a thread running a select_for_update that
     169        accesses rows being touched by a similar operation
     170        on another connection blocks correctly.
     171        """
     172        # First, let's start the transaction in our thread.
     173        self.start_blocking_transaction()
     174
     175        # Now, try it again using the ORM's select_for_update
     176        # facility. Do this in a separate thread.
     177        status = []
     178        thread = threading.Thread(
     179            target=self.run_select_for_update, args=(status,)
     180        )
     181
     182        # The thread should immediately block, but we'll sleep
     183        # for a bit to make sure.
     184        thread.start()
     185        sanity_count = 0
     186        while len(status) != 1 and sanity_count < 10:
     187            sanity_count += 1
     188            time.sleep(1)
     189        if sanity_count >= 10:
     190            raise ValueError, 'Thread did not run and block'
     191
     192        # Check the person hasn't been updated. Since this isn't
     193        # using FOR UPDATE, it won't block.
     194        p = Person.objects.get(pk=self.person.pk)
     195        self.assertEqual('Reinhardt', p.name)
     196
     197        # When we end our blocking transaction, our thread should
     198        # be able to continue.
     199        self.end_blocking_transaction()
     200        thread.join(5.0)
     201
     202        # Check the thread has finished. Assuming it has, we should
     203        # find that it has updated the person's name.
     204        self.failIf(thread.isAlive())
     205        p = Person.objects.get(pk=self.person.pk)
     206        self.assertEqual('Fred', p.name)
     207
     208    @requires_threading
     209    @skipUnlessDBFeature('has_select_for_update')
     210    def test_raw_lock_not_available(self):
     211        """
     212        Check that running a raw query which can't obtain a FOR UPDATE lock
     213        raises the correct exception
     214        """
     215        self.start_blocking_transaction()
     216        def raw(status):
     217            try:
     218                list(
     219                    Person.objects.raw(
     220                        'SELECT * FROM %s %s' % (
     221                            Person._meta.db_table,
     222                            connection.ops.for_update_sql(nowait=True)
     223                        )
     224                    )
     225                )
     226            except DatabaseError, e:
     227                status.append(e)
     228        status = []
     229        thread = threading.Thread(target=raw, kwargs={'status': status})
     230        thread.start()
     231        time.sleep(1)
     232        thread.join()
     233        self.end_blocking_transaction()
     234        self.check_exc(status[-1])
     235
     236    @skipUnlessDBFeature('has_select_for_update')
     237    def test_transaction_dirty_managed(self):
     238        """ Check that a select_for_update sets the transaction to be
     239        dirty when executed under txn management. Setting the txn dirty
     240        means that it will be either committed or rolled back by Django,
     241        which will release any locks held by the SELECT FOR UPDATE.
     242        """
     243        people = list(Person.objects.select_for_update())
     244        self.assertTrue(transaction.is_dirty())
     245
     246    @skipUnlessDBFeature('has_select_for_update')
     247    def test_transaction_not_dirty_unmanaged(self):
     248        """ If we're not under txn management, the txn will never be
     249        marked as dirty.
     250        """
     251        transaction.managed(False)
     252        transaction.leave_transaction_management()
     253        people = list(Person.objects.select_for_update())
     254        self.assertFalse(transaction.is_dirty())
  • tests/modeltests/select_for_update/models.py

     
     1from django.db import models
     2
     3class Person(models.Model):
     4    name = models.CharField(max_length=30)
  • AUTHORS

     
    165165    eriks@win.tue.nl
    166166    Tomáš Ehrlich <tomas.ehrlich@gmail.com>
    167167    Dirk Eschler <dirk.eschler@gmx.net>
     168    Dan Fairs <dan@fezconsulting.com>
    168169    Marc Fargas <telenieko@telenieko.com>
    169170    Szilveszter Farkas <szilveszter.farkas@gmail.com>
    170171    Grigory Fateyev <greg@dial.com.ru>
  • docs/ref/models/querysets.txt

     
    940940    # queries the database with the 'backup' alias
    941941    >>> Entry.objects.using('backup')
    942942
     943select_for_update
     944~~~~~~~~~~~~~~~~~
    943945
     946.. method:: select_for_update(nowait=False)
     947
     948.. versionadded:: 1.4
     949
     950Returns a queryset that will lock rows until the end of the transaction,
     951generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
     952
     953For example::
     954
     955    entries = Entry.objects.select_for_update().filter(author=request.user)
     956
     957All matched entries will be locked until the end of the transaction block,
     958meaning that other transactions will be prevented from changing or acquiring
     959locks on them.
     960
     961Usually, if another transaction has already acquired a lock on one of the
     962selected rows, the query will block until the lock is released. If this is
     963not the behaviour you want, call ``select_for_update(nowait=True)``. This will
     964make the call non-blocking. If a conflicting lock is already acquired by
     965another transaction, ``django.db.utils.DatabaseError`` will be raised when
     966the queryset is evaluated.
     967
     968Note that using ``select_related`` will cause the current transaction to be set
     969dirty, if under transaction management. This is to ensure that Django issues a
     970``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
     971UPDATE``.
     972
     973Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
     974database backends support ``select_for_update()``. However, MySQL has no
     975support for the ``nowait`` argument.
     976
     977Passing ``nowait=True`` to ``select_for_update`` using database backends that
     978do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
     979raised. This is in order to prevent code unexpectedly blocking.
     980
     981Using ``select_for_update`` on backends which do not support
     982``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
     983
     984
    944985Methods that do not return QuerySets
    945986------------------------------------
    946987
  • docs/ref/databases.txt

     
    368368:class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
    369369respectively, a ``ValueError`` is raised rather than truncating data.
    370370
     371Row locking with ``QuerySet.select_for_update()``
     372-------------------------------------------------
     373
     374MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
     375statement. However, you may call the ``select_for_update()`` method of a
     376queryset with ``nowait=True``. In that case, the argument will be silently
     377discarded and the generated query will block until the requested lock can be
     378acquired.
     379
    371380.. _sqlite-notes:
    372381
    373382SQLite notes
Back to Top