Ticket #10154: 10154.diff
File 10154.diff, 20.1 KB (added by , 14 years ago) |
---|
-
django/db/models/expressions.py
1 from datetimeimport datetime1 import datetime 2 2 3 3 from django.utils import tree 4 4 from django.utils.copycompat import deepcopy … … 26 26 super(ExpressionNode, self).__init__(children, connector, negated) 27 27 28 28 def _combine(self, other, connector, reversed, node=None): 29 if isinstance(other, datetime.timedelta): 30 return DateModifierNode([self, other], connector) 31 29 32 if reversed: 30 33 obj = ExpressionNode([other], connector) 31 34 obj.add(node or self, connector) … … 111 114 112 115 def evaluate(self, evaluator, qn, connection): 113 116 return evaluator.evaluate_leaf(self, qn, connection) 117 118 class DateModifierNode(ExpressionNode): 119 """ 120 Node that implements the following syntax: 121 filter(end_date__gt=F('start_date') + datetime.timedelta(days=3, seconds=200)) 122 123 which translates into: 124 POSTGRES: 125 WHERE end_date > (start_date + INTERVAL '3 days 200 seconds') 126 127 MYSQL: 128 WHERE end_date > (start_date + INTERVAL '3 0:0:200:0' DAY_MICROSECOND) 129 130 ORACLE: 131 WHERE end_date > (start_date + INTERVAL '3 00:03:20.000000' DAY(1) TO SECOND(6)) 132 133 SQLITE: 134 WHERE end_date > django_format_dtdelta(start_date, "+" "3", "200", "0") 135 (A custom function is used in order to preserve six digits of fractional 136 second information on sqlite, and to format both date and datetime values.) 137 138 Note that microsecond comparisons are not well supported with MySQL, since 139 MySQL does not store microsecond information. 140 141 Only adding and subtracting timedeltas is supported, attempts to use other 142 operations raise a TypeError. 143 """ 144 def __init__(self, children, connector, negated=False): 145 if len(children) != 2: 146 raise TypeError('Must specify a node and a timedelta.') 147 if not isinstance(children[1], datetime.timedelta): 148 raise TypeError('Second child must be a timedelta.') 149 if connector not in (self.ADD, self.SUB): 150 raise TypeError('Connector must be + or -, not %s' % connector) 151 super(DateModifierNode, self).__init__(children, connector, negated) 152 153 def evaluate(self, evaluator, qn, connection): 154 timedelta = self.children.pop() 155 sql, params = evaluator.evaluate_node(self, qn, connection) 156 157 if timedelta.days == 0 and timedelta.seconds == 0 and \ 158 timedelta.microseconds == 0: 159 return sql, params 160 161 return connection.ops.date_interval_sql(sql, self.connector, timedelta), params -
django/db/backends/postgresql/operations.py
27 27 else: 28 28 return "EXTRACT('%s' FROM %s)" % (lookup_type, field_name) 29 29 30 def date_interval_sql(self, sql, connector, timedelta): 31 """ 32 implements the interval functionality for expressions 33 format for Postgres: 34 (datefield + interval '3 days 200 seconds 5 microseconds') 35 """ 36 modifiers = [] 37 if timedelta.days: 38 modifiers.append(u'%s days' % timedelta.days) 39 if timedelta.seconds: 40 modifiers.append(u'%s seconds' % timedelta.seconds) 41 if timedelta.microseconds: 42 modifiers.append(u'%s microseconds' % timedelta.microseconds) 43 mods = u' '.join(modifiers) 44 conn = u' %s ' % connector 45 return u'(%s)' % conn.join([sql, u'interval \'%s\'' % mods]) 46 30 47 def date_trunc_sql(self, lookup_type, field_name): 31 48 # http://www.postgresql.org/docs/8.0/static/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC 32 49 return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name) -
django/db/backends/sqlite3/base.py
9 9 10 10 import re 11 11 import sys 12 import datetime 12 13 13 14 from django.db import utils 14 15 from django.db.backends import * … … 69 70 # cause a collision with a field name). 70 71 return "django_extract('%s', %s)" % (lookup_type.lower(), field_name) 71 72 73 def date_interval_sql(self, sql, connector, timedelta): 74 # It would be more straightforward if we could use the sqlite strftime 75 # function, but it does not allow for keeping six digits of fractional 76 # second information, nor does it allow for formatting date and datetime 77 # values differently. So instead we register our own function that 78 # formats the datetime combined with the delta in a manner suitable 79 # for comparisons. 80 return u'django_format_dtdelta(%s, "%s", "%d", "%d", "%d")' % (sql, 81 connector, timedelta.days, timedelta.seconds, timedelta.microseconds) 82 72 83 def date_trunc_sql(self, lookup_type, field_name): 73 84 # sqlite doesn't support DATE_TRUNC, so we fake it with a user-defined 74 85 # function django_date_trunc that's registered in connect(). Note that … … 176 187 self.connection.create_function("django_extract", 2, _sqlite_extract) 177 188 self.connection.create_function("django_date_trunc", 2, _sqlite_date_trunc) 178 189 self.connection.create_function("regexp", 2, _sqlite_regexp) 190 self.connection.create_function("django_format_dtdelta", 5, _sqlite_format_dtdelta) 179 191 connection_created.send(sender=self.__class__) 180 192 return self.connection.cursor(factory=SQLiteCursorWrapper) 181 193 … … 239 251 elif lookup_type == 'day': 240 252 return "%i-%02i-%02i 00:00:00" % (dt.year, dt.month, dt.day) 241 253 254 def _sqlite_format_dtdelta(dt, conn, days, secs, usecs): 255 try: 256 dt = util.typecast_timestamp(dt) 257 delta = datetime.timedelta(int(days), int(secs), int(usecs)) 258 if conn.strip() == '+': 259 dt = dt + delta 260 else: 261 dt = dt - delta 262 except (ValueError, TypeError): 263 return None 264 265 if isinstance(dt, datetime.datetime): 266 rv = dt.strftime("%Y-%m-%d %H:%M:%S") 267 if dt.microsecond: 268 rv = "%s.%0.6d" % (rv, dt.microsecond) 269 else: 270 rv = dt.strftime("%Y-%m-%d") 271 return rv 272 242 273 def _sqlite_regexp(re_pattern, re_string): 243 274 import re 244 275 try: -
django/db/backends/mysql/base.py
150 150 sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str) 151 151 return sql 152 152 153 def date_interval_sql(self, sql, connector, timedelta): 154 return "(%s %s INTERVAL '%d 0:0:%d:%d' DAY_MICROSECOND)" % (sql, connector, 155 timedelta.days, timedelta.seconds, timedelta.microseconds) 156 153 157 def drop_foreignkey_sql(self): 154 158 return "DROP FOREIGN KEY" 155 159 -
django/db/backends/oracle/base.py
93 93 else: 94 94 return "EXTRACT(%s FROM %s)" % (lookup_type, field_name) 95 95 96 def date_interval_sql(self, sql, connector, timedelta): 97 """ 98 Implements the interval functionality for expressions 99 format for Oracle: 100 (datefield + INTERVAL '3 00:03:20.000000' DAY(1) TO SECOND(6)) 101 """ 102 minutes, seconds = divmod(timedelta.seconds, 60) 103 hours, minutes = divmod(minutes, 60) 104 days = str(timedelta.days) 105 day_precision = len(days) 106 fmt = "(%s %s INTERVAL '%s %02d:%02d:%02d.%06d' DAY(%d) TO SECOND(6))" 107 return fmt % (sql, connector, days, hours, minutes, seconds, 108 timedelta.microseconds, day_precision) 109 96 110 def date_trunc_sql(self, lookup_type, field_name): 97 111 # Oracle uses TRUNC() for both dates and numbers. 98 112 # http://download-east.oracle.com/docs/cd/B10501_01/server.920/a96540/functions155a.htm#SQLRF06151 -
django/db/backends/__init__.py
124 124 """ 125 125 raise NotImplementedError() 126 126 127 def date_interval_sql(self, sql, connector, timedelta): 128 """ 129 Implements the date interval functionality for expressions 130 """ 131 raise NotImplementedError() 132 127 133 def date_trunc_sql(self, lookup_type, field_name): 128 134 """ 129 135 Given a lookup_type of 'year', 'month' or 'day', returns the SQL that -
tests/regressiontests/expressions_regress/tests.py
1 import datetime 2 3 from django.test import TestCase 4 from django.db.models import F 5 from django.conf import settings 6 from django.db import DEFAULT_DB_ALIAS 7 8 from models import Experiment 9 10 class FTimeDeltaTests(TestCase): 11 12 def setUp(self): 13 db = settings.DATABASES[DEFAULT_DB_ALIAS] 14 self.db_is_mysql = db['ENGINE'] == 'django.db.backends.mysql' 15 self.db_is_sqlite = db['ENGINE'] == 'django.db.backends.sqlite3' 16 17 sday = datetime.date(2010, 6, 25) 18 stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000) 19 midnight = datetime.time(0) 20 21 delta0 = datetime.timedelta(0) 22 delta1 = datetime.timedelta(microseconds=253000) 23 delta2 = datetime.timedelta(seconds=44) 24 delta3 = datetime.timedelta(hours=21, minutes=8) 25 delta4 = datetime.timedelta(days=10) 26 27 # Test data is set so that deltas and delays will be 28 # strictly increasing. 29 self.deltas = [] 30 self.delays = [] 31 self.days_long = [] 32 33 # e0: started same day as assigned, zero duration 34 end = stime+delta0 35 e0 = Experiment.objects.create(name='e0', assigned=sday, start=stime, 36 end=end, completed=end.date()) 37 self.deltas.append(delta0) 38 self.delays.append(e0.start- 39 datetime.datetime.combine(e0.assigned, midnight)) 40 self.days_long.append(e0.completed-e0.assigned) 41 42 # e1: started one day after assigned, tiny duration, data 43 # set so that end time has no fractional seconds, which 44 # tests an edge case on sqlite. This Experiment is only 45 # included in the test data when DB is not MySQL (since 46 # on MySQL microseconds are dropped from datetime fields). 47 if not self.db_is_mysql: 48 delay = datetime.timedelta(1) 49 end = stime + delay + delta1 50 e1 = Experiment.objects.create(name='e1', assigned=sday, 51 start=stime+delay, end=end, completed=end.date()) 52 self.deltas.append(delta1) 53 self.delays.append(e1.start- 54 datetime.datetime.combine(e1.assigned, midnight)) 55 self.days_long.append(e1.completed-e1.assigned) 56 57 # e2: started three days after assigned, small duration 58 end = stime+delta2 59 e2 = Experiment.objects.create(name='e2', 60 assigned=sday-datetime.timedelta(3), start=stime, end=end, 61 completed=end.date()) 62 self.deltas.append(delta2) 63 self.delays.append(e2.start- 64 datetime.datetime.combine(e2.assigned, midnight)) 65 self.days_long.append(e2.completed-e2.assigned) 66 67 # e3: started four days after assigned, medium duration 68 delay = datetime.timedelta(4) 69 end = stime + delay + delta3 70 e3 = Experiment.objects.create(name='e3', 71 assigned=sday, start=stime+delay, end=end, completed=end.date()) 72 self.deltas.append(delta3) 73 self.delays.append(e3.start- 74 datetime.datetime.combine(e3.assigned, midnight)) 75 self.days_long.append(e3.completed-e3.assigned) 76 77 # e4: started 10 days after assignment, long duration 78 end = stime + delta4 79 e4 = Experiment.objects.create(name='e4', 80 assigned=sday-datetime.timedelta(10), start=stime, end=end, 81 completed=end.date()) 82 self.deltas.append(delta4) 83 self.delays.append(e4.start- 84 datetime.datetime.combine(e4.assigned, midnight)) 85 self.days_long.append(e4.completed-e4.assigned) 86 self.expnames = [e.name for e in Experiment.objects.all()] 87 88 def test_delta_add(self): 89 for i in range(len(self.deltas)): 90 delta = self.deltas[i] 91 test_set = [e.name for e in 92 Experiment.objects.filter(end__lt=F('start')+delta)] 93 self.assertEqual(test_set, self.expnames[:i]) 94 95 test_set = [e.name for e in 96 Experiment.objects.filter(end__lte=F('start')+delta)] 97 self.assertEqual(test_set, self.expnames[:i+1]) 98 99 def test_delta_subtract(self): 100 for i in range(len(self.deltas)): 101 delta = self.deltas[i] 102 test_set = [e.name for e in 103 Experiment.objects.filter(start__gt=F('end')-delta)] 104 self.assertEqual(test_set, self.expnames[:i]) 105 106 test_set = [e.name for e in 107 Experiment.objects.filter(start__gte=F('end')-delta)] 108 self.assertEqual(test_set, self.expnames[:i+1]) 109 110 def test_exclude(self): 111 for i in range(len(self.deltas)): 112 delta = self.deltas[i] 113 test_set = [e.name for e in 114 Experiment.objects.exclude(end__lt=F('start')+delta)] 115 self.assertEqual(test_set, self.expnames[i:]) 116 117 test_set = [e.name for e in 118 Experiment.objects.exclude(end__lte=F('start')+delta)] 119 self.assertEqual(test_set, self.expnames[i+1:]) 120 121 def test_date_comparison(self): 122 for i in range(len(self.days_long)): 123 days = self.days_long[i] 124 test_set = [e.name for e in 125 Experiment.objects.filter(completed__lt=F('assigned')+days)] 126 self.assertEqual(test_set, self.expnames[:i]) 127 128 test_set = [e.name for e in 129 Experiment.objects.filter(completed__lte=F('assigned')+days)] 130 self.assertEqual(test_set, self.expnames[:i+1]) 131 132 def test_mixed_comparisons1(self): 133 for i in range(len(self.delays)): 134 delay = self.delays[i] 135 if self.db_is_mysql: 136 delay = datetime.timedelta(delay.days, delay.seconds) 137 test_set = [e.name for e in 138 Experiment.objects.filter(assigned__gt=F('start')-delay)] 139 self.assertEqual(test_set, self.expnames[:i]) 140 141 test_set = [e.name for e in 142 Experiment.objects.filter(assigned__gte=F('start')-delay)] 143 self.assertEqual(test_set, self.expnames[:i+1]) 144 145 def test_mixed_comparisons2(self): 146 delays = [datetime.timedelta(delay.days) for delay in self.delays] 147 for i in range(len(delays)): 148 delay = delays[i] 149 test_set = [e.name for e in 150 Experiment.objects.filter(start__lt=F('assigned')+delay)] 151 self.assertEqual(test_set, self.expnames[:i]) 152 153 test_set = [e.name for e in 154 Experiment.objects.filter(start__lte=F('assigned')+delay+ 155 datetime.timedelta(1))] 156 self.assertEqual(test_set, self.expnames[:i+1]) 157 158 def test_delta_update(self): 159 for i in range(len(self.deltas)): 160 delta = self.deltas[i] 161 exps = Experiment.objects.all() 162 expected_durations = [e.duration() for e in exps] 163 expected_starts = [e.start+delta for e in exps] 164 expected_ends = [e.end+delta for e in exps] 165 166 Experiment.objects.update(start=F('start')+delta, end=F('end')+delta) 167 exps = Experiment.objects.all() 168 new_starts = [e.start for e in exps] 169 new_ends = [e.end for e in exps] 170 new_durations = [e.duration() for e in exps] 171 self.assertEqual(expected_starts, new_starts) 172 self.assertEqual(expected_ends, new_ends) 173 self.assertEqual(expected_durations, new_durations) 174 175 def test_delta_invalid_op_mult(self): 176 raised = False 177 try: 178 r = repr(Experiment.objects.filter(end__lt=F('start')*self.deltas[0])) 179 except TypeError: 180 raised = True 181 self.assertTrue(raised, "TypeError not raised on attempt to multiply datetime by timedelta.") 182 183 def test_delta_invalid_op_div(self): 184 raised = False 185 try: 186 r = repr(Experiment.objects.filter(end__lt=F('start')/self.deltas[0])) 187 except TypeError: 188 raised = True 189 self.assertTrue(raised, "TypeError not raised on attempt to divide datetime by timedelta.") 190 191 def test_delta_invalid_op_mod(self): 192 raised = False 193 try: 194 r = repr(Experiment.objects.filter(end__lt=F('start')%self.deltas[0])) 195 except TypeError: 196 raised = True 197 self.assertTrue(raised, "TypeError not raised on attempt to modulo divide datetime by timedelta.") 198 199 def test_delta_invalid_op_and(self): 200 raised = False 201 try: 202 r = repr(Experiment.objects.filter(end__lt=F('start')&self.deltas[0])) 203 except TypeError: 204 raised = True 205 self.assertTrue(raised, "TypeError not raised on attempt to binary and a datetime with a timedelta.") 206 207 def test_delta_invalid_op_or(self): 208 raised = False 209 try: 210 r = repr(Experiment.objects.filter(end__lt=F('start')|self.deltas[0])) 211 except TypeError: 212 raised = True 213 self.assertTrue(raised, "TypeError not raised on attempt to binary or a datetime with a timedelta.") 214 -
tests/regressiontests/expressions_regress/models.py
Property changes on: tests/regressiontests/expressions_regress/tests.py ___________________________________________________________________ Added: svn:executable + * Added: svn:eol-style + native
15 15 def __unicode__(self): 16 16 return u'%i, %.3f' % (self.integer, self.float) 17 17 18 class Experiment(models.Model): 19 name = models.CharField(max_length=24) 20 assigned = models.DateField() 21 completed = models.DateField() 22 start = models.DateTimeField() 23 end = models.DateTimeField() 24 25 class Meta: 26 ordering = ('name',) 27 28 def duration(self): 29 return self.end - self.start 30 18 31 19 32 __test__ = {'API_TESTS': """ 20 33 >>> from django.db.models import F -
docs/topics/db/queries.txt
38 38 headline = models.CharField(max_length=255) 39 39 body_text = models.TextField() 40 40 pub_date = models.DateTimeField() 41 mod_date = models.DateTimeField() 41 42 authors = models.ManyToManyField(Author) 42 43 n_comments = models.IntegerField() 43 44 n_pingbacks = models.IntegerField() … … 535 536 536 537 >>> Entry.objects.filter(authors__name=F('blog__name')) 537 538 539 For date fields, you can add or subtract a ``datetime.timedelta`` object. The 540 following would return all entries that were modified more than 3 days after 541 they were published: 542 543 >>> from datetime import timedelta 544 >>> Entry.objects.filter(mod_date__gt=F('pub_date') + timedelta(days=3)) 545 538 546 The pk lookup shortcut 539 547 ---------------------- 540 548