-
Notifications
You must be signed in to change notification settings - Fork 849
Expand file tree
/
Copy pathconnection.py
More file actions
1964 lines (1686 loc) · 68.4 KB
/
connection.py
File metadata and controls
1964 lines (1686 loc) · 68.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Implementation of Connection.
It is a wrapper around the actual psycopg3 driver, and connection
object.
"""
import os
import secrets
import datetime
import asyncio
from collections import deque
from pathlib import Path
import psycopg
from flask import g, current_app
from flask_babel import gettext
from flask_security import current_user
from pgadmin.utils.crypto import decrypt
from psycopg._encodings import py_codecs as encodings
import config
from pgadmin.model import User
from pgadmin.utils.exception import ConnectionLost, CryptKeyMissing
from pgadmin.utils import get_complete_file_path
from pgadmin.utils.ajax import internal_server_error
from ..abstract import BaseConnection
from .cursor import DictCursor, AsyncDictCursor, AsyncDictServerCursor
from .typecast import register_binary_data_typecasters,\
register_global_typecasters, register_string_typecasters,\
register_binary_typecasters, register_array_to_string_typecasters,\
register_numeric_typecasters, ALL_JSON_TYPES
from .encoding import get_encoding, configure_driver_encodings
from pgadmin.utils import csv_lib as csv
from pgadmin.utils.master_password import get_crypt_key
from io import StringIO
from pgadmin.utils.locker import ConnectionLocker
from pgadmin.utils.driver import get_driver
# On Windows, Psycopg is not compatible with the default ProactorEventLoop.
# So, setting to SelectorEventLoop.
if os.name == 'nt':
asyncio.set_event_loop_policy(
asyncio.WindowsSelectorEventLoopPolicy()
)
_ = gettext
# Register global type caster which will be applicable to all connections.
register_global_typecasters()
configure_driver_encodings(encodings)
class Connection(BaseConnection):
"""
class Connection(object)
A wrapper class, which wraps the psycopg3 connection object, and
delegate the execution to the actual connection object, when required.
Methods:
-------
* connect(**kwargs)
- Connect the PostgreSQL/EDB Postgres Advanced Server using the psycopg3
driver
* execute_scalar(query, params, formatted_exception_msg)
- Execute the given query and returns single datum result
* execute_async(query, params, formatted_exception_msg)
- Execute the given query asynchronously and returns result.
* execute_void(query, params, formatted_exception_msg)
- Execute the given query with no result.
* execute_2darray(query, params, formatted_exception_msg)
- Execute the given query and returns the result as a 2 dimensional
array.
* execute_dict(query, params, formatted_exception_msg)
- Execute the given query and returns the result as an array of dict
(column name -> value) format.
* connected()
- Get the status of the connection.
Returns True if connected, otherwise False.
* reset()
- Reconnect the database server (if possible)
* transaction_status()
- Transaction Status
* ping()
- Ping the server.
* _release()
- Release the connection object of psycopg3
* _reconnect()
- Attempt to reconnect to the database
* _wait(conn)
- This method is used to wait for asynchronous connection. This is a
blocking call.
* _wait_timeout(conn)
- This method is used to wait for asynchronous connection with timeout.
This is a non blocking call.
* poll(formatted_exception_msg)
- This method is used to poll the data of query running on asynchronous
connection.
* status_message()
- Returns the status message returned by the last command executed on
the server.
* rows_affected()
- Returns the no of rows affected by the last command executed on
the server.
* cancel_transaction(conn_id, did=None)
- This method is used to cancel the transaction for the
specified connection id and database id.
* messages()
- Returns the list of messages/notices sends from the PostgreSQL database
server.
* _formatted_exception_msg(exception_obj, formatted_msg)
- This method is used to parse the psycopg.Error object and returns the
formatted error message if flag is set to true else return
normal error message.
* check_notifies(required_polling)
- Check for the notify messages by polling the connection or after
execute is there in notifies.
* get_notifies()
- This function will returns list of notifies received from database
server.
* pq_encrypt_password_conn()
- This function will return the encrypted password for database server
- greater than or equal to 10.
"""
UNAUTHORIZED_REQUEST = gettext("Unauthorized request.")
CURSOR_NOT_FOUND = \
gettext("Cursor could not be found for the async connection.")
ARGS_STR = "{0}#{1}"
def __init__(self, manager, conn_id, db, **kwargs):
assert (manager is not None)
assert (conn_id is not None)
auto_reconnect = kwargs.get('auto_reconnect', True)
async_ = kwargs.get('async_', 0)
use_binary_placeholder = kwargs.get('use_binary_placeholder', False)
array_to_string = kwargs.get('array_to_string', False)
self.conn_id = conn_id
self.manager = manager
self.db = db if db is not None else manager.db
self.conn = None
self.auto_reconnect = auto_reconnect
self.async_ = async_
self.__async_cursor = None
self.__async_query_id = None
self.__async_query_error = None
self.__backend_pid = None
self.execution_aborted = False
self.row_count = 0
self.__notices = None
self.__notifies = None
self.password = None
# This flag indicates the connection status (connected/disconnected).
self.wasConnected = False
# This flag indicates the connection reconnecting status.
self.reconnecting = False
self.use_binary_placeholder = use_binary_placeholder
self.array_to_string = array_to_string
self.qtLiteral = get_driver(config.PG_DEFAULT_DRIVER).qtLiteral
self._autocommit = True
super(Connection, self).__init__()
def as_dict(self):
"""
Returns the dictionary object representing this object.
"""
# In case, it cannot be auto reconnectable, or already been released,
# then we will return None.
if not self.auto_reconnect and not self.conn:
return None
res = dict()
res['conn_id'] = self.conn_id
res['database'] = self.db
res['async_'] = self.async_
res['wasConnected'] = self.wasConnected
res['auto_reconnect'] = self.auto_reconnect
res['use_binary_placeholder'] = self.use_binary_placeholder
res['array_to_string'] = self.array_to_string
return res
def __repr__(self):
return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
self.conn_id, self.db,
'Connected' if self.conn and not self.conn.closed else
"Disconnected",
self.async_
)
def __str__(self):
return self.__repr__()
def _check_user_password(self, kwargs):
"""
Check user and password.
"""
password = None
encpass = None
is_update_password = True
if 'user' in kwargs and kwargs['password']:
password = kwargs['password']
kwargs.pop('password')
is_update_password = False
else:
if 'encpass' in kwargs:
encpass = kwargs['encpass']
else:
encpass = kwargs['password'] if 'password' in kwargs else None
return password, encpass, is_update_password
def _decode_password(self, encpass, manager, password, crypt_key):
if encpass:
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return True, self.UNAUTHORIZED_REQUEST, password
try:
password = decrypt(encpass, crypt_key)
# password is in bytes, for python3 we need it in string
if isinstance(password, bytes):
password = password.decode()
except Exception as e:
manager.stop_ssh_tunnel()
current_app.logger.exception(e)
return True, \
_(
"Failed to decrypt the saved password.\nError: {0}"
).format(str(e)), password
return False, '', password
def connect(self, **kwargs):
if self.conn:
if self.conn.closed:
self.conn = None
else:
return True, None
manager = self.manager
crypt_key_present, crypt_key = get_crypt_key()
if not crypt_key_present:
raise CryptKeyMissing()
password, encpass, is_update_password = \
self._check_user_password(kwargs)
tunnel_password = kwargs['tunnel_password'] if 'tunnel_password' in \
kwargs else ''
# Check SSH Tunnel needs to be created
if manager.use_ssh_tunnel == 1 and not manager.tunnel_created:
status, error = manager.create_ssh_tunnel(tunnel_password)
if not status:
return False, error
# Check SSH Tunnel is alive or not.
if manager.use_ssh_tunnel == 1:
manager.check_ssh_tunnel_alive()
if is_update_password:
if encpass is None:
encpass = self.password or getattr(manager, 'password', None)
self.password = encpass
# Reset the existing connection password
if self.reconnecting is not False:
self.password = None
if not crypt_key_present:
raise CryptKeyMissing()
is_error, errmsg, password = self._decode_password(
encpass, manager, password, crypt_key)
if is_error:
return False, errmsg
# Retrieve password from passfile, if one has not been set yet.
passfile = Path(kwargs['passfile']) if 'passfile' in kwargs else None
if not password and passfile:
try:
password = passfile.read_text(encoding='utf-8').strip()
except (OSError, UnicodeDecodeError) as e:
error = _("Failed to read PassFile.\nError: {0}").format(str(e))
return False, error
# If no password credential is found then connect request might
# come from Query tool, ViewData grid, debugger etc tools.
# we will check for pgpass file availability from connection manager
# if it's present then we will use it
if not password and not encpass and not passfile:
passfile = manager.get_connection_param_value('passfile')
if manager.passexec:
password = manager.passexec.get()
try:
database = self.db
if 'user' in kwargs and kwargs['user']:
user = kwargs['user']
else:
user = manager.user
conn_id = self.conn_id
import os
os.environ['PGAPPNAME'] = '{0} - {1}'.format(
config.APP_NAME, conn_id)
ssl_key = get_complete_file_path(
manager.get_connection_param_value('sslkey'))
sslmode = manager.get_connection_param_value('sslmode')
if ssl_key and sslmode in \
['require', 'verify-ca', 'verify-full']:
ssl_key_file_permission = \
int(oct(os.stat(ssl_key).st_mode)[-3:])
if ssl_key_file_permission > 600:
os.chmod(ssl_key, 0o600)
with ConnectionLocker(manager.kerberos_conn):
# Create the connection string
connection_string = manager.create_connection_string(
database, user, password)
if self.async_:
autocommit = True
if 'auto_commit' in kwargs:
autocommit = kwargs['auto_commit']
async def connectdbserver():
return await psycopg.AsyncConnection.connect(
connection_string,
cursor_factory=AsyncDictCursor,
autocommit=autocommit,
prepare_threshold=manager.prepare_threshold
)
pg_conn = asyncio.run(connectdbserver())
pg_conn.server_cursor_factory = AsyncDictServerCursor
else:
pg_conn = psycopg.Connection.connect(
connection_string,
cursor_factory=DictCursor,
prepare_threshold=manager.prepare_threshold)
except psycopg.Error as e:
manager.stop_ssh_tunnel()
if hasattr(e, 'pgerror'):
msg = e.pgerror
elif e.diag.message_detail:
msg = e.diag.message_detail
else:
msg = str(e)
current_app.logger.info(
"Failed to connect to the database server(#{server_id}) for "
"connection ({conn_id}) with error message as below"
":{msg}".format(
server_id=self.manager.sid,
conn_id=conn_id,
msg=msg
)
)
return False, msg
# Overwrite connection notice attr to support
# more than 50 notices at a time
pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
pg_conn.add_notify_handler(self.check_notifies)
pg_conn.add_notice_handler(self.get_notices)
self.conn = pg_conn
self.wasConnected = True
try:
status, msg = self._initialize(conn_id, **kwargs)
except Exception as e:
manager.stop_ssh_tunnel()
current_app.logger.exception(e)
self.conn = None
if not self.reconnecting:
self.wasConnected = False
raise e
if status and is_update_password:
manager._update_password(encpass)
else:
if not self.reconnecting and is_update_password:
self.wasConnected = False
return status, msg
def _set_auto_commit(self, kwargs):
"""
autocommit flag does not work with asynchronous connections.
By default asynchronous connection runs in autocommit mode.
:param kwargs:
:return:
"""
if self.async_ == 0:
if 'autocommit' in kwargs and kwargs['autocommit'] is False:
self.conn.autocommit = False
else:
self.conn.autocommit = True
def _set_role(self, manager, cur, conn_id, **kwargs):
"""
Set role
:param manager:
:param cur:
:param conn_id:
:return:
"""
is_set_role = False
role = None
status = None
if 'role' in kwargs and kwargs['role']:
is_set_role = True
role = kwargs['role']
elif manager.role:
is_set_role = True
role = manager.role
if is_set_role:
_query = "SELECT rolname from pg_roles WHERE rolname = {0}" \
"".format(self.qtLiteral(role, self.conn))
_status, res = self.execute_scalar(_query)
if res:
status = self._execute(cur, "SET ROLE TO {0}".format(
self.qtLiteral(role, self.conn)))
else:
# If role is not found then set the status to role
# for showing the proper error message
status = role
if status is not None:
self.conn.close()
self.conn = None
current_app.logger.error(
"Connect to the database server (#{server_id}) for "
"connection ({conn_id}), but - failed to setup the role "
" {msg}".format(
server_id=self.manager.sid,
conn_id=conn_id,
msg=status
)
)
return True, \
_(
"Failed to setup the role \n{0}"
).format(status)
return False, ''
def _execute(self, cur, query, params=None):
formatted_exception_msg = self._formatted_exception_msg
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg.Error as pe:
cur.close_cursor()
return formatted_exception_msg(pe, False)
return None
def _initialize(self, conn_id, **kwargs):
self.execution_aborted = False
self.__backend_pid = self.conn.info.backend_pid
setattr(g, self.ARGS_STR.format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
register_string_typecasters(self.conn)
manager = self.manager
# autocommit flag does not work with asynchronous connections.
# By default asynchronous connection runs in autocommit mode.
self._set_auto_commit(kwargs)
if self.array_to_string:
register_array_to_string_typecasters(self.conn)
# Register type casters for binary data only after registering array to
# string type casters.
if self.use_binary_placeholder:
register_binary_typecasters(self.conn)
#
postgres_encoding, self.python_encoding = \
get_encoding(self.conn.info.encoding)
status, cur = self.__cursor()
# Note that we use 'UPDATE pg_settings' for setting bytea_output as a
# convenience hack for those running on old, unsupported versions of
# PostgreSQL 'cos we're nice like that.
status = self._execute(
cur,
"SET DateStyle=ISO; "
"SET client_min_messages=notice; "
"SELECT set_config('bytea_output','hex',false)"
" FROM pg_show_all_settings()"
" WHERE name = 'bytea_output'; "
"SET client_encoding='{0}';".format(postgres_encoding)
)
if status is not None:
self.conn.close()
self.conn = None
return False, status
is_error, errmsg = self._set_role(manager, cur, conn_id, **kwargs)
if is_error:
return False, errmsg
# Check database version every time on reconnection
status = self._execute(cur, "SELECT version()")
if status is not None:
self.conn.close()
self.conn = None
self.wasConnected = False
current_app.logger.error(
"Failed to fetch the version information on the "
"established connection to the database server "
"(#{server_id}) for '{conn_id}' with below error "
"message:{msg}".format(
server_id=self.manager.sid,
conn_id=conn_id,
msg=status)
)
return False, status
if cur.rowcount > 0:
row = cur.fetchmany(1)[0]
manager.ver = row['version']
manager.sversion = self.conn.info.server_version
status = self._execute(cur, """
SELECT
db.oid as did, db.datname, db.datallowconn,
pg_encoding_to_char(db.encoding) AS serverencoding,
has_database_privilege(db.oid, 'CREATE') as cancreate,
datistemplate
FROM
pg_catalog.pg_database db
WHERE db.datname = current_database()""")
if status is None:
manager.db_info = manager.db_info or dict()
if cur.rowcount > 0:
res = cur.fetchmany(1)[0]
manager.db_info[res['did']] = res.copy()
# We do not have database oid for the maintenance database.
if len(manager.db_info) == 1:
manager.did = res['did']
if manager.sversion >= 120000:
status = self._execute(cur, """
SELECT
gss_authenticated, encrypted
FROM
pg_catalog.pg_stat_gssapi
WHERE pid = pg_backend_pid()""")
if status is None and cur.get_rowcount() > 0:
res_enc = cur.fetchmany(1)[0]
manager.db_info[res['did']]['gss_authenticated'] =\
res_enc['gss_authenticated']
manager.db_info[res['did']]['gss_encrypted'] = \
res_enc['encrypted']
if len(manager.db_info) == 1:
manager.gss_authenticated = \
res_enc['gss_authenticated']
manager.gss_encrypted = res_enc['encrypted']
self._set_user_info(cur, manager, **kwargs)
self._set_server_type_and_password(kwargs, manager)
ret_msg = self.execute_post_connection_sql(cur, manager)
manager.update_session()
return True, ret_msg
def _set_user_info(self, cur, manager, **kwargs):
"""
Set user info.
:param cur:
:param manager:
:return:
"""
status = self._execute(cur, """
SELECT
roles.oid as id, roles.rolname as name,
roles.rolsuper as is_superuser,
CASE WHEN roles.rolsuper THEN true ELSE roles.rolcreaterole END as
can_create_role,
CASE WHEN roles.rolsuper THEN true
ELSE roles.rolcreatedb END as can_create_db,
CASE WHEN 'pg_signal_backend'=ANY(ARRAY(WITH RECURSIVE cte AS (
SELECT pg_roles.oid,pg_roles.rolname FROM pg_roles
WHERE pg_roles.oid = roles.oid
UNION ALL
SELECT m.roleid,pgr.rolname FROM cte cte_1
JOIN pg_auth_members m ON m.member = cte_1.oid
JOIN pg_roles pgr ON pgr.oid = m.roleid)
SELECT rolname FROM cte)) THEN True
ELSE False END as can_signal_backend
FROM
pg_catalog.pg_roles as roles
WHERE
rolname = current_user""")
if status is None and 'user' not in kwargs:
manager.user_info = dict()
if cur.get_rowcount() > 0:
manager.user_info = cur.fetchmany(1)[0]
def _set_server_type_and_password(self, kwargs, manager):
"""
Set server type
:param kwargs:
:param manager:
:return:
"""
if 'password' in kwargs:
manager.password = kwargs['password']
server_types = None
if 'server_types' in kwargs and isinstance(
kwargs['server_types'], list):
server_types = manager.server_types = kwargs['server_types']
if server_types is None:
from pgadmin.browser.server_groups.servers.types import ServerType
server_types = ServerType.types()
for st in server_types:
if st.stype == 'ppas':
if st.instance_of(manager.ver):
manager.server_type = st.stype
manager.server_cls = st
break
else:
if st.instance_of():
manager.server_type = st.stype
manager.server_cls = st
break
def execute_post_connection_sql(self, cur, manager):
# Execute post connection SQL if provided in the server dialog
errmsg = None
if manager.post_connection_sql and manager.post_connection_sql != '':
status = self._execute(cur, manager.post_connection_sql)
if status is not None:
errmsg = gettext(("Failed to execute the post connection SQL "
"with below error message:\n{msg}").format(
msg=status))
current_app.logger.error(errmsg)
return errmsg
def __cursor(self, server_cursor=False, scrollable=False):
if not get_crypt_key()[0] and config.SERVER_MODE:
raise CryptKeyMissing()
# Check SSH Tunnel is alive or not. If used by the database
# server for the connection.
if self.manager.use_ssh_tunnel == 1:
self.manager.check_ssh_tunnel_alive()
if self.wasConnected is False:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
cur = getattr(g, self.ARGS_STR.format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
if self.connected() and cur and not cur.closed:
if not server_cursor or (
server_cursor and type(cur) is AsyncDictServerCursor):
return True, cur
if not self.connected():
errmsg = ""
current_app.logger.warning(
"Connection to database server (#{server_id}) for the "
"connection - '{conn_id}' has been lost.".format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
if self.auto_reconnect and not self.reconnecting:
self.__attempt_execution_reconnect(None)
else:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:]
)
try:
if server_cursor:
# Providing name to cursor will create server side cursor.
cursor_name = "CURSOR:{0}".format(self.conn_id)
self.conn.server_cursor_factory = AsyncDictServerCursor
cur = self.conn.cursor(
name=cursor_name,
scrollable=scrollable
)
else:
cur = self.conn.cursor(scrollable=scrollable)
except psycopg.Error as pe:
current_app.logger.exception(pe)
errmsg = gettext(
"Failed to create cursor for psycopg3 connection with error "
"message for the server#{1}:{2}:\n{0}"
).format(
str(pe), self.manager.sid, self.db
)
current_app.logger.error(errmsg)
if self.conn.closed:
self.conn = None
if self.auto_reconnect and not self.reconnecting:
current_app.logger.info(
gettext(
"Attempting to reconnect to the database server "
"(#{server_id}) for the connection - '{conn_id}'."
).format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
return self.__attempt_execution_reconnect(
self.__cursor, server_cursor
)
else:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == 'DB:'
else self.conn_id[5:]
)
setattr(
g, self.ARGS_STR.format(
self.manager.sid, self.conn_id.encode('utf-8')
), cur
)
return True, cur
def reset_cursor_at(self, position):
"""
This function is used to reset the cursor at the given position
"""
cur = self.__async_cursor
if not cur:
current_app.logger.log(
25,
'Cursor not found in reset_cursor_at method')
try:
cur.scroll(position, mode='absolute')
except psycopg.Error:
# bypassing the error as cursor tried to scroll on the
# specified position, but end of records found
current_app.logger.log(
25,
'Failed to reset cursor in reset_cursor_at method')
except IndexError as e:
current_app.logger.log(
25,
'Psycopg3 Cursor: {0}'.format(str(e)))
def __internal_blocking_execute(self, cur, query, params):
"""
This function executes the query using cursor's execute function,
but in case of asynchronous connection we need to wait for the
transaction to be completed. If self.async_ is 1 then it is a
blocking call.
Args:
cur: Cursor object
query: SQL query to run.
params: Extra parameters
"""
query = query.encode(self.python_encoding)
cur.execute(query, params)
def execute_on_server_as_csv(self, records=2000):
"""
To fetch query result and generate CSV output
Args:
params: Additional parameters
records: Number of initial records
Returns:
Generator response
"""
cur = self.__async_cursor
if not cur:
return False, self.CURSOR_NOT_FOUND
if self.conn.pgconn.connect_poll() != 3:
return False, gettext(
"Asynchronous query execution/operation underway."
)
encoding = self.python_encoding
query = None
try:
query = str(cur.query, encoding) \
if cur and cur.query is not None else None
except Exception:
current_app.logger.warning('Error encoding query with {0}'.format(
encoding))
current_app.logger.log(
25,
"Execute (with server cursor) by {pga_user} on "
"{db_user}@{db_host}/{db_name} #{server_id} - "
"{conn_id} (Query-id: {query_id}):\n{query}".format(
pga_user=current_user.email,
db_user=self.conn.info.user,
db_host=self.conn.info.host,
db_name=self.conn.info.dbname,
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=self.__async_query_id
)
)
# http://initd.org/psycopg/docs/cursor.html#cursor.description
# to avoid no-op
if cur.description is None:
return False, \
gettext('The query executed did not return any data.')
def handle_null_values(results, replace_nulls_with):
"""
This function is used to replace null values with the given string
:param results:
:param replace_nulls_with: null values will be replaced by this
string.
:return: modified result
"""
temp_results = []
for row in results:
res = dict()
for k, v in row.items():
if v is None:
res[k] = replace_nulls_with
else:
res[k] = v
temp_results.append(res)
results = temp_results
return results
def gen(conn_obj, trans_obj, quote='strings', quote_char="'",
field_separator=',', replace_nulls_with=None):
try:
cur.scroll(0, mode='absolute')
except Exception as e:
print(str(e))
# Make sure numeric values will be fetched without quoting
register_numeric_typecasters(cur)
results = cur.fetchmany(records)
if not results:
yield gettext('The query executed did not return any data.')
return
header = []
json_columns = []
for c in cur.ordered_description():
# This is to handle the case in which column name is non-ascii
column_name = c.to_dict()['name']
header.append(column_name)
if c.to_dict()['type_code'] in ALL_JSON_TYPES:
json_columns.append(column_name)
res_io = StringIO()
if quote == 'strings':
quote = csv.QUOTE_NONNUMERIC
elif quote == 'all':
quote = csv.QUOTE_ALL
else:
quote = csv.QUOTE_NONE
csv_writer = csv.DictWriter(
res_io, fieldnames=header, delimiter=field_separator,
quoting=quote,
quotechar=quote_char,
replace_nulls_with=replace_nulls_with
)
csv_writer.writeheader()
# Replace the null values with given string if configured.
if replace_nulls_with is not None:
results = handle_null_values(results, replace_nulls_with)
csv_writer.writerows(results)
yield res_io.getvalue()
while True:
results = cur.fetchmany(records)
if not results:
break
res_io = StringIO()
csv_writer = csv.DictWriter(
res_io, fieldnames=header, delimiter=field_separator,
quoting=quote,
quotechar=quote_char,
replace_nulls_with=replace_nulls_with
)
# Replace the null values with given string if configured.
if replace_nulls_with is not None:
results = handle_null_values(results, replace_nulls_with)
csv_writer.writerows(results)
yield res_io.getvalue()
try:
# try to reset the cursor scroll back to where it was,
# bypass error, if cannot scroll back
rows_fetched_from = trans_obj.get_fetched_row_cnt()
cur.scroll(rows_fetched_from, mode='absolute')
except psycopg.Error:
# bypassing the error as cursor tried to scroll on the
# specified position, but end of records found
pass
except Exception:
pass
# Registering back type caster for large size data types to string
# which was unregistered at starting
register_string_typecasters(self.conn)
return True, gen, self
def execute_scalar(self, query, params=None,
formatted_exception_msg=False):
status, cur = self.__cursor()
self.row_count = 0
if not status:
return False, str(cur)