Ticket #9164: introspection.py

File introspection.py, 5.4 KB (added by cristofaro.campagna@…, 16 years ago)
Line 
1from django.db.backends import BaseDatabaseIntrospection
2from MySQLdb import ProgrammingError, OperationalError
3from MySQLdb.constants import FIELD_TYPE
4import re
5
6foreign_key_re = re.compile(r"\sCONSTRAINT `[^`]*` FOREIGN KEY \(`([^`]*)`\) REFERENCES `([^`]*)` \(`([^`]*)`\)")
7
8def visitDFS(g):
9 vertices_order =[]
10 def dfs(g):
11 seen = {}
12 def visit(g, v):
13 # Print v here to get preorder
14 #vertices_order.append(v)
15 seen[v] = 1
16 for u in g[v]:
17 if u not in seen:
18 visit(g, u)
19 vertices_order.append(v)
20 # Print v here to get postorder
21 # print v
22
23 for v in g:
24 if v not in seen:
25 visit(g, v)
26
27 dfs(g)
28 return vertices_order
29
30class DatabaseIntrospection(BaseDatabaseIntrospection):
31 data_types_reverse = {
32 FIELD_TYPE.BLOB: 'TextField',
33 FIELD_TYPE.CHAR: 'CharField',
34 FIELD_TYPE.DECIMAL: 'DecimalField',
35 FIELD_TYPE.NEWDECIMAL: 'DecimalField',
36 FIELD_TYPE.DATE: 'DateField',
37 FIELD_TYPE.DATETIME: 'DateTimeField',
38 FIELD_TYPE.DOUBLE: 'FloatField',
39 FIELD_TYPE.FLOAT: 'FloatField',
40 FIELD_TYPE.INT24: 'IntegerField',
41 FIELD_TYPE.LONG: 'IntegerField',
42 FIELD_TYPE.LONGLONG: 'IntegerField',
43 FIELD_TYPE.SHORT: 'IntegerField',
44 FIELD_TYPE.STRING: 'CharField',
45 FIELD_TYPE.TIMESTAMP: 'DateTimeField',
46 FIELD_TYPE.TINY: 'IntegerField',
47 FIELD_TYPE.TINY_BLOB: 'TextField',
48 FIELD_TYPE.MEDIUM_BLOB: 'TextField',
49 FIELD_TYPE.LONG_BLOB: 'TextField',
50 FIELD_TYPE.VAR_STRING: 'CharField',
51 }
52
53
54 def get_table_list(self, cursor):
55 "Returns a list of table names in the current database."
56 cursor.execute("SHOW TABLES")
57 tables = cursor.fetchall()
58 cursor.execute("""
59 SELECT referenced_table_name, table_name
60 FROM information_schema.key_column_usage
61 WHERE
62 table_schema = DATABASE()
63 AND constraint_name LIKE %s
64 AND referenced_column_name IS NOT NULL""", ['%fk%'])
65 list_adj ={}
66 listtables = cursor.fetchall()
67 for t in tables :
68 list_adj[str(t[0])] =[]
69 for row in listtables:
70 list_adj[str(row[1])].append(str(row[0]))
71 """
72 for v in list_adj:
73 print v, "->", ", ".join([str(u) for u in list_adj[v]])
74 """
75 return visitDFS(list_adj)
76
77
78 def get_table_description(self, cursor, table_name):
79 "Returns a description of the table, with the DB-API cursor.description interface."
80 cursor.execute("SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name))
81 return cursor.description
82
83 def _name_to_index(self, cursor, table_name):
84 """
85 Returns a dictionary of {field_name: field_index} for the given table.
86 Indexes are 0-based.
87 """
88 return dict([(d[0], i) for i, d in enumerate(self.get_table_description(cursor, table_name))])
89
90 def get_relations(self, cursor, table_name):
91 """
92 Returns a dictionary of {field_index: (field_index_other_table, other_table)}
93 representing all relationships to the given table. Indexes are 0-based.
94 """
95 my_field_dict = self._name_to_index(cursor, table_name)
96 constraints = []
97 relations = {}
98 try:
99 # This should work for MySQL 5.0.
100 cursor.execute("""
101 SELECT column_name, referenced_table_name, referenced_column_name
102 FROM information_schema.key_column_usage
103 WHERE table_name = %s
104 AND table_schema = DATABASE()
105 AND referenced_table_name IS NOT NULL
106 AND referenced_column_name IS NOT NULL""", [table_name])
107 constraints.extend(cursor.fetchall())
108 except (ProgrammingError, OperationalError):
109 # Fall back to "SHOW CREATE TABLE", for previous MySQL versions.
110 # Go through all constraints and save the equal matches.
111 cursor.execute("SHOW CREATE TABLE %s" % self.connection.ops.quote_name(table_name))
112 for row in cursor.fetchall():
113 pos = 0
114 while True:
115 match = foreign_key_re.search(row[1], pos)
116 if match == None:
117 break
118 pos = match.end()
119 constraints.append(match.groups())
120
121 for my_fieldname, other_table, other_field in constraints:
122 other_field_index = self._name_to_index(cursor, other_table)[other_field]
123 my_field_index = my_field_dict[my_fieldname]
124 relations[my_field_index] = (other_field_index, other_table)
125
126 return relations
127
128 def get_indexes(self, cursor, table_name):
129 """
130 Returns a dictionary of fieldname -> infodict for the given table,
131 where each infodict is in the format:
132 {'primary_key': boolean representing whether it's the primary key,
133 'unique': boolean representing whether it's a unique index}
134 """
135 cursor.execute("SHOW INDEX FROM %s" % self.connection.ops.quote_name(table_name))
136 indexes = {}
137 for row in cursor.fetchall():
138 indexes[row[4]] = {'primary_key': (row[2] == 'PRIMARY'), 'unique': not bool(row[1])}
139 return indexes
140
Back to Top