-
Notifications
You must be signed in to change notification settings - Fork 379
Expand file tree
/
Copy pathtest_magic.py
More file actions
134 lines (111 loc) · 4.97 KB
/
test_magic.py
File metadata and controls
134 lines (111 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from nose import with_setup
from sql.magic import SqlMagic
from textwrap import dedent
import re
ip = get_ipython()
def setup():
sqlmagic = SqlMagic(shell=ip)
ip.register_magics(sqlmagic)
def _setup():
ip.run_line_magic('sql', 'sqlite:// CREATE TABLE test (n INT, name TEXT)')
ip.run_line_magic('sql', "sqlite:// INSERT INTO test VALUES (1, 'foo');")
ip.run_line_magic('sql', "sqlite:// INSERT INTO test VALUES (2, 'bar');")
def _teardown():
ip.run_line_magic('sql', 'sqlite:// DROP TABLE test')
@with_setup(_setup, _teardown)
def test_memory_db():
assert ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")[0][0] == 1
assert ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")[1]['name'] == 'bar'
@with_setup(_setup, _teardown)
def test_html():
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")
assert '<td>foo</td>' in result._repr_html_().lower()
@with_setup(_setup, _teardown)
def test_print():
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")
assert re.search(r'1\s+\|\s+foo', str(result))
@with_setup(_setup, _teardown)
def test_plain_style():
ip.run_line_magic('config', "SqlMagic.style = 'PLAIN_COLUMNS'")
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")
assert re.search(r'1\s+foo', str(result))
def _setup_writer():
ip.run_line_magic('sql', 'sqlite:// CREATE TABLE writer (first_name, last_name, year_of_death)')
ip.run_line_magic('sql', "sqlite:// INSERT INTO writer VALUES ('William', 'Shakespeare', 1616)")
ip.run_line_magic('sql', "sqlite:// INSERT INTO writer VALUES ('Bertold', 'Brecht', 1956)")
def _teardown_writer():
ip.run_line_magic('sql', "sqlite:// DROP TABLE writer")
@with_setup(_setup_writer, _teardown_writer)
def test_multi_sql():
result = ip.run_cell_magic('sql', '', """
sqlite://
SELECT last_name FROM writer;
""")
assert 'Shakespeare' in str(result) and 'Brecht' in str(result)
@with_setup(_setup_writer, _teardown_writer)
def test_access_results_by_keys():
assert ip.run_line_magic('sql', "sqlite:// SELECT * FROM writer;")['William'] == (u'William', u'Shakespeare', 1616)
@with_setup(_setup_writer, _teardown_writer)
def test_duplicate_column_names_accepted():
result = ip.run_cell_magic('sql', '', """
sqlite://
SELECT last_name, last_name FROM writer;
""")
assert (u'Brecht', u'Brecht') in result
@with_setup(_setup, _teardown)
def test_autolimit():
ip.run_line_magic('config', "SqlMagic.autolimit = 0")
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")
assert len(result) == 2
ip.run_line_magic('config', "SqlMagic.autolimit = 1")
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM test;")
assert len(result) == 1
@with_setup(_setup, _teardown)
def test_persist():
ip.run_cell("results = %sql SELECT * FROM test;")
ip.runcode("results_dframe = results.DataFrame()")
ip.run_line_magic('sql', 'PERSIST results_dframe')
persisted = ip.run_line_magic('sql', 'SELECT * FROM results_dframe')
assert 'foo' in str(persisted)
@with_setup(_setup_writer, _teardown_writer)
def test_unnamed_persist():
ip.run_cell("results = %sql SELECT * FROM writer;")
ip.run_line_magic('sql', 'PERSIST results.DataFrame()')
persisted = ip.run_line_magic('sql', 'SELECT * FROM results')
assert 'Shakespeare' in str(persisted)
@with_setup(_setup_writer, _teardown_writer)
def test_displaylimit():
ip.run_line_magic('config', "SqlMagic.autolimit = 0")
ip.run_line_magic('config', "SqlMagic.displaylimit = 0")
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM writer;")
assert result._repr_html_().count("<tr>") == 3
ip.run_line_magic('config', "SqlMagic.displaylimit = 1")
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM writer;")
assert result._repr_html_().count("<tr>") == 2
@with_setup(_setup_writer, _teardown_writer)
def test_column_local_vars():
ip.run_line_magic('config', "SqlMagic.column_local_vars = True")
result = ip.run_line_magic('sql', "sqlite:// SELECT * FROM writer;")
assert result is None
assert 'William' in ip.user_global_ns['first_name']
assert 'Shakespeare' in ip.user_global_ns['last_name']
assert len(ip.user_global_ns['first_name']) == 2
ip.run_line_magic('config', "SqlMagic.column_local_vars = False")
@with_setup(_setup, _teardown)
def test_userns_not_changed():
ip.run_cell(dedent("""
def function():
local_var = 'local_val'
%sql sqlite:// INSERT INTO test VALUES (2, 'bar');
function()"""))
assert 'local_var' not in ip.user_ns
def test_bind_vars():
ip.user_global_ns['x'] = 22
result = ip.run_line_magic('sql', "sqlite:// SELECT :x")
assert result[0][0] == 22
@with_setup(_setup, _teardown)
def test_autopandas():
ip.run_line_magic('config', "SqlMagic.autopandas = True")
dframe = ip.run_cell("%sql SELECT * FROM test;")
assert dframe.success
assert dframe.result.name[0] == 'foo'