diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..9a540779e --- /dev/null +++ b/.gitignore @@ -0,0 +1,112 @@ +.idea + +# VS Code files +.vscode + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Result files +results/ + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 000000000..e79f53ee8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +language: python +python: + - "3.6" + +install: + - pip install -r requirements.txt + +script: + - flake8 src + - nose2 + - coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml diff --git a/README.md b/README.md new file mode 100644 index 000000000..81023f7a1 --- /dev/null +++ b/README.md @@ -0,0 +1,110 @@ +[![Build Status](https://travis-ci.com/RealVNF/coord-sim.svg?branch=master)](https://travis-ci.com/RealVNF/coord-sim) + +# Simulation: Coordination of chained virtual network functions + +Simulate flow-level, inter-node network coordination including scaling and placement of services and scheduling/balancing traffic between them. + +

+ + + +

+ +**Features**: + +- Simulate any given network topology with arbitrary node and link capacities and link delays +- Simulatie any given network service consisting of linearly chained SFs/VNFs +- VNFs can specify arbitrary resource consumption as function of their load using Python modules. Also VNF delay can be specified individually and may be normally distributed. +- Simulate network traffic in the form of flow arrivals at various ingress nodes with varying arrival rate, flow length, volume, etc according to stochastic distributions +- Simple and clear interface to run algorithms for scaling, placement, and scheduling/load balancing of these incoming flows across the nodes in the network. Coordination within each node is out of scope. +- Interface allows easy integration with OpenAI Gym to enable training and evaluating reinforcement learning algorithms +- Collection of metrics like successful/dropped flows, end-to-end delay, resource consumption, etc over time. Easily extensible. +- Discrete event simulation to evaluate coordination over time with SimPy +- Gracefull adjustment of placements: When VNFs are removed from a placement by an algorithm. Currently processing flows are allowed to finish processing before the VNF is completely removed (see PR [#78](https://github.com/RealVNF/coordination-simulation/pull/78) and [#81](https://github.com/RealVNF/coordination-simulation/pull/81)). + +## Setup + +Requires Python 3.6. Install with (ideally using [virtualenv](https://virtualenv.pypa.io/en/stable/)): + +```bash +pip install -r requirements.txt +``` + +## Usage + +Type `coord-sim -h` for help using the simulator. For now, this should print + +``` +$ coord-sim -h +usage: coord-sim [-h] -d DURATION -sf SF [-sfr SFR] -n NETWORK -c CONFIG + [-t TRACE] [-s SEED] + +Coordination-Simulation tool + +optional arguments: + -h, --help show this help message and exit + -d DURATION, --duration DURATION + The duration of the simulation (simulates + milliseconds). + -sf SF, --sf SF VNF file which contains the SFCs and their respective + SFs and their properties. + -sfr SFR, --sfr SFR Path which contains the SF resource consumption + functions. + -n NETWORK, --network NETWORK + The GraphML network file that specifies the nodes and + edges of the network. + -c CONFIG, --config CONFIG + Path to the simulator config file. + -t TRACE, --trace TRACE + Provide a CSV trace file to configure the traffic the + simulator is generating. + -s SEED, --seed SEED Random seed +``` + +You can use the following command as an example (run from the root project folder) + +```bash +coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml +``` + +This will run a simulation on a provided GraphML network file and a YAML placement file for a duration of 20 timesteps. + +### Dynamic SF resource consumption + +By default, all SFs have a node resource consumption, which exactly equals the aggregated traffic that they have to handle. + +It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a +function `resource_function(load)` (see examples [here](https://github.com/RealVNF/coordination-simulation/tree/master/params/services/resource_functions)). + +To use these modules, they need to be referenced in the service file: + +``` +sf_list: + a: + processing_delay_mean: 5.0 + processing_delay_stdev: 0.0 + resource_function_id: A +``` + +And the path to the folder with the Python modules needs to be passed via the `-sfr` argument. + +See PR https://github.com/RealVNF/coordination-simulation/pull/78 for details. + +## Tests + +```bash +# style check +flake8 src + +# tests +nose2 +``` + +## Acknowledgement + +This project has received funding from German Federal Ministry of Education and Research ([BMBF](https://www.bmbf.de/)) through Software Campus grant 01IS17046 ([RealVNF](https://realvnf.github.io/)). + +

+ + +

diff --git a/docs/BMBF_sponsored_by.jpg b/docs/BMBF_sponsored_by.jpg new file mode 100644 index 000000000..97913aaca Binary files /dev/null and b/docs/BMBF_sponsored_by.jpg differ diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 000000000..d0c3cbf10 --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/huawei_horizontal.png b/docs/huawei_horizontal.png new file mode 100644 index 000000000..12dd2c403 Binary files /dev/null and b/docs/huawei_horizontal.png differ diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 000000000..6247f7e23 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=source +set BUILDDIR=build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/realvnf_logo.png b/docs/realvnf_logo.png new file mode 100644 index 000000000..19f85e9f3 Binary files /dev/null and b/docs/realvnf_logo.png differ diff --git a/docs/simulator.md b/docs/simulator.md new file mode 100644 index 000000000..b4ad603b6 --- /dev/null +++ b/docs/simulator.md @@ -0,0 +1,87 @@ +# Coordination Simulator +Disclaimer: This document is currently in **draft** mode and includes the documentation for the project and is updated as features are added or modified. + +Last update: 20.06.2019 by Haydar Qarawlus +___ + +Table of contents: +- Introduction +- Application structure +- Operation flow +- Input parameters +- Running the simulator + +## Introduction +This discrete-event flow-based simulator is a fast testbed for VNF coordination algorithms. It is designed to simulate small to medium sized networks with relatively close-to-reality accuracy. It can interact with coordination algorithms via an interface that can be adapted to by the developers of the algorithms. + +The simulator is based on SimPy and is tested with Python 3.6. + +Project owner: Stefan Schneider, MSc, University of Paderborn. + +Developers: +- Haydar Qarawlus, MSc. CS student, University of Paderborn. +- Adnan Manzoor, MSc. CS student, University of Paderborn. +- Sven Uthe, MSc, University of Paderbron. + + + + +## Application Structure +The file structure of the simulator is as follows: +- docs (Folder): Contains the doumentation files of the project. +- params (Folder): Contains sample parameter files (network file and VNF file) for testing purposes. +- src (Folder): Contains the source code for the simulator and the interface. + - coordsim (Folder): contains the source code of the simulator + - metrics (Folder): contains the metrics module. + - network (Folder): contains the network module. + - reader (Folder): contains the params file reader module. + - simulation (Folder): main simulator module. + - main (Python): main executable for running the simulator from CLI + - siminterface (Folder): contains the interface source code +- tests (Folder): contains the unit tests for the simulator. + +The main modules of the application are the following: + +1. Simulation Module: This module contains FlowSimulator and SimulatorParams files. FlowSimulator is the main simulator that imports and utilizes the other modules. SimulatorParams is the class used to hold all the simulator parameters, this allows the parameters to be changed mid-simulation by the calling coordination algorithm. +2. Network Module: This module contains the Flow descriptor class. this class holds the main properties of the flow such as the data rate and the requested SFC, etc. +3. Reader Module: This module holds the network reader file that parses the parameter YAML and GraphML files that identify the network and the available SFCs and SFs. +4. Simulation Module: This module contains the main flow simulator. The flow simulator obtains its parameters from the SimulatotParams class. The params class is created by the calling coordination algorithm. + + +## Operation Flow +The simulator works as follows: +The user (coordination algorithm or cli) provide two main inputs for the simulator: +- Network file: GraphML file using the Zoo format. This file contains the network nodes and the edges. +- VNF file: YAML file that includes the list of SFCs and the list of SFs under each SFC in an ordered manner. The file can also include a specifiecd placement that can be used as a default placement. The SFs must include a processing delay mean and standard deviation values so that processing delays can be calculated for each flow passing through that SF. + +Once the parameters are provided, the flow of data through the simulator is as follows: + +1. The input network and VNF files are parsed producing a NetworkX object containing the list of nodes and edges, and the shortest paths of the network (using Floyd-Warshall). The parsing also produces dictionaries that contain the list of SFCs and the list of SFs and their respective values. Additionally, the list of ingress nodes (nodes at which flows arrive) are also calculated from the GraphML file. These parameters are then passed to a SimulatorParams object, which holds all the parameters of the simulator, the simulator is then started using the FlowSimulator object's `start()` function +2. At each ingress node, the function `generate_flow()` is called as a SimPy process, this function creates `Flow` objects with exponentially distributed random inter arrival times. The flow's data rate and size are generated using normally distributed random variables. All of the inter arrival time, data rate, and flow size parameters are user configurable. The flow is also assigned a random SFC chosen from the list of available SFC given in the VNF file. +3. Once the flow is generated, `init_flow()` is called as a SimPy process which initializes the handling of the flow within the simulator. The function then calls `pass_flow()`, which then handles the scheduling of the flow according to the defined load balancing rules (flow schedule). Once the next node has been determined, the forwarding of the flow is simulated by halting the flow for the path delay duration using the `forward_flow()` function. Once that is done, the processing of the flow is simulated by calling `process_flow()` as a SimPy process. If the requested SF was not found at the next node, the flow is then dropped. +4. In `process_flow()`, the processing delay for that particular SF is generated using given mean and standard deviation values using a normal distribution. The simulator checks the node's remaining processing capacity to check if the node can handle the data rate requested by the SF, if there is not enough capacity, then the flow is dropped. For the duration that the flow is being processed by the SF, the flow's data rate is deducted from the node's capacity, and returned after the flow finished processing completely. +5. Once the flow was processed completely at each SF, `depart_flow()` is called to register the flow's departure from the network. If the flow still has other SFs to be processed at in the network, `process_flow()` calls `pass_flow()` again in a mutually recursive manner. This allows the flow to stay in the SF for processing, while the parts of the flow that were processed already to be sent to the next SF. + + +## Input Parameters +The available input parameters that are configurable by the user are: +- d: The duration of the simulation (simulates milliseconds). +- s: The seed to use for the random number generator. +- n: The GraphML network file that specifies the nodes and edges of the network. +- sf: VNF file which contains the SFCs and their respective SFs and their properties. +- iam: Inter arrival mean of the flows' arrival at ingress nodes. +- fdm: The mean value for the generation of data rate values for each flow. +- fds: The standard deviation value for the generation of data rate values for each flow. +- fss: The shape of the Pareto distribution for the generation of the flow size values. + +## Running the simulator + +The simulator application is called `coord-sim` +To run the simulator, the following call may be executed: + +```bash +coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -c params/config/sim_config.yaml +``` + +This will run the coord-sim simulator to simulate the given network and vnf parameter files for 10ms (environment time). + diff --git a/docs/software_campus.png b/docs/software_campus.png new file mode 100644 index 000000000..977bb2c25 Binary files /dev/null and b/docs/software_campus.png differ diff --git a/docs/source/conf.py b/docs/source/conf.py new file mode 100644 index 000000000..48d9b5c44 --- /dev/null +++ b/docs/source/conf.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# +# Configuration file for the Sphinx documentation builder. +# +# This file does only contain a selection of the most common options. For a +# full list see the documentation: +# http://www.sphinx-doc.org/en/master/config + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +# import os +# import sys +# sys.path.insert(0, os.path.abspath('.')) + + +# -- Project information ----------------------------------------------------- + +project = u'Coordination Simulation' +copyright = u'2019, Stefan Schneider' +author = u'Stefan Schneider' + +# The short X.Y version +version = u'' +# The full version, including alpha/beta/rc tags +release = u'0.9.1' + + +# -- General configuration --------------------------------------------------- + +# If your documentation needs a minimal Sphinx version, state it here. +# +# needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.doctest', + 'sphinx.ext.todo', + 'sphinx.ext.githubpages', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix(es) of source filenames. +# You can specify multiple suffix as a list of string: +# +# source_suffix = ['.rst', '.md'] +source_suffix = '.rst' + +# The master toctree document. +master_doc = 'index' + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +# +# This is also used if you do content translation via gettext catalogs. +# Usually you set "language" from the command line for these cases. +language = None + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = [] + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = None + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'alabaster' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +# +# html_theme_options = {} + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Custom sidebar templates, must be a dictionary that maps document names +# to template names. +# +# The default sidebars (for documents that don't match any pattern) are +# defined by theme itself. Builtin themes are using these templates by +# default: ``['localtoc.html', 'relations.html', 'sourcelink.html', +# 'searchbox.html']``. +# +# html_sidebars = {} + + +# -- Options for HTMLHelp output --------------------------------------------- + +# Output file base name for HTML help builder. +htmlhelp_basename = 'CoordinationSimulationdoc' + + +# -- Options for LaTeX output ------------------------------------------------ + +latex_elements = { + # The paper size ('letterpaper' or 'a4paper'). + # + # 'papersize': 'letterpaper', + + # The font size ('10pt', '11pt' or '12pt'). + # + # 'pointsize': '10pt', + + # Additional stuff for the LaTeX preamble. + # + # 'preamble': '', + + # Latex figure (float) alignment + # + # 'figure_align': 'htbp', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + (master_doc, 'CoordinationSimulation.tex', u'Coordination Simulation Documentation', + u'Stefan Schneider', 'manual'), +] + + +# -- Options for manual page output ------------------------------------------ + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + (master_doc, 'coordinationsimulation', u'Coordination Simulation Documentation', + [author], 1) +] + + +# -- Options for Texinfo output ---------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + (master_doc, 'CoordinationSimulation', u'Coordination Simulation Documentation', + author, 'CoordinationSimulation', 'One line description of project.', + 'Miscellaneous'), +] + + +# -- Options for Epub output ------------------------------------------------- + +# Bibliographic Dublin Core info. +epub_title = project + +# The unique identifier of the text. This can be a ISBN number +# or the project homepage. +# +# epub_identifier = '' + +# A unique identification for the text. +# +# epub_uid = '' + +# A list of files that should not be packed into the epub file. +epub_exclude_files = ['search.html'] + + +# -- Extension configuration ------------------------------------------------- + +# -- Options for todo extension ---------------------------------------------- + +# If true, `todo` and `todoList` produce output, else they produce nothing. +todo_include_todos = True diff --git a/docs/source/coordsim.metrics.rst b/docs/source/coordsim.metrics.rst new file mode 100644 index 000000000..47778b3e8 --- /dev/null +++ b/docs/source/coordsim.metrics.rst @@ -0,0 +1,22 @@ +coordsim.metrics package +======================== + +Submodules +---------- + +coordsim.metrics.metrics module +------------------------------- + +.. automodule:: coordsim.metrics.metrics + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: coordsim.metrics + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/coordsim.network.rst b/docs/source/coordsim.network.rst new file mode 100644 index 000000000..a8de7ef11 --- /dev/null +++ b/docs/source/coordsim.network.rst @@ -0,0 +1,30 @@ +coordsim.network package +======================== + +Submodules +---------- + +coordsim.network.dummy\_data module +----------------------------------- + +.. automodule:: coordsim.network.dummy_data + :members: + :undoc-members: + :show-inheritance: + +coordsim.network.flow module +---------------------------- + +.. automodule:: coordsim.network.flow + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: coordsim.network + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/coordsim.reader.rst b/docs/source/coordsim.reader.rst new file mode 100644 index 000000000..548cd9400 --- /dev/null +++ b/docs/source/coordsim.reader.rst @@ -0,0 +1,22 @@ +coordsim.reader package +======================= + +Submodules +---------- + +coordsim.reader.reader module +----------------------------- + +.. automodule:: coordsim.reader.reader + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: coordsim.reader + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/coordsim.rst b/docs/source/coordsim.rst new file mode 100644 index 000000000..ff892eb5c --- /dev/null +++ b/docs/source/coordsim.rst @@ -0,0 +1,32 @@ +coordsim package +================ + +Subpackages +----------- + +.. toctree:: + + coordsim.metrics + coordsim.network + coordsim.reader + coordsim.simulation + +Submodules +---------- + +coordsim.main module +-------------------- + +.. automodule:: coordsim.main + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: coordsim + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/coordsim.simulation.rst b/docs/source/coordsim.simulation.rst new file mode 100644 index 000000000..9f0213d16 --- /dev/null +++ b/docs/source/coordsim.simulation.rst @@ -0,0 +1,30 @@ +coordsim.simulation package +=========================== + +Submodules +---------- + +coordsim.simulation.flowsimulator module +---------------------------------------- + +.. automodule:: coordsim.simulation.flowsimulator + :members: + :undoc-members: + :show-inheritance: + +coordsim.simulation.simulatorparams module +------------------------------------------ + +.. automodule:: coordsim.simulation.simulatorparams + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: coordsim.simulation + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/index.rst b/docs/source/index.rst new file mode 100644 index 000000000..017be6be1 --- /dev/null +++ b/docs/source/index.rst @@ -0,0 +1,154 @@ +.. Coordination Simulation documentation master file, created by + sphinx-quickstart on Thu Jul 11 22:55:56 2019. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to Coordination Simulation's documentation! +=================================================== + +Introduction +------------ + +This discrete-event flow-based simulator is a fast testbed for VNF +coordination algorithms. It is designed to simulate small to medium +sized networks with relatively close-to-reality accuracy. It can +interact with coordination algorithms via an interface that can be +adapted to by the developers of the algorithms. + +The simulator is based on SimPy and is tested with Python 3.6. + +Project owner: Stefan Schneider, MSc, University of Paderborn. + +Developers: - Haydar Qarawlus, MSc. CS student, University of Paderborn. +- Adnan Manzoor, MSc. CS student, University of Paderborn. - Sven Uthe, +MSc, University of Paderbron. + +Application Structure +--------------------- + +The file structure of the simulator is as follows: - docs (Folder): +Contains the doumentation files of the project. - params (Folder): +Contains sample parameter files (network file and VNF file) for testing +purposes. - src (Folder): Contains the source code for the simulator and +the interface. - coordsim (Folder): contains the source code of the +simulator - metrics (Folder): contains the metrics module. - network +(Folder): contains the network module. - reader (Folder): contains the +params file reader module. - simulation (Folder): main simulator module. +- main (Python): main executable for running the simulator from CLI - +siminterface (Folder): contains the interface source code - tests +(Folder): contains the unit tests for the simulator. + +The main modules of the application are the following: + +1. Simulation Module: This module contains FlowSimulator and + SimulatorParams files. FlowSimulator is the main simulator that + imports and utilizes the other modules. SimulatorParams is the class + used to hold all the simulator parameters, this allows the parameters + to be changed mid-simulation by the calling coordination algorithm. +2. Network Module: This module contains the Flow descriptor class. this + class holds the main properties of the flow such as the data rate and + the requested SFC, etc. +3. Reader Module: This module holds the network reader file that parses + the parameter YAML and GraphML files that identify the network and + the available SFCs and SFs. +4. Simulation Module: This module contains the main flow simulator. The + flow simulator obtains its parameters from the SimulatotParams class. + The params class is created by the calling coordination algorithm. + +Operation Flow +-------------- + +The simulator works as follows: The user (coordination algorithm or cli) +provide two main inputs for the simulator: - Network file: GraphML file +using the Zoo format. This file contains the network nodes and the +edges. - VNF file: YAML file that includes the list of SFCs and the list +of SFs under each SFC in an ordered manner. The file can also include a +specifiecd placement that can be used as a default placement. The SFs +must include a processing delay mean and standard deviation values so +that processing delays can be calculated for each flow passing through +that SF. + +Once the parameters are provided, the flow of data through the simulator +is as follows: + +1. The input network and VNF files are parsed producing a NetworkX + object containing the list of nodes and edges, and the shortest paths + of the network (using Floyd-Warshall). The parsing also produces + dictionaries that contain the list of SFCs and the list of SFs and + their respective values. Additionally, the list of ingress nodes + (nodes at which flows arrive) are also calculated from the GraphML + file. These parameters are then passed to a SimulatorParams object, + which holds all the parameters of the simulator, the simulator is + then started using the FlowSimulator object's ``start()`` function +2. At each ingress node, the function ``generate_flow()`` is called as a + SimPy process, this function creates ``Flow`` objects with + exponentially distributed random inter arrival times. The flow's data + rate and size are generated using normally distributed random + variables. All of the inter arrival time, data rate, and flow size + parameters are user configurable. The flow is also assigned a random + SFC chosen from the list of available SFC given in the VNF file. +3. Once the flow is generated, ``init_flow()`` is called as a SimPy + process which initializes the handling of the flow within the + simulator. The function then calls ``pass_flow()``, which then + handles the scheduling of the flow according to the defined load + balancing rules (flow schedule). Once the next node has been + determined, the forwarding of the flow is simulated by halting the + flow for the path delay duration using the ``forward_flow()`` + function. Once that is done, the processing of the flow is simulated + by calling ``process_flow()`` as a SimPy process. If the requested SF + was not found at the next node, the flow is then dropped. +4. In ``process_flow()``, the processing delay for that particular SF is + generated using given mean and standard deviation values using a + normal distribution. The simulator checks the node's remaining + processing capacity to check if the node can handle the data rate + requested by the SF, if there is not enough capacity, then the flow + is dropped. For the duration that the flow is being processed by the + SF, the flow's data rate is deducted from the node's capacity, and + returned after the flow finished processing completely. +5. Once the flow was processed completely at each SF, ``depart_flow()`` + is called to register the flow's departure from the network. If the + flow still has other SFs to be processed at in the network, + ``process_flow()`` calls ``pass_flow()`` again in a mutually + recursive manner. This allows the flow to stay in the SF for + processing, while the parts of the flow that were processed already + to be sent to the next SF. + +Input Parameters +---------------- + +The available input parameters that are configurable by the user are: - +d: The duration of the simulation (simulates milliseconds). - s: The +seed to use for the random number generator. - n: The GraphML network +file that specifies the nodes and edges of the network. - sf: VNF file +which contains the SFCs and their respective SFs and their properties. - +iam: Inter arrival mean of the flows' arrival at ingress nodes. - fdm: +The mean value for the generation of data rate values for each flow. - +fds: The standard deviation value for the generation of data rate values +for each flow. - fss: The shape of the Pareto distribution for the +generation of the flow size values. + +Running the simulator +--------------------- + +The simulator application is called ``coord-sim`` To run the simulator, +the following call may be executed: + +.. code:: bash + + coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -c params/config/sim_config.yaml + +This will run the coord-sim simulator to simulate the given network and +vnf parameter files for 10ms (environment time). + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/source/modules.rst b/docs/source/modules.rst new file mode 100644 index 000000000..429519e49 --- /dev/null +++ b/docs/source/modules.rst @@ -0,0 +1,7 @@ +coordsim +======== + +.. toctree:: + :maxdepth: 4 + + coordsim diff --git a/docs/source/siminterface.rst b/docs/source/siminterface.rst new file mode 100644 index 000000000..e4b29542e --- /dev/null +++ b/docs/source/siminterface.rst @@ -0,0 +1,22 @@ +siminterface package +==================== + +Submodules +---------- + +siminterface.simulator module +----------------------------- + +.. automodule:: siminterface.simulator + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: siminterface + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/upb.png b/docs/upb.png new file mode 100644 index 000000000..e69ae7528 Binary files /dev/null and b/docs/upb.png differ diff --git a/params/config/sim_config.yaml b/params/config/sim_config.yaml new file mode 100644 index 000000000..e0d9370a7 --- /dev/null +++ b/params/config/sim_config.yaml @@ -0,0 +1,35 @@ +# module for configuring the simulator +# configuration parameters are loaded and used both when using the simulator via the CLI and via the interface +# all parameters are required, defaults are in comments + +inter_arrival_mean: 10.0 # default: 10.0 +deterministic_arrival: True # default: True +flow_dr_mean: 1.0 # default: 1.0 +flow_dr_stdev: 0.0 # default: 0.0 +flow_size_shape: 0.001 # default: 0.001 (for deterministic!) +deterministic_size: True # default: True +# if deterministic = True, the simulator reinterprets and uses inter_arrival_mean and flow_size_shape as fixed +# deterministic values rather than means of a random distribution +# can be overridden by deterministic_arrival and deterministic_size +#deterministic: True # default: False +run_duration: 100 # default: 100 + +# Optional: Trace file trace relative to the CWD. +# Until values start in the trace file, the defaults from this file are used +# trace_path: params/traces/default_trace.csv + +# Optional: Write Scheduling results +write_schedule: True + +# States (two state markov arrival) +# Optional param: states: True | False +use_states: True +init_state: state_1 + +states: + state_1: + inter_arr_mean: 10.0 + switch_p: 0.8 + state_2: + inter_arr_mean: 2.0 + switch_p: 0.3 \ No newline at end of file diff --git a/params/networks/triangle.graphml b/params/networks/triangle.graphml new file mode 100644 index 000000000..59f0fe11e --- /dev/null +++ b/params/networks/triangle.graphml @@ -0,0 +1,128 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 3/02/11 + US + Country + Abilene + Primary + 0 + http://www.internet2.edu/pubs/200502-IS-AN.pdf + 1.0 + REN + Historic + 1 + 0 + Abilene + 0.3.34dev-20120328 + 0 + 0 + e278b1b + = + 02 + 3/02/11 + IP + Topology Zoo Toolset + 0 + 0 + 2005_02 + 2005 + 2011_09_01 + 0 + + 1 + 40.71427 + United States + 0 + -74.00597 + New York + Ingress + 10 + + + 1 + 41.85003 + United States + 1 + -87.65005 + Chicago + Normal + 10 + + + 1 + 38.89511 + United States + 2 + -77.03637 + Washington DC + Normal + 10 + + + OC-192 + OC-192c + c + 0 + 1024 + + + OC-192 + OC-192c + c + 0 + 1024 + 512 + + + OC-192 + OC-192c + c + 0 + 15 + 1024 + + + diff --git a/params/services/3sfcs.yaml b/params/services/3sfcs.yaml new file mode 100644 index 000000000..356a0222e --- /dev/null +++ b/params/services/3sfcs.yaml @@ -0,0 +1,28 @@ +# simple chain of 3 SFs a->b->c with deterministic processing delays + +# list of SFCs and involved SFs (order of SFs matters). names need to match dummy schedule and placement (dummy_data.py) +sfc_list: + sfc_1: + - a + - b + - c + sfc_2: + - a + - b + - c + sfc_3: + - b + - a + - c + +# SF attributes +sf_list: + a: + processing_delay_mean: 6.0 + processing_delay_stdev: 1.0 + b: + processing_delay_mean: 4.5 + processing_delay_stdev: 1.0 + c: + processing_delay_mean: 3.0 +processing_delay_stdev: 1.0 diff --git a/params/services/abc.yaml b/params/services/abc.yaml new file mode 100644 index 000000000..6c1ca3d91 --- /dev/null +++ b/params/services/abc.yaml @@ -0,0 +1,24 @@ +# simple chain of 3 SFs a->b->c with deterministic processing delays + +# list of SFCs and involved SFs (order of SFs matters). names need to match dummy schedule and placement (dummy_data.py) +sfc_list: + sfc_1: + - a + - b + - c + +# SF attributes (for now, processing delay) +sf_list: + a: + processing_delay_mean: 5.0 + processing_delay_stdev: 0.0 + # reference to module A.py in 'resource_functions' (specified via -sfr arg) + resource_function_id: A + b: + processing_delay_mean: 5.0 + processing_delay_stdev: 0.0 + resource_function_id: B + c: + processing_delay_mean: 5.0 + processing_delay_stdev: 0.0 + # no resource_function_id defaults to linear resource consumption, equal to the load diff --git a/params/services/resource_functions/A.py b/params/services/resource_functions/A.py new file mode 100644 index 000000000..a01d98c50 --- /dev/null +++ b/params/services/resource_functions/A.py @@ -0,0 +1,2 @@ +def resource_function(load): + return load diff --git a/params/services/resource_functions/B.py b/params/services/resource_functions/B.py new file mode 100644 index 000000000..a01d98c50 --- /dev/null +++ b/params/services/resource_functions/B.py @@ -0,0 +1,2 @@ +def resource_function(load): + return load diff --git a/params/traces/default_trace.csv b/params/traces/default_trace.csv new file mode 100644 index 000000000..38c52ff65 --- /dev/null +++ b/params/traces/default_trace.csv @@ -0,0 +1,8 @@ +time,inter_arrival_mean +0,10 +100,50 +200,40 +300,60 +400,20 +500,5 +1000,45 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..c7aee540e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +--editable git://github.com/RealVNF/common-utils#egg=common-utils +--editable . \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 000000000..79a16af7e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 120 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..6090ade56 --- /dev/null +++ b/setup.py @@ -0,0 +1,39 @@ +from setuptools import setup, find_packages +requirements = [ + 'simpy==3.0.13', + 'networkx', + 'geopy', + 'pyyaml>=5.1', + 'numpy', + 'common-utils' +] + +test_requirements = [ + 'flake8', + 'nose2' +] + +dependency_links = [ + 'git+https://github.com/RealVNF/common-utils' +] + +setup( + name='coord-sim', + version='1.0.0', + description='Simulate flow-level, inter-node network coordination including scaling and placement of services and ' + 'scheduling/balancing traffic between them.', + url='https://github.com/RealVNF/coord-sim', + author='Stefan Schneider', + dependency_links=dependency_links, + author_email='stefan.schneider@upb.de', + package_dir={'': 'src'}, + packages=find_packages('src'), + install_requires=requirements + test_requirements, + tests_require=test_requirements, + zip_safe=False, + entry_points={ + 'console_scripts': [ + 'coord-sim=coordsim.main:main', + ], + }, +) diff --git a/src/coordsim/__init__.py b/src/coordsim/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/coordsim/main.py b/src/coordsim/main.py new file mode 100644 index 000000000..e20d9d4a8 --- /dev/null +++ b/src/coordsim/main.py @@ -0,0 +1,93 @@ +import argparse +import simpy +import random +import numpy +from coordsim.simulation.flowsimulator import FlowSimulator +from coordsim.reader import reader +from coordsim.metrics.metrics import Metrics +from coordsim.simulation.simulatorparams import SimulatorParams +import coordsim.network.dummy_data as dummy_data +from coordsim.trace_processor.trace_processor import TraceProcessor +import logging +import time +import os + + +log = logging.getLogger(__name__) + + +def main(): + args = parse_args() + start_time = time.time() + logging.basicConfig(level=logging.INFO) + + # Create a SimPy environment + env = simpy.Environment() + + # Seed the random generator + random.seed(args.seed) + numpy.random.seed(args.seed) + + # Parse network and get NetworkX object and ingress network list + network, ing_nodes = reader.read_network(args.network, node_cap=10, link_cap=10) + + # use dummy placement and schedule for running simulator without algorithm + # TODO: make configurable via CLI + sf_placement = dummy_data.triangle_placement + schedule = dummy_data.triangle_schedule + + # Getting current SFC list, and the SF list of each SFC, and config + sfc_list = reader.get_sfc(args.sf) + sf_list = reader.get_sf(args.sf, args.sfr) + config = reader.get_config(args.config) + + metrics = Metrics(network, sf_list) + + # Create the simulator parameters object with the provided args + params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, metrics, sf_placement=sf_placement, + schedule=schedule) + log.info(params) + + if 'trace_path' in config: + trace_path = os.path.join(os.getcwd(), config['trace_path']) + trace = reader.get_trace(trace_path) + TraceProcessor(params, env, trace) + log.info("Using trace "+config['trace_path']) + # Create a FlowSimulator object, pass the SimPy environment and params objects + simulator = FlowSimulator(env, params) + + # Start the simulation + simulator.start() + + # Run the simpy environment for the specified duration + env.run(until=args.duration) + + # Record endtime and running_time metrics + end_time = time.time() + metrics.running_time(start_time, end_time) + + # dump all metrics + log.info(metrics.metrics) + + +# parse CLI args (when using simulator as stand-alone, not triggered through the interface) +def parse_args(): + parser = argparse.ArgumentParser(description="Coordination-Simulation tool") + parser.add_argument('-d', '--duration', required=True, dest="duration", type=int, + help="The duration of the simulation (simulates milliseconds).") + parser.add_argument('-sf', '--sf', required=True, dest="sf", + help="VNF file which contains the SFCs and their respective SFs and their properties.") + parser.add_argument('-sfr', '--sfr', required=False, default='', dest='sfr', + help="Path which contains the SF resource consumption functions.") + parser.add_argument('-n', '--network', required=True, dest='network', + help="The GraphML network file that specifies the nodes and edges of the network.") + parser.add_argument('-c', '--config', required=True, dest='config', help="Path to the simulator config file.") + parser.add_argument('-t', '--trace', required=False, dest='trace', default=None, + help="Provide a CSV trace file to configure the traffic the simulator is generating.") + parser.add_argument('-s', '--seed', required=False, default=random.randint(0, 9999), dest='seed', type=int, + help="Random seed") + return parser.parse_args() + + +if __name__ == '__main__': + main() diff --git a/src/coordsim/metrics/__init__.py b/src/coordsim/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/coordsim/metrics/metrics.py b/src/coordsim/metrics/metrics.py new file mode 100644 index 000000000..338f9b581 --- /dev/null +++ b/src/coordsim/metrics/metrics.py @@ -0,0 +1,198 @@ +""" + +Metrics collection module + +""" +import numpy as np +from collections import defaultdict +import logging +logger = logging.getLogger(__name__) + +# Metrics global dict +# metrics = {} + + +class Metrics: + def __init__(self, network, sfs): + self.metrics = {} + self.network = network + self.sfs = sfs + self.reset_metrics() + + def reset_metrics(self): + """Set/Reset all metrics""" + + # successful flows + self.metrics['generated_flows'] = 0 + self.metrics['processed_flows'] = 0 + self.metrics['dropped_flows'] = 0 + self.metrics['total_active_flows'] = 0 + # number of dropped flows per node and SF (locations) + self.metrics['dropped_flows_locs'] = {v: {sf: 0 for sf in self.sfs.keys()} for v in self.network.nodes.keys()} + # number of dropped flow per node - reset every run + self.metrics['run_dropped_flows_per_node'] = {v: 0 for v in self.network.nodes.keys()} + + # delay + self.metrics['total_processing_delay'] = 0.0 + self.metrics['num_processing_delays'] = 0 + self.metrics['avg_processing_delay'] = 0.0 + + self.metrics['total_path_delay'] = 0.0 + self.metrics['num_path_delays'] = 0 + # avg path delay per used path, not per entire service chain + self.metrics['avg_path_delay'] = 0.0 + + self.metrics['total_end2end_delay'] = 0.0 + self.metrics['avg_end2end_delay'] = 0.0 + self.metrics['avg_total_delay'] = 0.0 + + self.metrics['running_time'] = 0.0 + + # Current number of active flows per each node + self.metrics['current_active_flows'] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + self.metrics['current_traffic'] = defaultdict(lambda: defaultdict(lambda: defaultdict(float))) + + self.reset_run_metrics() + + def reset_run_metrics(self): + """Set/Reset metrics belonging to one run""" + self.metrics['run_dropped_flows_per_node'] = {v: 0 for v in self.network.nodes.keys()} + + self.metrics['run_end2end_delay'] = 0 + self.metrics['run_avg_end2end_delay'] = 0.0 + self.metrics['run_max_end2end_delay'] = 0.0 + self.metrics['run_total_path_delay'] = 0 + # path delay averaged over all generated flows in the run + # not 100% accurate due to flows still in the network from previous runs + self.metrics['run_avg_path_delay'] = 0 + self.metrics['run_generated_flows'] = 0 + self.metrics['run_in_network_flows'] = 0 + self.metrics['run_processed_flows'] = 0 + self.metrics['run_max_node_usage'] = defaultdict(float) + + # total requested traffic: increased whenever a flow is requesting processing before scheduling or processing + self.metrics['run_total_requested_traffic'] = defaultdict(lambda: defaultdict(lambda: defaultdict(float))) + # total generated traffic. traffic generate on ingress nodes is recorded + # this value could also be extracted from network and sim config file. + self.metrics['run_total_requested_traffic_node'] = defaultdict(float) + # total processed traffic (aggregated data rate) per node per SF within one run + self.metrics['run_total_processed_traffic'] = defaultdict(lambda: defaultdict(float)) + + def calc_max_node_usage(self, node_id, current_usage): + """ + Calculate the run's max node usage + """ + if current_usage > self.metrics['run_max_node_usage'][node_id]: + self.metrics['run_max_node_usage'][node_id] = current_usage + + def add_requesting_flow(self, flow): + self.metrics['run_total_requested_traffic'][flow.current_node_id][flow.sfc][flow.current_sf] += flow.dr + + # call when new flows starts processing at an SF + def add_active_flow(self, flow, current_node_id, current_sf): + self.metrics['current_active_flows'][current_node_id][flow.sfc][current_sf] += 1 + self.metrics['current_traffic'][current_node_id][flow.sfc][current_sf] += flow.dr + self.metrics['run_total_processed_traffic'][current_node_id][current_sf] += flow.dr + + def remove_active_flow(self, flow, current_node_id, current_sf): + self.metrics['current_active_flows'][current_node_id][flow.sfc][current_sf] -= 1 + self.metrics['current_traffic'][current_node_id][flow.sfc][current_sf] -= flow.dr + + try: + assert self.metrics['current_active_flows'][flow.current_node_id][flow.sfc][flow.current_sf] >= 0, "\ + Nodes cannot have negative current active flows" + + assert self.metrics['total_active_flows'] >= 0, "\ + Nodes cannot have negative active flows" + + assert self.metrics['current_traffic'][flow.current_node_id][flow.sfc][flow.current_sf] >= 0.00, "\ + Nodes cannot have negative traffic" + + except Exception as e: + logger.critical(e) + + def generated_flow(self, flow, current_node): + self.metrics['generated_flows'] += 1 + self.metrics['run_generated_flows'] += 1 + self.metrics['total_active_flows'] += 1 + self.metrics['run_total_requested_traffic_node'][current_node] += flow.dr + + # call when flow was successfully completed, ie, processed by all required SFs + def completed_flow(self): + self.metrics['processed_flows'] += 1 + self.metrics['run_processed_flows'] += 1 + self.metrics['total_active_flows'] -= 1 + assert self.metrics['total_active_flows'] >= 0, "Cannot have negative active flows" + + def dropped_flow(self, flow): + self.metrics['dropped_flows'] += 1 + self.metrics['total_active_flows'] -= 1 + self.metrics['dropped_flows_locs'][flow.current_node_id][flow.current_sf] += 1 + self.metrics['run_dropped_flows_per_node'][flow.current_node_id] += 1 + assert self.metrics['total_active_flows'] >= 0, "Cannot have negative active flows" + + def add_processing_delay(self, delay): + self.metrics['num_processing_delays'] += 1 + self.metrics['total_processing_delay'] += delay + + def add_path_delay(self, delay): + self.metrics['num_path_delays'] += 1 + self.metrics['total_path_delay'] += delay + + # calc path delay per run; average over num generated flows in run + self.metrics['run_total_path_delay'] += delay + if self.metrics['run_generated_flows'] > 0: + self.metrics[ + 'run_avg_path_delay' + ] = self.metrics['run_total_path_delay'] / self.metrics['run_generated_flows'] + + def add_end2end_delay(self, delay): + self.metrics['total_end2end_delay'] += delay + self.metrics['run_end2end_delay'] += delay + if delay > self.metrics['run_max_end2end_delay']: + self.metrics['run_max_end2end_delay'] = delay + + def running_time(self, start_time, end_time): + self.metrics['running_time'] = end_time - start_time + + def calc_avg_processing_delay(self): + if self.metrics['num_processing_delays'] > 0: + self.metrics['avg_processing_delay'] \ + = self.metrics['total_processing_delay'] / self.metrics['num_processing_delays'] + else: + self.metrics['avg_processing_delay'] = 0 + + def calc_avg_path_delay(self): + if self.metrics['num_path_delays'] > 0: + self.metrics['avg_path_delay'] = self.metrics['total_path_delay'] / self.metrics['num_path_delays'] + else: + self.metrics['avg_path_delay'] = 0 + + def calc_avg_end2end_delay(self): + # We divide by number of processed flows to get end2end delays for processed flows only + if self.metrics['processed_flows'] > 0: + self.metrics['avg_end2end_delay'] = self.metrics['total_end2end_delay'] / self.metrics['processed_flows'] + else: + self.metrics['avg_end2end_delay'] = 0 # No avg end2end delay yet (no processed flows yet) + + if self.metrics['run_processed_flows'] > 0: + self.metrics[ + 'run_avg_end2end_delay' + ] = self.metrics['run_end2end_delay'] / self.metrics['run_processed_flows'] + else: + self.metrics['run_avg_end2end_delay'] = 0 # No run avg end2end delay yet (no processed flows yet) + + def calc_avg_total_delay(self): + avg_processing_delay = self.metrics['avg_processing_delay'] + avg_path_delay = self.metrics['avg_path_delay'] + self.metrics['avg_total_delay'] = np.mean([avg_path_delay, avg_processing_delay]) + + def get_active_flows(self): + return self.metrics['current_active_flows'] + + def get_metrics(self): + self.calc_avg_processing_delay() + self.calc_avg_path_delay() + self.calc_avg_total_delay() + self.calc_avg_end2end_delay() + return self.metrics diff --git a/src/coordsim/network/__init__.py b/src/coordsim/network/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/coordsim/network/dummy_data.py b/src/coordsim/network/dummy_data.py new file mode 100644 index 000000000..372784912 --- /dev/null +++ b/src/coordsim/network/dummy_data.py @@ -0,0 +1,238 @@ +# dummy placements and schedules for testing the simulator without an algorithm + +# placements +placement = { + 'pop0': ['a', 'b', 'c'], + 'pop1': ['a', 'b', 'c'], + 'pop2': ['a', 'b'] +} + +triangle_placement = { + 'pop0': ['a'], + 'pop1': ['b'], + 'pop2': ['c'] +} + +# schedules +schedule = { + 'pop0': { + 'sfc_1': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_2': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_3': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + }, + 'pop1': { + 'sfc_1': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_2': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_3': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + }, + 'pop2': { + 'sfc_1': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_2': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_3': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + }, +} + +# simple schedule sending flows along the triangle in deterministic way +triangle_schedule = { + 'pop0': { + 'sfc_1': { + 'a': { + 'pop0': 1, + 'pop1': 0, + 'pop2': 0 + }, + 'b': { + 'pop0': 0, + 'pop1': 1, + 'pop2': 0 + }, + 'c': { + 'pop0': 0, + 'pop1': 0, + 'pop2': 1 + } + } + }, + 'pop1': { + 'sfc_1': { + 'a': { + 'pop0': 1, + 'pop1': 0, + 'pop2': 0 + }, + 'b': { + 'pop0': 0, + 'pop1': 1, + 'pop2': 0 + }, + 'c': { + 'pop0': 0, + 'pop1': 0, + 'pop2': 1 + } + } + }, + 'pop2': { + 'sfc_1': { + 'a': { + 'pop0': 1, + 'pop1': 0, + 'pop2': 0 + }, + 'b': { + 'pop0': 0, + 'pop1': 1, + 'pop2': 0 + }, + 'c': { + 'pop0': 0, + 'pop1': 0, + 'pop2': 1 + } + } + } +} diff --git a/src/coordsim/network/flow.py b/src/coordsim/network/flow.py new file mode 100644 index 000000000..5b75b8870 --- /dev/null +++ b/src/coordsim/network/flow.py @@ -0,0 +1,34 @@ +""" + +Flow class. +This identifies the flow and its parameters. +TODO: Add get/set methods + +""" + + +class Flow: + + def __init__(self, flow_id, sfc, dr, size, creation_time, + destination=None, current_sf=None, current_node_id=None, current_position=0, end2end_delay=0.0): + + # Flow ID: Unique ID string + self.flow_id = flow_id + # The requested SFC + self.sfc = sfc + # The requested data rate in Megabits per second (Mbit/s) + self.dr = dr + # The size of the flow in Megabit (Mb) + self.size = size + # The current SF that the flow is being processed in. + self.current_sf = current_sf + # The current node that the flow is being processed in + self.current_node_id = current_node_id + # The duration of the flow calculated in ms. + self.duration = (float(size) / float(dr)) * 1000 # Converted flow duration to ms + # Current flow position within the SFC + self.current_position = current_position + # End to end delay of the flow, used for metrics + self.end2end_delay = end2end_delay + # FLow creation time + self.creation_time = creation_time diff --git a/src/coordsim/reader/__init__.py b/src/coordsim/reader/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/coordsim/reader/reader.py b/src/coordsim/reader/reader.py new file mode 100644 index 000000000..34376af84 --- /dev/null +++ b/src/coordsim/reader/reader.py @@ -0,0 +1,238 @@ +import networkx as nx +from geopy.distance import distance as dist +import numpy as np +import logging +import yaml +import math +from collections import defaultdict +import importlib +import csv + +log = logging.getLogger(__name__) + +# Disclaimer: Some snippets of the following file were imported/modified from B-JointSP on GitHub. +# Original code can be found on https://github.com/CN-UPB/B-JointSP + +""" +Network parsing module. +- Reads and parses network files into NetworkX. +- Reads and parses network yaml files and gets placement and SFC and SFs. +""" + + +def get_trace(trace_file): + """ + Parse the trace file that the simulator will use to generate traffic. + """ + with open(trace_file) as f: + trace_rows = csv.DictReader(f) + traces = [] + for row in trace_rows: + traces.append(dict(row)) + return traces + + +def get_config(config_file): + """ + Parse simulator config params in specified yaml file and return as Python dict + """ + # TODO: specify defaults as fall back if param is not set in config + with open(config_file) as f: + config = yaml.load(f, Loader=yaml.FullLoader) + return config + + +def get_sfc(sfc_file): + """ + Get the list of SFCs from the yaml data. + """ + with open(sfc_file) as yaml_stream: + sfc_data = yaml.load(yaml_stream, Loader=yaml.FullLoader) + + sfc_list = defaultdict(None) + for sfc_name, sfc_sf in sfc_data['sfc_list'].items(): + sfc_list[sfc_name] = sfc_sf + return sfc_list + + +def load_resource_function(name, path): + try: + spec = importlib.util.spec_from_file_location(name, path + '/' + name + '.py') + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + except Exception: + raise Exception(f'Cannot load file "{name}.py" from specified location "{path}".') + + try: + return getattr(module, 'resource_function') + except Exception: + raise Exception(f'There is no "resource_function" defined in file "{name}.py."') + + +def get_sf(sf_file, resource_functions_path=''): + """ + Get the list of SFs and their properties from the yaml data. + """ + with open(sf_file) as yaml_stream: + sf_data = yaml.load(yaml_stream, Loader=yaml.FullLoader) + + # Configurable default mean and stddev defaults + default_processing_delay_mean = 1.0 + default_processing_delay_stdev = 1.0 + def default_resource_function(x): return x + sf_list = defaultdict(None) + for sf_name, sf_details in sf_data['sf_list'].items(): + sf_list[sf_name] = sf_details + # Set defaults (currently processing delay mean and stdev) + sf_list[sf_name]["processing_delay_mean"] = sf_list[sf_name].get("processing_delay_mean", + default_processing_delay_mean) + sf_list[sf_name]["processing_delay_stdev"] = sf_list[sf_name].get("processing_delay_stdev", + default_processing_delay_stdev) + if 'resource_function_id' in sf_list[sf_name]: + try: + sf_list[sf_name]['resource_function'] = load_resource_function(sf_list[sf_name]['resource_function_id'], + resource_functions_path) + except Exception as ex: + sf_list[sf_name]['resource_function_id'] = 'default' + sf_list[sf_name]['resource_function'] = default_resource_function + log.warning(f'{str(ex)} SF {sf_name} will use default resource function instead.') + else: + sf_list[sf_name]["resource_function_id"] = 'default' + sf_list[sf_name]["resource_function"] = default_resource_function + log.debug(f'No resource function specified for SF {sf_name}. Default resource function will be used.') + return sf_list + + +def weight(edge_cap, edge_delay): + """ + edge weight = 1 / (cap + 1/delay) => prefer high cap, use smaller delay as additional influence/tie breaker + if cap = None, set it to 0 use edge_delay as weight + """ + assert edge_delay is not None + if edge_cap is None: + return edge_delay + if edge_cap == 0: + return math.inf + elif edge_delay == 0: + return 0 + return 1 / (edge_cap + 1 / edge_delay) + + +def network_diameter(nx_network): + """Return the network diameter, ie, delay of longest shortest path""" + if 'shortest_paths' not in nx_network.graph: + shortest_paths(nx_network) + return max([path[1] for path in nx_network.graph['shortest_paths'].values()]) + + +def shortest_paths(networkx_network): + """ + finds the all pairs shortest paths using Johnson Algo + sets a dictionary, keyed by source and target, of all pairs shortest paths with path_delays in the network as an + attr. + key: (src, dest) , value: ([nodes_on_the_shortest_path], path_delay) + path delays are the sum of individual edge_delays of the edges in the shortest path from source to destination + """ + # in-built implementation of Johnson Algo, just returns a list of shortest paths + # returns a dict with : key: source, value: dict with key: dest and value: shortest path as list of nodes + all_pair_shortest_paths = dict(nx.johnson(networkx_network, weight='weight')) + # contains shortest paths with path_delays + # key: (src, dest) , value: ([nodes_on_the_shortest_path], path_delay) + shortest_paths_with_delays = {} + for source, v in all_pair_shortest_paths.items(): + for destination, shortest_path_list in v.items(): + path_delay = 0 + # only if the source and destination are different, path_delays need to be calculated, otherwise 0 + if source != destination: + # shortest_path_list only contains ordered nodes [node1,node2,node3....] in the shortest path + # here we take ordered pair of nodes (src, dest) to cal. the path_delay of the edge between them + for i in range(len(shortest_path_list) - 1): + path_delay += networkx_network[shortest_path_list[i]][shortest_path_list[i + 1]]['delay'] + shortest_paths_with_delays[(source, destination)] = (shortest_path_list, path_delay) + networkx_network.graph['shortest_paths'] = shortest_paths_with_delays + + +def read_network(file, node_cap=None, link_cap=None): + """ + Read the GraphML file and return list of nodes and edges. + """ + SPEED_OF_LIGHT = 299792458 # meter per second + PROPAGATION_FACTOR = 0.77 # https://en.wikipedia.org/wiki/Propagation_delay + + if not file.endswith(".graphml"): + raise ValueError("{} is not a GraphML file".format(file)) + graphml_network = nx.read_graphml(file, node_type=int) + networkx_network = nx.Graph() + + # Setting the nodes of the NetworkX Graph + for n in graphml_network.nodes(data=True): + node_id = "pop{}".format(n[0]) + cap = n[1].get("NodeCap", None) + if cap is None: + cap = node_cap + log.warning("NodeCap not set in the GraphML file, now using default NodeCap for node: {}".format(n)) + node_type = n[1].get("NodeType", "Normal") + node_name = n[1].get("label", None) + if cap is None: + raise ValueError("No NodeCap. set for node{} in file {} (as cmd argument or in graphml)".format(n, file)) + # Adding a Node in the NetworkX Graph + # {"id": node_id, "name": node_name, "type": node_type, "cap": cpu}) + # Type of node. For now it is either "Normal" or "Ingress" + # Init 'remaining_resources' to the node capacity + networkx_network.add_node(node_id, name=node_name, type=node_type, cap=cap, available_sf={}, + remaining_cap=cap) + + # set links + # calculate link delay based on geo positions of nodes; + + for e in graphml_network.edges(data=True): + # Check whether LinkDelay value is set, otherwise default to None + source = "pop{}".format(e[0]) + target = "pop{}".format(e[1]) + link_delay = e[2].get("LinkDelay", None) + # As edges are undirectional, only LinkFwdCap determines the available data rate + link_fwd_cap = e[2].get("LinkFwdCap", link_cap) + if e[2].get("LinkFwdCap") is None: + log.warning(f"Link {(e[0], e[1])} has no capacity defined. Using the default capacity {link_cap} instead.") + # Setting a default delay of 3 incase no delay specified in GraphML file + # and we are unable to set it based on Geo location + delay = 3 + if link_delay is None: + n1 = graphml_network.nodes(data=True)[e[0]] + n2 = graphml_network.nodes(data=True)[e[1]] + n1_lat, n1_long = n1.get("Latitude", None), n1.get("Longitude", None) + n2_lat, n2_long = n2.get("Latitude", None), n2.get("Longitude", None) + if n1_lat is None or n1_long is None or n2_lat is None or n2_long is None: + log.warning("Link Delay not set in the GraphML file and unable to calc based on Geo Location," + "Now using default delay for edge: ({},{})".format(source, target)) + else: + distance = dist((n1_lat, n1_long), (n2_lat, n2_long)).meters # in meters + # round delay to int using np.around for consistency with emulator + delay = int(np.around((distance / SPEED_OF_LIGHT * 1000) * PROPAGATION_FACTOR)) # in milliseconds + else: + delay = link_delay + + # Adding the undirected edges for each link defined in the network. + # delay = edge delay , cap = edge capacity + networkx_network.add_edge(source, target, delay=delay, cap=link_fwd_cap, remaining_cap=link_fwd_cap) + + # setting the weight property for each edge in the NetworkX Graph + # weight attribute is used to find the shortest paths + for edge in networkx_network.edges.values(): + edge['weight'] = weight(edge['cap'], edge['delay']) + # Setting the all-pairs shortest path in the NetworkX network as a graph attribute + shortest_paths(networkx_network) + + # Filter ingress nodes + ing_nodes = [] + for node in networkx_network.nodes.items(): + if node[1]["type"] == "Ingress": + ing_nodes.append(node) + + return networkx_network, ing_nodes + + +def reset_cap(network): + for node in network.nodes.keys(): + network.nodes[node]['remaining_cap'] = network.nodes[node]['cap'] + network.nodes[node]['available_sf'] = {} diff --git a/src/coordsim/simulation/__init__.py b/src/coordsim/simulation/__init__.py new file mode 100644 index 000000000..bfa4d6071 --- /dev/null +++ b/src/coordsim/simulation/__init__.py @@ -0,0 +1,2 @@ +from coordsim.simulation.flowsimulator import FlowSimulator +__all__ = ['FlowSimulator', ] diff --git a/src/coordsim/simulation/flowsimulator.py b/src/coordsim/simulation/flowsimulator.py new file mode 100644 index 000000000..58a71a882 --- /dev/null +++ b/src/coordsim/simulation/flowsimulator.py @@ -0,0 +1,290 @@ +import logging +import random +import numpy as np +from coordsim.network.flow import Flow +# from coordsim.metrics import metrics + +log = logging.getLogger(__name__) + +""" +Flow Simulator class +This class holds the flow simulator and its internal flow handling functions. +Flow of data through the simulator (abstract): + +start() -> generate_flow() -> init_flow() -> pass_flow() -> process_flow() +and forward_flow() -> depart_flow() or pass_flow() + +""" + + +class FlowSimulator: + def __init__(self, env, params): + self.env = env + self.params = params + self.total_flow_count = 0 + + def start(self): + """ + Start the simulator. + """ + log.info("Starting simulation") + # Setting the all-pairs shortest path in the NetworkX network as a graph attribute + log.info("Using nodes list {}\n".format(list(self.params.network.nodes.keys()))) + log.info("Total of {} ingress nodes available\n".format(len(self.params.ing_nodes))) + for node in self.params.ing_nodes: + node_id = node[0] + self.env.process(self.generate_flow(node_id)) + + def generate_flow(self, node_id): + """ + Generate flows at the ingress nodes. + """ + while self.params.inter_arr_mean[node_id] is not None: + self.total_flow_count += 1 + + # set normally distributed flow data rate + flow_dr = np.random.normal(self.params.flow_dr_mean, self.params.flow_dr_stdev) + + # set deterministic or random flow arrival times and flow sizes according to config + if self.params.deterministic_arrival: + inter_arr_time = self.params.inter_arr_mean[node_id] + else: + # Poisson arrival -> exponential distributed inter-arrival time + inter_arr_time = random.expovariate(lambd=1.0/self.params.inter_arr_mean[node_id]) + + if self.params.deterministic_size: + flow_size = self.params.flow_size_shape + else: + # heavy-tail flow size + flow_size = np.random.pareto(self.params.flow_size_shape) + 1 + + # Skip flows with negative flow_dr or flow_size values + if flow_dr <= 0.00 or flow_size <= 0.00: + continue + + # Assign a random SFC to the flow + flow_sfc = np.random.choice([sfc for sfc in self.params.sfc_list.keys()]) + # Get the flow's creation time (current environment time) + creation_time = self.env.now + # Generate flow based on given params + flow = Flow(str(self.total_flow_count), flow_sfc, flow_dr, flow_size, creation_time, + current_node_id=node_id) + # Update metrics for the generated flow + self.params.metrics.generated_flow(flow, node_id) + # Generate flows and schedule them at ingress node + self.env.process(self.init_flow(flow)) + yield self.env.timeout(inter_arr_time) + + def init_flow(self, flow): + """ + Initialize flows within the network. This function takes the generated flow object at the ingress node + and handles it according to the requested SFC. We check if the SFC that is being requested is indeed + within the schedule, otherwise we log a warning and drop the flow. + The algorithm will check the flow's requested SFC, and will forward the flow through the network using the + SFC's list of SFs based on the LB rules that are provided through the scheduler's 'flow_schedule()' + function. + """ + log.info( + "Flow {} generated. arrived at node {} Requesting {} - flow duration: {}ms, " + "flow dr: {}. Time: {}".format(flow.flow_id, flow.current_node_id, flow.sfc, flow.duration, flow.dr, + self.env.now)) + sfc = self.params.sfc_list[flow.sfc] + # Check to see if requested SFC exists + if sfc is not None: + # Iterate over the SFs and process the flow at each SF. + yield self.env.process(self.pass_flow(flow, sfc)) + else: + log.info(f"Requested SFC was not found. Dropping flow {flow.flow_id}") + # Update metrics for the dropped flow + self.params.metrics.dropped_flow(flow) + self.env.exit() + + def pass_flow(self, flow, sfc): + """ + Passes the flow to the next node to begin processing. + The flow might still be arriving at a previous node or SF. + This function is used in a mutual recursion alongside process_flow() function to allow flows to arrive and begin + processing without waiting for the flow to completely arrive. + The mutual recursion is as follows: + pass_flow() -> process_flow() -> pass_flow() and so on... + Breaking condition: Flow reaches last position within the SFC, then process_flow() calls depart_flow() + instead of pass_flow(). The position of the flow within the SFC is determined using current_position + attribute of the flow object. + """ + + # set current sf of flow + sf = sfc[flow.current_position] + flow.current_sf = sf + self.params.metrics.add_requesting_flow(flow) + + next_node = self.get_next_node(flow, sf) + yield self.env.process(self.forward_flow(flow, next_node)) + + log.info("Flow {} STARTED ARRIVING at node {} for processing. Time: {}" + .format(flow.flow_id, flow.current_node_id, self.env.now)) + yield self.env.process(self.process_flow(flow, sfc)) + + def get_next_node(self, flow, sf): + """ + Get next node using weighted probabilites from the scheduler + """ + schedule = self.params.schedule + # Check if scheduling rule exists + if (flow.current_node_id in schedule) and flow.sfc in schedule[flow.current_node_id]: + schedule_node = schedule[flow.current_node_id] + schedule_sf = schedule_node[flow.sfc][sf] + sf_nodes = [sch_sf for sch_sf in schedule_sf.keys()] + sf_probability = [prob for name, prob in schedule_sf.items()] + try: + next_node = np.random.choice(sf_nodes, p=sf_probability) + return next_node + + except Exception as ex: + + # Scheduling rule does not exist: drop flow + log.warning(f'Flow {flow.flow_id}: Scheduling rule at node {flow.current_node_id} not correct' + f'Dropping flow!') + log.warning(ex) + self.params.metrics.dropped_flow(flow) + self.env.exit() + else: + # Scheduling rule does not exist: drop flow + log.warning(f'Flow {flow.flow_id}: Scheduling rule not found at {flow.current_node_id}. Dropping flow!') + self.params.metrics.dropped_flow(flow) + self.env.exit() + + def forward_flow(self, flow, next_node): + """ + Calculates the path delays occurring when forwarding a node + Path delays are calculated using the Shortest path + The delay is simulated by timing out for the delay amount of duration + """ + path_delay = 0 + if flow.current_node_id != next_node: + path_delay = self.params.network.graph['shortest_paths'][(flow.current_node_id, next_node)][1] + + # Metrics calculation for path delay. Flow's end2end delay is also incremented. + self.params.metrics.add_path_delay(path_delay) + flow.end2end_delay += path_delay + if flow.current_node_id == next_node: + assert path_delay == 0, "While Forwarding the flow, the Current and Next node same, yet path_delay != 0" + log.info("Flow {} will stay in node {}. Time: {}.".format(flow.flow_id, flow.current_node_id, self.env.now)) + else: + log.info("Flow {} will leave node {} towards node {}. Time {}" + .format(flow.flow_id, flow.current_node_id, next_node, self.env.now)) + yield self.env.timeout(path_delay) + flow.current_node_id = next_node + + def process_flow(self, flow, sfc): + """ + Process the flow at the requested SF of the current node. + """ + # Generate a processing delay for the SF + current_node_id = flow.current_node_id + sf = sfc[flow.current_position] + flow.current_sf = sf + + log.info("Flow {} STARTED PROCESSING at node {} for processing. Time: {}" + .format(flow.flow_id, flow.current_node_id, self.env.now)) + + if sf in self.params.sf_placement[current_node_id]: + current_sf = flow.current_sf + vnf_delay_mean = self.params.sf_list[flow.current_sf]["processing_delay_mean"] + vnf_delay_stdev = self.params.sf_list[flow.current_sf]["processing_delay_stdev"] + processing_delay = np.absolute(np.random.normal(vnf_delay_mean, vnf_delay_stdev)) + # Update metrics for the processing delay + # Add the delay to the flow's end2end delay + self.params.metrics.add_processing_delay(processing_delay) + flow.end2end_delay += processing_delay + + # Calculate the demanded capacity when the flow is processed at this node + demanded_total_capacity = 0.0 + for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items(): + if sf == sf_i: + # Include flows data rate in requested sf capacity calculation + demanded_total_capacity += self.params.sf_list[sf]['resource_function'](sf_data['load'] + flow.dr) + else: + demanded_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load']) + + # Get node capacities + node_cap = self.params.network.nodes[current_node_id]["cap"] + node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"] + assert node_remaining_cap >= 0, "Remaining node capacity cannot be less than 0 (zero)!" + if demanded_total_capacity <= node_cap: + log.info("Flow {} started processing at sf {} at node {}. Time: {}, Processing delay: {}" + .format(flow.flow_id, current_sf, current_node_id, self.env.now, processing_delay)) + + # Metrics: Add active flow to the SF once the flow has begun processing. + self.params.metrics.add_active_flow(flow, current_node_id, current_sf) + + # Add load to sf + self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] += flow.dr + # Set remaining node capacity + self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - demanded_total_capacity + # Set max node usage + self.params.metrics.calc_max_node_usage(current_node_id, demanded_total_capacity) + # Just for the sake of keeping lines small, the node_remaining_cap is updated again. + node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"] + + yield self.env.timeout(processing_delay) + log.info("Flow {} started departing sf {} at node {}. Time {}" + .format(flow.flow_id, current_sf, current_node_id, self.env.now)) + + # Check if flow is currently in last SF, if so, then depart flow. + if (flow.current_position == len(sfc) - 1): + yield self.env.timeout(flow.duration) + self.depart_flow(flow) + else: + # Increment the position of the flow within SFC + flow.current_position += 1 + self.env.process(self.pass_flow(flow, sfc)) + yield self.env.timeout(flow.duration) + # before departing the SF. + # print(metrics.get_metrics()['current_active_flows']) + log.info("Flow {} FINISHED ARRIVING at SF {} at node {} for processing. Time: {}" + .format(flow.flow_id, current_sf, current_node_id, self.env.now)) + # Remove the active flow from the SF after it departed the SF + self.params.metrics.remove_active_flow(flow, current_node_id, current_sf) + + # Remove load from sf + self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] -= flow.dr + assert self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] >= 0, \ + 'SF load cannot be less than 0!' + # Check if SF is not processing any more flows AND if SF is removed from placement. If so the SF will + # be removed from the load recording. This allows SFs to be handed gracefully. + if (self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] == 0) and ( + sf not in self.params.sf_placement[current_node_id]): + del self.params.network.nodes[current_node_id]['available_sf'][sf] + + # Recalculation is necessary because other flows could have already arrived or departed at the node + used_total_capacity = 0.0 + for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items(): + used_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load']) + # Set remaining node capacity + self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - used_total_capacity + # Just for the sake of keeping lines small, the node_remaining_cap is updated again. + node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"] + + # We assert that remaining capacity must at all times be less than the node capacity so that + # nodes dont put back more capacity than the node's capacity. + assert node_remaining_cap <= node_cap, "Node remaining capacity cannot be more than node capacity!" + else: + log.info(f"Not enough capacity for flow {flow.flow_id} at node {flow.current_node_id}. Dropping flow.") + # Update metrics for the dropped flow + self.params.metrics.dropped_flow(flow) + self.env.exit() + else: + log.info(f"SF {sf} was not found at {current_node_id}. Dropping flow {flow.flow_id}") + self.params.metrics.dropped_flow(flow) + self.env.exit() + + def depart_flow(self, flow): + """ + Process the flow at the requested SF of the current node. + """ + # Update metrics for the processed flow + self.params.metrics.completed_flow() + self.params.metrics.add_end2end_delay(flow.end2end_delay) + self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf) + log.info("Flow {} was processed and departed the network from {}. Time {}" + .format(flow.flow_id, flow.current_node_id, self.env.now)) diff --git a/src/coordsim/simulation/simulatorparams.py b/src/coordsim/simulation/simulatorparams.py new file mode 100644 index 000000000..d2ca283ef --- /dev/null +++ b/src/coordsim/simulation/simulatorparams.py @@ -0,0 +1,109 @@ +""" + +Flow Simulator parameters. +- Allows for clean and quick access to parameters from the flow simulator. +- Facilitates the quick changing of schedule decisions and +other parameters for the simulator. + +""" +import numpy as np + + +class SimulatorParams: + def __init__(self, network, ing_nodes, sfc_list, sf_list, config, metrics, schedule=None, sf_placement=None): + # NetworkX network object: DiGraph + self.network = network + # Ingress nodes of the network (nodes at which flows arrive): list + self.ing_nodes = ing_nodes + # List of available SFCs and their child SFs: defaultdict(None) + self.sfc_list = sfc_list + # List of every SF and it's properties (e.g. processing_delay): defaultdict(None) + self.sf_list = sf_list + self.metrics = metrics + self.use_trace = False + if 'trace_path' in config: + self.use_trace = True + + if schedule is None: + schedule = {} + if sf_placement is None: + sf_placement = {} + # read dummy placement and schedule if specified + # Flow forwarding schedule: dict + self.schedule = schedule + # Placement of SFs in each node: defaultdict(list) + self.sf_placement = sf_placement + # Update which sf is available at which node + for node_id, placed_sf_list in sf_placement.items(): + for sf in placed_sf_list: + self.network.nodes[node_id]['available_sf'][sf] = self.network.nodes[node_id]['available_sf'].get(sf, { + 'load': 0.0}) + + # Flow data rate normal distribution mean: float + self.flow_dr_mean = config['flow_dr_mean'] + # Flow data rate normal distribution standard deviation: float + self.flow_dr_stdev = config['flow_dr_stdev'] + # Flow size Pareto heavy-tail distribtution shape: float + self.flow_size_shape = config['flow_size_shape'] + # if deterministic = True, the simulator reinterprets and uses inter_arrival_mean and flow_size_shape as fixed + # deterministic values rather than means of a random distribution + self.deterministic_arrival = None + self.deterministic_size = None + if 'deterministic' in config: + self.deterministic_arrival = config['deterministic'] + self.deterministic_size = config['deterministic'] + # deterministic_arrival/size override 'deterministic' + if 'deterministic_arrival' in config: + self.deterministic_arrival = config['deterministic_arrival'] + if 'deterministic_size' in config: + self.deterministic_size = config['deterministic_size'] + if self.deterministic_arrival is None or self.deterministic_size is None: + raise ValueError("'deterministic_arrival' or 'deterministic_size' are not set in simulator config.") + + # also allow to set determinism for inter-arrival times and flow size separately + # The duration of a run in the simulator's interface + self.run_duration = config['run_duration'] + + self.use_states = False + self.states = {} + self.in_init_state = True + + if 'use_states' in config and config['use_states']: + self.use_states = True + self.init_state = config['init_state'] + self.states = config['states'] + if self.in_init_state: + self.current_state = self.init_state + state_inter_arr_mean = self.states[self.current_state]['inter_arr_mean'] + self.update_single_inter_arr_mean(state_inter_arr_mean) + else: + inter_arr_mean = config['inter_arrival_mean'] + self.update_single_inter_arr_mean(inter_arr_mean) + + def update_state(self): + switch = [False, True] + change_prob = self.states[self.current_state]['switch_p'] + remain_prob = 1 - change_prob + switch_decision = np.random.choice(switch, p=[remain_prob, change_prob]) + if switch_decision: + state_names = list(self.states.keys()) + if self.current_state == state_names[0]: + self.current_state = state_names[1] + else: + self.current_state = state_names[0] + state_inter_arr_mean = self.states[self.current_state]['inter_arr_mean'] + self.update_single_inter_arr_mean(state_inter_arr_mean) + + def update_single_inter_arr_mean(self, new_mean): + self.inter_arr_mean = {node_id: new_mean for node_id in self.network.nodes} + + # string representation for logging + def __str__(self): + params_str = "Simulator parameters: \n" + params_str += "inter_arr_mean: {}\n".format(self.inter_arr_mean) + params_str += f"deterministic_arrival: {self.deterministic_arrival}\n" + params_str += "flow_dr_mean: {}\n".format(self.flow_dr_mean) + params_str += "flow_dr_stdv: {}\n".format(self.flow_dr_stdev) + params_str += "flow_size_shape: {}\n".format(self.flow_size_shape) + params_str += f"deterministic_size: {self.deterministic_size}\n" + return params_str diff --git a/src/coordsim/trace_processor/__init__.py b/src/coordsim/trace_processor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/coordsim/trace_processor/trace_processor.py b/src/coordsim/trace_processor/trace_processor.py new file mode 100644 index 000000000..e108d74d5 --- /dev/null +++ b/src/coordsim/trace_processor/trace_processor.py @@ -0,0 +1,47 @@ +from coordsim.simulation.simulatorparams import SimulatorParams +from coordsim.simulation.flowsimulator import FlowSimulator +from simpy import Environment +import logging +log = logging.getLogger(__name__) + + +class TraceProcessor(): + """ + Trace processor class + """ + + def __init__(self, params: SimulatorParams, env: Environment, trace: list, simulator: FlowSimulator): + self.params = params + self.env = env + self.trace_index = 0 + self.trace = trace + self.simulator = simulator + self.env.process(self.process_trace()) + + def process_trace(self): + """ + Changes the inter arrival mean during simulation + The initial time is read from the the config file, so if the inter_arrival_time set in the trace CSV + file does not start from 0, then the simulator will use the value set in sim_config + + """ + self.timeout = float(self.trace[self.trace_index]['time']) - self.env.now + inter_arrival_mean = self.trace[self.trace_index]['inter_arrival_mean'] + yield self.env.timeout(self.timeout) + log.debug(f"Inter arrival mean changed to {inter_arrival_mean} at {self.env.now}") + if 'node' in self.trace[self.trace_index]: + node_id = self.trace[self.trace_index]['node'] + if inter_arrival_mean == 'None': + self.params.inter_arr_mean[node_id] = None + else: + inter_arrival_mean = float(inter_arrival_mean) + old_mean = self.params.inter_arr_mean[node_id] + self.params.inter_arr_mean[node_id] = inter_arrival_mean + if old_mean is None: + self.env.process(self.simulator.generate_flow(node_id)) + else: + inter_arrival_mean = float(inter_arrival_mean) + self.params.update_single_inter_arr_mean(inter_arrival_mean) + if self.trace_index < len(self.trace)-1: + self.trace_index += 1 + self.env.process(self.process_trace()) diff --git a/src/coordsim/writer/__init__.py b/src/coordsim/writer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/coordsim/writer/writer.py b/src/coordsim/writer/writer.py new file mode 100644 index 000000000..d17c73968 --- /dev/null +++ b/src/coordsim/writer/writer.py @@ -0,0 +1,148 @@ +""" +Simulator file writer module +""" + +import csv +import os +import yaml +from spinterface import SimulatorAction, SimulatorState + + +class ResultWriter(): + """ + Result Writer + Helper class to write results to CSV files. + """ + def __init__(self, test_mode: bool, test_dir, write_schedule=False): + """ + If the simulator is in test mode, create result folder and CSV files + """ + self.write_schedule = write_schedule + self.test_mode = test_mode + if self.test_mode: + if self.write_schedule: + self.scheduling_file_name = f"{test_dir}/scheduling.csv" + self.placement_file_name = f"{test_dir}/placements.csv" + self.resources_file_name = f"{test_dir}/resources.csv" + self.metrics_file_name = f"{test_dir}/metrics.csv" + self.dropped_flows_file_name = f"{test_dir}/dropped_flows.yaml" + self.rl_state_file_name = f"{test_dir}/rl_state.csv" + self.runtimes_file_name = f"{test_dir}/runtimes.csv" + + # Create the results directory if not exists + os.makedirs(os.path.dirname(self.placement_file_name), exist_ok=True) + + self.placement_stream = open(self.placement_file_name, 'a+', newline='') + self.resources_stream = open(self.resources_file_name, 'a+', newline='') + self.metrics_stream = open(self.metrics_file_name, 'a+', newline='') + self.rl_state_stream = open(self.rl_state_file_name, 'a+', newline='') + self.runtimes_stream = open(self.runtimes_file_name, 'a+', newline='') + + if self.write_schedule: + self.scheduleing_stream = open(self.scheduling_file_name, 'a+', newline='') + self.scheduling_writer = csv.writer(self.scheduleing_stream) + # Create CSV writers + self.placement_writer = csv.writer(self.placement_stream) + self.resources_writer = csv.writer(self.resources_stream) + self.metrics_writer = csv.writer(self.metrics_stream) + self.rl_state_writer = csv.writer(self.rl_state_stream) + self.runtimes_writer = csv.writer(self.runtimes_stream) + + # Write the headers to the files + self.create_csv_headers() + + def __del__(self): + # Close all writer streams + if self.test_mode: + self.placement_stream.close() + if self.write_schedule: + self.scheduleing_stream.close() + self.resources_stream.close() + self.metrics_stream.close() + self.rl_state_stream.close() + self.runtimes_stream.close() + + def create_csv_headers(self): + """ + Creates statistics CSV headers and writes them to their files + """ + + # Create CSV headers + if self.write_schedule: + scheduling_output_header = ['episode', 'time', 'origin_node', 'sfc', 'sf', 'schedule_node', 'schedule_prob'] + self.scheduling_writer.writerow(scheduling_output_header) + placement_output_header = ['episode', 'time', 'node', 'sf'] + resources_output_header = ['episode', 'time', 'node', 'node_capacity', 'used_resources'] + metrics_output_header = ['episode', 'time', 'total_flows', 'successful_flows', 'dropped_flows', + 'in_network_flows', 'avg_end2end_delay'] + runtimes_output_header = ['run', 'runtime'] + + # Write headers to CSV files + self.placement_writer.writerow(placement_output_header) + self.resources_writer.writerow(resources_output_header) + self.metrics_writer.writerow(metrics_output_header) + self.runtimes_writer.writerow(runtimes_output_header) + + def write_runtime(self, run, time): + """ + Write runtime results to output file + """ + if self.test_mode: + self.runtimes_writer.writerow([run, time]) + + def write_action_result(self, episode, time, action: SimulatorAction): + """ + Write simulator actions to CSV files for statistics purposes + """ + if self.test_mode: + placement = action.placement + placement_output = [] + scheduling_output = [] + + for node_id, sfs in placement.items(): + for sf in sfs: + placement_output_row = [episode, time, node_id, sf] + placement_output.append(placement_output_row) + if self.write_schedule: + scheduling = action.scheduling + for node, sfcs in scheduling.items(): + for sfc, sfs in sfcs.items(): + for sf, scheduling in sfs.items(): + for schedule_node, schedule_prob in scheduling.items(): + scheduling_output_row = [episode, time, node, sfc, sf, schedule_node, schedule_prob] + scheduling_output.append(scheduling_output_row) + self.scheduling_writer.writerows(scheduling_output) + + self.placement_writer.writerows(placement_output) + + def write_state_results(self, episode, time, state: SimulatorState): + """ + Write node resource consumption to CSV file + """ + if self.test_mode: + network = state.network + stats = state.network_stats + + metrics_output = [episode, time, stats['total_flows'], stats['successful_flows'], stats['dropped_flows'], + stats['in_network_flows'], stats['avg_end2end_delay']] + + resource_output = [] + for node in network['nodes']: + node_id = node['id'] + node_cap = node['resource'] + used_resources = node['used_resources'] + resource_output_row = [episode, time, node_id, node_cap, used_resources] + resource_output.append(resource_output_row) + + self.metrics_writer.writerow(metrics_output) + self.resources_writer.writerows(resource_output) + + def write_dropped_flow_locs(self, dropped_flow_locs): + """Dump dropped flow counters into yaml file. Called at end of simulation""" + if self.test_mode: + with open(self.dropped_flows_file_name, 'w') as f: + yaml.dump(dropped_flow_locs, f, default_flow_style=False) + + def write_rl_state(self, rl_state): + if self.test_mode: + self.rl_state_writer.writerow(rl_state) diff --git a/src/siminterface/__init__.py b/src/siminterface/__init__.py new file mode 100644 index 000000000..051b50591 --- /dev/null +++ b/src/siminterface/__init__.py @@ -0,0 +1,2 @@ +from siminterface.simulator import Simulator +__all__ = ['Simulator', ] diff --git a/src/siminterface/simulator.py b/src/siminterface/simulator.py new file mode 100644 index 000000000..c098f854c --- /dev/null +++ b/src/siminterface/simulator.py @@ -0,0 +1,230 @@ +import logging +import random +import time +import os +from coordsim.metrics.metrics import Metrics +import coordsim.reader.reader as reader +from coordsim.simulation.flowsimulator import FlowSimulator +from coordsim.simulation.simulatorparams import SimulatorParams +import numpy +import simpy +from spinterface import SimulatorAction, SimulatorInterface, SimulatorState +from coordsim.writer.writer import ResultWriter +from coordsim.trace_processor.trace_processor import TraceProcessor + +logger = logging.getLogger(__name__) + + +class Simulator(SimulatorInterface): + def __init__(self, network_file, service_functions_file, config_file, resource_functions_path="", + test_mode=False, test_dir=None): + # SimulatorInterface.__init__(self, test_mode=test_mode) + # Number of time the simulator has run. Necessary to correctly calculate env run time of apply function + self.run_times = int(1) + self.network_file = network_file + self.test_mode = test_mode + self.test_dir = test_dir + # init network, sfc, sf, and config files + self.network, self.ing_nodes = reader.read_network(self.network_file) + self.sfc_list = reader.get_sfc(service_functions_file) + self.sf_list = reader.get_sf(service_functions_file, resource_functions_path) + self.config = reader.get_config(config_file) + self.metrics = Metrics(self.network, self.sf_list) + write_schedule = False + if 'write_schedule' in self.config and self.config['write_schedule']: + write_schedule = True + # Create CSV writer + self.writer = ResultWriter(self.test_mode, self.test_dir, write_schedule) + + self.episode = 0 + + self.last_apply_time = None + + def __del__(self): + # write dropped flow locs to yaml + self.writer.write_dropped_flow_locs(self.metrics.metrics['dropped_flows_locs']) + + def init(self, seed): + # increment episode count + self.episode += 1 + # reset network caps and available SFs: + reader.reset_cap(self.network) + # Initialize metrics, record start time + self.run_times = int(1) + self.start_time = time.time() + + # Generate SimPy simulation environment + self.env = simpy.Environment() + self.params = SimulatorParams(self.network, self.ing_nodes, self.sfc_list, self.sf_list, self.config, + self.metrics) + self.params.metrics.reset_metrics() + + # Instantiate the parameter object for the simulator. + if self.params.use_states and 'trace_path' in self.config: + logger.warning('Two state model and traces are both activated, thi will cause unexpected behaviour!') + + if self.params.use_states: + if self.params.in_init_state: + self.params.in_init_state = False + else: + self.params.update_state() + + self.duration = self.params.run_duration + # Get and plant random seed + self.seed = seed + random.seed(self.seed) + numpy.random.seed(self.seed) + + # Instantiate a simulator object, pass the environment and params + self.simulator = FlowSimulator(self.env, self.params) + + # Start the simulator + self.simulator.start() + # Trace handling + if 'trace_path' in self.config: + trace_path = os.path.join(os.getcwd(), self.config['trace_path']) + trace = reader.get_trace(trace_path) + TraceProcessor(self.params, self.env, trace, self.simulator) + + # Run the environment for one step to get initial stats. + self.env.step() + + # Parse the NetworkX object into a dict format specified in SimulatorState. This is done to account + # for changing node remaining capacities. + # Also, parse the network stats and prepare it in SimulatorState format. + self.parse_network() + self.network_metrics() + + # Record end time and running time metrics + self.end_time = time.time() + self.params.metrics.running_time(self.start_time, self.end_time) + simulator_state = SimulatorState(self.network_dict, self.simulator.params.sf_placement, self.sfc_list, + self.sf_list, self.traffic, self.network_stats) + logger.debug(f"t={self.env.now}: {simulator_state}") + + # set time stamp to calculate runtime of next apply call + self.last_apply_time = time.time() + + return simulator_state + + def apply(self, actions: SimulatorAction): + logger.debug(f"t={self.env.now}: {actions}") + + # calc runtime since last apply (or init): that's the algorithm's runtime without simulation + alg_runtime = time.time() - self.last_apply_time + # TODO: save and save to file instead of printing + # configurable: eg, based on test_mode? should only write when testing, not training. but also work with non RL + # print(f"Alg. runtime: {alg_runtime}") + + self.writer.write_runtime(self.run_times, alg_runtime) + self.writer.write_action_result(self.episode, self.env.now, actions) + + # Get the new placement from the action passed by the RL agent + # Modify and set the placement parameter of the instantiated simulator object. + self.simulator.params.sf_placement = actions.placement + # Update which sf is available at which node + for node_id, placed_sf_list in actions.placement.items(): + available = {} + # Keep only SFs which still process + for sf, sf_data in self.simulator.params.network.nodes[node_id]['available_sf'].items(): + if sf_data['load'] != 0: + available[sf] = sf_data + # Add all SFs which are in the placement + for sf in placed_sf_list: + available[sf] = available.get(sf, {'load': 0.0}) + self.simulator.params.network.nodes[node_id]['available_sf'] = available + + # Get the new schedule from the SimulatorAction + # Set it in the params of the instantiated simulator object. + self.simulator.params.schedule = actions.scheduling + + # reset metrics for steps + self.params.metrics.reset_run_metrics() + + # Run the simulation again with the new params for the set duration. + # Due to SimPy restraints, we multiply the duration by the run times because SimPy does not reset when run() + # stops and we must increase the value of "until=" to accomodate for this. e.g.: 1st run call runs for 100 time + # uniits (1 run time), 2nd run call will also run for 100 more time units but value of "until=" is now 200. + runtime_steps = self.duration * self.run_times + logger.debug("Running simulator until time step %s", runtime_steps) + self.env.run(until=runtime_steps) + + # Parse the NetworkX object into a dict format specified in SimulatorState. This is done to account + # for changing node remaining capacities. + # Also, parse the network stats and prepare it in SimulatorState format. + self.parse_network() + self.network_metrics() + + # Increment the run times variable + self.run_times += 1 + + # Record end time of the apply round, doesn't change start time to show the running time of the entire + # simulation at the end of the simulation. + self.end_time = time.time() + self.params.metrics.running_time(self.start_time, self.end_time) + + # Create a new SimulatorState object to pass to the RL Agent + simulator_state = SimulatorState(self.network_dict, self.simulator.params.sf_placement, self.sfc_list, + self.sf_list, self.traffic, self.network_stats) + self.writer.write_state_results(self.episode, self.env.now, simulator_state) + logger.debug(f"t={self.env.now}: {simulator_state}") + if self.params.use_states: + self.params.update_state() + + # set time stamp to calculate runtime of next apply call + self.last_apply_time = time.time() + + return simulator_state + + def parse_network(self) -> dict: + """ + Converts the NetworkX network in the simulator to a dict in a format specified in the SimulatorState class. + """ + max_node_usage = self.params.metrics.get_metrics()['run_max_node_usage'] + self.network_dict = {'nodes': [], 'edges': []} + for node in self.params.network.nodes(data=True): + node_cap = node[1]['cap'] + run_max_node_usage = max_node_usage[node[0]] + # 'used_resources' here is the max usage for the run. + self.network_dict['nodes'].append({'id': node[0], 'resource': node_cap, + 'used_resources': run_max_node_usage}) + for edge in self.network.edges(data=True): + edge_src = edge[0] + edge_dest = edge[1] + edge_delay = edge[2]['delay'] + edge_dr = edge[2]['cap'] + # We use a fixed user data rate for the edges here as the functionality is not yet incorporated in the + # simulator. + # TODO: Implement used edge data rates in the simulator. + edge_used_dr = 0 + self.network_dict['edges'].append({ + 'src': edge_src, + 'dst': edge_dest, + 'delay': edge_delay, + 'data_rate': edge_dr, + 'used_data_rate': edge_used_dr + }) + + def network_metrics(self): + """ + Processes the metrics and parses them in a format specified in the SimulatorState class. + """ + stats = self.params.metrics.get_metrics() + self.traffic = stats['run_total_requested_traffic'] + self.network_stats = { + 'processed_traffic': stats['run_total_processed_traffic'], + 'total_flows': stats['generated_flows'], + 'successful_flows': stats['processed_flows'], + 'dropped_flows': stats['dropped_flows'], + 'in_network_flows': stats['total_active_flows'], + 'avg_end2end_delay': stats['avg_end2end_delay'], + 'run_avg_end2end_delay': stats['run_avg_end2end_delay'], + 'run_max_end2end_delay': stats['run_max_end2end_delay'], + 'run_avg_path_delay': stats['run_avg_path_delay'], + 'run_total_processed_traffic': stats['run_total_processed_traffic'], + 'run_dropped_flows_per_node': stats['run_dropped_flows_per_node'] + } + + def get_active_ingress_nodes(self): + """Return names of all ingress nodes that are currently active, ie, produce flows.""" + return [ing[0] for ing in self.ing_nodes if self.params.inter_arr_mean[ing[0]] is not None] diff --git a/tests/test_simulator.py b/tests/test_simulator.py new file mode 100644 index 000000000..4d1b30fdf --- /dev/null +++ b/tests/test_simulator.py @@ -0,0 +1,62 @@ +from unittest import TestCase +from coordsim.simulation.flowsimulator import FlowSimulator +from coordsim.simulation.simulatorparams import SimulatorParams +from coordsim.network import dummy_data +from coordsim.reader import reader +import simpy +import logging +from coordsim.metrics.metrics import Metrics + +NETWORK_FILE = "params/networks/triangle.graphml" +SERVICE_FUNCTIONS_FILE = "params/services/abc.yaml" +RESOURCE_FUNCTION_PATH = "params/services/resource_functions" +CONFIG_FILE = "params/config/sim_config.yaml" +SIMULATION_DURATION = 1000 +SEED = 1234 + + +class TestFlowSimulator(TestCase): + flow_simulator = None + simulator_params = None + + def setUp(self): + """ + Setup test environment + """ + logging.basicConfig(level=logging.ERROR) + + self.env = simpy.Environment() + # Configure simulator parameters + network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10) + sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE) + sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH) + config = reader.get_config(CONFIG_FILE) + + self.metrics = Metrics(network, sf_list) + + sf_placement = dummy_data.triangle_placement + schedule = dummy_data.triangle_schedule + + # Initialize Simulator and SimulatoParams objects + self.simulator_params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, self.metrics, + sf_placement=sf_placement, schedule=schedule) + self.flow_simulator = FlowSimulator(self.env, self.simulator_params) + self.flow_simulator.start() + self.env.run(until=SIMULATION_DURATION) + + def test_simulator(self): + """ + Test the simulator + """ + # Collect metrics + self.metric_collection = self.metrics.get_metrics() + # Check if Simulator is initiated correctly + self.assertIsInstance(self.flow_simulator, FlowSimulator) + # Check if Params are set correctly + self.assertIsInstance(self.simulator_params, SimulatorParams) + # Check if generated flows are equal to processed flow + dropped + active flows + gen_flow_check = self.metric_collection['generated_flows'] == (self.metric_collection['processed_flows'] + + self.metric_collection['dropped_flows'] + + self.metric_collection['total_active_flows']) + self.assertIs(gen_flow_check, True) + # More tests are to come diff --git a/tests/test_simulatorInterface.py b/tests/test_simulatorInterface.py new file mode 100644 index 000000000..326aec177 --- /dev/null +++ b/tests/test_simulatorInterface.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- +""" +Simulator interface tests +""" +from unittest import TestCase + +from spinterface import SimulatorInterface, SimulatorAction, SimulatorState + +NETWORK_FILE = "params/networks/triangle.graphml" +SERVICE_FUNCTIONS_FILE = "params/services/3sfcs.yaml" +RESOURCE_FUNCTION_PATH = "params/services/resource_functions" +CONFIG_FILE = "params/config/sim_config.yaml" +TRACE_FILE = "params/traces/simple_trace.csv" + +SIMULATOR_MODULE_NAME = "siminterface.simulator" +SIMULATOR_CLS_NAME = "Simulator" +SIMULATOR_MODULE = __import__(SIMULATOR_MODULE_NAME) +SIMULATOR_CLS = getattr(SIMULATOR_MODULE, SIMULATOR_CLS_NAME) +TEST_MODE = False + + +class TestSimulatorInterface(TestCase): + + simulator = None # type: SimulatorInterface + + def setUp(self): + """ + create simulator for test cases + """ + # TODO: replace SimulatorInterface with implementation + self.simulator = SIMULATOR_CLS(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, test_mode=TEST_MODE, + resource_functions_path=RESOURCE_FUNCTION_PATH) + self.simulator.init(1234) + + def test_apply(self): + # test if placement and schedule can be applied + placement = { + 'pop0': ['a', 'b', 'c'], + 'pop1': ['a', 'b', 'c'], + 'pop2': ['a', 'b', 'c'], + } + flow_schedule = { + 'pop0': { + 'sfc_1': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_2': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_3': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + }, + 'pop1': { + 'sfc_1': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_2': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_3': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + }, + 'pop2': { + 'sfc_1': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_2': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + 'sfc_3': { + 'a': { + 'pop0': 0.4, + 'pop1': 0.6, + 'pop2': 0 + }, + 'b': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + }, + 'c': { + 'pop0': 0.6, + 'pop1': 0.2, + 'pop2': 0.2 + } + }, + }, + } + + action = SimulatorAction(placement=placement, scheduling=flow_schedule) + simulator_state = self.simulator.apply(action) + self.assertIsInstance(simulator_state, SimulatorState) + + # test if network is read correctly + nw_nodes = simulator_state.network['nodes'] + self.assertIs(len(nw_nodes), 3) + # 3 bidirectional edges + edges = simulator_state.network['edges'] + self.assertIs(len(edges), 3) + # with 5 edge attributes: + # 'edges': [{ + # 'src': str, + # 'dst': str, + # 'delay': int (ms), + # 'data_rate': int (Mbit/s), + # 'used_data_rate': int (Mbit/s), + # }], + nw_edges = simulator_state.network['edges'][0] + self.assertIs(len(nw_edges), 5) + + # Check if placement is read correctly + sim_placement = simulator_state.placement + self.assertIs(len(sim_placement), 3) + + # test if sfcs are read correctly + sfcs = simulator_state.sfcs + self.assertIs(len(sfcs), 3) + + # SFs + service_functions = simulator_state.service_functions + self.assertIs(len(service_functions), 3) + + # traffic + # TODO: test traffic + + # network_stats + """ + network_stats : dict + { + 'total_flows' : int, + 'successful_flows' : int, + 'dropped_flows' : int, + 'in_network_flows' : int, + 'avg_end_2_end_delay' : int + } + """ + network_stats = simulator_state.network_stats + self.assertIs(len(network_stats), 11) + self.assertIn('total_flows', network_stats) + self.assertIn('successful_flows', network_stats) + self.assertIn('dropped_flows', network_stats) + self.assertIn('in_network_flows', network_stats) + self.assertIn('avg_end2end_delay', network_stats)