Skip to content

Commit

Permalink
python-pydantic-v1: Handle response chartset in exception.
Browse files Browse the repository at this point in the history
  • Loading branch information
igorgatis committed Oct 14, 2024
1 parent 2354d40 commit 5e6b15c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ class ApiClient:
collection_formats)
url += "?" + url_query

def extract_charset(content_type):
match = None
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
return match.group(1) if match else "utf-8"

try:
# perform request and return response
response_data = {{#asyncio}}await {{/asyncio}}{{#tornado}}yield {{/tornado}}self.request(
Expand All @@ -231,7 +237,12 @@ class ApiClient:
_request_timeout=_request_timeout)
except ApiException as e:
if e.body:
e.body = e.body.decode('utf-8')
try:
charset = extract_charset((e.headers or {}).get('content-type'))
e.body = e.body.decode(charset)
except UnicodeDecodeError:
# Keep original body if charset is not recognized.
pass
raise e

self.last_response = response_data
Expand All @@ -247,12 +258,9 @@ class ApiClient:
if response_type == "bytearray":
response_data.data = response_data.data
else:
match = None
content_type = response_data.getheader('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_data.data = response_data.data.decode(encoding)
charset = extract_charset(content_type)
response_data.data = response_data.data.decode(charset)

# deserialize response data
if response_type == "bytearray":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ def __call_api(
collection_formats)
url += "?" + url_query

def extract_charset(content_type):
match = None
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
return match.group(1) if match else "utf-8"

try:
# perform request and return response
response_data = self.request(
Expand All @@ -218,7 +224,12 @@ def __call_api(
_request_timeout=_request_timeout)
except ApiException as e:
if e.body:
e.body = e.body.decode('utf-8')
try:
charset = extract_charset((e.headers or {}).get('content-type'))
e.body = e.body.decode(charset)
except UnicodeDecodeError:
# Keep original body if charset is not recognized.
pass
raise e

self.last_response = response_data
Expand All @@ -234,12 +245,9 @@ def __call_api(
if response_type == "bytearray":
response_data.data = response_data.data
else:
match = None
content_type = response_data.getheader('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_data.data = response_data.data.decode(encoding)
charset = extract_charset(content_type)
response_data.data = response_data.data.decode(charset)

# deserialize response data
if response_type == "bytearray":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ async def __call_api(
collection_formats)
url += "?" + url_query

def extract_charset(content_type):
match = None
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
return match.group(1) if match else "utf-8"

try:
# perform request and return response
response_data = await self.request(
Expand All @@ -198,7 +204,12 @@ async def __call_api(
_request_timeout=_request_timeout)
except ApiException as e:
if e.body:
e.body = e.body.decode('utf-8')
try:
charset = extract_charset((e.headers or {}).get('content-type'))
e.body = e.body.decode(charset)
except UnicodeDecodeError:
# Keep original body if charset is not recognized.
pass
raise e

self.last_response = response_data
Expand All @@ -214,12 +225,9 @@ async def __call_api(
if response_type == "bytearray":
response_data.data = response_data.data
else:
match = None
content_type = response_data.getheader('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_data.data = response_data.data.decode(encoding)
charset = extract_charset(content_type)
response_data.data = response_data.data.decode(charset)

# deserialize response data
if response_type == "bytearray":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ def __call_api(
collection_formats)
url += "?" + url_query

def extract_charset(content_type):
match = None
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
return match.group(1) if match else "utf-8"

try:
# perform request and return response
response_data = self.request(
Expand All @@ -217,7 +223,12 @@ def __call_api(
_request_timeout=_request_timeout)
except ApiException as e:
if e.body:
e.body = e.body.decode('utf-8')
try:
charset = extract_charset((e.headers or {}).get('content-type'))
e.body = e.body.decode(charset)
except UnicodeDecodeError:
# Keep original body if charset is not recognized.
pass
raise e

self.last_response = response_data
Expand All @@ -233,12 +244,9 @@ def __call_api(
if response_type == "bytearray":
response_data.data = response_data.data
else:
match = None
content_type = response_data.getheader('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_data.data = response_data.data.decode(encoding)
charset = extract_charset(content_type)
response_data.data = response_data.data.decode(charset)

# deserialize response data
if response_type == "bytearray":
Expand Down

0 comments on commit 5e6b15c

Please sign in to comment.