| 1 | # -*- coding: utf-8 -*- |
| 2 | from datetime import datetime |
| 3 | from StringIO import StringIO |
| 4 | from xml.dom import minidom |
| 5 | |
| 6 | from django.core import serializers |
| 7 | from django.db import transaction |
| 8 | from django.test import TestCase |
| 9 | from django.utils import simplejson |
| 10 | |
| 11 | from models import Category, Author, Article, AuthorProfile, Actor, \ |
| 12 | Movie, Score, Player, Team |
| 13 | |
| 14 | class SerializersTestBase(object): |
| 15 | |
| 16 | def setUp(self): |
| 17 | self.categories = {"sports": Category(name="Sports"), |
| 18 | "music": Category(name="Music"), |
| 19 | "op_ed": Category(name="Op-Ed"),} |
| 20 | for category in self.categories: |
| 21 | self.categories[category].save() |
| 22 | |
| 23 | self.authors ={"joe": Author(name="Joe"), |
| 24 | "jane": Author(name="Jane")} |
| 25 | for author in self.authors: |
| 26 | self.authors[author].save() |
| 27 | |
| 28 | self.a1 = Article( |
| 29 | author = self.authors["jane"], |
| 30 | headline = "Poker has no place on ESPN", |
| 31 | pub_date = datetime(2006, 6, 16, 11, 00)) |
| 32 | self.a2 = Article( |
| 33 | author = self.authors["joe"], |
| 34 | headline = "Time to reform copyright", |
| 35 | pub_date = datetime(2006, 6, 16, 13, 00, 11, 345)) |
| 36 | self.a1.save() |
| 37 | self.a2.save() |
| 38 | self.a1.categories = [self.categories["sports"], |
| 39 | self.categories["op_ed"]] |
| 40 | self.a2.categories = [self.categories["music"], |
| 41 | self.categories["op_ed"]] |
| 42 | |
| 43 | def test_serialize(self): |
| 44 | """Tests that basic serialization works.""" |
| 45 | serial_str = serializers.serialize(self.serializer_name, |
| 46 | Article.objects.all()) |
| 47 | self.assertTrue(self._validate_output(serial_str)) |
| 48 | |
| 49 | def test_serializer_roundtrip(self): |
| 50 | """Tests that serialized content can be deserialized.""" |
| 51 | serial_str = serializers.serialize(self.serializer_name, |
| 52 | Article.objects.all()) |
| 53 | models = list(serializers.deserialize(self.serializer_name, serial_str)) |
| 54 | self.assertEqual(len(models), 2) |
| 55 | |
| 56 | def test_altering_serialized_output(self): |
| 57 | """ |
| 58 | Tests the ability to create new objects by |
| 59 | modifying serialized content. |
| 60 | """ |
| 61 | old_headline = "Poker has no place on ESPN" |
| 62 | new_headline = "Poker has no place on television" |
| 63 | serial_str = serializers.serialize(self.serializer_name, |
| 64 | Article.objects.all()) |
| 65 | serial_str = serial_str.replace("Poker has no place on ESPN", |
| 66 | "Poker has no place on television") |
| 67 | models = list(serializers.deserialize(self.serializer_name, serial_str)) |
| 68 | |
| 69 | self.assertTrue(Article.objects.filter(headline=old_headline)) |
| 70 | self.assertFalse(Article.objects.filter(headline=new_headline)) |
| 71 | |
| 72 | for model in models: |
| 73 | model.save() |
| 74 | self.assertTrue(Article.objects.filter(headline=new_headline)) |
| 75 | self.assertFalse(Article.objects.filter(headline=old_headline)) |
| 76 | |
| 77 | def test_one_to_one_as_pk(self): |
| 78 | """ |
| 79 | Tests that if you use your own primary key field |
| 80 | (such as a OneToOneField), it doesn't appear in the |
| 81 | serialized field list - it replaces the pk identifier. |
| 82 | """ |
| 83 | profile = AuthorProfile(author=self.authors["joe"], |
| 84 | date_of_birth=datetime(1970,1,1)) |
| 85 | profile.save() |
| 86 | serial_str = serializers.serialize(self.serializer_name, |
| 87 | AuthorProfile.objects.all()) |
| 88 | for obj in serializers.deserialize(self.serializer_name, serial_str): |
| 89 | obj_pk = obj.object.pk |
| 90 | # If we are serializing using XML we need to convert the primary |
| 91 | # key to an int before comparing |
| 92 | if self.serializer_name == "xml": |
| 93 | obj_pk = int(obj_pk) |
| 94 | self.assertEqual(obj_pk, self.authors["joe"].pk) |
| 95 | |
| 96 | def test_forward_refs(self): |
| 97 | """ |
| 98 | Tests that objects ids can be referenced before they are |
| 99 | defined in the serialization data. |
| 100 | """ |
| 101 | # Clear out the existing models so that our primary keys won't |
| 102 | # already be taken |
| 103 | Category.objects.all().delete() |
| 104 | Author.objects.all().delete() |
| 105 | Article.objects.all().delete() |
| 106 | |
| 107 | # The deserialization process will need to be contained |
| 108 | # within a transaction. |
| 109 | transaction.enter_transaction_management() |
| 110 | transaction.managed(True) |
| 111 | objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str) |
| 112 | for obj in objs: |
| 113 | obj.save() |
| 114 | transaction.commit() |
| 115 | transaction.leave_transaction_management() |
| 116 | |
| 117 | for model_cls in (Category, Author, Article): |
| 118 | self.assertEqual(model_cls.objects.all().count(), 1) |
| 119 | art_obj = Article.objects.all()[0] |
| 120 | self.assertEqual(art_obj.categories.all().count(), 1) |
| 121 | self.assertEqual(art_obj.author.name, "Agnes") |
| 122 | |
| 123 | def test_serialize_field_subset(self): |
| 124 | """Tests that output can be restricted to a subset of fields""" |
| 125 | valid_fields = ('headline','pub_date') |
| 126 | invalid_fields = ("author", "categories") |
| 127 | serial_str = serializers.serialize(self.serializer_name, |
| 128 | Article.objects.all(), |
| 129 | fields=valid_fields) |
| 130 | for field_name in invalid_fields: |
| 131 | self.assertFalse(self._get_field_values(serial_str, field_name)) |
| 132 | |
| 133 | for field_name in valid_fields: |
| 134 | self.assertTrue(self._get_field_values(serial_str, field_name)) |
| 135 | |
| 136 | def test_serialize_unicode(self): |
| 137 | """Tests that unicode makes the roundtrip intact""" |
| 138 | actor_name = u"Za\u017c\u00f3\u0142\u0107" |
| 139 | json_name = r"Za\u017c\u00f3\u0142\u0107" |
| 140 | movie_title = u'G\u0119\u015bl\u0105 ja\u017a\u0144' |
| 141 | json_title = r'G\u0119\u015bl\u0105 ja\u017a\u0144' |
| 142 | ac = Actor(name=actor_name) |
| 143 | mv = Movie(title=movie_title, actor=ac) |
| 144 | ac.save() |
| 145 | mv.save() |
| 146 | |
| 147 | serial_str = serializers.serialize(self.serializer_name, [mv]) |
| 148 | if self.serializer_name == "xml": |
| 149 | self.assertTrue(serial_str.find(movie_title.encode("utf8")) > -1) |
| 150 | self.assertTrue(serial_str.find(actor_name.encode("utf8")) > -1) |
| 151 | elif self.serializer_name == "json": |
| 152 | self.assertTrue(serial_str.find(json_title) > -1) |
| 153 | self.assertTrue(serial_str.find(json_name) > -1) |
| 154 | elif self.serializer_name == "yaml": |
| 155 | # Because YAML can split strings across multiple lines |
| 156 | # we need to use _get_field_values to get values to compare to |
| 157 | cmp_title = self._get_field_values(serial_str, "title")[0] |
| 158 | cmp_name = self._get_field_values(serial_str, "actor")[0] |
| 159 | self.assertEqual(cmp_title, movie_title) |
| 160 | self.assertEqual(cmp_name, actor_name) |
| 161 | else: |
| 162 | self.assertTrue(False) |
| 163 | |
| 164 | obj_list = list(serializers.deserialize(self.serializer_name, |
| 165 | serial_str)) |
| 166 | mv_obj = obj_list[0].object |
| 167 | self.assertEqual(mv_obj.title, movie_title) |
| 168 | |
| 169 | def test_serialize_with_null_pk(self): |
| 170 | """ |
| 171 | Tests that serialized data with no primary key results |
| 172 | in a model instance with no id |
| 173 | """ |
| 174 | category = Category(name="Reference") |
| 175 | serial_str = serializers.serialize(self.serializer_name, [category]) |
| 176 | cat_obj = list(serializers.deserialize(self.serializer_name, |
| 177 | serial_str))[0].object |
| 178 | self.assertEqual(cat_obj.id, None) |
| 179 | |
| 180 | def test_float_serialization(self): |
| 181 | """Tests that float values serialize and deserialize intact""" |
| 182 | sc = Score(score=3.4) |
| 183 | sc.save() |
| 184 | serial_str = serializers.serialize(self.serializer_name, [sc]) |
| 185 | deserial_objs = list(serializers.deserialize(self.serializer_name, |
| 186 | serial_str)) |
| 187 | self.assertEqual(deserial_objs[0].object.score, 3.4) |
| 188 | |
| 189 | def test_custom_field_serialization(self): |
| 190 | """Tests that custom fields serialize and deserialize intact""" |
| 191 | team_str = "Spartak Moskva" |
| 192 | player = Player() |
| 193 | player.name = "Soslan Djanaev" |
| 194 | player.rank = 1 |
| 195 | player.team = Team(team_str) |
| 196 | player.save() |
| 197 | serial_str = serializers.serialize(self.serializer_name, |
| 198 | Player.objects.all()) |
| 199 | team = self._get_field_values(serial_str, "team") |
| 200 | self.assertTrue(team) |
| 201 | self.assertEqual(team[0], team_str) |
| 202 | |
| 203 | deserial_objs = list(serializers.deserialize(self.serializer_name, |
| 204 | serial_str)) |
| 205 | self.assertEqual(deserial_objs[0].object.team.to_string(), |
| 206 | player.team.to_string()) |
| 207 | |
| 208 | def test_pre_1000ad_date(self): |
| 209 | """Tests that year values before 1000AD are properly formatted""" |
| 210 | # Regression for #12524 -- dates before 1000AD get prefixed |
| 211 | # 0's on the year |
| 212 | a = Article.objects.create( |
| 213 | author = self.authors["jane"], |
| 214 | headline = "Nobody remembers the early years", |
| 215 | pub_date = datetime(1, 2, 3, 4, 5, 6)) |
| 216 | |
| 217 | serial_str = serializers.serialize(self.serializer_name, [a]) |
| 218 | date_values = self._get_field_values(serial_str, "pub_date") |
| 219 | self.assertTrue(date_values) |
| 220 | assert date_values[0].startswith("0001") |
| 221 | |
| 222 | |
| 223 | class XmlSerializerTestCase(SerializersTestBase, TestCase): |
| 224 | serializer_name = "xml" |
| 225 | fwd_ref_str = ('<?xml version="1.0" encoding="utf-8"?>' |
| 226 | '<django-objects version="1.0">' |
| 227 | '<object pk="1" model="serializers.article">' |
| 228 | '<field to="serializers.author" name="author" ' |
| 229 | 'rel="ManyToOneRel">1</field>' |
| 230 | '<field type="CharField" name="headline">' |
| 231 | 'Forward references pose no problem</field>' |
| 232 | '<field type="DateTimeField" name="pub_date">' |
| 233 | '2006-06-16 15:00:00</field>' |
| 234 | '<field to="serializers.category" name="categories"' |
| 235 | ' rel="ManyToManyRel">' |
| 236 | '<object pk="1"></object></field></object>' |
| 237 | '<object pk="1" model="serializers.author">' |
| 238 | '<field type="CharField" name="name">' |
| 239 | 'Agnes</field></object>' |
| 240 | '<object pk="1" model="serializers.category">' |
| 241 | '<field type="CharField" name="name">' |
| 242 | 'Reference</field></object>' |
| 243 | '</django-objects>') |
| 244 | |
| 245 | @staticmethod |
| 246 | def _validate_output(serial_str): |
| 247 | try: |
| 248 | minidom.parseString(serial_str) |
| 249 | except Exception: |
| 250 | return False |
| 251 | else: |
| 252 | return True |
| 253 | |
| 254 | @staticmethod |
| 255 | def _get_field_values(serial_str, field_name): |
| 256 | ret_list = [] |
| 257 | dom = minidom.parseString(serial_str) |
| 258 | fields = dom.getElementsByTagName("field") |
| 259 | for field in fields: |
| 260 | if field.getAttribute("name") == field_name: |
| 261 | temp = [] |
| 262 | for child in field.childNodes: |
| 263 | temp.append(child.nodeValue) |
| 264 | ret_list.append("".join(temp)) |
| 265 | return ret_list |
| 266 | |
| 267 | |
| 268 | class JsonSerializerTestCase(SerializersTestBase, TestCase): |
| 269 | serializer_name = "json" |
| 270 | fwd_ref_str = ('[{"pk": 1, "model": "serializers.article", "fields":' |
| 271 | '{"headline": "Forward references pose no problem",' |
| 272 | '"pub_date": "2006-06-16 15:00:00",' |
| 273 | '"categories": [1], "author": 1}},' |
| 274 | '{"pk": 1, "model": "serializers.category",' |
| 275 | '"fields": {"name": "Reference"}},' |
| 276 | '{"pk": 1, "model": "serializers.author",' |
| 277 | '"fields": {"name": "Agnes"}}]') |
| 278 | |
| 279 | @staticmethod |
| 280 | def _validate_output(serial_str): |
| 281 | try: |
| 282 | simplejson.loads(serial_str) |
| 283 | except Exception: |
| 284 | return False |
| 285 | else: |
| 286 | return True |
| 287 | |
| 288 | @staticmethod |
| 289 | def _get_field_values(serial_str, field_name): |
| 290 | ret_list = [] |
| 291 | serial_list = simplejson.loads(serial_str) |
| 292 | for obj_dict in serial_list: |
| 293 | if field_name in obj_dict["fields"]: |
| 294 | ret_list.append(obj_dict["fields"][field_name]) |
| 295 | return ret_list |
| 296 | |
| 297 | try: |
| 298 | import yaml |
| 299 | except ImportError: |
| 300 | pass |
| 301 | else: |
| 302 | class YamlSerializerTestCase(SerializersTestBase, TestCase): |
| 303 | serializer_name = "yaml" |
| 304 | fwd_ref_str = "\n".join(["- fields:", |
| 305 | " headline: Forward references pose no problem", |
| 306 | ' pub_date: 2006-06-16 15:00:00', |
| 307 | ' categories: [1]', |
| 308 | ' author: 1', |
| 309 | ' pk: 1', |
| 310 | ' model: serializers.article', |
| 311 | '- fields:', |
| 312 | ' name: Reference', |
| 313 | ' pk: 1', |
| 314 | ' model: serializers.category', |
| 315 | '- fields:', |
| 316 | ' name: Agnes', |
| 317 | ' pk: 1', |
| 318 | ' model: serializers.author',]) |
| 319 | |
| 320 | @staticmethod |
| 321 | def _validate_output(serial_str): |
| 322 | try: |
| 323 | yaml.load(StringIO(serial_str)) |
| 324 | except Exception: |
| 325 | return False |
| 326 | else: |
| 327 | return True |
| 328 | |
| 329 | @staticmethod |
| 330 | def _get_field_values(serial_str, field_name): |
| 331 | ret_list = [] |
| 332 | stream = StringIO(serial_str) |
| 333 | for obj_dict in yaml.load(stream): |
| 334 | if "fields" in obj_dict and field_name in obj_dict["fields"]: |
| 335 | field_value = obj_dict["fields"][field_name] |
| 336 | # yaml.load will return non-string objects for some |
| 337 | # of the fields we are interested in, this ensures that |
| 338 | # everything comes back as a string |
| 339 | if isinstance(field_value, basestring): |
| 340 | ret_list.append(field_value) |
| 341 | else: |
| 342 | ret_list.append(str(field_value)) |
| 343 | return ret_list |