Ticket #10154: 10154-r14446.diff
File 10154-r14446.diff, 19.9 KB (added by , 14 years ago) |
---|
-
django/db/models/expressions.py
1 from datetimeimport datetime1 import datetime 2 2 3 3 from django.utils import tree 4 4 from django.utils.copycompat import deepcopy … … 26 26 super(ExpressionNode, self).__init__(children, connector, negated) 27 27 28 28 def _combine(self, other, connector, reversed, node=None): 29 if isinstance(other, datetime.timedelta): 30 return DateModifierNode([self, other], connector) 31 29 32 if reversed: 30 33 obj = ExpressionNode([other], connector) 31 34 obj.add(node or self, connector) … … 111 114 112 115 def evaluate(self, evaluator, qn, connection): 113 116 return evaluator.evaluate_leaf(self, qn, connection) 117 118 class DateModifierNode(ExpressionNode): 119 """ 120 Node that implements the following syntax: 121 filter(end_date__gt=F('start_date') + datetime.timedelta(days=3, seconds=200)) 122 123 which translates into: 124 POSTGRES: 125 WHERE end_date > (start_date + INTERVAL '3 days 200 seconds') 126 127 MYSQL: 128 WHERE end_date > (start_date + INTERVAL '3 0:0:200:0' DAY_MICROSECOND) 129 130 ORACLE: 131 WHERE end_date > (start_date + INTERVAL '3 00:03:20.000000' DAY(1) TO SECOND(6)) 132 133 SQLITE: 134 WHERE end_date > django_format_dtdelta(start_date, "+" "3", "200", "0") 135 (A custom function is used in order to preserve six digits of fractional 136 second information on sqlite, and to format both date and datetime values.) 137 138 Note that microsecond comparisons are not well supported with MySQL, since 139 MySQL does not store microsecond information. 140 141 Only adding and subtracting timedeltas is supported, attempts to use other 142 operations raise a TypeError. 143 """ 144 def __init__(self, children, connector, negated=False): 145 if len(children) != 2: 146 raise TypeError('Must specify a node and a timedelta.') 147 if not isinstance(children[1], datetime.timedelta): 148 raise TypeError('Second child must be a timedelta.') 149 if connector not in (self.ADD, self.SUB): 150 raise TypeError('Connector must be + or -, not %s' % connector) 151 super(DateModifierNode, self).__init__(children, connector, negated) 152 153 def evaluate(self, evaluator, qn, connection): 154 timedelta = self.children.pop() 155 sql, params = evaluator.evaluate_node(self, qn, connection) 156 157 if timedelta.days == 0 and timedelta.seconds == 0 and \ 158 timedelta.microseconds == 0: 159 return sql, params 160 161 return connection.ops.date_interval_sql(sql, self.connector, timedelta), params -
django/db/backends/postgresql/operations.py
27 27 else: 28 28 return "EXTRACT('%s' FROM %s)" % (lookup_type, field_name) 29 29 30 def date_interval_sql(self, sql, connector, timedelta): 31 """ 32 implements the interval functionality for expressions 33 format for Postgres: 34 (datefield + interval '3 days 200 seconds 5 microseconds') 35 """ 36 modifiers = [] 37 if timedelta.days: 38 modifiers.append(u'%s days' % timedelta.days) 39 if timedelta.seconds: 40 modifiers.append(u'%s seconds' % timedelta.seconds) 41 if timedelta.microseconds: 42 modifiers.append(u'%s microseconds' % timedelta.microseconds) 43 mods = u' '.join(modifiers) 44 conn = u' %s ' % connector 45 return u'(%s)' % conn.join([sql, u'interval \'%s\'' % mods]) 46 30 47 def date_trunc_sql(self, lookup_type, field_name): 31 48 # http://www.postgresql.org/docs/8.0/static/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC 32 49 return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name) -
django/db/backends/sqlite3/base.py
9 9 10 10 import re 11 11 import sys 12 import datetime 12 13 13 14 from django.db import utils 14 15 from django.db.backends import * … … 90 91 # cause a collision with a field name). 91 92 return "django_extract('%s', %s)" % (lookup_type.lower(), field_name) 92 93 94 def date_interval_sql(self, sql, connector, timedelta): 95 # It would be more straightforward if we could use the sqlite strftime 96 # function, but it does not allow for keeping six digits of fractional 97 # second information, nor does it allow for formatting date and datetime 98 # values differently. So instead we register our own function that 99 # formats the datetime combined with the delta in a manner suitable 100 # for comparisons. 101 return u'django_format_dtdelta(%s, "%s", "%d", "%d", "%d")' % (sql, 102 connector, timedelta.days, timedelta.seconds, timedelta.microseconds) 103 93 104 def date_trunc_sql(self, lookup_type, field_name): 94 105 # sqlite doesn't support DATE_TRUNC, so we fake it with a user-defined 95 106 # function django_date_trunc that's registered in connect(). Note that … … 197 208 self.connection.create_function("django_extract", 2, _sqlite_extract) 198 209 self.connection.create_function("django_date_trunc", 2, _sqlite_date_trunc) 199 210 self.connection.create_function("regexp", 2, _sqlite_regexp) 211 self.connection.create_function("django_format_dtdelta", 5, _sqlite_format_dtdelta) 200 212 connection_created.send(sender=self.__class__, connection=self) 201 213 return self.connection.cursor(factory=SQLiteCursorWrapper) 202 214 … … 260 272 elif lookup_type == 'day': 261 273 return "%i-%02i-%02i 00:00:00" % (dt.year, dt.month, dt.day) 262 274 275 def _sqlite_format_dtdelta(dt, conn, days, secs, usecs): 276 try: 277 dt = util.typecast_timestamp(dt) 278 delta = datetime.timedelta(int(days), int(secs), int(usecs)) 279 if conn.strip() == '+': 280 dt = dt + delta 281 else: 282 dt = dt - delta 283 except (ValueError, TypeError): 284 return None 285 286 if isinstance(dt, datetime.datetime): 287 rv = dt.strftime("%Y-%m-%d %H:%M:%S") 288 if dt.microsecond: 289 rv = "%s.%0.6d" % (rv, dt.microsecond) 290 else: 291 rv = dt.strftime("%Y-%m-%d") 292 return rv 293 263 294 def _sqlite_regexp(re_pattern, re_string): 264 295 import re 265 296 try: -
django/db/backends/mysql/base.py
158 158 sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str) 159 159 return sql 160 160 161 def date_interval_sql(self, sql, connector, timedelta): 162 return "(%s %s INTERVAL '%d 0:0:%d:%d' DAY_MICROSECOND)" % (sql, connector, 163 timedelta.days, timedelta.seconds, timedelta.microseconds) 164 161 165 def drop_foreignkey_sql(self): 162 166 return "DROP FOREIGN KEY" 163 167 -
django/db/backends/oracle/base.py
95 95 else: 96 96 return "EXTRACT(%s FROM %s)" % (lookup_type, field_name) 97 97 98 def date_interval_sql(self, sql, connector, timedelta): 99 """ 100 Implements the interval functionality for expressions 101 format for Oracle: 102 (datefield + INTERVAL '3 00:03:20.000000' DAY(1) TO SECOND(6)) 103 """ 104 minutes, seconds = divmod(timedelta.seconds, 60) 105 hours, minutes = divmod(minutes, 60) 106 days = str(timedelta.days) 107 day_precision = len(days) 108 fmt = "(%s %s INTERVAL '%s %02d:%02d:%02d.%06d' DAY(%d) TO SECOND(6))" 109 return fmt % (sql, connector, days, hours, minutes, seconds, 110 timedelta.microseconds, day_precision) 111 98 112 def date_trunc_sql(self, lookup_type, field_name): 99 113 # Oracle uses TRUNC() for both dates and numbers. 100 114 # http://download-east.oracle.com/docs/cd/B10501_01/server.920/a96540/functions155a.htm#SQLRF06151 -
django/db/backends/__init__.py
216 216 """ 217 217 raise NotImplementedError() 218 218 219 def date_interval_sql(self, sql, connector, timedelta): 220 """ 221 Implements the date interval functionality for expressions 222 """ 223 raise NotImplementedError() 224 219 225 def date_trunc_sql(self, lookup_type, field_name): 220 226 """ 221 227 Given a lookup_type of 'year', 'month' or 'day', returns the SQL that -
tests/regressiontests/expressions_regress/tests.py
1 import datetime 2 3 from django.test import TestCase 4 from django.db.models import F 5 from django.conf import settings 6 from django.db import DEFAULT_DB_ALIAS 7 8 from models import Experiment 9 10 class FTimeDeltaTests(TestCase): 11 12 def setUp(self): 13 db = settings.DATABASES[DEFAULT_DB_ALIAS] 14 self.db_is_mysql = db['ENGINE'] == 'django.db.backends.mysql' 15 self.db_is_sqlite = db['ENGINE'] == 'django.db.backends.sqlite3' 16 17 sday = datetime.date(2010, 6, 25) 18 stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000) 19 midnight = datetime.time(0) 20 21 delta0 = datetime.timedelta(0) 22 delta1 = datetime.timedelta(microseconds=253000) 23 delta2 = datetime.timedelta(seconds=44) 24 delta3 = datetime.timedelta(hours=21, minutes=8) 25 delta4 = datetime.timedelta(days=10) 26 27 # Test data is set so that deltas and delays will be 28 # strictly increasing. 29 self.deltas = [] 30 self.delays = [] 31 self.days_long = [] 32 33 # e0: started same day as assigned, zero duration 34 end = stime+delta0 35 e0 = Experiment.objects.create(name='e0', assigned=sday, start=stime, 36 end=end, completed=end.date()) 37 self.deltas.append(delta0) 38 self.delays.append(e0.start- 39 datetime.datetime.combine(e0.assigned, midnight)) 40 self.days_long.append(e0.completed-e0.assigned) 41 42 # e1: started one day after assigned, tiny duration, data 43 # set so that end time has no fractional seconds, which 44 # tests an edge case on sqlite. This Experiment is only 45 # included in the test data when DB is not MySQL (since 46 # on MySQL microseconds are dropped from datetime fields). 47 if not self.db_is_mysql: 48 delay = datetime.timedelta(1) 49 end = stime + delay + delta1 50 e1 = Experiment.objects.create(name='e1', assigned=sday, 51 start=stime+delay, end=end, completed=end.date()) 52 self.deltas.append(delta1) 53 self.delays.append(e1.start- 54 datetime.datetime.combine(e1.assigned, midnight)) 55 self.days_long.append(e1.completed-e1.assigned) 56 57 # e2: started three days after assigned, small duration 58 end = stime+delta2 59 e2 = Experiment.objects.create(name='e2', 60 assigned=sday-datetime.timedelta(3), start=stime, end=end, 61 completed=end.date()) 62 self.deltas.append(delta2) 63 self.delays.append(e2.start- 64 datetime.datetime.combine(e2.assigned, midnight)) 65 self.days_long.append(e2.completed-e2.assigned) 66 67 # e3: started four days after assigned, medium duration 68 delay = datetime.timedelta(4) 69 end = stime + delay + delta3 70 e3 = Experiment.objects.create(name='e3', 71 assigned=sday, start=stime+delay, end=end, completed=end.date()) 72 self.deltas.append(delta3) 73 self.delays.append(e3.start- 74 datetime.datetime.combine(e3.assigned, midnight)) 75 self.days_long.append(e3.completed-e3.assigned) 76 77 # e4: started 10 days after assignment, long duration 78 end = stime + delta4 79 e4 = Experiment.objects.create(name='e4', 80 assigned=sday-datetime.timedelta(10), start=stime, end=end, 81 completed=end.date()) 82 self.deltas.append(delta4) 83 self.delays.append(e4.start- 84 datetime.datetime.combine(e4.assigned, midnight)) 85 self.days_long.append(e4.completed-e4.assigned) 86 self.expnames = [e.name for e in Experiment.objects.all()] 87 88 def test_delta_add(self): 89 for i in range(len(self.deltas)): 90 delta = self.deltas[i] 91 test_set = [e.name for e in 92 Experiment.objects.filter(end__lt=F('start')+delta)] 93 self.assertEqual(test_set, self.expnames[:i]) 94 95 test_set = [e.name for e in 96 Experiment.objects.filter(end__lte=F('start')+delta)] 97 self.assertEqual(test_set, self.expnames[:i+1]) 98 99 def test_delta_subtract(self): 100 for i in range(len(self.deltas)): 101 delta = self.deltas[i] 102 test_set = [e.name for e in 103 Experiment.objects.filter(start__gt=F('end')-delta)] 104 self.assertEqual(test_set, self.expnames[:i]) 105 106 test_set = [e.name for e in 107 Experiment.objects.filter(start__gte=F('end')-delta)] 108 self.assertEqual(test_set, self.expnames[:i+1]) 109 110 def test_exclude(self): 111 for i in range(len(self.deltas)): 112 delta = self.deltas[i] 113 test_set = [e.name for e in 114 Experiment.objects.exclude(end__lt=F('start')+delta)] 115 self.assertEqual(test_set, self.expnames[i:]) 116 117 test_set = [e.name for e in 118 Experiment.objects.exclude(end__lte=F('start')+delta)] 119 self.assertEqual(test_set, self.expnames[i+1:]) 120 121 def test_date_comparison(self): 122 for i in range(len(self.days_long)): 123 days = self.days_long[i] 124 test_set = [e.name for e in 125 Experiment.objects.filter(completed__lt=F('assigned')+days)] 126 self.assertEqual(test_set, self.expnames[:i]) 127 128 test_set = [e.name for e in 129 Experiment.objects.filter(completed__lte=F('assigned')+days)] 130 self.assertEqual(test_set, self.expnames[:i+1]) 131 132 def test_mixed_comparisons1(self): 133 for i in range(len(self.delays)): 134 delay = self.delays[i] 135 if self.db_is_mysql: 136 delay = datetime.timedelta(delay.days, delay.seconds) 137 test_set = [e.name for e in 138 Experiment.objects.filter(assigned__gt=F('start')-delay)] 139 self.assertEqual(test_set, self.expnames[:i]) 140 141 test_set = [e.name for e in 142 Experiment.objects.filter(assigned__gte=F('start')-delay)] 143 self.assertEqual(test_set, self.expnames[:i+1]) 144 145 def test_mixed_comparisons2(self): 146 delays = [datetime.timedelta(delay.days) for delay in self.delays] 147 for i in range(len(delays)): 148 delay = delays[i] 149 test_set = [e.name for e in 150 Experiment.objects.filter(start__lt=F('assigned')+delay)] 151 self.assertEqual(test_set, self.expnames[:i]) 152 153 test_set = [e.name for e in 154 Experiment.objects.filter(start__lte=F('assigned')+delay+ 155 datetime.timedelta(1))] 156 self.assertEqual(test_set, self.expnames[:i+1]) 157 158 def test_delta_update(self): 159 for i in range(len(self.deltas)): 160 delta = self.deltas[i] 161 exps = Experiment.objects.all() 162 expected_durations = [e.duration() for e in exps] 163 expected_starts = [e.start+delta for e in exps] 164 expected_ends = [e.end+delta for e in exps] 165 166 Experiment.objects.update(start=F('start')+delta, end=F('end')+delta) 167 exps = Experiment.objects.all() 168 new_starts = [e.start for e in exps] 169 new_ends = [e.end for e in exps] 170 new_durations = [e.duration() for e in exps] 171 self.assertEqual(expected_starts, new_starts) 172 self.assertEqual(expected_ends, new_ends) 173 self.assertEqual(expected_durations, new_durations) 174 175 def test_delta_invalid_op_mult(self): 176 raised = False 177 try: 178 r = repr(Experiment.objects.filter(end__lt=F('start')*self.deltas[0])) 179 except TypeError: 180 raised = True 181 self.assertTrue(raised, "TypeError not raised on attempt to multiply datetime by timedelta.") 182 183 def test_delta_invalid_op_div(self): 184 raised = False 185 try: 186 r = repr(Experiment.objects.filter(end__lt=F('start')/self.deltas[0])) 187 except TypeError: 188 raised = True 189 self.assertTrue(raised, "TypeError not raised on attempt to divide datetime by timedelta.") 190 191 def test_delta_invalid_op_mod(self): 192 raised = False 193 try: 194 r = repr(Experiment.objects.filter(end__lt=F('start')%self.deltas[0])) 195 except TypeError: 196 raised = True 197 self.assertTrue(raised, "TypeError not raised on attempt to modulo divide datetime by timedelta.") 198 199 def test_delta_invalid_op_and(self): 200 raised = False 201 try: 202 r = repr(Experiment.objects.filter(end__lt=F('start')&self.deltas[0])) 203 except TypeError: 204 raised = True 205 self.assertTrue(raised, "TypeError not raised on attempt to binary and a datetime with a timedelta.") 206 207 def test_delta_invalid_op_or(self): 208 raised = False 209 try: 210 r = repr(Experiment.objects.filter(end__lt=F('start')|self.deltas[0])) 211 except TypeError: 212 raised = True 213 self.assertTrue(raised, "TypeError not raised on attempt to binary or a datetime with a timedelta.") 214 1 215 """ 2 216 Spanning tests for all the operations that F() expressions can perform. 3 217 """ -
tests/regressiontests/expressions_regress/models.py
10 10 def __unicode__(self): 11 11 return u'%i, %.3f' % (self.integer, self.float) 12 12 13 class Experiment(models.Model): 14 name = models.CharField(max_length=24) 15 assigned = models.DateField() 16 completed = models.DateField() 17 start = models.DateTimeField() 18 end = models.DateTimeField() 19 20 class Meta: 21 ordering = ('name',) 22 23 def duration(self): 24 return self.end - self.start 25 -
docs/topics/db/queries.txt
36 36 headline = models.CharField(max_length=255) 37 37 body_text = models.TextField() 38 38 pub_date = models.DateTimeField() 39 mod_date = models.DateTimeField() 39 40 authors = models.ManyToManyField(Author) 40 41 n_comments = models.IntegerField() 41 42 n_pingbacks = models.IntegerField() … … 538 539 539 540 >>> Entry.objects.filter(authors__name=F('blog__name')) 540 541 542 For date fields, you can add or subtract a ``datetime.timedelta`` object. The 543 following would return all entries that were modified more than 3 days after 544 they were published: 545 546 >>> from datetime import timedelta 547 >>> Entry.objects.filter(mod_date__gt=F('pub_date') + timedelta(days=3)) 548 541 549 The pk lookup shortcut 542 550 ---------------------- 543 551