2222from __future__ import print_function
2323from __future__ import unicode_literals
2424
25+ import datetime
2526import os
27+ import pendulum
2628import time
2729import random
2830
2931from sqlalchemy import event , exc , select
32+ from sqlalchemy .types import DateTime , TypeDecorator
3033
3134from airflow .utils .log .logging_mixin import LoggingMixin
3235
3336log = LoggingMixin ().log
37+ utc = pendulum .timezone ('UTC' )
3438
3539
3640def setup_event_handlers (
@@ -101,13 +105,21 @@ def ping_connection(connection, branch):
101105 def connect (dbapi_connection , connection_record ):
102106 connection_record .info ['pid' ] = os .getpid ()
103107
104- @ event . listens_for ( engine , "connect" )
105- def set_sqlite_pragma ( dbapi_connection , connection_record ):
106- if 'sqlite3.Connection' in str ( type ( dbapi_connection ) ):
108+ if engine . dialect . name == "sqlite" :
109+ @ event . listens_for ( engine , "connect" )
110+ def set_sqlite_pragma ( dbapi_connection , connection_record ):
107111 cursor = dbapi_connection .cursor ()
108112 cursor .execute ("PRAGMA foreign_keys=ON" )
109113 cursor .close ()
110114
115+ # this ensures sanity in mysql when storing datetimes (not required for postgres)
116+ if engine .dialect .name == "mysql" :
117+ @event .listens_for (engine , "connect" )
118+ def set_mysql_timezone (dbapi_connection , connection_record ):
119+ cursor = dbapi_connection .cursor ()
120+ cursor .execute ("SET time_zone = '+00:00'" )
121+ cursor .close ()
122+
111123 @event .listens_for (engine , "checkout" )
112124 def checkout (dbapi_connection , connection_record , connection_proxy ):
113125 pid = os .getpid ()
@@ -117,3 +129,46 @@ def checkout(dbapi_connection, connection_record, connection_proxy):
117129 "Connection record belongs to pid {}, "
118130 "attempting to check out in pid {}" .format (connection_record .info ['pid' ], pid )
119131 )
132+
133+
134+ class UtcDateTime (TypeDecorator ):
135+ """
136+ Almost equivalent to :class:`~sqlalchemy.types.DateTime` with
137+ ``timezone=True`` option, but it differs from that by:
138+ - Never silently take naive :class:`~datetime.datetime`, instead it
139+ always raise :exc:`ValueError` unless time zone aware value.
140+ - :class:`~datetime.datetime` value's :attr:`~datetime.datetime.tzinfo`
141+ is always converted to UTC.
142+ - Unlike SQLAlchemy's built-in :class:`~sqlalchemy.types.DateTime`,
143+ it never return naive :class:`~datetime.datetime`, but time zone
144+ aware value, even with SQLite or MySQL.
145+ - Always returns DateTime in UTC
146+ """
147+
148+ impl = DateTime (timezone = True )
149+
150+ def process_bind_param (self , value , dialect ):
151+ if value is not None :
152+ if not isinstance (value , datetime .datetime ):
153+ raise TypeError ('expected datetime.datetime, not ' +
154+ repr (value ))
155+ elif value .tzinfo is None :
156+ raise ValueError ('naive datetime is disallowed' )
157+
158+ return value .astimezone (utc )
159+
160+ def process_result_value (self , value , dialect ):
161+ """
162+ Processes DateTimes from the DB making sure it is always
163+ returning UTC. Not using timezone.convert_to_utc as that
164+ converts to configured TIMEZONE while the DB might be
165+ running with some other setting. We assume UTC datetimes
166+ in the database.
167+ """
168+ if value is not None :
169+ if value .tzinfo is None :
170+ value = value .replace (tzinfo = utc )
171+ else :
172+ value = value .astimezone (utc )
173+
174+ return value
0 commit comments