Skip to content

Inconsistency JSON results among PostgreSQL drivers #5584

@fantix

Description

@fantix

Describe the bug

By default, different PostgreSQL drivers may return different Python types for JSON(B) values. For example of the same JSON dict, psycopg2 returns dict while asyncpg returns str. SQLAlchemy should eliminate the difference and return consistent results. The issue is, the asyncpg dialect/driver cannot parse JSON into dict if the SA type is unspecified (executing raw SQL for example), please see below.

To Reproduce

import asyncio

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import create_async_engine


async def main():
    e = sa.create_engine("postgresql+psycopg2:///", future=True)
    ae = create_async_engine("postgresql+asyncpg:///")

    for json_type in [sa.JSON, JSONB]:
        print(json_type.__visit_name__)

        # create test tables
        metadata = sa.MetaData()
        users = sa.Table("users", metadata, sa.Column("profile", json_type))
        metadata.create_all(e)

        queries = [
            ("clause", sa.select([users.c.profile])),
            ("text  ", sa.text("SELECT profile FROM users")),
        ]

        with e.connect() as conn:
            conn.execute(users.insert().values(profile=dict(a=1, b=2)))
            for exec_type, obj in queries:
                print("sync ", exec_type, type(conn.scalar(obj)).__name__)
            conn.execute(users.delete())

        async with ae.connect() as conn:
            await conn.execute(users.insert().values(profile=dict(a=1, b=2)))
            for exec_type, obj in queries:
                print("async", exec_type, type(await conn.scalar(obj)).__name__)
            await conn.execute(users.delete())

        metadata.drop_all(e)


asyncio.run(main())

Expected results

JSON
sync  clause dict
sync  text   dict
async clause dict
async text   dict
JSONB
sync  clause dict
sync  text   dict
async clause dict
async text   dict

Actual results

JSON
sync  clause dict
sync  text   dict
async clause dict
async text   str
JSONB
sync  clause dict
sync  text   dict
async clause dict
async text   str

Versions.

  • OS: Darwin Kernel Version 19.5.0
  • Python: Python 3.8.2
  • SQLAlchemy: 1.4.0b1 @ 4d17fe4
  • Database: PostgreSQL 12.4
  • DBAPI: asyncpg 0.21.0, psycopg2 2.8.5 (dt dec pq3 ext lo64)

Additional context

I have 2 candidate solutions:

  1. Override NullType.result_processor(). This is simpler but slightly hacky:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..a62db1a5e 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -70,6 +70,9 @@ try:
 except ImportError:
     _python_UUID = None

+JSON_COLTYPE = 114
+JSONB_COLTYPE = 3802
+

 class AsyncpgTime(sqltypes.Time):
     def get_dbapi_type(self, dbapi):
@@ -218,6 +221,15 @@ class AsyncpgOID(OID):
         return dbapi.INTEGER


+class AsyncpgNullType(sqltypes.NullType):
+    def result_processor(self, dialect, coltype):
+        if coltype == JSON_COLTYPE:
+            return json.JSON().result_processor(dialect, coltype)
+        if coltype == JSONB_COLTYPE:
+            return json.JSONB().result_processor(dialect, coltype)
+        return super().result_processor(dialect, coltype)
+
+
 class PGExecutionContext_asyncpg(PGExecutionContext):
     def pre_exec(self):
         if self.isddl:
@@ -720,6 +732,7 @@ class PGDialect_asyncpg(PGDialect):
             sqltypes.Enum: AsyncPgEnum,
             OID: AsyncpgOID,
             REGCLASS: AsyncpgREGCLASS,
+            sqltypes.NullType: AsyncpgNullType,
         },
     )

(This was the solution in GINO, see python-gino/gino#305, python-gino/gino#403)

  1. Use asyncpg-native set_type_codec(). This is more consistent with psycopg2 impl but longer. Moreover, it revealed another issue, please see TODO below:
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 780e23844..330e71732 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -41,6 +41,7 @@ in conjunction with :func:`_sa.craete_engine`::
 import collections
 import decimal
 import itertools
+import json as json_lib
 import re

 from . import json
@@ -123,11 +124,17 @@ class AsyncpgJSON(json.JSON):
     def get_dbapi_type(self, dbapi):
         return dbapi.JSON

+    def result_processor(self, dialect, coltype):
+        return None
+

 class AsyncpgJSONB(json.JSONB):
     def get_dbapi_type(self, dbapi):
         return dbapi.JSONB

+    def result_processor(self, dialect, coltype):
+        return None
+

 class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
     def get_dbapi_type(self, dbapi):
@@ -500,6 +507,7 @@ class AsyncAdapt_asyncpg_connection:

         """

+        # TODO: this causes failure in future set_type_codec() below
         await self._connection.set_type_codec(
             "char",
             schema="pg_catalog",
@@ -508,6 +516,24 @@ class AsyncAdapt_asyncpg_connection:
             format="text",
         )

+    def setup_json_handlers(self, serializer, deserializer):
+        self.await_(
+            self._connection.set_type_codec(
+                "json",
+                encoder=serializer,
+                decoder=deserializer,
+                schema="pg_catalog",
+            )
+        )
+        self.await_(
+            self._connection.set_type_codec(
+                "jsonb",
+                encoder=serializer,
+                decoder=deserializer,
+                schema="pg_catalog",
+            )
+        )
+
     def _handle_exception(self, error):
         if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
             exception_mapping = self.dbapi._asyncpg_error_translate
@@ -796,5 +822,34 @@ class PGDialect_asyncpg(PGDialect):
                 e, self.dbapi.InterfaceError
             ) and "connection is closed" in str(e)

+    def on_connect(self):
+        fns = []
+        if self.isolation_level is not None:
+
+            def on_connect(conn):
+                self.set_isolation_level(conn, self.isolation_level)
+
+            fns.append(on_connect)
+
+        if self.dbapi:
+
+            def on_connect(conn):
+                conn.setup_json_handlers(
+                    lambda x: x,
+                    self._json_deserializer or json_lib.loads,
+                )
+
+            fns.append(on_connect)
+
+        if fns:
+
+            def on_connect(conn):
+                for fn in fns:
+                    fn(conn)
+
+            return on_connect
+        else:
+            return None
+

 dialect = PGDialect_asyncpg

Have a nice day!

Metadata

Metadata

Assignees

No one assigned

    Labels

    datatypesthings to do with database types, like VARCHAR and otherspostgresqluse casenot really a feature or a bug; can be support for new DB features or user use cases not anticipated

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions