Skip to content

Commit

Permalink
allow str as a port in rest_connector (#5451)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: aebb4828f2ab051e365e3e6f3bc913ce30b3a6e0
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Jan 18, 2024
1 parent 4b135f8 commit 2801085
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 15 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### Changed

- `pw.io.http.rest_connector` now also accepts port as a string for backwards compatibility.


## [0.7.8] - 2024-01-18

### Added
Expand Down
56 changes: 49 additions & 7 deletions integration_tests/webserver/test_rest_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,42 @@
)


def test_server(tmp_path: pathlib.Path):
port = int(os.environ.get("PATHWAY_MONITORING_HTTP_PORT", "20000")) + 10000
class UniquePortDispenser:
"""
Tests are run simultaneously by several workers.
Since they involve running a web server, they shouldn't interfere, so they
should occupy distinct ports.
This class automates unique port assignments for different tests.
"""

def __init__(self, range_start: int = 12345, worker_range_size: int = 1000):
pytest_worker_id = os.environ["PYTEST_XDIST_WORKER"]
if pytest_worker_id == "master":
worker_id = 0
elif pytest_worker_id.startswith("gw"):
worker_id = int(pytest_worker_id[2:])
else:
raise ValueError(f"Unknown xdist worker id: {pytest_worker_id}")

self._next_available_port = range_start + worker_id * worker_range_size
self._lock = threading.Lock()

def get_unique_port(self) -> int:
self._lock.acquire()
result = self._next_available_port
self._next_available_port += 1
self._lock.release()

return result


PORT_DISPENSER = UniquePortDispenser()


def _test_server_basic(tmp_path: pathlib.Path, port_is_str: bool = False):
port = PORT_DISPENSER.get_unique_port()
if port_is_str:
port = str(port)
output_path = tmp_path / "output.csv"

class InputSchema(pw.Schema):
Expand Down Expand Up @@ -56,8 +90,16 @@ def target():
wait_result_with_checker(CsvLinesNumberChecker(output_path, 4), 30)


def test_server(tmp_path: pathlib.Path):
_test_server_basic(tmp_path)


def test_server_str_port_compatibility(tmp_path: pathlib.Path):
_test_server_basic(tmp_path, port_is_str=True)


def test_server_customization(tmp_path: pathlib.Path):
port = int(os.environ.get("PATHWAY_MONITORING_HTTP_PORT", "20000")) + 10001
port = PORT_DISPENSER.get_unique_port()
output_path = tmp_path / "output.csv"

class InputSchema(pw.Schema):
Expand Down Expand Up @@ -97,7 +139,7 @@ def target():


def test_server_schema_customization(tmp_path: pathlib.Path):
port = int(os.environ.get("PATHWAY_MONITORING_HTTP_PORT", "20000")) + 10002
port = PORT_DISPENSER.get_unique_port()
output_path = tmp_path / "output.csv"

class InputSchema(pw.Schema):
Expand Down Expand Up @@ -133,7 +175,7 @@ def target():


def test_server_keep_queries(tmp_path: pathlib.Path):
port = int(os.environ.get("PATHWAY_MONITORING_HTTP_PORT", "20000")) + 10003
port = PORT_DISPENSER.get_unique_port()
output_path = tmp_path / "output.csv"

class InputSchema(pw.Schema):
Expand Down Expand Up @@ -182,7 +224,7 @@ def target():


def test_server_fail_on_duplicate_port(tmp_path: pathlib.Path):
port = int(os.environ.get("PATHWAY_MONITORING_HTTP_PORT", "20000")) + 10003
port = PORT_DISPENSER.get_unique_port()
output_path = tmp_path / "output.csv"

class InputSchema(pw.Schema):
Expand Down Expand Up @@ -214,7 +256,7 @@ class InputSchema(pw.Schema):


def test_server_two_endpoints(tmp_path: pathlib.Path):
port = int(os.environ.get("PATHWAY_MONITORING_HTTP_PORT", "20000")) + 10004
port = PORT_DISPENSER.get_unique_port()
output_path = tmp_path / "output.csv"

class InputSchema(pw.Schema):
Expand Down
16 changes: 9 additions & 7 deletions python/pathway/io/http/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _is_finite(self):
@check_arg_types
def rest_connector(
host: str | None = None,
port: int | None = None,
port: int | str | None = None,
*,
webserver: PathwayWebserver | None = None,
route: str = "/",
Expand Down Expand Up @@ -252,12 +252,14 @@ def rest_connector(
raise ValueError(
"If webserver object isn't specified, host and port must be present"
)
warn(
"The `host` and `port` arguments are deprecated. Please use `webserver` "
"instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(port, str):
port = int(port)
# warn(
# "The `host` and `port` arguments are deprecated. Please use `webserver` "
# "instead.",
# DeprecationWarning,
# stacklevel=2,
# )
webserver = PathwayWebserver(host, port)
else:
if host is not None or port is not None:
Expand Down
16 changes: 15 additions & 1 deletion python/pathway/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2731,7 +2731,21 @@ class InputSchema(pw.Schema):
with pytest.raises(TypeError):
pw.io.http.rest_connector(
host="127.0.0.1",
port="8080",
port=("8080",),
schema=InputSchema,
delete_completed_queries=False,
)


def test_server_fail_on_unparsable_port():
class InputSchema(pw.Schema):
k: int
v: int

with pytest.raises(ValueError):
pw.io.http.rest_connector(
host="127.0.0.1",
port="abc",
schema=InputSchema,
delete_completed_queries=False,
)
Expand Down

0 comments on commit 2801085

Please sign in to comment.