Unverified Commit 620ed793 authored by Daniel's avatar Daniel Committed by GitHub

Refactor call pattern for switching between clients (#81)

* Added a 'request_wrapper' that abstracts away the logic to switch between Envoy clients and basic HTTP requests.
parent bd580b07
import logging
import requests
from http import HTTPStatus
from typing import Any, Dict
......@@ -12,7 +11,7 @@ from amundsen_application.log.action_log import action_logging
from amundsen_application.models.user import load_user, dump_user
from amundsen_application.api.utils.request_utils import get_query_param
from amundsen_application.api.utils.request_utils import get_query_param, request_wrapper
LOGGER = logging.getLogger(__name__)
......@@ -69,14 +68,11 @@ def popular_tables() -> Response:
try:
url = app.config['METADATASERVICE_BASE'] + POPULAR_TABLES_ENDPOINT
# TODO: Create an abstraction for this logic that is reused many times
if app.config['METADATASERVICE_REQUEST_CLIENT'] is not None:
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -124,14 +120,11 @@ def get_table_metadata() -> Response:
def _send_metadata_get_request(url: str) -> Response:
# TODO: Create an abstraction for this logic that is reused many times
if app.config['METADATASERVICE_REQUEST_CLIENT'] is not None:
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
return envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
return s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
return request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
def _get_partition_data(watermarks: Dict) -> Dict:
......@@ -240,20 +233,11 @@ def _update_table_owner(*, table_key: str, method: str, owner: str) -> Dict[str,
table_endpoint = _get_table_endpoint()
url = '{0}/{1}/owner/{2}'.format(table_endpoint, table_key, owner)
# TODO: Create an abstraction for this logic that is reused many times
if app.config['METADATASERVICE_REQUEST_CLIENT'] is not None:
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
if method == 'PUT':
envoy_client.put(url, headers=envoy_headers)
else:
envoy_client.delete(url, headers=envoy_headers)
else:
with requests.Session() as s:
if method == 'PUT':
s.put(url, timeout=REQUEST_SESSION_TIMEOUT)
else:
s.delete(url, timeout=REQUEST_SESSION_TIMEOUT)
request_wrapper(method=method,
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
# TODO: Figure out a way to get this payload from flask.jsonify which wraps with app's response_class
return {'msg': 'Updated owner'}
......@@ -286,14 +270,11 @@ def get_last_indexed() -> Response:
try:
url = app.config['METADATASERVICE_BASE'] + LAST_INDEXED_ENDPOINT
# TODO: Create an abstraction for this logic that is reused many times
if app.config['METADATASERVICE_REQUEST_CLIENT'] is not None:
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -319,14 +300,11 @@ def get_table_description() -> Response:
url = '{0}/{1}/description'.format(table_endpoint, table_key)
# TODO: Create an abstraction for this logic that is reused many times
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
if envoy_client is not None:
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -354,14 +332,11 @@ def get_column_description() -> Response:
url = '{0}/{1}/column/{2}/description'.format(table_endpoint, table_key, column_name)
# TODO: Create an abstraction for this logic that is reused many times
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
if envoy_client is not None:
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -399,14 +374,11 @@ def put_table_description() -> Response:
url = '{0}/{1}/description/{2}'.format(table_endpoint, table_key, description)
_log_put_table_description(table_key=table_key, description=description, source=src)
# TODO: Create an abstraction for this logic that is reused many times
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
if envoy_client is not None:
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.put(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.put(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='PUT',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -444,14 +416,11 @@ def put_column_description() -> Response:
url = '{0}/{1}/column/{2}/description/{3}'.format(table_endpoint, table_key, column_name, description)
_log_put_column_description(table_key=table_key, column_name=column_name, description=description, source=src)
# TODO: Create an abstraction for this logic that is reused many times
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
if envoy_client is not None:
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.put(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.put(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='PUT',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -478,14 +447,11 @@ def get_tags() -> Response:
try:
url = app.config['METADATASERVICE_BASE'] + TAGS_ENDPOINT
# TODO: Create an abstraction for this logic that is reused many times
if app.config['METADATASERVICE_REQUEST_CLIENT'] is not None:
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
response = envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......@@ -526,20 +492,11 @@ def update_table_tags() -> Response:
_log_update_table_tags(table_key=table_key, method=method, tag=tag)
# TODO: Create an abstraction for this logic that is reused many times
envoy_client = app.config['METADATASERVICE_REQUEST_CLIENT']
if envoy_client is not None:
envoy_headers = app.config['METADATASERVICE_REQUEST_HEADERS']
if method == 'PUT':
response = envoy_client.put(url, headers=envoy_headers, raw_response=True)
else:
response = envoy_client.delete(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
if method == 'PUT':
response = s.put(url, timeout=REQUEST_SESSION_TIMEOUT)
else:
response = s.delete(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method=method,
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......
import logging
import requests
from http import HTTPStatus
......@@ -10,7 +9,7 @@ from flask import current_app as app
from flask.blueprints import Blueprint
from amundsen_application.log.action_log import action_logging
from amundsen_application.api.utils.request_utils import get_query_param
from amundsen_application.api.utils.request_utils import get_query_param, request_wrapper
LOGGER = logging.getLogger(__name__)
......@@ -198,14 +197,11 @@ def _search_table(*, search_term: str, page_index: int) -> Dict[str, Any]:
search_term,
page_index)
# TODO: Create an abstraction for this logic that is reused many times
if app.config['SEARCHSERVICE_REQUEST_CLIENT'] is not None:
envoy_client = app.config['SEARCHSERVICE_REQUEST_CLIENT']
envoy_headers = app.config['SEARCHSERVICE_REQUEST_HEADERS']
response = envoy_client.get(url, headers=envoy_headers, raw_response=True)
else:
with requests.Session() as s:
response = s.get(url, timeout=REQUEST_SESSION_TIMEOUT)
response = request_wrapper(method='GET',
url=url,
client=app.config['SEARCHSERVICE_REQUEST_CLIENT'],
headers=app.config['SEARCHSERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT)
status_code = response.status_code
......
from typing import Dict
import requests
def get_query_param(args: Dict, param: str, error_msg: str = None) -> str:
......@@ -7,3 +8,39 @@ def get_query_param(args: Dict, param: str, error_msg: str = None) -> str:
msg = 'A {0} parameter must be provided'.format(param) if error_msg is not None else error_msg
raise Exception(msg)
return value
# TODO: Define an interface for envoy_client
def request_wrapper(method: str, url: str, client, headers, timeout_sec: int): # type: ignore
"""
Wraps a request to use Envoy client and headers, if available
:param method: DELETE | GET | POST | PUT
:param url: The request URL
:param client: Optional Envoy client
:param headers: Optional Envoy request headers
:param timeout_sec: Number of seconds before timeout is triggered. Not used with Envoy
:return:
"""
if client is not None:
if method == 'DELETE':
return client.delete(url, headers=headers, raw_response=True)
elif method == 'GET':
return client.get(url, headers=headers, raw_response=True)
elif method == 'POST':
return client.post(url, headers=headers, raw_response=True)
elif method == 'PUT':
return client.put(url, headers=headers, raw_response=True)
else:
raise Exception('Method not allowed: {}'.format(method))
else:
with requests.Session() as s:
if method == 'DELETE':
return s.delete(url, timeout=timeout_sec)
elif method == 'GET':
return s.get(url, timeout=timeout_sec)
elif method == 'POST':
return s.post(url, timeout=timeout_sec)
elif method == 'PUT':
return s.put(url, timeout=timeout_sec)
else:
raise Exception('Method not allowed: {}'.format(method))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment