Ticket #13376: messages_expirity_capabilities.2.diff
File messages_expirity_capabilities.2.diff, 35.8 KB (added by , 14 years ago) |
---|
-
contrib/messages/restrictions.py
1 import time as time_provider 2 3 4 class Restriction (object): 5 def __init__(self, name): 6 self.type = name 7 8 def is_expired(self): 9 ''' indicates whether given message should be removed ''' 10 raise NotImplementedError () 11 12 def on_display (self): 13 ''' called when iterated - does nothing by default ''' 14 15 16 class TimeRestriction (Restriction): 17 def __init__(self, seconds): 18 """ 19 seconds - expiration time since now 20 """ 21 Restriction.__init__(self, 'time') 22 created = time_provider.time() 23 self.expires = created + int(seconds) 24 25 def set_expirity_time(self, expiration_time): 26 """ 27 Sets expilcity expiration time 28 """ 29 self.expires = int(expiration_time) 30 31 def is_expired (self): 32 return self.expires < time_provider.time() 33 34 def __eq__(self, other): 35 return self.expires == other.expires 36 37 38 class AmountRestriction (Restriction): 39 def __init__(self, amount): 40 assert int(amount) >= 0 41 Restriction.__init__(self, 'amount') 42 self.can_be_shown = int(amount) 43 44 def on_display (self): 45 self.can_be_shown -= 1 46 47 def is_expired(self): 48 return int(self.can_be_shown) <= 0 49 50 def __eq__(self, other): 51 return self.can_be_shown == other.can_be_shown 52 -
contrib/messages/api.py
13 13 pass 14 14 15 15 16 def add_message(request, level, message, extra_tags='', fail_silently=False ):16 def add_message(request, level, message, extra_tags='', fail_silently=False, restrictions = []): 17 17 """ 18 18 Attempts to add a message to the request using the 'messages' app, falling 19 19 back to the user's message_set if MessageMiddleware hasn't been enabled. 20 20 """ 21 21 if hasattr(request, '_messages'): 22 return request._messages.add(level, message, extra_tags )22 return request._messages.add(level, message, extra_tags, restrictions = restrictions) 23 23 if hasattr(request, 'user') and request.user.is_authenticated(): 24 24 return request.user.message_set.create(message=message) 25 25 if not fail_silently: -
contrib/messages/tests/restrictions.py
1 from django.test import TestCase 2 from django.contrib.messages import restrictions 3 from django.contrib.messages.restrictions import AmountRestriction, TimeRestriction 4 from django.contrib.messages.tests.time_provider import TestTimeProvider 5 6 restrictions.time_provider = TestTimeProvider () 7 8 class RestrictionsTest(TestCase): 9 def __check_expired(self, amount_restriction, iterations_amount): 10 """ 11 Checks whether after iterations_amount of on_displayate given restriction will become expired 12 But before iterations_amount given amount_restriction must not indicate is_expired 13 """ 14 for i in range(iterations_amount): 15 self.assertFalse(amount_restriction.is_expired()) 16 amount_restriction.on_display() 17 self.assertTrue(amount_restriction.is_expired()) 18 19 def test_amount_restrictions(self): 20 res = AmountRestriction(4) 21 self.__check_expired(res, 4) 22 23 def test_amount_restrictions_invalid_argument(self): 24 self.assertRaises(AssertionError, AmountRestriction, -1) 25 26 def test_equal(self): 27 self.assertEqual(AmountRestriction(5), AmountRestriction(5)) 28 self.assertFalse(AmountRestriction(1) == AmountRestriction(3)) 29 self.assertEqual(TimeRestriction(2), TimeRestriction(2)) 30 self.assertFalse(TimeRestriction(3) == TimeRestriction(4)) 31 -
contrib/messages/tests/time_provider.py
1 class TestTimeProvider(object): 2 def __init__(self, act_time = 0): 3 self.act_time = act_time 4 def set_act_time(self, act_time): 5 self.act_time = act_time 6 def time(self): 7 return self.act_time 8 def inc_act_time(self): 9 self.act_time += 1 -
contrib/messages/tests/cookie.py
1 1 from django.contrib.messages import constants 2 2 from django.contrib.messages.tests.base import BaseTest 3 from django.contrib.messages.tests.time_provider import TestTimeProvider 4 from django.contrib.messages import restrictions 5 from django.contrib.messages.restrictions import TimeRestriction, AmountRestriction 3 6 from django.contrib.messages.storage.cookie import CookieStorage, \ 4 7 MessageEncoder, MessageDecoder 5 8 from django.contrib.messages.storage.base import Message … … 46 49 def test_get(self): 47 50 storage = self.storage_class(self.get_request()) 48 51 # Set initial data. 49 example_messages = ['test', 'me'] 52 example_messages = [Message(constants.INFO, 'test'), Message(constants.INFO, 'me')] 53 expected_messages = [Message(constants.INFO, 'test', restrictions = [AmountRestriction(0),]),\ 54 Message(constants.INFO, 'me', restrictions = [AmountRestriction(0),])] 50 55 set_cookie_data(storage, example_messages) 51 56 # Test that the message actually contains what we expect. 52 self.assertEqual(list(storage), ex ample_messages)57 self.assertEqual(list(storage), expected_messages) 53 58 54 59 def test_get_bad_cookie(self): 55 60 request = self.get_request() … … 91 96 instances is properly encoded/decoded by the custom JSON 92 97 encoder/decoder classes. 93 98 """ 99 restrictions.time_provider = TestTimeProvider() 100 restrictions.time_provider.set_act_time(0) 94 101 messages = [ 95 102 { 96 103 'message': Message(constants.INFO, 'Test message'), 97 'message_list': [Message(constants.INFO, 'message %s') \ 104 'message_list': [Message(constants.INFO, 'message %s', \ 105 restrictions = [TimeRestriction(5), AmountRestriction(3), TimeRestriction(2), AmountRestriction(10)]) \ 98 106 for x in xrange(5)] + [{'another-message': \ 99 107 Message(constants.ERROR, 'error')}], 100 108 }, … … 104 112 value = encoder.encode(messages) 105 113 decoded_messages = json.loads(value, cls=MessageDecoder) 106 114 self.assertEqual(messages, decoded_messages) 115 116 def transport_storage(self, response, request): 117 request.COOKIES['messages'] = response.cookies['messages'].value 118 -
contrib/messages/tests/base.py
5 5 from django.conf import settings 6 6 from django.utils.translation import ugettext_lazy 7 7 from django.utils.unittest import skipIf 8 from django.contrib.messages import constants, utils, get_level, set_level 8 from django.contrib.messages import constants, utils, get_level, set_level, restrictions 9 9 from django.contrib.messages.api import MessageFailure 10 from django.contrib.messages.restrictions import AmountRestriction, TimeRestriction 10 11 from django.contrib.messages.storage import default_storage, base 11 12 from django.contrib.messages.storage.base import Message 13 from django.contrib.messages.tests.time_provider import TestTimeProvider 12 14 from django.core.urlresolvers import reverse 13 15 from django.contrib.auth.models import User 14 16 17 restrictions.time_provider = TestTimeProvider() 15 18 16 19 def skipUnlessAuthIsInstalled(func): 17 20 return skipIf( … … 102 105 storage._loaded_data = data or [] 103 106 return storage 104 107 108 def read_storage(self, storage): 109 """ 110 Simulates reading all messages from given storage 111 Returns list of read messages 112 """ 113 return list(storage) 114 105 115 def test_add(self): 106 116 storage = self.get_storage() 107 117 self.assertFalse(storage.added_new) … … 177 187 response = self.client.post(add_url, data, follow=True) 178 188 self.assertRedirects(response, show_url) 179 189 self.assertTrue('messages' in response.context) 180 messages = [Message(self.levels[level], msg ) for msg in190 messages = [Message(self.levels[level], msg, restrictions = [AmountRestriction(0),]) for msg in 181 191 data['messages']] 182 192 self.assertEqual(list(response.context['messages']), messages) 183 193 for msg in data['messages']: … … 210 220 """ 211 221 settings.MESSAGE_LEVEL = constants.DEBUG 212 222 data = { 213 'messages': [' Test message%d' % x for x in xrange(10)],223 'messages': ['%d' % x for x in xrange(10)], 214 224 } 215 225 show_url = reverse('django.contrib.messages.tests.urls.show') 216 226 messages = [] … … 221 231 args=(level,)) 222 232 self.client.post(add_url, data) 223 233 response = self.client.get(show_url) 234 new_messages = [] 235 for level in ('debug', 'info', 'success', 'warning', 'error'): 236 new_messages.extend([Message(self.levels[level], msg, \ 237 restrictions = [AmountRestriction(0),]) for msg in data['messages']]) 224 238 self.assertTrue('messages' in response.context) 225 self.assertEqual(list(response.context['messages']), messages)239 self.assertEqual(list(response.context['messages']), new_messages) 226 240 for msg in data['messages']: 227 241 self.assertContains(response, msg) 228 242 … … 437 451 # Ensure the level tags constant is put back like we found it. 438 452 self.restore_setting('MESSAGE_TAGS') 439 453 base.LEVEL_TAGS = utils.get_level_tags() 454 455 def test_storing_restrictoins(self): 456 storage = self.get_storage() 457 response = self.get_response() 458 459 get_res = lambda ar1, ar2, tr1, tr2: [AmountRestriction(ar1), AmountRestriction(ar2), TimeRestriction(tr1), TimeRestriction(tr2)] 460 461 restrictions.time_provider.set_act_time(0) 462 storage.add(constants.WARNING, 'First msg', restrictions = get_res(10, 20, 30, 40)) 463 messages = self.__read_messages(storage, response) 464 self.assertEqual(len(messages), 1) 465 rest = messages[0].restrictions 466 for a in (9, 19): 467 self.assertTrue(a in [r.can_be_shown for r in rest if r.type == 'amount']) 468 for t in (30, 40): 469 self.assertTrue(t in [r.expires for r in rest if r.type == 'time']) 470 471 restrictions.time_provider.set_act_time(4) 472 storage = self.get_storage() 473 response = self.get_response() 474 storage.add(constants.WARNING, 'First msg', restrictions = get_res(10, 20, 30, 40)) 475 messages = self.__read_messages(storage, response) 476 rest = messages[0].restrictions 477 for a in (9, 19): 478 self.assertTrue(a in [r.can_be_shown for r in rest if r.type == 'amount']) 479 for t in (34, 44): 480 self.assertTrue(t in [r.expires for r in rest if r.type == 'time']) 481 482 def test_expire_default(self): 483 """ 484 Tests against message expiration in default behaviour (no explicity defined restrictions on expire -> show only once) 485 """ 486 storage = self.get_existing_storage() 487 response = self.get_response() 488 489 list (storage) 490 storage.update(response) 491 492 storing = self.stored_messages_count(storage, response) 493 self.assertEqual(storing, 0) 494 495 def test_expire_after_showing_given_amount_of_times(self): 496 """ 497 Tests against expiring message after beeing shown given amount of times 498 """ 499 storage = self.get_existing_storage() 500 response = self.get_response() 501 502 read_messages = lambda: self.__read_messages(storage, response) 503 504 storage.add(constants.WARNING, 'Third message, should be shown four times', restrictions = [AmountRestriction(4),]) 505 storage.add(constants.ERROR, "Four'th message, should be shown five times", restrictions = [AmountRestriction(5),]) 506 507 508 messages = read_messages () 509 self.assertEqual(len(messages), 4) 510 storing = self.stored_messages_count(storage, response) 511 self.assertEqual(storing, 2) 512 513 514 for i in range(2): 515 messages = read_messages () 516 self.assertEqual(len(messages), 2) 517 storing = self.stored_messages_count(storage, response) 518 self.assertEqual(storing, 2) 519 [self.assertTrue(level in [x.level for x in messages]) for level in (constants.ERROR, constants.WARNING)] 520 521 522 messages = read_messages () 523 self.assertEqual(len(messages), 2) 524 storing = self.stored_messages_count(storage, response) 525 self.assertEqual(storing, 1) 526 527 528 messages = read_messages() 529 self.assertEqual(len(messages), 1) 530 self.assertEqual(messages[0].level, constants.ERROR) 531 storing = self.stored_messages_count(storage, response) 532 self.assertEqual(storing, 0) 533 534 535 messages = read_messages() 536 self.assertEqual(len(messages), 0) 537 storing = self.stored_messages_count(storage, response) 538 self.assertEqual(storing, 0) 539 540 def test_expire_after_time(self): 541 storage = self.get_storage() 542 response = self.get_response() 543 res = restrictions 544 res.time_provider.set_act_time(0) 545 546 storage.add(constants.WARNING, 'Third message, should be shown four times', restrictions = [TimeRestriction(4),]) 547 storage.add(constants.ERROR, "Four'th message, should be shown five times", restrictions = [TimeRestriction(6),]) 548 549 for i in range(10): 550 messages = self.__read_messages(storage, response) 551 self.assertEqual(len(messages), 2) 552 storing = self.stored_messages_count(storage, response) 553 self.assertEqual(storing, 2) 554 555 res.time_provider.set_act_time(5) 556 messages = self.__read_messages(storage, response) 557 self.assertEqual(len(messages), 1) 558 self.assertEqual(messages[0].level, constants.ERROR) 559 storing = self.stored_messages_count(storage, response) 560 self.assertEqual(storing, 1) 561 562 res.time_provider.set_act_time(7) 563 messages = self.__read_messages(storage, response) 564 self.assertEqual(len(messages), 0) 565 storing = self.stored_messages_count(storage, response) 566 self.assertEqual(storing, 0) 567 568 def test_expire_fixed_restrictions(self): 569 """ 570 Check message expirity when defining few sort of restrictions 571 of different kind 572 """ 573 get_storage_and_response = lambda: (self.get_storage(), self.get_response()) 574 storage, response = get_storage_and_response() 575 get_res = lambda ar1, ar2, tr1, tr2: [AmountRestriction(ar1), AmountRestriction(ar2), TimeRestriction(tr1), TimeRestriction(tr2)] 576 577 578 def check_storage(storage, iterations, expected_amount, fnct = None): 579 """ 580 Checks whether given storage would contain expected_amount of messages before each of iterations iterations 581 Performs iterations amount of iterations 582 Calls fnct at the end of each iteration 583 """ 584 for i in range(iterations): #read 2 times 585 messages = self.__read_messages(storage, response) 586 self.assertEqual(len(messages), expected_amount) 587 storing = self.stored_messages_count(storage, response) 588 self.assertEqual(storing, expected_amount) 589 if fnct: fnct() 590 591 # first - expires after amount of showing 592 restrictions.time_provider.set_act_time(0) 593 storage.add(constants.ERROR, "Some message", restrictions = get_res(7, 30, 5, 20)) 594 storage.add(constants.INFO, "another message", restrictions = get_res(3, 10, 4, 11)) 595 check_storage(storage, 2, 2) 596 messages = self.__read_messages(storage, response) # 3 times read 597 self.assertEqual(len(messages), 2) 598 storing = self.stored_messages_count(storage, response) 599 self.assertEqual(storing, 1) 600 check_storage(storage, 3, 1) 601 602 603 storage, response = get_storage_and_response() 604 restrictions.time_provider.set_act_time(0) 605 storage.add(constants.ERROR, "Some message", restrictions = get_res(7, 30, 5, 20)) 606 storage.add(constants.INFO, "another message", restrictions = get_res(3, 10, 4, 11)) 607 restrictions.time_provider.set_act_time(2) 608 check_storage(storage, 2, 2, lambda:restrictions.time_provider.set_act_time(3)) 609 610 # second - expires after amount of time 611 storage, response = get_storage_and_response() 612 restrictions.time_provider.set_act_time(0) 613 storage.add(constants.ERROR, "Some message", restrictions = get_res(10, 30, 5, 20)) 614 storage.add(constants.INFO, "another message", restrictions = get_res(10, 20, 3, 11)) 615 check_storage(storage, 2, 2) 616 restrictions.time_provider.set_act_time(2) 617 check_storage(storage, 2, 2) 618 restrictions.time_provider.set_act_time(4) 619 check_storage(storage, 2, 1) 620 621 def transport_storage(self, response, request): 622 """ 623 Transports storage from response to request 624 This method can be overriden by subclasses 625 """ 626 627 def __read_messages(self, storage, response):#simulate next call to render messages 628 """ 629 Simulates iterating storage in template 630 Returns messages from that iteration that would be returned 631 """ 632 messages = self.read_storage(storage) 633 storage.update(response) 634 self.transport_storage (response, storage.request) 635 storage.used = False 636 storage._queued_messages = [] 637 del storage._loaded_data 638 return messages 639 -
contrib/messages/tests/__init__.py
1 from django.contrib.messages.tests.session import SessionTest 1 2 from django.contrib.messages.tests.cookie import CookieTest 2 3 from django.contrib.messages.tests.fallback import FallbackTest 3 from django.contrib.messages.tests. middleware import MiddlewareTest4 from django.contrib.messages.tests. session import SessionTest4 from django.contrib.messages.tests.restrictions import RestrictionsTest 5 from django.contrib.messages.tests.message import MessageTest 5 6 from django.contrib.messages.tests.user_messages import \ 6 7 UserMessagesTest, LegacyFallbackTest 8 9 from django.contrib.messages.tests.middleware import MiddlewareTest 10 -
contrib/messages/tests/fallback.py
1 1 from django.contrib.messages import constants 2 from django.contrib.messages.storage.base import Message 2 3 from django.contrib.messages.storage.fallback import FallbackStorage, \ 3 4 CookieStorage 4 5 from django.contrib.messages.tests.base import BaseTest 5 6 from django.contrib.messages.tests.cookie import set_cookie_data, \ 6 7 stored_cookie_messages_count 8 from django.contrib.messages.restrictions import AmountRestriction 7 9 from django.contrib.messages.tests.session import set_session_data, \ 8 10 stored_session_messages_count 9 11 … … 44 46 cookie_storage = self.get_cookie_storage(storage) 45 47 46 48 # Set initial cookie data. 47 example_messages = [str(i) for i in range(5)] 49 example_messages = [Message(constants.INFO, str(i)) for i in range(5)] 50 expected_messages = [Message(constants.INFO, str(i), restrictions = [AmountRestriction(0),]) for i in range(5)] 48 51 set_cookie_data(cookie_storage, example_messages) 49 52 50 53 # Overwrite the _get method of the fallback storage to prove it is not … … 52 55 self.get_session_storage(storage)._get = None 53 56 54 57 # Test that the message actually contains what we expect. 55 self.assertEqual(list(storage), ex ample_messages)58 self.assertEqual(list(storage), expected_messages) 56 59 57 60 def test_get_empty(self): 58 61 request = self.get_request() … … 72 75 session_storage = self.get_session_storage(storage) 73 76 74 77 # Set initial cookie and session data. 75 example_messages = [str(i) for i in range(5)] 78 example_messages = [Message(constants.INFO, str(i)) for i in range(5)] 79 expected_messages = [Message(constants.INFO, str(i), restrictions = [AmountRestriction(0),]) for i in range(5)] 76 80 set_cookie_data(cookie_storage, example_messages[:4] + 77 81 [CookieStorage.not_finished]) 78 82 set_session_data(session_storage, example_messages[4:]) 79 83 80 84 # Test that the message actually contains what we expect. 81 self.assertEqual(list(storage), ex ample_messages)85 self.assertEqual(list(storage), expected_messages) 82 86 83 87 def test_get_fallback_only(self): 84 88 request = self.get_request() … … 87 91 session_storage = self.get_session_storage(storage) 88 92 89 93 # Set initial cookie and session data. 90 example_messages = [ str(i) for i in range(5)]94 example_messages = [Message(constants.INFO, str(i)) for i in range(5)] 91 95 set_cookie_data(cookie_storage, [CookieStorage.not_finished], 92 96 encode_empty=True) 93 97 set_session_data(session_storage, example_messages) … … 102 106 session_storage = self.get_session_storage(storage) 103 107 104 108 # Set initial cookie and session data. 105 set_cookie_data(cookie_storage, [ 'cookie', CookieStorage.not_finished])106 set_session_data(session_storage, [ 'session'])109 set_cookie_data(cookie_storage, [Message(constants.INFO, 'cookie'), CookieStorage.not_finished]) 110 set_session_data(session_storage, [Message(constants.INFO, 'session')]) 107 111 108 112 # When updating, previously used but no longer needed backends are 109 113 # flushed. … … 173 177 self.assertEqual(cookie_storing, 0) 174 178 session_storing = self.stored_session_messages_count(storage, response) 175 179 self.assertEqual(session_storing, 1) 180 181 def transport_storage(self, response, request): 182 request.COOKIES['messages'] = response.cookies['messages'].value -
contrib/messages/tests/session.py
1 1 from django.contrib.messages.tests.base import BaseTest 2 2 from django.contrib.messages.storage.session import SessionStorage 3 from django.contrib.messages.storage.base import Message 3 4 4 5 5 6 def set_session_data(storage, messages): … … 32 33 def test_get(self): 33 34 storage = self.storage_class(self.get_request()) 34 35 # Set initial data. 35 example_messages = [ 'test', 'me']36 example_messages = [Message(1, 'test') , Message(1, 'me')] 36 37 set_session_data(storage, example_messages) 37 38 # Test that the message actually contains what we expect. 38 39 self.assertEqual(list(storage), example_messages) -
contrib/messages/tests/message.py
1 from django.test import TestCase 2 from django.contrib.messages.storage.base import Message 3 from django.contrib.messages import restrictions 4 from django.contrib.messages.restrictions import AmountRestriction, TimeRestriction, time_provider 5 from django.contrib.messages.tests.time_provider import TestTimeProvider 6 from django.contrib.messages import constants 7 8 9 class MessageTest(TestCase): 10 def setUp(self): 11 self.tp = restrictions.time_provider = TestTimeProvider () 12 def __check_active(self, msg, iterations): 13 """ 14 Reads msg given amount of iterations, and after each read 15 checks whether before each read message is active 16 """ 17 for i in range(iterations): 18 self.assertTrue(msg.active()) 19 msg.on_display() 20 self.assertFalse(msg.active()) 21 msg.on_display() 22 self.assertFalse(msg.active()) 23 24 def test_active_default(self): 25 msg = Message(constants.INFO, "Test message") 26 self.__check_active(msg, 1) 27 28 def test_active_custom_one_amount_restriction(self): 29 msg = Message(constants.INFO, "Test message", restrictions = [AmountRestriction(3),]) 30 self.__check_active(msg, 3) 31 32 def test_active_custom_few_amount_restriction(self): 33 msg = Message(constants.INFO, "Test message", restrictions = [AmountRestriction(x) for x in (2, 3, 5)]) 34 self.__check_active(msg, 2) 35 36 def test_active_custom_one_time_restriction(self): 37 msg = Message(constants.INFO, "Test message", restrictions = [TimeRestriction(3),]) 38 def check_iter(): 39 for i in range(10): # iteration doesn't have direct impact for TimeRestriction 40 self.assertTrue(msg.active()) 41 msg.on_display() 42 check_iter() 43 self.tp.set_act_time(3) 44 check_iter() 45 self.tp.set_act_time(4) 46 self.assertFalse(msg.active()) 47 48 49 def test_mixed_restrictions(self): 50 get_restrictions = lambda:[TimeRestriction(3), TimeRestriction(5), AmountRestriction(2), AmountRestriction(3)] 51 get_msg = lambda:Message(constants.INFO, "Test message", restrictions = get_restrictions()) 52 53 msg = get_msg() 54 for i in range(2): 55 self.assertTrue(msg.active()) 56 msg.on_display() 57 self.assertFalse(msg.active()) 58 59 msg = get_msg() 60 self.assertTrue(msg.active()) 61 msg.on_display() 62 self.assertTrue(msg.active()) 63 self.tp.set_act_time(4) 64 self.assertFalse(msg.active()) 65 for i in range(10): 66 self.assertFalse(msg.active()) 67 msg.on_display() 68 69 -
contrib/messages/tests/user_messages.py
1 1 from django import http 2 2 from django.contrib.auth.models import User 3 from django.contrib.messages import constants 4 from django.contrib.messages.storage.base import Message 3 5 from django.contrib.messages.storage.user_messages import UserMessagesStorage,\ 4 6 LegacyFallbackStorage 5 7 from django.contrib.messages.tests.base import skipUnlessAuthIsInstalled … … 60 62 storage = self.storage_class(request) 61 63 cookie_storage = self.get_cookie_storage(storage) 62 64 self.user.message_set.create(message='user message') 63 set_cookie_data(cookie_storage, [ 'cookie'])65 set_cookie_data(cookie_storage, [Message(constants.INFO,'cookie')]) 64 66 65 67 # Test that the message actually contains what we expect. 66 68 self.assertEqual(len(storage), 2) 67 69 self.assertEqual(list(storage)[0].message, 'user message') 68 self.assertEqual(list(storage)[1] , 'cookie')70 self.assertEqual(list(storage)[1].message, 'cookie') 69 71 70 72 LegacyFallbackTest = skipUnlessAuthIsInstalled(LegacyFallbackTest) -
contrib/messages/storage/cookie.py
5 5 from django.utils import simplejson as json 6 6 from django.utils.crypto import salted_hmac, constant_time_compare 7 7 8 from django.contrib.messages.restrictions import AmountRestriction, TimeRestriction 8 9 9 10 class MessageEncoder(json.JSONEncoder): 10 11 """ … … 12 13 """ 13 14 message_key = '__json_message' 14 15 16 def __encode_restriction (self, r): 17 if r.type == 'amount': ret = '%s@%s' % ('a', r.can_be_shown) 18 elif r.type == 'time': ret = '%s@%s' % ('t', r.expires) 19 return ret 20 15 21 def default(self, obj): 16 22 if isinstance(obj, Message): 17 23 message = [self.message_key, obj.level, obj.message] 18 24 if obj.extra_tags: 19 25 message.append(obj.extra_tags) 26 restrictions = [self.__encode_restriction(r) for r in obj.restrictions] 27 message.append (restrictions) 20 28 return message 21 29 return super(MessageEncoder, self).default(obj) 22 30 … … 25 33 """ 26 34 Decodes JSON that includes serialized ``Message`` instances. 27 35 """ 36 def __get_restrictions (self, restrictions): 37 def _get_r(type, value): 38 if type == 'a': return AmountRestriction (value) 39 elif type == 't': 40 ret = TimeRestriction(0) 41 ret.set_expirity_time(value) 42 return ret 43 ret = [] 44 for r in restrictions: 45 rtype, value = r.split('@') 46 added = _get_r (rtype, value) 47 ret.append (added) 48 return ret 28 49 50 51 def create_message(self, vars): 52 ''' creates message on the basis of encoded data ''' 53 args = vars[:-1] 54 restrictions = vars[-1] 55 restrictions = self.__get_restrictions (restrictions) 56 return Message(*args, **{'restrictions': restrictions}) 57 29 58 def process_messages(self, obj): 30 59 if isinstance(obj, list) and obj: 31 60 if obj[0] == MessageEncoder.message_key: 32 return Message(*obj[1:])61 return self.create_message (obj[1:]) 33 62 return [self.process_messages(item) for item in obj] 34 63 if isinstance(obj, dict): 35 64 return dict([(key, self.process_messages(value)) … … 148 177 # with the data. 149 178 self.used = True 150 179 return None 180 181 def transport_storage(self, response, request): 182 storage.request.COOKIES['messages'] = response.cookies['messages'].value -
contrib/messages/storage/base.py
1 1 from django.conf import settings 2 2 from django.utils.encoding import force_unicode, StrAndUnicode 3 from django.contrib.messages import constants, utils 3 from django.contrib.messages import constants, utils, restrictions as res 4 4 5 5 6 6 LEVEL_TAGS = utils.get_level_tags() 7 7 8 8 9 9 10 class Message(StrAndUnicode): 10 11 """ … … 13 14 or template. 14 15 """ 15 16 16 def __init__(self, level, message, extra_tags=None ):17 def __init__(self, level, message, extra_tags=None, restrictions = []): 17 18 self.level = int(level) 18 19 self.message = message 19 20 self.extra_tags = extra_tags 20 21 21 def _prepare(self): 22 self.restrictions = restrictions or list([res.AmountRestriction(1)]) 23 # if not given any restriction - one show by default 24 25 def _prepare(self): # todo: slef.restrictions = 22 26 """ 23 27 Prepares the message for serialization by forcing the ``message`` 24 28 and ``extra_tags`` to unicode in case they are lazy translations. … … 31 35 32 36 def __eq__(self, other): 33 37 return isinstance(other, Message) and self.level == other.level and \ 34 self.message == other.message 38 self.message == other.message and \ 39 self.restrictions == other.restrictions 35 40 36 41 def __unicode__(self): 37 return force_unicode(self.message) 42 return force_unicode(self.message) + ':' + unicode('CFVB') 38 43 44 39 45 def _get_tags(self): 40 46 label_tag = force_unicode(LEVEL_TAGS.get(self.level, ''), 41 47 strings_only=True) … … 50 56 tags = property(_get_tags) 51 57 52 58 59 def active (self): 60 for r in self.restrictions: 61 if r.is_expired(): return False 62 return True 63 64 65 def on_display (self): 66 for r in self.restrictions: 67 r.on_display () 68 69 53 70 class BaseStorage(object): 54 71 """ 55 72 This is the base backend for temporary message storage. … … 60 77 61 78 def __init__(self, request, *args, **kwargs): 62 79 self.request = request 80 self.used = False 63 81 self._queued_messages = [] 64 self.used = False65 82 self.added_new = False 66 83 super(BaseStorage, self).__init__(*args, **kwargs) 67 84 … … 69 86 return len(self._loaded_messages) + len(self._queued_messages) 70 87 71 88 def __iter__(self): 72 self.used = True73 if self._queued_messages:74 self._loaded_messages.extend(self._queued_messages)75 self._queued_messages = []76 return iter(self._loaded_messages)89 if not self.used: 90 self.used = True 91 if self._queued_messages: 92 self._loaded_messages.extend(self._queued_messages) 93 self._queued_messages = [] 77 94 95 active_messages = [x for x in self._loaded_messages if x.active ()] 96 for x in active_messages: 97 x.on_display () 98 self._queued_messages.extend (active_messages) 99 return iter(self._queued_messages) 100 101 78 102 def __contains__(self, item): 79 103 return item in self._loaded_messages or item in self._queued_messages 80 104 … … 104 128 """ 105 129 raise NotImplementedError() 106 130 131 def filter_store(self, messages, response, *args, **kwargs): 132 ''' stores only active messages from given messages in storage ''' 133 filtered_messages = [x for x in messages if x.active ()] 134 return self._store(filtered_messages, response, *args, **kwargs) 135 136 107 137 def _store(self, messages, response, *args, **kwargs): 108 138 """ 109 139 Stores a list of messages, returning a list of any messages which could … … 130 160 be stored again. Otherwise, only messages added after the last 131 161 iteration will be stored. 132 162 """ 163 # if used or used and added, then _queued_messages contains all messages that should be saved 164 # if added, then save: all messages currently stored and added ones 133 165 self._prepare_messages(self._queued_messages) 134 166 if self.used: 135 return self. _store(self._queued_messages, response)167 return self.filter_store(self._queued_messages, response) 136 168 elif self.added_new: 137 169 messages = self._loaded_messages + self._queued_messages 138 return self. _store(messages, response)170 return self.filter_store(messages, response) 139 171 140 def add(self, level, message, extra_tags='' ):172 def add(self, level, message, extra_tags='', restrictions = []): 141 173 """ 142 174 Queues a message to be stored. 143 175 … … 152 184 return 153 185 # Add the message. 154 186 self.added_new = True 155 message = Message(level, message, extra_tags=extra_tags )187 message = Message(level, message, extra_tags=extra_tags, restrictions = restrictions) 156 188 self._queued_messages.append(message) 157 189 158 190 def _get_level(self):