Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 4.2.0 bugs #541

Merged
merged 4 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions dtable_events/common_dataset/common_dataset_sync_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dtable_events.common_dataset.dtable_db_cell_validators import validate_table_db_cell_value
from dtable_events.utils import get_inner_dtable_server_url
from dtable_events.utils.constants import ColumnTypes
from dtable_events.utils.dtable_server_api import BaseExceedsLimitException, DTableServerAPI
from dtable_events.utils.dtable_server_api import BaseExceedsException, DTableServerAPI
from dtable_events.utils.dtable_db_api import DTableDBAPI

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -562,11 +562,11 @@ def create_dst_table_or_update_columns(dst_dtable_uuid, dst_table_id, dst_table_
try:
resp_json = dst_dtable_server_api.add_table(dst_table_name, lang, columns=columns)
dst_table_id = resp_json.get('_id')
except BaseExceedsLimitException:
except BaseExceedsException as e:
return None, {
'dst_table_id': None,
'error_msg': 'base exceeds limit',
'error_type': 'base_exceeds_limit',
'error_msg': e.error_msg,
'error_type': e.error_type,
'task_status_code': 400
}
except Exception as e:
Expand All @@ -587,11 +587,11 @@ def create_dst_table_or_update_columns(dst_dtable_uuid, dst_table_id, dst_table_
} for col in to_be_appended_columns]
try:
dst_dtable_server_api.batch_append_columns_by_table_id(dst_table_id, columns)
except BaseExceedsLimitException:
except BaseExceedsException as e:
return None, {
'dst_table_id': None,
'error_msg': 'base exceeds limit',
'error_type': 'base_exceeds_limit',
'error_msg': e.error_msg,
'error_type': e.error_type,
'task_status_code': 400
}
except Exception as e:
Expand All @@ -610,11 +610,11 @@ def create_dst_table_or_update_columns(dst_dtable_uuid, dst_table_id, dst_table_
} for col in to_be_updated_columns]
try:
dst_dtable_server_api.batch_update_columns_by_table_id(dst_table_id, columns)
except BaseExceedsLimitException:
except BaseExceedsException as e:
return None, {
'dst_table_id': None,
'error_msg': 'base exceeds limit',
'error_type': 'base_exceeds_limit',
'error_msg': e.error_msg,
'error_type': e.error_type,
'task_status_code': 400
}
except Exception as e:
Expand All @@ -632,11 +632,11 @@ def append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dt
for i in range(0, len(to_be_appended_rows), step):
try:
dst_dtable_server_api.batch_append_rows(dst_table_name, to_be_appended_rows[i: i+step], need_convert_back=False)
except BaseExceedsLimitException:
except BaseExceedsException as e:
return {
'dst_table_id': None,
'error_msg': 'base exceeds limit',
'error_type': 'base_exceeds_limit',
'error_msg': e.error_msg,
'error_type': e.error_type,
'task_status_code': 400
}
except Exception as e:
Expand All @@ -660,11 +660,11 @@ def update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dta
})
try:
dst_dtable_server_api.batch_update_rows(dst_table_name, updates, need_convert_back=False)
except BaseExceedsLimitException:
except BaseExceedsException as e:
return {
'dst_table_id': None,
'error_msg': 'base exceeds limit',
'error_type': 'base_exceeds_limit',
'error_msg': e.error_msg,
'error_type': e.error_type,
'task_status_code': 400
}
except Exception as e:
Expand Down Expand Up @@ -770,11 +770,11 @@ def import_sync_CDS(context):
if server_only and (start + step) > SRC_ROWS_LIMIT:
step = SRC_ROWS_LIMIT - start
sql = sql_template + (" LIMIT {offset}, {limit}".format(offset=start, limit=step))
logger.debug('fetch src rows-id sql: %s', sql)
logger.debug('fetch src rows-id sql: %s', sql[:200])
try:
rows, _ = src_dtable_db_api.query(sql, convert=False, server_only=server_only)
except Exception as e:
logger.error('fetch src rows id filter_conditions: %s sql: %s src columns: %s error: %s', filter_conditions, sql, src_table['columns'], e)
logger.error('fetch src rows id filter_conditions: %s sql: %s src columns: %s error: %s', filter_conditions, sql[:200], src_table['columns'], e)
return {
'dst_table_id': None,
'error_msg': 'fetch src rows id error: %s' % e,
Expand All @@ -795,11 +795,11 @@ def import_sync_CDS(context):
start, step = 0, 10000
while is_sync and True:
sql = f"SELECT `_id` FROM `{dst_table_name}` LIMIT {start}, {step}"
logger.debug('fetch dst rows-id sql: %s', sql)
logger.debug('fetch dst rows-id sql: %s', sql[:200])
try:
rows, _ = dst_dtable_db_api.query(sql, convert=False, server_only=True)
except Exception as e:
logger.error('fetch dst rows id sql: %s error: %s', sql, e)
logger.error('fetch dst rows id sql: %s error: %s', sql[:200], e)
return {
'dst_table_id': None,
'error_msg': 'fetch dst rows id error: %s' % e,
Expand Down Expand Up @@ -834,7 +834,7 @@ def import_sync_CDS(context):
try:
src_rows, _ = src_dtable_db_api.query(sql, convert=False, server_only=server_only)
except Exception as e:
logger.error('fetch src to-be-updated-rows sql: %s error: %s', sql, e)
logger.error('fetch src to-be-updated-rows sql: %s error: %s', sql[:200], e)
return {
'dst_table_id': None,
'error_msg': 'fetch src to-be-updated-rows error: %s' % e,
Expand All @@ -846,7 +846,7 @@ def import_sync_CDS(context):
try:
dst_rows, _ = dst_dtable_db_api.query(sql, convert=False, server_only=True)
except Exception as e:
logger.error('fetch dst to-be-updated-rows sql: %s error: %s', sql, e)
logger.error('fetch dst to-be-updated-rows sql: %s error: %s', sql[:200], e)
return {
'dst_table_id': None,
'error_msg': 'fetch dst to-be-updated-rows error: %s' % e,
Expand Down Expand Up @@ -882,7 +882,7 @@ def import_sync_CDS(context):
try:
src_rows, _ = src_dtable_db_api.query(sql, convert=False, server_only=server_only)
except Exception as e:
logger.error('fetch to-be-appended-rows sql: %s error: %s', sql, e)
logger.error('fetch to-be-appended-rows sql: %s error: %s', sql[:200], e)
return {
'dst_table_id': None,
'error_msg': 'fetch to-be-appended-rows error: %s' % e,
Expand Down
7 changes: 6 additions & 1 deletion dtable_events/common_dataset/common_dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,12 @@ def check_common_dataset(session_class):
continue
else:
if result.get('error_msg'):
if result.get('error_type') in ('generate_synced_columns_error', 'base_exceeds_limit'):
if result.get('error_type') in (
'generate_synced_columns_error',
'base_exceeds_limit',
'exceed_columns_limit',
'exceed_rows_limit'
):
logging.warning('src_dtable_uuid: %s src_table_id: %s src_view_id: %s dst_dtable_uuid: %s dst_table_id: %s client error: %s',
src_dtable_uuid, src_table_id, src_view_id, dst_dtable_uuid, dst_table_id, result)
with session_class() as db_session:
Expand Down
2 changes: 1 addition & 1 deletion dtable_events/dtable_io/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def parse_excel_columns(sheet_rows, head_index, max_column):

for index in range(max_column):
name = get_excel_cell_value(head_row, index)
column_name = str(name.replace('\ufeff', '').strip()) if name else 'Field' + str(index + 1)
column_name = str(name).replace('\ufeff', '').strip() if name else 'Field' + str(index + 1)

if column_name in column_name_set:
raise Exception('Duplicated column names are not supported')
Expand Down
25 changes: 18 additions & 7 deletions dtable_events/utils/dtable_server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ class WrongFilterException(Exception):
pass


class BaseExceedsLimitException(Exception):
pass
class BaseExceedsException(Exception):

def __init__(self, error_type, error_msg):
self.error_type = error_type
self.error_msg = error_msg

class NotFoundException(Exception):
pass
Expand All @@ -29,15 +31,24 @@ def parse_response(response):
if response.status_code >= 400:
if response.status_code == 404:
raise NotFoundException()
error_type, error_msg = '', ''
try:
response_json = response.json()
except:
pass
error_msg = response.text
else:
if response_json.get('error_type') == 'wrong_filter_in_filters':
raise WrongFilterException()
if response_json.get('error_msg') == 'base_exceeds_limit':
raise BaseExceedsLimitException()
error_type = response_json.get('error_type')
error_msg = response_json.get('error_msg')

if error_type == 'wrong_filter_in_filters':
raise WrongFilterException()
if error_type == 'exceed_rows_limit' or error_msg == 'Exceed the rows limit':
raise BaseExceedsException('exceed_rows_limit', 'Exceed the rows limit')
if error_type == 'exceed_columns_limit' or error_msg == 'Exceed the columns limit':
raise BaseExceedsException('exceed_columns_limit', 'Exceed the columns limit')
if error_type == 'base_exceeds_limit' or error_msg == 'The base size exceeds the limit of 200MB, the operation cannot be performed.':
raise BaseExceedsException('base_exceeds_limit', 'The base size exceeds the limit of 200MB, the operation cannot be performed.')

raise ConnectionError(response.status_code, response.text)
else:
try:
Expand Down