Skip to content

Commit 355bd56

Browse files
GuzikJakubFokko
authored andcommitted
[AIRFLOW-3888] HA for metastore connection (apache#4708)
* HA for Metastore * [AIRFLOW-3888] HA for metastore connection Creating a connection to a metasotor with two hosts for high avitablity (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastor. * add function to base_hook * update webhdfs_hook * back to original version * back to original version * Update hive_hooks.py Thank you. I made a few changes because during the tests I detected several errors. I have a question, when I do marge to my pull it will be still possible to land it in the airflow main branch? * [AIRFLOW-3888] HA for metastore connection flake8 code repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Code behavior improvements * [AIRFLOW-3888] HA for metastore connection Add test * [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Improving the typo in the variable name * [AIRFLOW-3888] HA for metastore connection Mock return_value edit * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Test repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair [AIRFLOW-3888] HA for metastore connection Test repair
1 parent f153bf5 commit 355bd56

File tree

2 files changed

+33
-5
lines changed

2 files changed

+33
-5
lines changed

airflow/hooks/hive_hooks.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import re
2323
import subprocess
2424
import time
25+
import socket
2526
from collections import OrderedDict
2627
from tempfile import NamedTemporaryFile
2728

@@ -471,7 +472,7 @@ class HiveMetastoreHook(BaseHook):
471472
MAX_PART_COUNT = 32767
472473

473474
def __init__(self, metastore_conn_id='metastore_default'):
474-
self.metastore_conn = self.get_connection(metastore_conn_id)
475+
self.conn_id = metastore_conn_id
475476
self.metastore = self.get_metastore_client()
476477

477478
def __getstate__(self):
@@ -492,13 +493,20 @@ def get_metastore_client(self):
492493
import hmsclient
493494
from thrift.transport import TSocket, TTransport
494495
from thrift.protocol import TBinaryProtocol
495-
ms = self.metastore_conn
496+
497+
ms = self._find_valid_server()
498+
499+
if ms is None:
500+
raise AirflowException("Failed to locate the valid server.")
501+
496502
auth_mechanism = ms.extra_dejson.get('authMechanism', 'NOSASL')
503+
497504
if configuration.conf.get('core', 'security') == 'kerberos':
498505
auth_mechanism = ms.extra_dejson.get('authMechanism', 'GSSAPI')
499506
kerberos_service_name = ms.extra_dejson.get('kerberos_service_name', 'hive')
500507

501-
socket = TSocket.TSocket(ms.host, ms.port)
508+
conn_socket = TSocket.TSocket(ms.host, ms.port)
509+
502510
if configuration.conf.get('core', 'security') == 'kerberos' \
503511
and auth_mechanism == 'GSSAPI':
504512
try:
@@ -514,14 +522,26 @@ def sasl_factory():
514522
return sasl_client
515523

516524
from thrift_sasl import TSaslClientTransport
517-
transport = TSaslClientTransport(sasl_factory, "GSSAPI", socket)
525+
transport = TSaslClientTransport(sasl_factory, "GSSAPI", conn_socket)
518526
else:
519-
transport = TTransport.TBufferedTransport(socket)
527+
transport = TTransport.TBufferedTransport(conn_socket)
520528

521529
protocol = TBinaryProtocol.TBinaryProtocol(transport)
522530

523531
return hmsclient.HMSClient(iprot=protocol)
524532

533+
def _find_valid_server(self):
534+
conns = self.get_connections(self.conn_id)
535+
for conn in conns:
536+
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
537+
self.log.info("Trying to connect to %s:%s", conn.host, conn.port)
538+
if host_socket.connect_ex((conn.host, conn.port)) == 0:
539+
self.log.info("Connected to %s:%s", conn.host, conn.port)
540+
host_socket.close()
541+
return conn
542+
else:
543+
self.log.info("Could not connect to %s:%s", conn.host, conn.port)
544+
525545
def get_conn(self):
526546
return self.metastore
527547

tests/hooks/test_hive_hook.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from airflow import DAG, configuration
3434
from airflow.exceptions import AirflowException
3535
from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook, HiveServer2Hook
36+
from airflow.models.connection import Connection
3637
from airflow.operators.hive_operator import HiveOperator
3738
from airflow.utils import timezone
3839
from airflow.utils.operator_helpers import AIRFLOW_VAR_NAME_FORMAT_MAPPING
@@ -283,6 +284,13 @@ def test_get_max_partition_from_valid_part_specs(self):
283284
def test_get_metastore_client(self):
284285
self.assertIsInstance(self.hook.get_metastore_client(), HMSClient)
285286

287+
@mock.patch("airflow.hooks.hive_hooks.HiveMetastoreHook.get_connection",
288+
return_value=[Connection(host="localhost", port="9802")])
289+
@mock.patch("airflow.hooks.hive_hooks.socket")
290+
def test_error_metastore_client(self, socket_mock, _find_vaild_server_mock):
291+
socket_mock.socket.return_value.connect_ex.return_value = 0
292+
self.hook.get_metastore_client()
293+
286294
def test_get_conn(self):
287295
self.assertIsInstance(self.hook.get_conn(), HMSClient)
288296

0 commit comments

Comments
 (0)