Commit bdb50e39 authored by Verdan Mahmood's avatar Verdan Mahmood Committed by Tao Feng

Implements a new method to handle the request headers method (#143)

parent d34022c3
...@@ -11,12 +11,11 @@ from amundsen_application.log.action_log import action_logging ...@@ -11,12 +11,11 @@ from amundsen_application.log.action_log import action_logging
from amundsen_application.models.user import load_user, dump_user from amundsen_application.models.user import load_user, dump_user
from amundsen_application.api.utils.request_utils import get_query_param, request_wrapper from amundsen_application.api.utils.request_utils import get_query_param, request_metadata
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
REQUEST_SESSION_TIMEOUT_SEC = 3
metadata_blueprint = Blueprint('metadata', __name__, url_prefix='/api/metadata/v0') metadata_blueprint = Blueprint('metadata', __name__, url_prefix='/api/metadata/v0')
...@@ -69,13 +68,7 @@ def popular_tables() -> Response: ...@@ -69,13 +68,7 @@ def popular_tables() -> Response:
try: try:
url = app.config['METADATASERVICE_BASE'] + POPULAR_TABLES_ENDPOINT url = app.config['METADATASERVICE_BASE'] + POPULAR_TABLES_ENDPOINT
response = request_metadata(url=url)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -121,14 +114,6 @@ def get_table_metadata() -> Response: ...@@ -121,14 +114,6 @@ def get_table_metadata() -> Response:
return make_response(jsonify({'tableData': {}, 'msg': message}), HTTPStatus.INTERNAL_SERVER_ERROR) return make_response(jsonify({'tableData': {}, 'msg': message}), HTTPStatus.INTERNAL_SERVER_ERROR)
def _send_metadata_get_request(url: str) -> Response:
return request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
def _get_partition_data(watermarks: Dict) -> Dict: def _get_partition_data(watermarks: Dict) -> Dict:
if watermarks: if watermarks:
high_watermark = next(filter(lambda x: x['watermark_type'] == 'high_watermark', watermarks)) high_watermark = next(filter(lambda x: x['watermark_type'] == 'high_watermark', watermarks))
...@@ -157,7 +142,7 @@ def _get_table_metadata(*, table_key: str, index: int, source: str) -> Dict[str, ...@@ -157,7 +142,7 @@ def _get_table_metadata(*, table_key: str, index: int, source: str) -> Dict[str,
try: try:
table_endpoint = _get_table_endpoint() table_endpoint = _get_table_endpoint()
url = '{0}/{1}'.format(table_endpoint, table_key) url = '{0}/{1}'.format(table_endpoint, table_key)
response = _send_metadata_get_request(url) response = request_metadata(url=url)
except ValueError as e: except ValueError as e:
# envoy client BadResponse is a subclass of ValueError # envoy client BadResponse is a subclass of ValueError
message = 'Encountered exception: ' + str(e) message = 'Encountered exception: ' + str(e)
...@@ -235,12 +220,7 @@ def _update_table_owner(*, table_key: str, method: str, owner: str) -> Dict[str, ...@@ -235,12 +220,7 @@ def _update_table_owner(*, table_key: str, method: str, owner: str) -> Dict[str,
try: try:
table_endpoint = _get_table_endpoint() table_endpoint = _get_table_endpoint()
url = '{0}/{1}/owner/{2}'.format(table_endpoint, table_key, owner) url = '{0}/{1}/owner/{2}'.format(table_endpoint, table_key, owner)
request_metadata(url=url, method=method)
request_wrapper(method=method,
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
# TODO: Figure out a way to get this payload from flask.jsonify which wraps with app's response_class # TODO: Figure out a way to get this payload from flask.jsonify which wraps with app's response_class
return {'msg': 'Updated owner'} return {'msg': 'Updated owner'}
...@@ -273,12 +253,7 @@ def get_last_indexed() -> Response: ...@@ -273,12 +253,7 @@ def get_last_indexed() -> Response:
try: try:
url = app.config['METADATASERVICE_BASE'] + LAST_INDEXED_ENDPOINT url = app.config['METADATASERVICE_BASE'] + LAST_INDEXED_ENDPOINT
response = request_wrapper(method='GET', response = request_metadata(url=url)
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -303,12 +278,7 @@ def get_table_description() -> Response: ...@@ -303,12 +278,7 @@ def get_table_description() -> Response:
url = '{0}/{1}/description'.format(table_endpoint, table_key) url = '{0}/{1}/description'.format(table_endpoint, table_key)
response = request_wrapper(method='GET', response = request_metadata(url=url)
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -335,12 +305,7 @@ def get_column_description() -> Response: ...@@ -335,12 +305,7 @@ def get_column_description() -> Response:
url = '{0}/{1}/column/{2}/description'.format(table_endpoint, table_key, column_name) url = '{0}/{1}/column/{2}/description'.format(table_endpoint, table_key, column_name)
response = request_wrapper(method='GET', response = request_metadata(url=url)
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -377,12 +342,7 @@ def put_table_description() -> Response: ...@@ -377,12 +342,7 @@ def put_table_description() -> Response:
url = '{0}/{1}/description/{2}'.format(table_endpoint, table_key, description) url = '{0}/{1}/description/{2}'.format(table_endpoint, table_key, description)
_log_put_table_description(table_key=table_key, description=description, source=src) _log_put_table_description(table_key=table_key, description=description, source=src)
response = request_wrapper(method='PUT', response = request_metadata(url=url, method='PUT')
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -419,12 +379,7 @@ def put_column_description() -> Response: ...@@ -419,12 +379,7 @@ def put_column_description() -> Response:
url = '{0}/{1}/column/{2}/description/{3}'.format(table_endpoint, table_key, column_name, description) url = '{0}/{1}/column/{2}/description/{3}'.format(table_endpoint, table_key, column_name, description)
_log_put_column_description(table_key=table_key, column_name=column_name, description=description, source=src) _log_put_column_description(table_key=table_key, column_name=column_name, description=description, source=src)
response = request_wrapper(method='PUT', response = request_metadata(url=url, method='PUT')
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -449,13 +404,7 @@ def get_tags() -> Response: ...@@ -449,13 +404,7 @@ def get_tags() -> Response:
""" """
try: try:
url = app.config['METADATASERVICE_BASE'] + TAGS_ENDPOINT url = app.config['METADATASERVICE_BASE'] + TAGS_ENDPOINT
response = request_metadata(url=url)
response = request_wrapper(method='GET',
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -495,12 +444,7 @@ def update_table_tags() -> Response: ...@@ -495,12 +444,7 @@ def update_table_tags() -> Response:
_log_update_table_tags(table_key=table_key, method=method, tag=tag) _log_update_table_tags(table_key=table_key, method=method, tag=tag)
response = request_wrapper(method=method, response = request_metadata(url=url, method=method)
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
...@@ -531,12 +475,7 @@ def get_user() -> Response: ...@@ -531,12 +475,7 @@ def get_user() -> Response:
_log_get_user(user_id=user_id) _log_get_user(user_id=user_id)
response = request_wrapper(method='GET', response = request_metadata(url=url)
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
......
...@@ -9,7 +9,7 @@ from flask import current_app as app ...@@ -9,7 +9,7 @@ from flask import current_app as app
from flask.blueprints import Blueprint from flask.blueprints import Blueprint
from amundsen_application.log.action_log import action_logging from amundsen_application.log.action_log import action_logging
from amundsen_application.api.utils.request_utils import get_query_param, request_wrapper from amundsen_application.api.utils.request_utils import get_query_param, request_search
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -197,12 +197,7 @@ def _search_table(*, search_term: str, page_index: int) -> Dict[str, Any]: ...@@ -197,12 +197,7 @@ def _search_table(*, search_term: str, page_index: int) -> Dict[str, Any]:
search_term, search_term,
page_index) page_index)
response = request_wrapper(method='GET', response = request_search(url=url)
url=url,
client=app.config['SEARCHSERVICE_REQUEST_CLIENT'],
headers=app.config['SEARCHSERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
......
from typing import Dict from typing import Dict
import requests import requests
from flask import current_app as app
REQUEST_SESSION_TIMEOUT_SEC = 3
def get_query_param(args: Dict, param: str, error_msg: str = None) -> str: def get_query_param(args: Dict, param: str, error_msg: str = None) -> str:
...@@ -10,6 +14,52 @@ def get_query_param(args: Dict, param: str, error_msg: str = None) -> str: ...@@ -10,6 +14,52 @@ def get_query_param(args: Dict, param: str, error_msg: str = None) -> str:
return value return value
def request_metadata(*, # type: ignore
url: str,
method: str = 'GET',
timeout_sec: int = REQUEST_SESSION_TIMEOUT_SEC):
"""
Helper function to make a request to metadata service.
Sets the client and header information based on the configuration
:param method: DELETE | GET | POST | PUT
:param url: The request URL
:param timeout_sec: Number of seconds before timeout is triggered.
:return:
"""
if app.config['REQUEST_HEADERS_METHOD']:
headers = app.config['REQUEST_HEADERS_METHOD'](app)
else:
headers = app.config['METADATASERVICE_REQUEST_HEADERS']
return request_wrapper(method=method,
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=headers,
timeout_sec=timeout_sec)
def request_search(*, # type: ignore
url: str,
method: str = 'GET',
timeout_sec: int = REQUEST_SESSION_TIMEOUT_SEC):
"""
Helper function to make a request to search service.
Sets the client and header information based on the configuration
:param method: DELETE | GET | POST | PUT
:param url: The request URL
:param timeout_sec: Number of seconds before timeout is triggered.
:return:
"""
if app.config['REQUEST_HEADERS_METHOD']:
headers = app.config['REQUEST_HEADERS_METHOD'](app)
else:
headers = app.config['SEARCHSERVICE_REQUEST_HEADERS']
return request_wrapper(method=method,
url=url,
client=app.config['SEARCHSERVICE_REQUEST_CLIENT'],
headers=headers,
timeout_sec=timeout_sec)
# TODO: Define an interface for envoy_client # TODO: Define an interface for envoy_client
def request_wrapper(method: str, url: str, client, headers, timeout_sec: int): # type: ignore def request_wrapper(method: str, url: str, client, headers, timeout_sec: int): # type: ignore
""" """
......
...@@ -7,10 +7,9 @@ from flask import current_app as app ...@@ -7,10 +7,9 @@ from flask import current_app as app
from flask.blueprints import Blueprint from flask.blueprints import Blueprint
from amundsen_application.api.metadata.v0 import USER_ENDPOINT from amundsen_application.api.metadata.v0 import USER_ENDPOINT
from amundsen_application.api.utils.request_utils import request_wrapper from amundsen_application.api.utils.request_utils import request_metadata
from amundsen_application.models.user import load_user, dump_user from amundsen_application.models.user import load_user, dump_user
REQUEST_SESSION_TIMEOUT_SEC = 3
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -27,12 +26,7 @@ def current_user() -> Response: ...@@ -27,12 +26,7 @@ def current_user() -> Response:
url = '{0}{1}/{2}'.format(app.config['METADATASERVICE_BASE'], USER_ENDPOINT, user.user_id) url = '{0}{1}/{2}'.format(app.config['METADATASERVICE_BASE'], USER_ENDPOINT, user.user_id)
response = request_wrapper(method='GET', response = request_metadata(url=url)
url=url,
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=app.config['METADATASERVICE_REQUEST_HEADERS'],
timeout_sec=REQUEST_SESSION_TIMEOUT_SEC)
status_code = response.status_code status_code = response.status_code
if status_code == HTTPStatus.OK: if status_code == HTTPStatus.OK:
message = 'Success' message = 'Success'
......
...@@ -44,6 +44,12 @@ class LocalConfig(Config): ...@@ -44,6 +44,12 @@ class LocalConfig(Config):
PORT=METADATA_PORT) PORT=METADATA_PORT)
) )
# If specified, will be used to generate headers for service-to-service communication
# Please note that if specified, this will ignore following config properties:
# 1. METADATASERVICE_REQUEST_HEADERS
# 2. SEARCHSERVICE_REQUEST_HEADERS
REQUEST_HEADERS_METHOD = None
AUTH_USER_METHOD = None AUTH_USER_METHOD = None
GET_PROFILE_URL = None GET_PROFILE_URL = None
......
...@@ -34,7 +34,7 @@ requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'r ...@@ -34,7 +34,7 @@ requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'r
with open(requirements_path) as requirements_file: with open(requirements_path) as requirements_file:
requirements = requirements_file.readlines() requirements = requirements_file.readlines()
__version__ = '1.0.2' __version__ = '1.0.3'
setup( setup(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment