diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..6965901 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,14 @@ +[run] +branch = True +include = + */pyvisa_sim/* + +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain if tests don't hit defensive assertion code: + raise NotImplementedError() + pass diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..a0a9d2a --- /dev/null +++ b/.flake8 @@ -0,0 +1,14 @@ +[flake8] +exclude = + .git, + __pycache__, + docs/source/conf.py, + old, + build, + dist, +ignore = E203, E266, E501, W503, E731 +# line length is intentionally set to 80 here because pyvisa uses Bugbear +# See https://github.com/psf/black/blob/master/README.md#line-length for more details +max-line-length = 80 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 \ No newline at end of file diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..3bad94f --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 +updates: + # Maintain dependencies for GitHub Actions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 62e8d7b..24b5d96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,33 +4,71 @@ on: - cron: '0 0 * * 2' push: branches: - - master + - main - staging - trying pull_request: branches: - - master + - main paths: - .github/workflows/ci.yml - - pyvisa_sim/* + - "pyvisa_sim/**" - pyproject.toml - - setup.cfg - setup.py jobs: + formatting: + name: Check code formatting + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.10" + - uses: actions/cache@v3 + with: + path: ${{ env.pythonLocation }} + key: ${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('dev-requirements.txt') }} + - name: Install tools + run: | + python -m pip install --upgrade pip + pip install -r dev-requirements.txt + - name: Isort + run: | + isort pyvisa_sim -c; + - name: Black + if: always() + run: | + black pyvisa_sim --check; + - name: Flake8 + if: always() + run: | + flake8 pyvisa_sim; + - name: Mypy + if: always() + run: | + mypy pyvisa_sim; tests: name: Unit tests runs-on: ${{ matrix.os }} + needs: + - formatting + if: needs.formatting.result == 'success' strategy: matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: [3.7, 3.8, 3.9, "3.10"] + python-version: ["3.8", "3.9", "3.10", "3.11"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} + - uses: actions/cache@v3 + with: + path: ${{ env.pythonLocation }} + key: ${{ env.pythonLocation }}-${{ hashFiles('pyproject.toml') }} - name: Install dependencies run: | python -m pip install --upgrade pip @@ -40,9 +78,9 @@ jobs: - name: Test with pytest run: | pip install pytest-cov - pytest --pyargs pyvisa_sim --cov pyvisa_sim --cov-report xml -v + pytest --pyargs pyvisa_sim --cov --cov-report xml -v - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v3 with: token: ${{ secrets.CODECOV_TOKEN }} flags: unittests diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 09da520..02b9604 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -21,15 +21,15 @@ jobs: name: Docs building runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python - uses: actions/setup-python@v1 + uses: actions/setup-python@v4 - name: Install dependencies run: | python -m pip install --upgrade pip - name: Install project run: | - python setup.py develop + pip install -e . - name: Install graphviz uses: kamiazya/setup-graphviz@v1 - name: Install doc building tools diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..385c475 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,111 @@ +name: Build and upload wheels +on: + workflow_dispatch: + schedule: + - cron: '0 0 * * 3' + push: + tags: + - '*' + +jobs: + build_sdist: + name: Build sdist + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Get history and tags for SCM versioning to work + run: | + git fetch --prune --unshallow + git fetch --depth=1 origin +refs/tags/*:refs/tags/* + - name: Setup Python + uses: actions/setup-python@v4 + - name: Build sdist + run: | + pip install --upgrade pip + pip install wheel build + python -m build . -s + - name: Test sdist + run: | + pip install pytest + pip install dist/*.tar.gz + python -X dev -m pytest --pyargs pyvisa_sim + - name: Store artifacts + uses: actions/upload-artifact@v3 + with: + name: artifact + path: dist/* + + build_wheel: + name: Build wheel + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Get history and tags for SCM versioning to work + run: | + git fetch --prune --unshallow + git fetch --depth=1 origin +refs/tags/*:refs/tags/* + - name: Setup Python + uses: actions/setup-python@v4 + - name: Build wheels + run: | + pip install --upgrade pip + pip install wheel build + python -m build . -w + - name: Test wheel + run: | + pip install pytest + pip install dist/*.whl + python -X dev -m pytest --pyargs pyvisa_sim + - name: Store artifacts + uses: actions/upload-artifact@v3 + with: + name: artifact + path: dist/*.whl + + release_upload: + name: Create Release and Upload Release Asset + runs-on: ubuntu-latest + if: github.event_name == 'push' + needs: [build_wheel, build_sdist] + steps: + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + draft: false + prerelease: ${{ contains(github.ref, 'rc') || contains(github.ref, 'a') || contains(github.ref, 'b')}} + - uses: actions/download-artifact@v3 + with: + name: artifact + path: dist + - name: Upload Release Asset + id: upload-release-asset + uses: shogo82148/actions-upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: dist/* + + upload_pypi: + if: github.event_name == 'push' + needs: [build_wheel, build_sdist] + runs-on: ubuntu-latest + steps: + - uses: actions/download-artifact@v3 + with: + name: artifact + path: dist + + - uses: pypa/gh-action-pypi-publish@v1 + with: + user: __token__ + password: ${{ secrets.pypi_password }} + # To test: + # repository_url: https://test.pypi.org/legacy/ diff --git a/.gitignore b/.gitignore index ef87bfb..5f10aba 100644 --- a/.gitignore +++ b/.gitignore @@ -116,6 +116,7 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +version.py # PyInstaller # Usually these files are written by a python script from a template diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..389b065 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,18 @@ +repos: + - repo: https://github.com/pre-commit/mirrors-isort + rev: v5.10.1 + hooks: + - id: isort + - repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black + - repo: https://github.com/pycqa/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.991 # Use the sha / tag you want to point at + hooks: + - id: mypy + additional_dependencies: [pyvisa, types-PyYAML, stringparser, pytest] diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..ea5c064 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,27 @@ +# .readthedocs.yaml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the version of Python and other tools you might need +build: + os: ubuntu-20.04 + tools: + python: "3.10" + +# Build documentation in the docs/source directory with Sphinx +sphinx: + configuration: docs/source/conf.py + +# Enable epub output +formats: + - epub + +# Optionally declare the Python requirements required to build your docs +python: + install: + - requirements: docs/requirements.txt + - method: pip + path: . diff --git a/AUTHORS.rst b/AUTHORS.rst index d39b1c0..3c12168 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -1,9 +1,12 @@ -pyvisa-sim is written and maintained by Hernan E. Grecco +pyvisa-sim was created by Hernan E. Grecco hernan.grecco@gmail.com. +It is currently maintained by: +- Matthieu Dartiailh m.dartiailh@gmail.com + + Other contributors, listed alphabetically, are: - Adam Vaughn avaughn@intersil.com - Colin Marquardt github@marquardt-home.de - Huan Nguyen famish99@gmail.com -- Matthieu Dartiailh marul@laposte.net diff --git a/CHANGES.rst b/CHANGES.rst index fa31ed2..4ad9870 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,13 @@ PyVISA-sim Changelog ==================== +Unreleased +---------- + +- Fixed debug logging a single character at a time. PR #79 +- Fixed issue with `common.iter_bytes` where the masked bits would be incorrect. + PR #81 + 0.5.1 (2022-09-08) ------------------ @@ -60,4 +67,4 @@ PyVISA-sim Changelog - First public release. - Basic ASRL INSTR functionality. - Basic USB INSTR functionality. -- Basic TCPIP INSTR functionality. \ No newline at end of file +- Basic TCPIP INSTR functionality. diff --git a/README.rst b/README.rst index fc3183e..9d351e1 100644 --- a/README.rst +++ b/README.rst @@ -56,8 +56,8 @@ measurement controlling: Requirements ------------ -- Python (tested with 3.6 to 3.9) -- PyVISA 1.6+ +- Python (tested with 3.8 to 3.11) +- PyVISA 1.11+ Installation ------------ @@ -68,8 +68,8 @@ Using ``pip``: or install the development version: - $ pip install -U - `https://github.com/pyvisa/pyvisa-sim/zipball/master`_ + $ pip install + `git+https://github.com/pyvisa/pyvisa-sim`_ PyVISA is automatically installed if needed. diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000..5f36d39 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,9 @@ +pyvisa +black +flake8 +mypy +types-PyYAML +isort +pytest +sphinx +sphinx-rtd-theme \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index a98c747..583e61e 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# PyVISA documentation build configuration file +# PyVISA-sim documentation build configuration file # # This file is execfile()d with the current directory set to its containing dir. # @@ -10,233 +10,240 @@ # All configuration values have a default; values that are commented out # serve to show the default. +import datetime +import importlib.metadata import os import sys -import pkg_resources -import datetime - # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -#sys.path.insert(0, os.path.abspath('.')) +# sys.path.insert(0, os.path.abspath('.')) # -- General configuration ----------------------------------------------------- # If your documentation needs a minimal Sphinx version, state it here. -#needs_sphinx = '1.0' +# needs_sphinx = '1.0' # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.intersphinx', 'sphinx.ext.coverage', 'sphinx.ext.viewcode', 'sphinx.ext.mathjax'] +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.intersphinx", + "sphinx.ext.coverage", + "sphinx.ext.viewcode", + "sphinx.ext.mathjax", +] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. -#source_encoding = 'utf-8-sig' +# source_encoding = 'utf-8-sig' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = 'PyVISA-sim' -author = 'PyVISA-sim Authors' +project = "PyVISA-sim" +author = "PyVISA-sim Authors" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. -version = pkg_resources.get_distribution(project).version +version = importlib.metadata.version(project) release = version this_year = datetime.date.today().year -copyright = '%s, %s' % (this_year, author) +copyright = "%s, %s" % (this_year, author) # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. -#language = None +# language = None # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: -#today = '' +# today = '' # Else, today_fmt is used as the format for a strftime call. -#today_fmt = '%B %d, %Y' +# today_fmt = '%B %d, %Y' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -exclude_patterns = ['_build'] +exclude_patterns = ["_build"] # The reST default role (used for this markup: `text`) to use for all documents. -#default_role = None +# default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True +# add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). -#add_module_names = True +# add_module_names = True # If true, sectionauthor and moduleauthor directives will be shown in the # output. They are ignored by default. -#show_authors = False +# show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # A list of ignored prefixes for module index sorting. -#modindex_common_prefix = [] +# modindex_common_prefix = [] # -- Options for HTML output --------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -html_theme = 'sphinx_rtd_theme' +html_theme = "sphinx_rtd_theme" -on_rtd = os.environ.get('READTHEDOCS', None) == 'True' +on_rtd = os.environ.get("READTHEDOCS", None) == "True" if not on_rtd: # only import and set the theme if we're building docs locally try: import sphinx_rtd_theme except ImportError: - print('\n\nTheme not found. Please install Sphinx Read The Docs Themes using:\n\n' - ' pip install sphinx_rtd_theme\n') + print( + "\n\nTheme not found. Please install Sphinx Read The Docs Themes using:\n\n" + " pip install sphinx_rtd_theme\n" + ) sys.exit(1) html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the # documentation. -#html_theme_options = {} +# html_theme_options = {} # Add any paths that contain custom themes here, relative to this directory. -#html_theme_path = [] -#html_theme_path = ['_themes'] +# html_theme_path = [] +# html_theme_path = ['_themes'] # The name for this set of Sphinx documents. If None, it defaults to # " v documentation". -#html_title = None +# html_title = None # A shorter title for the navigation bar. Default is the same as html_title. -#html_short_title = None +# html_short_title = None # The name of an image file (relative to this directory) to place at the top # of the sidebar. -#html_logo = None +# html_logo = None # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. -#html_favicon = None +# html_favicon = None # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. -#html_last_updated_fmt = '%b %d, %Y' +# html_last_updated_fmt = '%b %d, %Y' # If true, SmartyPants will be used to convert quotes and dashes to # typographically correct entities. -#html_use_smartypants = True +# html_use_smartypants = True # Custom sidebar templates, maps document names to template names. -#html_sidebars = {} +# html_sidebars = {} html_sidebars = { - 'index': ['sidebarintro.html', 'sourcelink.html', 'searchbox.html'], - '**': ['sidebarlogo.html', 'localtoc.html', 'relations.html', - 'sourcelink.html', 'searchbox.html'] + "index": ["sidebarintro.html", "sourcelink.html", "searchbox.html"], + "**": [ + "sidebarlogo.html", + "localtoc.html", + "relations.html", + "sourcelink.html", + "searchbox.html", + ], } # Additional templates that should be rendered to pages, maps page names to # template names. -#html_additional_pages = {} +# html_additional_pages = {} # If false, no module index is generated. -#html_domain_indices = True +# html_domain_indices = True # If false, no index is generated. -#html_use_index = True +# html_use_index = True # If true, the index is split into individual pages for each letter. -#html_split_index = False +# html_split_index = False # If true, links to the reST sources are added to the pages. -#html_show_sourcelink = True +# html_show_sourcelink = True # If true, "Created using Sphinx" is shown in the HTML footer. Default is True. -#html_show_sphinx = True +# html_show_sphinx = True # If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. -#html_show_copyright = True +# html_show_copyright = True # If true, an OpenSearch description file will be output, and all pages will # contain a tag referring to it. The value of this option must be the # base URL from which the finished HTML is served. -#html_use_opensearch = '' +# html_use_opensearch = '' # This is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = None +# html_file_suffix = None # Output file base name for HTML help builder. -htmlhelp_basename = 'pyvisa-simtdoc' +htmlhelp_basename = "pyvisa-simtdoc" # -- Options for LaTeX output -------------------------------------------------- latex_elements = { -# The paper size ('letterpaper' or 'a4paper'). -#'papersize': 'letterpaper', - -# The font size ('10pt', '11pt' or '12pt'). -#'pointsize': '10pt', - -# Additional stuff for the LaTeX preamble. -#'preamble': '', + # The paper size ('letterpaper' or 'a4paper'). + #'papersize': 'letterpaper', + # The font size ('10pt', '11pt' or '12pt'). + #'pointsize': '10pt', + # Additional stuff for the LaTeX preamble. + #'preamble': '', } # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'pyvisa-sim.tex', 'PyVISA Documentation', - 'PyVISA Authors', 'manual'), + ("index", "pyvisa-sim.tex", "PyVISA Documentation", "PyVISA Authors", "manual"), ] # The name of an image file (relative to this directory) to place at the top of # the title page. -#latex_logo = None +# latex_logo = None # For "manual" documents, if this is true, then toplevel headings are parts, # not chapters. -#latex_use_parts = False +# latex_use_parts = False # If true, show page references after internal links. -#latex_show_pagerefs = False +# latex_show_pagerefs = False # If true, show URL addresses after external links. -#latex_show_urls = False +# latex_show_urls = False # Documents to append as an appendix to all manuals. -#latex_appendices = [] +# latex_appendices = [] # If false, no module index is generated. -#latex_domain_indices = True +# latex_domain_indices = True # -- Options for manual page output -------------------------------------------- # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - ('index', 'pyvisa-sim', 'PyVISA Documentation', - ['PyVISA Authors'], 1) -] +man_pages = [("index", "pyvisa-sim", "PyVISA Documentation", ["PyVISA Authors"], 1)] # If true, show URL addresses after external links. -#man_show_urls = False +# man_show_urls = False # -- Options for Texinfo output ------------------------------------------------ @@ -245,19 +252,25 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - ('index', 'PyVISA', 'PyVISA Documentation', - 'PyVISA Authors', 'PyVISA', 'One line description of project.', - 'Miscellaneous'), + ( + "index", + "PyVISA", + "PyVISA Documentation", + "PyVISA Authors", + "PyVISA", + "One line description of project.", + "Miscellaneous", + ), ] # Documents to append as an appendix to all manuals. -#texinfo_appendices = [] +# texinfo_appendices = [] # If false, no module index is generated. -#texinfo_domain_indices = True +# texinfo_domain_indices = True # How to display URL addresses: 'footnote', 'no', or 'inline'. -#texinfo_show_urls = 'footnote' +# texinfo_show_urls = 'footnote' # -- Options for Epub output --------------------------------------------------- @@ -270,38 +283,38 @@ # The language of the text. It defaults to the language option # or en if the language is not set. -#epub_language = '' +# epub_language = '' # The scheme of the identifier. Typical schemes are ISBN or URL. -#epub_scheme = '' +# epub_scheme = '' # The unique identifier of the text. This can be a ISBN number # or the project homepage. -#epub_identifier = '' +# epub_identifier = '' # A unique identification for the text. -#epub_uid = '' +# epub_uid = '' # A tuple containing the cover image and cover page html template filenames. -#epub_cover = () +# epub_cover = () # HTML files that should be inserted before the pages created by sphinx. # The format is a list of tuples containing the path and title. -#epub_pre_files = [] +# epub_pre_files = [] # HTML files shat should be inserted after the pages created by sphinx. # The format is a list of tuples containing the path and title. -#epub_post_files = [] +# epub_post_files = [] # A list of files that should not be packed into the epub file. -#epub_exclude_files = [] +# epub_exclude_files = [] # The depth of the table of contents in toc.ncx. -#epub_tocdepth = 3 +# epub_tocdepth = 3 # Allow duplicate toc entries. -#epub_tocdup = True +# epub_tocdup = True # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {'python': ('http://docs.python.org/3', None)} +intersphinx_mapping = {"python": ("http://docs.python.org/3", None)} diff --git a/docs/source/definitions.rst b/docs/source/definitions.rst index 62f5312..d77cb89 100644 --- a/docs/source/definitions.rst +++ b/docs/source/definitions.rst @@ -208,7 +208,7 @@ If you want to use a file which is bundled with PyVISA-sim, just write: .. _YAML: http://en.wikipedia.org/wiki/YAML -.. _`one provided with pyvisa-sim`: https://github.com/pyvisa/pyvisa-sim/blob/master/pyvisa-sim/default.yaml +.. _`one provided with pyvisa-sim`:https://github.com/pyvisa/pyvisa-sim/blob/main/pyvisa_sim/default.yaml .. _`YAML online parser`: http://yaml-online-parser.appspot.com/ .. _PEP3101: https://www.python.org/dev/peps/pep-3101/ .. _`Lantz Example Driver`: https://lantz.readthedocs.org/en/0.3/tutorial/building.html diff --git a/docs/source/index.rst b/docs/source/index.rst index c57f75c..5c37f80 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -14,8 +14,8 @@ and therefore test your applications without having real instruments connected. You can select the PyVISA-sim backend using **@sim** when instantiating the visa Resource Manager: - >>> import visa - >>> rm = visa.ResourceManager('@sim') + >>> import pyvisa + >>> rm = pyvisa.ResourceManager('@sim') >>> rm.list_resources() ('ASRL1::INSTR') >>> inst = rm.open_resource('ASRL1::INSTR', read_termination='\n') @@ -28,7 +28,7 @@ use the NI-VISA backend for PyVISA. If you want to load your own file instead of the default, specify the path prepended to the @sim string: - >>> rm = visa.ResourceManager('your_mock_here.yaml@sim') + >>> rm = pyvisa.ResourceManager('your_mock_here.yaml@sim') You can write your own simulators. See :ref:`definitions` to find out how. diff --git a/pyproject.toml b/pyproject.toml index 5f713a4..d93ab63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,101 @@ +[project] +name = "PyVISA-sim" +description = "Simulated backend for PyVISA implementing TCPIP, GPIB, RS232, and USB resources" +readme = "README.rst" +requires-python = ">=3.7" +license = {file = "LICENSE.txt"} +authors = [ + {name = "Hernan E. Grecco", email = "hernan.grecco@gmail.com"}, +] +maintainers = [ + {name = "Matthieu C. Dartiailh", email = "m.dartiailh@gmail.com"} +] +keywords = [ + "VISA", + "GPIB", + "USB", + "serial", + "RS232", + "measurement", + "acquisition", + "simulator", + "mock", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: MIT License", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS :: MacOS X", + "Programming Language :: Python", + "Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator", + "Topic :: Software Development :: Libraries :: Python Modules", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", +] +dependencies = [ + "pyvisa>=1.11.0", + "PyYAML", + "stringparser", + "typing-extensions", +] +dynamic=["version"] + + +[project.urls] +homepage = "https://github.com/pyvisa/pyvisa-sim" +documentation = "https://pyvisa-sim.readthedocs.io/en/latest/" +repository = "https://github.com/pyvisa/pyvisa-sim" +changelog = "https://github.com/pyvisa/pyvisa-sim/blob/main/CHANGES.rst" + [build-system] -requires = ["setuptools>=42", "wheel", "setuptools_scm[toml]>=3.4.3"] +requires = ["setuptools>=61.2", "wheel", "setuptools_scm[toml]>=3.4.3"] build-backend = "setuptools.build_meta" [tool.setuptools_scm] +write_to = "pyvisa_sim/version.py" +write_to_template = """ +# This file is auto-generated by setuptools-scm do NOT edit it. + +from collections import namedtuple + +#: A namedtuple of the version info for the current release. +_version_info = namedtuple("_version_info", "major minor micro status") + +parts = "{version}".split(".", 3) +version_info = _version_info( + int(parts[0]), + int(parts[1]), + int(parts[2]), + parts[3] if len(parts) == 4 else "", +) + +# Remove everything but the 'version_info' from this module. +del namedtuple, _version_info, parts + +__version__ = "{version}" +""" + +[tool.black] +line-length = 88 # Enforce the default value + +[tool.pytest.ini_options] +minversion = "6.0" + +[tool.mypy] +follow_imports = "normal" +strict_optional = true + +[[tool.mypy.overrides]] +module = [ + "stringparser", +] +ignore_missing_imports = true + +[tool.isort] +profile = "black" + diff --git a/pyvisa_sim/__init__.py b/pyvisa_sim/__init__.py index b6650f8..6621fb6 100644 --- a/pyvisa_sim/__init__.py +++ b/pyvisa_sim/__init__.py @@ -1,24 +1,20 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim - ~~~~~~~~~~ +"""Simulated backend for PyVISA. - Simulated backend for PyVISA. +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ -import pkg_resources +from importlib.metadata import PackageNotFoundError, version from .highlevel import SimVisaLibrary - __version__ = "unknown" -try: # pragma: no cover - __version__ = pkg_resources.get_distribution("pyvisa-sim").version -except: # pragma: no cover - pass # we seem to have a local copy without any repository control or installed without setuptools - # so the reported version will be __unknown__ +try: + __version__ = version(__name__) +except PackageNotFoundError: + # package is not installed + pass WRAPPER_CLASS = SimVisaLibrary diff --git a/pyvisa_sim/channels.py b/pyvisa_sim/channels.py index 2fdf1a9..2816efc 100644 --- a/pyvisa_sim/channels.py +++ b/pyvisa_sim/channels.py @@ -1,54 +1,62 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.channel - ~~~~~~~~~~~~~~~~~~ +"""Classes to enable the use of channels in devices. - Classes to enable the use of channels in devices. +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ from collections import defaultdict +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar import stringparser from .common import logger -from .component import Component, Property, to_bytes +from .component import Component, OptionalBytes, OptionalStr, Property, T, to_bytes +if TYPE_CHECKING: + from .devices import Device -class ChannelProperty(Property): - """A channel property storing the value for all channels.""" - def __init__(self, channel, name, default_value, specs): +class ChannelProperty(Property[T]): + """A channel property storing the value for all channels.""" - #: Refrence to the channel holding that property. + def __init__( + self, channel: "Channels", name: str, default_value: str, specs: Dict[str, str] + ) -> None: self._channel = channel - super(ChannelProperty, self).__init__(name, default_value, specs) - def init_value(self, string_value): + def init_value(self, string_value: str) -> None: """Create an empty defaultdict holding the default value.""" value = self.validate_value(string_value) self._value = defaultdict(lambda: value) - def get_value(self): + def get_value(self) -> Optional[T]: """Get the current value for a channel.""" return self._value[self._channel._selected] - def set_value(self, string_value): + def set_value(self, string_value: str) -> None: """Set the current value for a channel.""" value = self.validate_value(string_value) self._value[self._channel._selected] = value + # --- Private API -class ChDict(dict): - """Default dict like creating specialized sommand sets for a channel.""" + #: Reference to the channel holding that property. + _channel: "Channels" - def __missing__(self, key): - """Create a channel specialized version of the mapping found in - __default__. + #: Value of the property on a per channel basis + _value: Dict[Any, T] # type: ignore - """ + +V = TypeVar("V") + + +class ChDict(Dict[str, Dict[bytes, V]]): + """Default dict like creating specialized command sets for a channel.""" + + def __missing__(self, key: str) -> Dict[bytes, V]: + """Create a channel specialized version of the mapping found in __default__.""" return { k.decode("utf-8").format(ch_id=key).encode("utf-8"): v for k, v in self["__default__"].items() @@ -58,46 +66,56 @@ def __missing__(self, key): class Channels(Component): """A component representing a device channels.""" - def __init__(self, device, ids, can_select): + #: Flag indicating whether or not the channel can be selected inside + #: the query or if it is pre-selected by a previous command. + can_select: bool + def __init__(self, device: "Device", ids: List[str], can_select: bool): super(Channels, self).__init__() - - #: Flag indicating whether or not the channel can be selected inside - #: the query or if it is pre-selected by a previous command. - self.can_select = can_select - - #: Currently active channel, this can either reflect the currently - #: selected channel on the device or the currently inspected possible - #: when attempting to match. + self.can_select: bool = can_select self._selected = None - - #: Reference to the parent device from which we might need to query and - #: set the current selected channel self._device = device - - #: Ids of the activated channels. self._ids = ids - self._getters = ChDict(__default__={}) - self._dialogues = ChDict(__default__={}) - def add_dialogue(self, query, response): + def add_dialogue(self, query: str, response: str) -> None: """Add dialogue to channel. - :param query: query string - :param response: response string + Parameters + ---------- + query : str + Query string to which this dialogue answers to. + response : str + Response sent in response to a query. + """ self._dialogues["__default__"][to_bytes(query)] = to_bytes(response) - def add_property(self, name, default_value, getter_pair, setter_triplet, specs): + def add_property( + self, + name: str, + default_value: str, + getter_pair: Optional[Tuple[str, str]], + setter_triplet: Optional[Tuple[str, OptionalStr, OptionalStr]], + specs: Dict[str, str], + ) -> None: """Add property to channel - :param name: property name - :param default_value: default value as string - :param getter_pair: (query, response) - :param setter_triplet: (query, response, error) - :param specs: specification of the Property + Parameters + ---------- + property_name : str + Name of the property. + default_value : str + Default value of the property as a str. + getter_pair : Optional[Tuple[str, str]] + Parameters for accessing the property value (query and response str) + setter_triplet : Optional[Tuple[str, OptionalStr, OptionalStr]] + Parameters for setting the property value. The response and error + are optional. + specs : Dict[str, str] + Specification for the property as a dict. + """ self._properties[name] = ChannelProperty(self, name, default_value, specs) @@ -106,19 +124,19 @@ def add_property(self, name, default_value, getter_pair, setter_triplet, specs): self._getters["__default__"][to_bytes(query)] = name, response if setter_triplet: - query, response, error = setter_triplet + query, response_, error = setter_triplet self._setters.append( - (name, stringparser.Parser(query), to_bytes(response), to_bytes(error)) + (name, stringparser.Parser(query), to_bytes(response_), to_bytes(error)) ) - def match(self, query): + def match(self, query: bytes) -> Optional[OptionalBytes]: """Try to find a match for a query in the channel commands.""" if not self.can_select: ch_id = self._device._properties["selected_channel"].get_value() if ch_id in self._ids: self._selected = ch_id else: - return + return None response = self._match_dialog(query, self._dialogues["__default__"]) if response is not None: @@ -142,7 +160,27 @@ def match(self, query): return self._match_setters(query) - def _match_setters(self, query): + # --- Private API + + #: Currently active channel, this can either reflect the currently + #: selected channel on the device or the currently inspected possible + #: when attempting to match. + _selected: Optional[str] + + #: Reference to the parent device from which we might need to query and + #: set the current selected channel + _device: "Device" + + #: Ids of the activated channels. + _ids: List[str] + + #: Dialogues organized by channel IDs + _dialogues: Dict[str, Dict[bytes, bytes]] # type: ignore + + #: Getters organized by channel ID + _getters: Dict[str, Dict[bytes, Tuple[str, str]]] # type: ignore + + def _match_setters(self, query: bytes) -> Optional[OptionalBytes]: """Try to find a match""" q = query.decode("utf-8") for name, parser, response, error_response in self._setters: @@ -155,9 +193,9 @@ def _match_setters(self, query): try: if isinstance(parsed, dict) and "ch_id" in parsed: self._selected = parsed["ch_id"] - self._properties[name].set_value(parsed["0"]) + self._properties[name].set_value(str(parsed["0"])) else: - self._properties[name].set_value(parsed) + self._properties[name].set_value(str(parsed)) return response except ValueError: if isinstance(error_response, bytes): diff --git a/pyvisa_sim/common.py b/pyvisa_sim/common.py index 187f71f..f1238b0 100644 --- a/pyvisa_sim/common.py +++ b/pyvisa_sim/common.py @@ -1,44 +1,91 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.common - ~~~~~~~~~~~~~~~~~ +"""Common tools. - This code is currently taken from PyVISA-py. - Do not edit here. +This code is currently taken from PyVISA-py. +Do not edit here. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. -""" -import sys +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. +""" import logging +from typing import Callable, Iterator, Optional, Sequence from pyvisa import logger -logger = logging.LoggerAdapter(logger, {"backend": "sim"}) +logger = logging.LoggerAdapter(logger, {"backend": "sim"}) # type: ignore -class NamedObject(object): - """A class to construct named sentinels.""" +def _create_bitmask(bits: int) -> int: + """Create a bitmask for the given number of bits.""" + mask = (1 << bits) - 1 + return mask - def __init__(self, name): - self.name = name - def __repr__(self): - return "<%s>" % self.name +def iter_bytes( + data: bytes, data_bits: Optional[int] = None, send_end: Optional[bool] = None +) -> Iterator[bytes]: + """Clip values to the correct number of bits per byte. - __str__ = __repr__ + Serial communication may use from 5 to 8 bits. + Parameters + ---------- + data : The data to clip as a byte string. + data_bits : How many bits per byte should be sent. Clip to this many bits. + For example: data_bits=5: 0xff (0b1111_1111) --> 0x1f (0b0001_1111). + Acceptable range is 5 to 8, inclusive. Values above 8 will be clipped to 8. + This maps to the VISA attribute VI_ATTR_ASRL_DATA_BITS. + send_end : + If None (the default), apply the mask that is determined by data_bits. + If False, apply the mask and set the highest (post-mask) bit to 0 for + all bytes. + If True, apply the mask and set the highest (post-mask) bit to 0 for + all bytes except for the final byte, which has the highest bit set to 1. -def iter_bytes(data, mask, send_end): - for d in data[:-1]: - yield bytes([d & ~mask]) + References + ---------- + + https://www.ivifoundation.org/downloads/Architecture%20Specifications/vpp43_2022-05-19.pdf, + + https://www.ni.com/docs/en-US/bundle/ni-visa/page/ni-visa/vi_attr_asrl_data_bits.html, + + https://www.ni.com/docs/en-US/bundle/ni-visa/page/ni-visa/vi_attr_asrl_end_out.html - if send_end: - yield bytes([data[-1] | ~mask]) + """ + if send_end and data_bits is None: + raise ValueError("'send_end' requires a valid 'data_bits' value.") + + if data_bits is None: + for d in data: + yield bytes([d]) else: - yield bytes([data[-1] & ~mask]) + if data_bits <= 0: + raise ValueError("'data_bits' cannot be zero or negative") + if data_bits > 8: + data_bits = 8 + + if send_end is None: + # only apply the mask + mask = _create_bitmask(data_bits) + for d in data: + yield bytes([d & mask]) + elif bool(send_end) is False: + # apply the mask and set highest bits to 0 + # This is effectively the same has reducing the mask by 1 bit. + mask = _create_bitmask(data_bits - 1) + for d in data: + yield bytes([d & mask]) + elif bool(send_end) is True: + # apply the mask and set highest bits to 0 + # This is effectively the same has reducing the mask by 1 bit. + mask = _create_bitmask(data_bits - 1) + for d in data[:-1]: + yield bytes([d & mask]) + # except for the last byte which has it's highest bit set to 1. + last_byte = data[-1] + highest_bit = 1 << (data_bits - 1) + yield bytes([(last_byte & mask) | highest_bit]) + else: + raise ValueError(f"Unknown 'send_end' value '{send_end}'") -int_to_byte = lambda val: bytes([val]) -last_int = lambda val: val[-1] +int_to_byte: Callable[[int], bytes] = lambda val: bytes([val]) +last_int: Callable[[Sequence[int]], int] = lambda val: val[-1] diff --git a/pyvisa_sim/component.py b/pyvisa_sim/component.py index 38b08ae..279bc92 100644 --- a/pyvisa_sim/component.py +++ b/pyvisa_sim/component.py @@ -1,128 +1,236 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.component - ~~~~~~~~~~~~~~~~~~~~ +"""Base classes for devices parts. - Base classes for devices parts. +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ +import enum +from typing import ( + Dict, + Final, + Generic, + List, + Literal, + Optional, + Set, + Tuple, + Type, + TypeVar, + Union, + overload, +) + import stringparser +from typing_extensions import TypeAlias # not needed starting with 3.10 from .common import logger +# Sentinel enum which is the only 'clean' way to have sentinels and meaningful typing +class Responses(enum.Enum): + NO = object() + + +NoResponse: Final = Responses.NO + +# Type aliases to be used when NoResponse is an acceptable value +OptionalStr: TypeAlias = Union[str, Literal[Responses.NO]] +OptionalBytes: TypeAlias = Union[bytes, Literal[Responses.NO]] + + +@overload +def to_bytes(val: str) -> bytes: + ... + + +@overload +def to_bytes(val: Literal[Responses.NO]) -> Literal[Responses.NO]: + ... + + def to_bytes(val): - """Takes a text message and return a tuple""" + """Takes a text message or NoResponse and encode it.""" if val is NoResponse: return val + val = val.replace("\\r", "\r").replace("\\n", "\n") return val.encode() -# Sentinel used for when there should not be a response to a query -NoResponse = object() +T = TypeVar("T", bound=Union[int, float, str]) -class Property(object): - """A device property""" +class Specs(Generic[T]): + """Specification to validate a property value. - def __init__(self, name, value, specs): - """ - :param name: name of the property - :param value: default value - :param specs: specification dictionary - :return: - """ + Parameters + ---------- + specs : DIct[str, str] + Specs as a dictionary as extracted from the yaml config. - t = specs.get("type", None) + """ + + #: Value that lead to some validation are int, float, str + type: Optional[Type[T]] + + #: Minimal admissible value + min: Optional[T] + + #: Maximal admissible value + max: Optional[T] + + #: Discrete set of valid values + valid: Set[T] + + # FIXME add support for special values + # some instrument support INCR DECR for increment decrement, + # other support MIN, MAX, DEF + + def __init__(self, specs: Dict[str, str]) -> None: + if "type" not in specs: + raise ValueError("No property type was specified.") + + specs_type = None + t = specs["type"] if t: for key, val in (("float", float), ("int", int), ("str", str)): if t == key: - t = specs["type"] = val + specs_type = val break - for key in ("min", "max"): - if key in specs: - specs[key] = t(specs[key]) + if specs_type is None: + raise ValueError( + f"Invalid property type '{t}', valid types are: " + "'int', 'float', 'str'" + ) + self.type = specs_type + + self.min = specs_type(specs["min"]) if "min" in specs else None + self.max = specs_type(specs["max"]) if "max" in specs else None + self.valid = set([specs_type(val) for val in specs.get("valid", ())]) + + +class Property(Generic[T]): + """A device property + + Parameters + ---------- + name : str + Name of the property + value : str + Default value as a string + specs : Dict[str, str] + Specification used to validate the property value. + + """ - if "valid" in specs: - specs["valid"] = set([t(val) for val in specs["valid"]]) + #: Name of the property + name: str + #: Specification used to validate + specs: Optional[Specs[T]] + + def __init__(self, name: str, value: str, specs: Dict[str, str]): self.name = name - self.specs = specs + try: + self.specs = Specs[T](specs) if specs else None + except ValueError as e: + raise ValueError(f"Failed to create Specs for property {name}") from e self._value = None self.init_value(value) - def init_value(self, string_value): + def init_value(self, string_value: str) -> None: """Initialize the value hold by the Property.""" self.set_value(string_value) - def get_value(self): + def get_value(self) -> Optional[T]: """Return the value stored by the Property.""" return self._value - def set_value(self, string_value): + def set_value(self, string_value: str) -> None: """Set the value""" self._value = self.validate_value(string_value) - def validate_value(self, string_value): + def validate_value(self, string_value: str) -> T: """Validate that a value match the Property specs.""" specs = self.specs - if "type" in specs: - value = specs["type"](string_value) - else: - value = string_value - if "min" in specs and value < specs["min"]: - raise ValueError - if "max" in specs and value > specs["max"]: - raise ValueError - if "valid" in specs and value not in specs["valid"]: - raise ValueError + if specs is None: + # This make str the default type + return string_value # type: ignore + + assert specs.type + value: T = specs.type(string_value) # type: ignore + # Mypy dislike comparison with unresolved type vars it seems + if specs.min is not None and value < specs.min: # type: ignore + raise ValueError( + f"Value provided for {self.name}: {value} " + f"is less than the minimum {specs.min}" + ) + if specs.max is not None and value > specs.max: # type: ignore + raise ValueError( + f"Value provided for {self.name}: {value} " + f"is more than the maximum {specs.max}" + ) + if specs.valid is not None and specs.valid and value not in specs.valid: + raise ValueError( + f"Value provide for {self.name}: {value}" + f"Does not belong to the list of valid values: {specs.valid}" + ) return value + # --- Private API -class Component(object): - """A component of a device.""" + #: Current value of the property. + _value: Optional[T] - def __init__(self): - #: Stores the queries accepted by the device. - #: query: response - #: :type: dict[bytes, bytes] - self._dialogues = {} +class Component: + """A component of a device.""" - #: Maps property names to value, type, validator - #: :type: dict[str, Property] + def __init__(self) -> None: + self._dialogues = {} self._properties = {} - - #: Stores the getter queries accepted by the device. - #: query: (property_name, response) - #: :type: dict[bytes, (str, str)] self._getters = {} - - #: Stores the setters queries accepted by the device. - #: (property_name, string parser query, response, error response) - #: :type: list[(str, stringparser.Parser, bytes, bytes)] self._setters = [] - def add_dialogue(self, query, response): + def add_dialogue(self, query: str, response: str) -> None: """Add dialogue to device. - :param query: query string - :param response: response string + Parameters + ---------- + query : str + Query to which the dialog answers to. + response : str + Response to the dialog query. + """ self._dialogues[to_bytes(query)] = to_bytes(response) - def add_property(self, name, default_value, getter_pair, setter_triplet, specs): + def add_property( + self, + name: str, + default_value: str, + getter_pair: Optional[Tuple[str, str]], + setter_triplet: Optional[Tuple[str, OptionalStr, OptionalStr]], + specs: Dict[str, str], + ): """Add property to device - :param name: property name - :param default_value: default value as string - :param getter_pair: (query, response) - :param setter_triplet: (query, response, error) - :param specs: specification of the Property + Parameters + ---------- + property_name : str + Name of the property. + default_value : str + Default value of the property as a str. + getter_pair : Optional[Tuple[str, str]] + Parameters for accessing the property value (query and response str) + setter_triplet : Optional[Tuple[str, OptionalStr, OptionalStr]] + Parameters for setting the property value. The response and error + are optional. + specs : Dict[str, str] + Specification for the property as a dict. + """ self._properties[name] = Property(name, default_value, specs) @@ -131,22 +239,49 @@ def add_property(self, name, default_value, getter_pair, setter_triplet, specs): self._getters[to_bytes(query)] = name, response if setter_triplet: - query, response, error = setter_triplet + query, response_, error = setter_triplet self._setters.append( - (name, stringparser.Parser(query), to_bytes(response), to_bytes(error)) + (name, stringparser.Parser(query), to_bytes(response_), to_bytes(error)) ) - def match(self, query): + def match(self, query: bytes) -> Optional[OptionalBytes]: """Try to find a match for a query in the instrument commands.""" raise NotImplementedError() - def _match_dialog(self, query, dialogues=None): + # --- Private API + + #: Stores the queries accepted by the device. + #: query: response + _dialogues: Dict[bytes, bytes] + + #: Maps property names to value, type, validator + _properties: Dict[str, Property] + + #: Stores the getter queries accepted by the device. + #: query: (property_name, response) + _getters: Dict[bytes, Tuple[str, str]] + + #: Stores the setters queries accepted by the device. + #: (property_name, string parser query, response, error response) + _setters: List[Tuple[str, stringparser.Parser, OptionalBytes, OptionalBytes]] + + def _match_dialog( + self, query: bytes, dialogues: Optional[Dict[bytes, bytes]] = None + ) -> Optional[bytes]: """Tries to match in dialogues - :param query: message tuple - :type query: Tuple[bytes] - :return: response if found or None - :rtype: Tuple[bytes] | None + Parameters + ---------- + query : bytes + Query that we try to match to. + dialogues : Optional[Dict[bytes, bytes]], optional + Alternative dialogs to use when matching. + + Returns + ------- + Optional[bytes] + Response if a dialog matched. + """ if dialogues is None: dialogues = self._dialogues @@ -158,13 +293,27 @@ def _match_dialog(self, query, dialogues=None): return response - def _match_getters(self, query, getters=None): + return None + + def _match_getters( + self, + query: bytes, + getters: Optional[Dict[bytes, Tuple[str, str]]] = None, + ) -> Optional[bytes]: """Tries to match in getters - :param query: message tuple - :type query: Tuple[bytes] - :return: response if found or None - :rtype: Tuple[bytes] | None + Parameters + ---------- + query : bytes + Query that we try to match to. + dialogues : Optional[Dict[bytes, bytes]], optional + Alternative getters to use when matching. + + Returns + ------- + Optional[bytes] + Response if a dialog matched. + """ if getters is None: getters = self._getters @@ -175,13 +324,21 @@ def _match_getters(self, query, getters=None): response = response.format(self._properties[name].get_value()) return response.encode("utf-8") - def _match_setters(self, query): + return None + + def _match_setters(self, query: bytes) -> Optional[OptionalBytes]: """Tries to match in setters - :param query: message tuple - :type query: Tuple[bytes] - :return: response if found or None - :rtype: Tuple[bytes] | None + Parameters + ---------- + query : bytes + Query that we try to match to. + + Returns + ------- + Optional[bytes] + Response if a dialog matched. + """ q = query.decode("utf-8") for name, parser, response, error_response in self._setters: @@ -197,6 +354,5 @@ def _match_setters(self, query): except ValueError: if isinstance(error_response, bytes): return error_response - return self.error_response("command_error") return None diff --git a/pyvisa_sim/devices.py b/pyvisa_sim/devices.py index aa8b50d..9824e41 100644 --- a/pyvisa_sim/devices.py +++ b/pyvisa_sim/devices.py @@ -1,22 +1,31 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.devices - ~~~~~~~~~~~~~~~~~~ +"""Classes to simulate devices. - Classes to simulate devices. +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ +from typing import Dict, List, Optional, Tuple, Union + from pyvisa import constants, rname +from .channels import Channels from .common import logger -from .component import to_bytes, Component, NoResponse +from .component import Component, NoResponse, OptionalBytes, to_bytes + + +class StatusRegister: + """Class used to mimic a register. + Parameters + ---------- + values : values: Dict[str, int] + Mapping between a name and the associated integer value. + The name 'q' is reserved and ignored. -class StatusRegister(object): - def __init__(self, values): - object.__init__(self) + """ + + def __init__(self, values: Dict[str, int]) -> None: self._value = 0 self._error_map = {} for name, value in values.items(): @@ -24,25 +33,43 @@ def __init__(self, values): continue self._error_map[name] = int(value) - def set(self, error_key): + def set(self, error_key: str) -> None: self._value = self._value | self._error_map[error_key] - def keys(self): + def keys(self) -> List[str]: return list(self._error_map.keys()) @property - def value(self): + def value(self) -> bytes: return to_bytes(str(self._value)) - def clear(self): + def clear(self) -> None: self._value = 0 + # --- Private API -class ErrorQueue(object): - def __init__(self, values): + #: Mapping between name and integer values. + _error_map: Dict[str, int] - super(ErrorQueue, self).__init__() - self._queue = [] + #: Current value of the register. + _value: int + + +class ErrorQueue: + """Store error messages in a FIFO queue. + + Parameters + ---------- + values : values: Dict[str, str] + Mapping between a name and the associated detailed error message. + The names 'q', 'default' and 'strict' are reserved. + 'q' and 'strict' are ignored, 'default' is used to set up the default + response when the queue is empty. + + """ + + def __init__(self, values: Dict[str, str]) -> None: + self._queue: List[bytes] = [] self._error_map = {} for name, value in values.items(): if name in ("q", "default", "strict"): @@ -50,89 +77,73 @@ def __init__(self, values): self._error_map[name] = to_bytes(value) self._default = to_bytes(values["default"]) - def append(self, err): + def append(self, err: str) -> None: if err in self._error_map: self._queue.append(self._error_map[err]) @property - def value(self): + def value(self) -> bytes: if self._queue: return self._queue.pop(0) else: return self._default - def clear(self): + def clear(self) -> None: self._queue = [] + # --- Private API + + #: Queue of recorded errors + _queue: List[bytes] + + #: Mapping between short error names and complete error messages + _error_map: Dict[str, bytes] + + #: Default response when the queue is empty. + _default: bytes + class Device(Component): """A representation of a responsive device - :param name: The identification name of the device - :type name: str - :param name: fullpath of the device where it is defined. - :type name: str - """ - - # To be bound when adding the Device to Devices - _resource_name = None + Parameters + ---------- + name : str + The identification name of the device + delimiter : bytes + Character delimiting multiple message sent in a single query. - # Default end of message used in query operations - # :type: bytes - _query_eom = b"" + """ - # Default end of message used in response operations - # :type: bytes - _response_eom = None + #: Name of the device. + name: str - def __init__(self, name, delimiter): + #: Special character use to delimit multiple messages. + delimiter: bytes + def __init__(self, name: str, delimiter: bytes) -> None: super(Device, self).__init__() - - #: Name of the device. self.name = name - - #: Special character use to delimit multiple messages. self.delimiter = delimiter - - #: Mapping between a name and a Channels object + self._resource_name = None + self._query_eom = b"" + self._response_eom = b"" self._channels = {} - - #: Stores the error response for each query accepted by the device. - #: :type: dict[bytes, bytes | NoResponse] self._error_response = {} - - #: Stores the registers by name. - #: Register name -> Register object - #: :type: dict[str, StatusRegister] self._status_registers = {} - self._error_map = {} - - #: Stores the specific end of messages for device. - #: TYPE CLASS -> (query termination, response termination) - #: :type: dict[(pyvisa.constants.InterfaceType, str), (str, str)] self._eoms = {} - - #: Buffer in which the user can read - #: :type: bytearray self._output_buffer = bytearray() - - #: Buffer in which the user can write - #: :type: bytearray self._input_buffer = bytearray() - - #: Mapping an error queue query and the queue. - #: :type: dict self._error_queues = {} @property - def resource_name(self): + def resource_name(self) -> Optional[str]: """Assigned resource name""" return self._resource_name @resource_name.setter - def resource_name(self, value): + def resource_name(self, value: str) -> None: p = rname.parse_resource_name(value) self._resource_name = str(p) try: @@ -146,11 +157,12 @@ def resource_name(self, value): ) self._query_eom, self._response_eom = b"\n", b"\n" - def add_channels(self, ch_name, ch_obj): + def add_channels(self, ch_name: str, ch_obj: Channels) -> None: """Add a channel definition.""" self._channels[ch_name] = ch_obj - def add_error_handler(self, error_input): + # FIXME use a TypedDict + def add_error_handler(self, error_input: Union[dict, str]): """Add error handler to the device""" if isinstance(error_input, dict): @@ -182,7 +194,8 @@ def add_error_handler(self, error_input): for key, value in response_dict.items(): self._error_response[key] = to_bytes(value) - def error_response(self, error_key): + def error_response(self, error_key: str) -> Optional[bytes]: + """Uupdate all error queues and return an error message if it exists.""" if error_key in self._error_map: self._error_map[error_key].set(error_key) @@ -191,42 +204,42 @@ def error_response(self, error_key): return self._error_response.get(error_key) - def add_eom(self, type_class, query_termination, response_termination): + def add_eom( + self, type_class: str, query_termination: str, response_termination: str + ) -> None: """Add default end of message for a given interface type and resource class. - :param type_class: interface type and resource class as strings joined by space - :param query_termination: end of message used in queries. - :param response_termination: end of message used in responses. + Parameters + ---------- + type_class : str + Interface type and resource class as strings joined by space + query_termination : str + End of message used in queries. + response_termination : str + End of message used in responses. + """ - interface_type, resource_class = type_class.split(" ") - interface_type = getattr(constants.InterfaceType, interface_type.lower()) + i_t, resource_class = type_class.split(" ") + interface_type = getattr(constants.InterfaceType, i_t.lower()) self._eoms[(interface_type, resource_class)] = ( to_bytes(query_termination), to_bytes(response_termination), ) - def write(self, data): - """Write data into the device input buffer. - - :param data: single element byte - :type data: bytes - """ + def write(self, data: bytes) -> None: + """Write data into the device input buffer.""" logger.debug("Writing into device input buffer: %r" % data) if not isinstance(data, bytes): raise TypeError("data must be an instance of bytes") - if len(data) != 1: - msg = "data must have a length of 1, not %d" - raise ValueError(msg % len(data)) - self._input_buffer.extend(data) - l = len(self._query_eom) + le = len(self._query_eom) if not self._input_buffer.endswith(self._query_eom): return try: - message = bytes(self._input_buffer[:-l]) + message = bytes(self._input_buffer[:-le]) queries = message.split(self.delimiter) if self.delimiter else [message] for query in queries: response = self._match(query) @@ -234,6 +247,7 @@ def write(self, data): if response is None: response = self.error_response("command_error") + assert response is not None if response is not NoResponse: self._output_buffer.extend(response) @@ -245,7 +259,7 @@ def write(self, data): finally: self._input_buffer = bytearray() - def read(self): + def read(self) -> bytes: """Return a single byte from the output buffer""" if self._output_buffer: b, self._output_buffer = (self._output_buffer[0:1], self._output_buffer[1:]) @@ -253,14 +267,46 @@ def read(self): return b"" - def _match(self, query): - """Tries to match in dialogues, getters and setters and subcomponents + # --- Private API - :param query: message tuple - :type query: Tuple[bytes] - :return: response if found or None - :rtype: Tuple[bytes] | None - """ + #: Resource name this device is bound to. Set when adding the device to Devices + _resource_name: Optional[str] + + # Default end of message used in query operations + _query_eom: bytes + + # Default end of message used in response operations + _response_eom: bytes + + #: Mapping between a name and a Channels object + _channels: Dict[str, Channels] + + #: Stores the error response for each query accepted by the device. + _error_response: Dict[str, bytes] + + #: Stores the registers by name. + #: Register name -> Register object + _status_registers: Dict[bytes, StatusRegister] + + #: Mapping between error and register affected by the error. + _error_map: Dict[str, StatusRegister] + + #: Stores the specific end of messages for device. + #: TYPE CLASS -> (query termination, response termination) + _eoms: Dict[Tuple[constants.InterfaceType, str], Tuple[bytes, bytes]] + + #: Buffer in which the user can read + _output_buffer: bytearray + + #: Buffer in which the user can write + _input_buffer: bytearray + + #: Mapping an error queue query and the queue. + _error_queues: Dict[bytes, ErrorQueue] + + def _match(self, query: bytes) -> Optional[OptionalBytes]: + """Tries to match in dialogues, getters and setters and channels.""" + response: Optional[OptionalBytes] response = self._match_dialog(query) if response is not None: return response @@ -289,14 +335,8 @@ def _match(self, query): return None - def _match_registers(self, query): - """Tries to match in status registers - - :param query: message tuple - :type query: Tuple[bytes] - :return: response if found or None - :rtype: Tuple[bytes] | None - """ + def _match_registers(self, query: bytes) -> Optional[bytes]: + """Tries to match in status registers.""" if query in self._status_registers: register = self._status_registers[query] response = register.value @@ -305,14 +345,10 @@ def _match_registers(self, query): return response - def _match_errors_queues(self, query): - """Tries to match in error queues + return None - :param query: message tuple - :type query: Tuple[bytes] - :return: response if found or None - :rtype: Tuple[bytes] | None - """ + def _match_errors_queues(self, query: bytes) -> Optional[bytes]: + """Tries to match in error queues.""" if query in self._error_queues: queue = self._error_queues[query] response = queue.value @@ -320,17 +356,16 @@ def _match_errors_queues(self, query): return response + return None -class Devices(object): - """The group of connected devices.""" - def __init__(self): +class Devices: + """The group of connected devices.""" - #: Devices - #: dict[str, Device] + def __init__(self) -> None: self._internal = {} - def add_device(self, resource_name, device): + def add_device(self, resource_name: str, device: Device) -> None: """Bind device to resource name""" if device.resource_name is not None: @@ -341,12 +376,17 @@ def add_device(self, resource_name, device): self._internal[device.resource_name] = device - def __getitem__(self, item): + def __getitem__(self, item: str) -> Device: return self._internal[item] - def list_resources(self): + def list_resources(self) -> Tuple[str, ...]: """List resource names. :rtype: tuple[str] """ return tuple(self._internal.keys()) + + # --- Private API + + #: Resource name to device map. + _internal: Dict[str, Device] diff --git a/pyvisa_sim/gpib.py b/pyvisa_sim/gpib.py deleted file mode 100644 index 9d057aa..0000000 --- a/pyvisa_sim/gpib.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- -""" - pyvisa-sim.gpib - ~~~~~~~~~~~~~~~ - - GPIB simulated session. - - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. -""" -import queue - -import time - -from pyvisa import constants - -from . import sessions - - -@sessions.Session.register(constants.InterfaceType.gpib, "INSTR") -class GPIBInstrumentSession(sessions.Session): - def after_parsing(self): - self.attrs[constants.VI_ATTR_INTF_NUM] = int(self.parsed.board) - self.attrs[constants.VI_ATTR_GPIB_PRIMARY_ADDR] = int( - self.parsed.primary_address - ) - self.attrs[constants.VI_ATTR_GPIB_SECONDARY_ADDR] = ( - int(self.parsed.secondary_address) - if self.parsed.secondary_address is not None - else constants.VI_NO_SEC_ADDR - ) - - def read(self, count): - end_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) - enabled, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR_EN) - timeout, _ = self.get_attribute(constants.VI_ATTR_TMO_VALUE) - timeout /= 1000 - - start = time.time() - - out = b"" - - while time.time() - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data): - send_end = self.get_attribute(constants.VI_ATTR_SEND_END_EN) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM4882 - pass - - return len(data), constants.StatusCode.success diff --git a/pyvisa_sim/highlevel.py b/pyvisa_sim/highlevel.py index c3a231e..90098e5 100644 --- a/pyvisa_sim/highlevel.py +++ b/pyvisa_sim/highlevel.py @@ -1,27 +1,24 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.highlevel - ~~~~~~~~~~~~~~~~~~~~ +"""Simulated VISA Library. - Simulated VISA Library. +:copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ - import random -from traceback import format_exc from collections import OrderedDict +from traceback import format_exc +from typing import Any, Dict, SupportsInt, Tuple, Union, overload -from pyvisa import constants, highlevel, rname import pyvisa.errors as errors +from pyvisa import constants, highlevel, rname +from pyvisa.typing import VISAEventContext, VISARMSession, VISASession from pyvisa.util import LibraryPath -from . import parser -from . import sessions - # This import is required to register subclasses -from . import gpib, serial, tcpip, usb +from . import parser +from .sessions import gpib, serial, tcpip, usb # noqa +from .sessions.session import Session class SimVisaLibrary(highlevel.VisaLibraryBase): @@ -29,55 +26,63 @@ class SimVisaLibrary(highlevel.VisaLibraryBase): The object is basically a dispatcher with some common functions implemented. - When a new resource object is requested to pyvisa, the library creates a Session object - (that knows how to perform low-level communication operations) associated with a session handle - (a number, usually refered just as session). + When a new resource object is requested to pyvisa, the library creates a Session + object (that knows how to perform low-level communication operations) associated + with a session handle (a number, usually referred just as session). - A call to a library function is handled by PyVisaLibrary if it involves a resource agnostic - function or dispatched to the correct session object (obtained from the session id). + A call to a library function is handled by PyVisaLibrary if it involves a resource + agnostic function or dispatched to the correct session object (obtained from the + session id). + + Importantly, the user is unaware of this. PyVisaLibrary behaves for the user + just as an IVIVisaLibrary. - Importantly, the user is unaware of this. PyVisaLibrary behaves for the user just as NIVisaLibrary. """ + #: Maps session handle to session objects. + sessions: Dict[VISASession, Session] + @staticmethod - def get_library_paths(): + def get_library_paths() -> Tuple[LibraryPath]: """List a dummy library path to allow to create the library.""" return (LibraryPath("unset"),) @staticmethod - def get_debug_info(): - """Return a list of lines with backend info. - """ + def get_debug_info() -> Dict[str, str]: + """Return a list of lines with backend info.""" from . import __version__ from .parser import SPEC_VERSION + d = OrderedDict() - d['Version'] = '%s' % __version__ - d['Spec version'] = SPEC_VERSION + d["Version"] = "%s" % __version__ + d["Spec version"] = SPEC_VERSION return d - def _init(self): - - #: map session handle to session object. - #: dict[int, SessionSim] - self.sessions = {} - + def _init(self) -> None: + self.sessions: Dict[int, Session] = {} try: - if self.library_path == 'unset': - self.devices = parser.get_devices('default.yaml', True) + if self.library_path == "unset": + self.devices = parser.get_devices("default.yaml", True) else: self.devices = parser.get_devices(self.library_path, False) except Exception as e: - msg = 'Could not parse definitions file. %r' + msg = "Could not parse definitions file. %r" raise type(e)(msg % format_exc()) + @overload + def _register(self, obj: "SimVisaLibrary") -> VISARMSession: + ... + + @overload + def _register(self, obj: Session) -> VISASession: + ... + def _register(self, obj): - """Creates a random but unique session handle for a session object, - register it in the sessions dictionary and return the value + """Creates a random but unique session handle for a session object. + + The handle is registered it in the sessions dictionary and returned. - :param obj: a session object. - :return: session handle - :rtype: int """ session = None @@ -87,80 +92,126 @@ def _register(self, obj): self.sessions[session] = obj return session - # noinspection PyShadowingBuiltins - def open(self, session, resource_name, - access_mode=constants.AccessModes.no_lock, open_timeout=constants.VI_TMO_IMMEDIATE): + def open( + self, + session: VISARMSession, + resource_name: str, + access_mode: constants.AccessModes = constants.AccessModes.no_lock, + open_timeout: SupportsInt = constants.VI_TMO_IMMEDIATE, + ) -> Tuple[VISASession, constants.StatusCode]: """Opens a session to the specified resource. Corresponds to viOpen function of the VISA library. - :param session: Resource Manager session - (should always be a session returned - from open_default_resource_manager()). - :param resource_name: Unique symbolic name of a resource. - :param access_mode: Specifies the mode by which the resource is to be accessed. (constants.AccessModes) - :param open_timeout: Specifies the maximum time period (in milliseconds) that this operation waits - before returning an error. - :return: Unique logical identifier reference to a session, return value of the library call. - :rtype: session, :class:`pyvisa.constants.StatusCode` + Parameters + ---------- + sessions : VISARMSession + Resource Manager session (should always be a session returned + from open_default_resource_manager()). + resource_name : str + Unique symbolic name of a resource. + access_mode : constants.AccessModes + Specifies the mode by which the resource is to be accessed. + open_timeout : int + Specifies the maximum time period (in milliseconds) that this operation + waits before returning an error. + + Returns + ------- + VISASession + Unique logical identifier reference to a session, return value of the + library call. + constants.StatusCode + Status code describing the operation execution. + """ try: open_timeout = int(open_timeout) except ValueError: - raise ValueError('open_timeout (%r) must be an integer (or compatible type)' % open_timeout) + raise ValueError( + "open_timeout (%r) must be an integer (or compatible type)" + % open_timeout + ) try: parsed = rname.parse_resource_name(resource_name) except rname.InvalidResourceName: - return 0, constants.StatusCode.error_invalid_resource_name + return VISASession(0), constants.StatusCode.error_invalid_resource_name # Loops through all session types, tries to parse the resource name and if ok, open it. - cls = sessions.Session.get_session_class(parsed.interface_type_const, parsed.resource_class) + cls = Session.get_session_class( + parsed.interface_type_const, parsed.resource_class + ) sess = cls(session, resource_name, parsed) try: - sess.device = self.devices[sess.attrs[constants.VI_ATTR_RSRC_NAME]] + r_name = sess.attrs[constants.ResourceAttribute.resource_name] + assert isinstance(r_name, str) + sess.device = self.devices[r_name] except KeyError: - return 0, constants.StatusCode.error_resource_not_found + return VISASession(0), constants.StatusCode.error_resource_not_found return self._register(sess), constants.StatusCode.success - def close(self, session): + def close( + self, session: Union[VISASession, VISARMSession, VISAEventContext] + ) -> constants.StatusCode: """Closes the specified session, event, or find list. Corresponds to viClose function of the VISA library. - :param session: Unique logical identifier to a session, event, or find list. - :return: return value of the library call. - :rtype: :class:`pyvisa.constants.StatusCode` + Parameters + ---------- + session : Union[VISASession, VISARMSession, VISAEventContext] + Unique logical identifier to a session, event, or find list. + + Returns + ------- + constants.StatusCode + Return value of the library call. + """ try: - del self.sessions[session] + del self.sessions[session] # type: ignore return constants.StatusCode.success except KeyError: return constants.StatusCode.error_invalid_object - def open_default_resource_manager(self): + def open_default_resource_manager( + self, + ) -> Tuple[VISARMSession, constants.StatusCode]: """This function returns a session to the Default Resource Manager resource. Corresponds to viOpenDefaultRM function of the VISA library. - :return: Unique logical identifier to a Default Resource Manager session, return value of the library call. - :rtype: session, :class:`pyvisa.constants.StatusCode` + Returns + ------- + VISARMSession + Unique logical identifier to a Default Resource Manager session, return + value of the library call. + constants.StatusCode + Return value of the library call. + """ return self._register(self), constants.StatusCode.success - def list_resources(self, session, query='?*::INSTR'): + def list_resources( + self, session: VISARMSession, query: str = "?*::INSTR" + ) -> Tuple[str, ...]: """Returns a tuple of all connected devices matching query. - :param session: - :param query: regular expression used to match devices. - """ - - # For each session type, ask for the list of connected resources and merge them into a single list. + Parameters + ---------- + session : VISARMSession + Resource manager session + query : str + VISA regular expression used to match devices. + """ + # For each session type, ask for the list of connected resources and merge + # them into a single list. resources = self.devices.list_resources() resources = rname.filter(resources, query) @@ -170,40 +221,64 @@ def list_resources(self, session, query='?*::INSTR'): raise errors.VisaIOError(errors.StatusCode.error_resource_not_found.value) - def read(self, session, count): + def read( + self, session: VISASession, count: int + ) -> Tuple[bytes, constants.StatusCode]: """Reads data from device or interface synchronously. Corresponds to viRead function of the VISA library. - :param session: Unique logical identifier to a session. - :param count: Number of bytes to be read. - :return: data read, return value of the library call. - :rtype: bytes, :class:`pyvisa.constants.StatusCode` + Parameters + ---------- + session : VISASession + Unique logical identifier to a session. + count : int + Number of bytes to be read. + + Returns + ------- + bytes + Date read + constants.StatusCode + Return value of the library call. + """ try: sess = self.sessions[session] except KeyError: - return b'', constants.StatusCode.error_invalid_object + return b"", constants.StatusCode.error_invalid_object try: - chunk, status = sess.read(count) + # We have an explicit except AttributeError + chunk, status = sess.read(count) # type: ignore if status == constants.StatusCode.error_timeout: raise errors.VisaIOError(constants.VI_ERROR_TMO) return chunk, status except AttributeError: - return b'', constants.StatusCode.error_nonsupported_operation + return b"", constants.StatusCode.error_nonsupported_operation - def write(self, session, data): + def write( + self, session: VISASession, data: bytes + ) -> Tuple[int, constants.StatusCode]: """Writes data to device or interface synchronously. Corresponds to viWrite function of the VISA library. - :param session: Unique logical identifier to a session. - :param data: data to be written. - :type data: str - :return: Number of bytes actually transferred, return value of the library call. - :rtype: int, :class:`pyvisa.constants.StatusCode` + Parameters + ---------- + session : VISASession + Unique logical identifier to a session. + data : bytes + Data to be written. + + Returns + ------- + int + Number of bytes actually transferred + constants.StatusCode + Return value of the library call. + """ try: @@ -212,45 +287,77 @@ def write(self, session, data): return 0, constants.StatusCode.error_invalid_object try: - return sess.write(data) + # We have an explicit except AttributeError + return sess.write(data) # type: ignore except AttributeError: return 0, constants.StatusCode.error_nonsupported_operation - def get_attribute(self, session, attribute): + def get_attribute( + self, + session: Union[VISASession, VISARMSession, VISAEventContext], + attribute: Union[constants.ResourceAttribute, constants.EventAttribute], + ) -> Tuple[Any, constants.StatusCode]: """Retrieves the state of an attribute. Corresponds to viGetAttribute function of the VISA library. - :param session: Unique logical identifier to a session, event, or find list. - :param attribute: Resource attribute for which the state query is made (see Attributes.*) - :return: The state of the queried attribute for a specified resource, return value of the library call. - :rtype: unicode (Py2) or str (Py3), list or other type, :class:`pyvisa.constants.StatusCode` + Parameters + ---------- + session : Union[VISASession, VISARMSession, VISAEventContext] + Unique logical identifier to a session, event, or find list. + attribute : Union[constants.ResourceAttribute, constants.EventAttribute] + Resource attribute for which the state query is made (see Attributes.*) + + Returns + ------- + Any + State of the queried attribute for a specified resource + constants.StatusCode + Return value of the library call. + """ try: - sess = self.sessions[session] + sess = self.sessions[session] # type: ignore except KeyError: return 0, constants.StatusCode.error_invalid_object - return sess.get_attribute(attribute) + # Not sure how to handle events yet and I do not want to error if people keep + # using the bare attribute values. + return sess.get_attribute(attribute) # type: ignore - def set_attribute(self, session, attribute, attribute_state): + def set_attribute( + self, + session: Union[VISASession, VISARMSession, VISAEventContext], + attribute: Union[constants.ResourceAttribute, constants.EventAttribute], + attribute_state: Any, + ) -> constants.StatusCode: """Sets the state of an attribute. Corresponds to viSetAttribute function of the VISA library. - :param session: Unique logical identifier to a session. - :param attribute: Attribute for which the state is to be modified. (Attributes.*) - :param attribute_state: The state of the attribute to be set for the specified object. - :return: return value of the library call. - :rtype: :class:`pyvisa.constants.StatusCode` - """ + Parameters + ---------- + session : Union[VISASession, VISARMSession, VISAEventContext] + Unique logical identifier to a session. + attribute : Union[constants.ResourceAttribute, constants.EventAttribute] + Attribute for which the state is to be modified. (Attributes.*) + attribute_state : Any + The state of the attribute to be set for the specified object. + + Returns + ------- + constants.StatusCode + Return value of the library call. + """ try: - sess = self.sessions[session] + sess = self.sessions[session] # type: ignore except KeyError: return constants.StatusCode.error_invalid_object - return sess.set_attribute(attribute, attribute_state) + # Not sure how to handle events yet and I do not want to error if people keep + # using the bare attribute values. + return sess.set_attribute(attribute, attribute_state) # type: ignore def disable_event(self, session, event_type, mechanism): # TODO: implement this for GPIB finalization diff --git a/pyvisa_sim/parser.py b/pyvisa_sim/parser.py index 0b4b070..e2d945d 100644 --- a/pyvisa_sim/parser.py +++ b/pyvisa_sim/parser.py @@ -1,27 +1,37 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.parser - ~~~~~~~~~~~~~~~~~ +"""Parser function - Parser function +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ +import importlib.resources import os -from io import open, StringIO +import pathlib from contextlib import closing +from io import StringIO, open from traceback import format_exc +from typing import ( + Any, + BinaryIO, + Dict, + Generic, + Literal, + Mapping, + TextIO, + Tuple, + TypeVar, + Union, +) -import pkg_resources import yaml -from .component import NoResponse -from .devices import Devices, Device from .channels import Channels +from .component import Component, NoResponse, Responses +from .devices import Device, Devices -def _ver_to_tuple(ver): +def _ver_to_tuple(ver: str) -> Tuple[int, ...]: return tuple(map(int, (ver.split(".")))) @@ -31,13 +41,18 @@ def _ver_to_tuple(ver): SPEC_VERSION_TUPLE = _ver_to_tuple(SPEC_VERSION) -class SimpleChainmap(object): +# FIXME does not allow to alter an inherited dialogue, property, etc +K = TypeVar("K") +V = TypeVar("V") + + +class SimpleChainmap(Generic[K, V]): """Combine multiple mappings for sequential lookup.""" - def __init__(self, *maps): + def __init__(self, *maps: Mapping[K, V]) -> None: self._maps = maps - def __getitem__(self, key): + def __getitem__(self, key: K) -> V: for mapping in self._maps: try: return mapping[key] @@ -46,37 +61,23 @@ def __getitem__(self, key): raise KeyError(key) -def _s(s): - """Strip white spaces""" - if s is NoResponse: - return s - - return s.strip(" ") - - -def _get_pair(dd): - """Return a pair from a dialogue dictionary. +def _get_pair(dd: Dict[str, str]) -> Tuple[str, str]: + """Return a pair from a dialogue dictionary.""" + return dd["q"].strip(" "), dd["r"].strip(" ") if "r" in dd else NoResponse # type: ignore[return-value] - :param dd: Dialogue dictionary. - :type dd: Dict[str, str] - :return: (query, response) - :rtype: (str, str) - """ - return _s(dd["q"]), _s(dd.get("r", NoResponse)) - - -def _get_triplet(dd): - """Return a triplet from a dialogue dictionary. - :param dd: Dialogue dictionary. - :type dd: Dict[str, str] - :return: (query, response, error response) - :rtype: (str, str | NoResponse, str | NoResponse) - """ - return _s(dd["q"]), _s(dd.get("r", NoResponse)), _s(dd.get("e", NoResponse)) +def _get_triplet( + dd: Dict[str, str] +) -> Tuple[str, Union[str, Literal[Responses.NO]], Union[str, Literal[Responses.NO]]]: + """Return a triplet from a dialogue dictionary.""" + return ( + dd["q"].strip(" "), + dd["r"].strip(" ") if "r" in dd else NoResponse, + dd["e"].strip(" ") if "e" in dd else NoResponse, + ) -def _load(content_or_fp): +def _load(content_or_fp: Union[str, bytes, TextIO, BinaryIO]) -> Dict[str, Any]: """YAML Parse a file or str and check version.""" try: data = yaml.load(content_or_fp, Loader=yaml.loader.BaseLoader) @@ -85,16 +86,16 @@ def _load(content_or_fp): try: ver = data["spec"] - except: - raise ValueError("The file does not specify a spec version") + except Exception as e: + raise ValueError("The file does not specify a spec version") from e try: ver = tuple(map(int, (ver.split(".")))) - except: + except Exception as e: raise ValueError( "Invalid spec version format. Expect 'X.Y'" " (X and Y integers), found %s" % ver - ) + ) from e if ver > SPEC_VERSION_TUPLE: raise ValueError( @@ -106,22 +107,23 @@ def _load(content_or_fp): return data -def parse_resource(name): - """Parse a resource file""" - with closing(pkg_resources.resource_stream(__name__, name)) as fp: +def parse_resource(name: str) -> Dict[str, Any]: + """Parse a resource file.""" + with closing(importlib.resources.open_binary("pyvisa_sim", name)) as fp: rbytes = fp.read() return _load(StringIO(rbytes.decode("utf-8"))) -def parse_file(fullpath): - """Parse a file""" - +def parse_file(fullpath: Union[str, pathlib.Path]) -> Dict[str, Any]: + """Parse a file.""" with open(fullpath, encoding="utf-8") as fp: return _load(fp) -def update_component(name, comp, component_dict): +def update_component( + name: str, comp: Component, component_dict: Dict[str, Any] +) -> None: """Get a component from a component dict.""" for dia in component_dict.get("dialogues", ()): try: @@ -148,12 +150,14 @@ def update_component(name, comp, component_dict): raise type(e)(msg % (name, prop_name, format_exc())) -def get_bases(definition_dict, loader): - """Collect dependencies.""" +def get_bases(definition_dict: Dict[str, Any], loader: "Loader") -> Dict[str, Any]: + """Collect inherited behaviors.""" bases = definition_dict.get("bases", ()) if bases: + # FIXME this currently does not work + raise NotImplementedError bases = ( - loader.get_comp_dict(required_version=SPEC_VERSION_TUPLE[0], **b) + loader.get_comp_dict(required_version=SPEC_VERSION_TUPLE[0], **b) # type: ignore for b in bases ) return SimpleChainmap(definition_dict, *bases) @@ -161,17 +165,35 @@ def get_bases(definition_dict, loader): return definition_dict -def get_channel(device, ch_name, channel_dict, loader, resource_dict): +def get_channel( + device: Device, + ch_name: str, + channel_dict: Dict[str, Any], + loader: "Loader", + resource_dict: Dict[str, Any], +) -> Channels: """Get a channels from a channels dictionary. - :param device: - :param ch_name: - :param channel_dict: - :param loader: - :param resource_dict: - :rtype: Device + Parameters + ---------- + device : Device + Device from which to retrieve a channel + ch_name : str + Name of the channel to access + channel_dict : Dict[str, Any] + Definition of the channel. + loader : Loader + Loader containing all the loaded information. + resource_dict : Dict[str, Any] + Dictionary describing the resource to which the device is attached. + + Returns + ------- + Channels: + Channels for the device. + """ - channel_dict = get_bases(channel_dict, loader) + cd = get_bases(channel_dict, loader) r_ids = resource_dict.get("channel_ids", {}).get(ch_name, []) ids = r_ids if r_ids else channel_dict.get("ids", {}) @@ -179,19 +201,35 @@ def get_channel(device, ch_name, channel_dict, loader, resource_dict): can_select = False if channel_dict.get("can_select") == "False" else True channels = Channels(device, ids, can_select) - update_component(ch_name, channels, channel_dict) + update_component(ch_name, channels, cd) return channels -def get_device(name, device_dict, loader, resource_dict): +def get_device( + name: str, + device_dict: Dict[str, Any], + loader: "Loader", + resource_dict: Dict[str, str], +) -> Device: """Get a device from a device dictionary. - :param loader: - :param resource_dict: - :param name: name of the device - :param device_dict: device dictionary - :rtype: Device + Parameters + ---------- + name : str + Name identifying the device. + device_dict : Dict[str, Any] + Dictionary describing the device. + loader : Loader + Global loader centralizing all devices information. + resource_dict : Dict[str, str] + Resource information to which the device is attached. + + Returns + ------- + Device + Accessed device + """ device = Device(name, device_dict.get("delimiter", ";").encode("utf-8")) @@ -213,21 +251,49 @@ def get_device(name, device_dict, loader, resource_dict): return device -class Loader(object): - def __init__(self, filename, bundled): +class Loader: + """Loader handling accessing the definitions in YAML files. - # (absolute path / resource name / None, bundled) -> dict - # :type: dict[str | None, bool, dict] - self._cache = {} + Parameters + ---------- + filename : Union[str, pathlib.Path] + Path to the file to be loaded on creation. + bundled : bool + Is the file bundled with pyvisa-sim itself. - self.data = self._load(filename, bundled, SPEC_VERSION_TUPLE[0]) + """ + + #: Definitions loaded from a YAML file. + data: Dict[str, Any] + def __init__(self, filename: Union[str, pathlib.Path], bundled: bool): + self._cache = {} self._filename = filename self._bundled = bundled - self._basepath = os.path.dirname(filename) - - def load(self, filename, bundled, parent, required_version): + self.data = self._load(filename, bundled, SPEC_VERSION_TUPLE[0]) + def load( + self, + filename: Union[str, pathlib.Path], + bundled: bool, + parent: Union[str, pathlib.Path, None], + required_version: int, + ): + """Load a new file into the loader. + + Parameters + ---------- + filename : Union[str, pathlib.Path] + Filename of the file to parse or name of the resource. + bundled : bool + Is the definition file bundled in pyvisa-sim. + parent : Union[str, pathlib.Path, None] + Path to directory in which the file can be found. If none the directory + in which the initial file was located. + required_version : int + Major required version. + + """ if self._bundled and not bundled: msg = "Only other bundled files can be loaded from bundled files." raise ValueError(msg) @@ -241,12 +307,59 @@ def load(self, filename, bundled, parent, required_version): return self._load(filename, bundled, required_version) - def _load(self, filename, bundled, required_version): + def get_device_dict( + self, + device: str, + filename: Union[str, pathlib.Path, None], + bundled: bool, + required_version: int, + ): + """Access a device definition. + + Parameters + ---------- + device : str + Name of the device information to access. + filename : Union[str, pathlib.Path] + Filename of the file to parse or name of the resource. + The file must be located in the same directory as the original file. + bundled : bool + Is the definition file bundled in pyvisa-sim. + required_version : int + Major required version. + + """ + if filename is None: + data = self.data + else: + data = self.load(filename, bundled, None, required_version) + + return data["devices"][device] + + # --- Private API + + #: (absolute path / resource name / None, bundled) -> dict + _cache: Dict[Tuple[Union[str, pathlib.Path, None], bool], Dict[str, str]] + + #: Path the first loaded file. + _filename: Union[str, pathlib.Path] + + #: Is the loader working with bundled resources. + _bundled: bool + + def _load( + self, filename: Union[str, pathlib.Path], bundled: bool, required_version: int + ) -> Dict[str, Any]: + """Load a YAML definition file. + The major version of the definition must match. + + """ if (filename, bundled) in self._cache: return self._cache[(filename, bundled)] if bundled: + assert isinstance(filename, str) data = parse_resource(filename) else: data = parse_file(filename) @@ -262,34 +375,31 @@ def _load(self, filename, bundled, required_version): return data - def get_device_dict(self, device, filename, bundled, required_version): - - if filename is None: - data = self.data - else: - data = self.load(filename, bundled, None, required_version) - return data["devices"][device] +def get_devices(filename: Union[str, pathlib.Path], bundled: bool) -> Devices: + """Get a Devices object from a file. + Parameters + ---------- + filename : Union[str, pathlib.Path] + Full path of the file to parse or name of the resource. + bundled : bool + Is the definition file bundled in pyvisa-sim. -def get_devices(filename, bundled): - """Get a Devices object from a file. + Returns + ------- + Devices + Devices found in the definition file. - :param bundled: - :param filename: full path of the file to parse or name of the resource. - :rtype: Devices """ loader = Loader(filename, bundled) - - data = loader.data - devices = Devices() # Iterate through the resources and generate each individual device # on demand. - for resource_name, resource_dict in data.get("resources", {}).items(): + for resource_name, resource_dict in loader.data.get("resources", {}).items(): device_name = resource_dict["device"] dd = loader.get_device_dict( diff --git a/pyvisa_sim/sessions.py b/pyvisa_sim/sessions.py deleted file mode 100644 index 0633dec..0000000 --- a/pyvisa_sim/sessions.py +++ /dev/null @@ -1,149 +0,0 @@ -# -*- coding: utf-8 -*- -""" - pyvisa-sim.sessions - ~~~~~~~~~~~~~~~~~~~ - - Base session class. - - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. -""" -import queue - -from pyvisa import constants, attributes, rname - -from .common import logger - - -class Session(object): - """A base class for Session objects. - - Just makes sure that common methods are defined and information is stored. - - :param resource_manager_session: The session handle of the parent Resource Manager - :param resource_name: The resource name. - :param parsed: the parsed resource name (optional). - """ - - #: Maps (Interface Type, Resource Class) to Python class encapsulating that resource. - #: dict[(Interface Type, Resource Class) , Session] - _session_classes = dict() - - #: Session handler for the resource manager. - session_type = None - - @classmethod - def get_session_class(cls, interface_type, resource_class): - """Return the session class for a given interface type and resource class. - - :type interface_type: constants.InterfaceType - :type resource_class: str - :return: Session - """ - try: - return cls._session_classes[(interface_type, resource_class)] - except KeyError: - raise ValueError( - "No class registered for %s, %s" % (interface_type, resource_class) - ) - - @classmethod - def register(cls, interface_type, resource_class): - """Register a session class for a given interface type and resource class. - - :type interface_type: constants.InterfaceType - :type resource_class: str - """ - - def _internal(python_class): - if (interface_type, resource_class) in cls._session_classes: - logger.warning( - "%s is already registered in the ResourceManager. " - "Overwriting with %s" - % ((interface_type, resource_class), python_class) - ) - - python_class.session_type = (interface_type, resource_class) - cls._session_classes[(interface_type, resource_class)] = python_class - return python_class - - return _internal - - def __init__(self, resource_manager_session, resource_name, parsed=None): - if parsed is None: - parsed = rname.parse_resource_name(resource_name) - self.parsed = parsed - self.attrs = { - constants.VI_ATTR_RM_SESSION: resource_manager_session, - constants.VI_ATTR_RSRC_NAME: str(parsed), - constants.VI_ATTR_RSRC_CLASS: parsed.resource_class, - constants.VI_ATTR_INTF_TYPE: parsed.interface_type_const, - } - self.after_parsing() - - #: devices.Device - self.device = None - - def after_parsing(self): - """Override in derived class to be executed after the resource name has - been parsed and the attr dictionary has been filled. - """ - pass - - def get_attribute(self, attribute): - """Get an attribute from the session. - - :param attribute: - :return: attribute value, status code - :rtype: object, constants.StatusCode - """ - - # Check that the attribute exists. - try: - attr = attributes.AttributesByID[attribute] - except KeyError: - return 0, constants.StatusCode.error_nonsupported_attribute - - # Check that the attribute is valid for this session type. - if not attr.in_resource(self.session_type): - return 0, constants.StatusCode.error_nonsupported_attribute - - # Check that the attribute is readable. - if not attr.read: - raise Exception("Do not now how to handle write only attributes.") - - # Return the current value of the default according the VISA spec - return ( - self.attrs.setdefault(attribute, attr.default), - constants.StatusCode.success, - ) - - def set_attribute(self, attribute, attribute_state): - """Get an attribute from the session. - - :param attribute_state: - :param attribute: - :return: attribute value, status code - :rtype: object, constants.StatusCode - """ - - # Check that the attribute exists. - try: - attr = attributes.AttributesByID[attribute] - except KeyError: - return constants.StatusCode.error_nonsupported_attribute - - # Check that the attribute is valid for this session type. - if not attr.in_resource(self.session_type): - return constants.StatusCode.error_nonsupported_attribute - - # Check that the attribute is writable. - if not attr.write: - return constants.StatusCode.error_attribute_read_only - - try: - self.attrs[attribute] = attribute_state - except ValueError: - return constants.StatusCode.error_nonsupported_attribute_state - - return constants.StatusCode.success diff --git a/pyvisa_sim/sessions/__init__.py b/pyvisa_sim/sessions/__init__.py new file mode 100644 index 0000000..2c64040 --- /dev/null +++ b/pyvisa_sim/sessions/__init__.py @@ -0,0 +1,6 @@ +"""Implementation for VISA sessions. + +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. + +""" diff --git a/pyvisa_sim/sessions/gpib.py b/pyvisa_sim/sessions/gpib.py new file mode 100644 index 0000000..7a450a4 --- /dev/null +++ b/pyvisa_sim/sessions/gpib.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +"""GPIB simulated session. + +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. + +""" +from pyvisa import constants, rname + +from . import session + + +@session.Session.register(constants.InterfaceType.gpib, "INSTR") +class GPIBInstrumentSession(session.MessageBasedSession): + parsed: rname.GPIBInstr + + def after_parsing(self) -> None: + self.attrs[constants.ResourceAttribute.termchar] = int(self.parsed.board) + self.attrs[constants.ResourceAttribute.gpib_primary_address] = int( + self.parsed.primary_address + ) + self.attrs[constants.ResourceAttribute.gpib_secondary_address] = ( + int(self.parsed.secondary_address) + if self.parsed.secondary_address is not None + else constants.VI_NO_SEC_ADDR + ) diff --git a/pyvisa_sim/serial.py b/pyvisa_sim/sessions/serial.py similarity index 56% rename from pyvisa_sim/serial.py rename to pyvisa_sim/sessions/serial.py index 086b5e1..0923e04 100644 --- a/pyvisa_sim/serial.py +++ b/pyvisa_sim/sessions/serial.py @@ -1,41 +1,40 @@ # -*- coding: utf-8 -*- -""" - pyvisa-sim.serial - ~~~~~~~~~~~~~~~~~ +"""ASRL (Serial) simulated session class. - ASRL (Serial) simulated session class. +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. """ -import queue - import time +from typing import Tuple -from pyvisa import constants +from pyvisa import constants, rname -from . import common -from . import sessions +from .. import common +from . import session -@sessions.Session.register(constants.InterfaceType.asrl, "INSTR") -class SerialInstrumentSession(sessions.Session): - def after_parsing(self): - self.attrs[constants.VI_ATTR_INTF_NUM] = int(self.parsed.board) +@session.Session.register(constants.InterfaceType.asrl, "INSTR") +class SerialInstrumentSession(session.MessageBasedSession): + parsed: rname.ASRLInstr - def read(self, count): + def after_parsing(self) -> None: + self.attrs[constants.ResourceAttribute.interface_number] = int( + self.parsed.board + ) + def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: # TODO: Implement VI_ATTR_SUPPRESS_END_EN - end_in, _ = self.get_attribute(constants.VI_ATTR_ASRL_END_IN) + end_in, _ = self.get_attribute(constants.ResourceAttribute.asrl_end_in) - end_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) + end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) end_char = common.int_to_byte(end_char) - enabled, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR_EN) - timeout, _ = self.get_attribute(constants.VI_ATTR_TMO_VALUE) + enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) + timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) timeout /= 1000 - last_bit, _ = self.get_attribute(constants.VI_ATTR_ASRL_DATA_BITS) + last_bit, _ = self.get_attribute(constants.ResourceAttribute.asrl_data_bits) mask = 1 << (last_bit - 1) start = time.time() @@ -55,7 +54,6 @@ def read(self, count): return out, constants.StatusCode.success_termination_character_read elif end_in == constants.SerialTermination.last_bit: - if common.last_int(out) & mask: return out, constants.StatusCode.success @@ -74,24 +72,22 @@ def read(self, count): else: return out, constants.StatusCode.error_timeout - def write(self, data): - send_end, _ = self.get_attribute(constants.VI_ATTR_SEND_END_EN) - asrl_end, _ = self.get_attribute(constants.VI_ATTR_ASRL_END_OUT) + def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: + send_end, _ = self.get_attribute(constants.ResourceAttribute.send_end_enabled) + asrl_end, _ = self.get_attribute(constants.ResourceAttribute.asrl_end_out) + data_bits, _ = self.get_attribute(constants.ResourceAttribute.asrl_data_bits) - end_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) + end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) end_char = common.int_to_byte(end_char) len_transferred = len(data) if asrl_end == constants.SerialTermination.last_bit: - last_bit, _ = self.get_attribute(constants.VI_ATTR_ASRL_DATA_BITS) - mask = 1 << (last_bit - 1) - for val in common.iter_bytes(data, mask, send_end): - self.device.write(val) + val = b"".join(common.iter_bytes(data, data_bits, send_end)) + self.device.write(val) else: - - for i in range(len(data)): - self.device.write(data[i : i + 1]) + val = b"".join(common.iter_bytes(data, data_bits, send_end=None)) + self.device.write(val) if asrl_end == constants.SerialTermination.termination_char: if send_end: diff --git a/pyvisa_sim/sessions/session.py b/pyvisa_sim/sessions/session.py new file mode 100644 index 0000000..ec48fa9 --- /dev/null +++ b/pyvisa_sim/sessions/session.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +"""Base session class. + +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. + +""" +import time +from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar + +from pyvisa import attributes, constants, rname, typing + +from ..common import logger +from ..devices import Device + +S = TypeVar("S", bound="Session") + + +class Session: + """A base class for Session objects. + + Just makes sure that common methods are defined and information is stored. + + Parameters + ---------- + resource_manager_session : VISARMSession + The session handle of the parent Resource Manager + resource_name : str + The resource name. + parsed : rname.ResourceName + Parsed resource name (optional). + + """ + + #: Maps (Interface Type, Resource Class) to Python class encapsulating that resource. + #: dict[(Interface Type, Resource Class) , Session] + _session_classes: Dict[ + Tuple[constants.InterfaceType, str], Type["Session"] + ] = dict() + + #: Session handler for the resource manager. + session_type: Tuple[constants.InterfaceType, str] + + #: Simulated device access by this session + device: Device + + @classmethod + def get_session_class( + cls, interface_type: constants.InterfaceType, resource_class: str + ) -> Type["Session"]: + """Return the session class for a given interface type and resource class. + + Parameters + ---------- + interface_type : constants.InterfaceType + Type of the interface for which we need a Session class. + resource_class : str + Resource class for which we need a Session class. + + Returns + ------- + Type[Session] + Registered session class. + + """ + try: + return cls._session_classes[(interface_type, resource_class)] + except KeyError: + raise ValueError( + "No class registered for %s, %s" % (interface_type, resource_class) + ) + + @classmethod + def register( + cls, interface_type: constants.InterfaceType, resource_class: str + ) -> Callable[[Type[S]], Type[S]]: + """Register a session class for a given interface type and resource class. + + Parameters + ---------- + interface_type : constants.InterfaceType + Type of the interface this session should be used for. + resource_class : str + Resource class for which this session should be used for. + + """ + + def _internal(python_class): + if (interface_type, resource_class) in cls._session_classes: + logger.warning( + "%s is already registered in the ResourceManager. " + "Overwriting with %s" + % ((interface_type, resource_class), python_class) + ) + + python_class.session_type = (interface_type, resource_class) + cls._session_classes[(interface_type, resource_class)] = python_class + return python_class + + return _internal + + def __init__( + self, + resource_manager_session: typing.VISARMSession, + resource_name: str, + parsed: Optional[rname.ResourceName] = None, + ): + if parsed is None: + parsed = rname.parse_resource_name(resource_name) + self.parsed = parsed + self.attrs = { + constants.ResourceAttribute.resource_manager_session: resource_manager_session, + constants.ResourceAttribute.resource_name: str(parsed), + constants.ResourceAttribute.resource_class: parsed.resource_class, + constants.ResourceAttribute.interface_type: parsed.interface_type_const, + } + self.after_parsing() + + def after_parsing(self) -> None: + """Override in derived class to customize the session. + + Executed after the resource name has been parsed and the attr dictionary + has been filled. + + """ + pass + + def get_attribute( + self, attribute: constants.ResourceAttribute + ) -> Tuple[Any, constants.StatusCode]: + """Get an attribute from the session. + + Parameters + ---------- + attribute : constants.ResourceAttribute + Attribute whose value to retrieve. + + Returns + ------- + object + Attribute value. + constants.StatusCode + Status code of the operation execution. + + """ + + # Check that the attribute exists. + try: + attr = attributes.AttributesByID[attribute] + except KeyError: + return 0, constants.StatusCode.error_nonsupported_attribute + + # Check that the attribute is valid for this session type. + if not attr.in_resource(self.session_type): + return 0, constants.StatusCode.error_nonsupported_attribute + + # Check that the attribute is readable. + if not attr.read: + raise Exception("Do not now how to handle write only attributes.") + + # Return the current value of the default according the VISA spec + return ( + self.attrs.setdefault(attribute, attr.default), + constants.StatusCode.success, + ) + + def set_attribute( + self, attribute: constants.ResourceAttribute, attribute_state: Any + ) -> constants.StatusCode: + """Get an attribute from the session. + + Parameters + ---------- + attribute : constants.ResourceAttribute + Attribute whose value to alter. + attribute_state : object + Value to set the attribute to. + + Returns + ------- + constants.StatusCode + Status code describing the operation execution. + + """ + + # Check that the attribute exists. + try: + attr = attributes.AttributesByID[attribute] + except KeyError: + return constants.StatusCode.error_nonsupported_attribute + + # Check that the attribute is valid for this session type. + if not attr.in_resource(self.session_type): + return constants.StatusCode.error_nonsupported_attribute + + # Check that the attribute is writable. + if not attr.write: + return constants.StatusCode.error_attribute_read_only + + try: + self.attrs[attribute] = attribute_state + except ValueError: + return constants.StatusCode.error_nonsupported_attribute_state + + return constants.StatusCode.success + + +class MessageBasedSession(Session): + """Base class for Message-Based sessions that support ``read`` and ``write`` methods.""" + + def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: + end_char, _ = self.get_attribute(constants.ResourceAttribute.termchar) + enabled, _ = self.get_attribute(constants.ResourceAttribute.termchar_enabled) + timeout, _ = self.get_attribute(constants.ResourceAttribute.timeout_value) + timeout /= 1000 + + start = time.time() + + out = b"" + + while time.time() - start <= timeout: + last = self.device.read() + + if not last: + time.sleep(0.01) + continue + + out += last + + if enabled: + if len(out) > 0 and out[-1] == end_char: + return out, constants.StatusCode.success_termination_character_read + + if len(out) == count: + return out, constants.StatusCode.success_max_count_read + else: + return out, constants.StatusCode.error_timeout + + def write(self, data: bytes) -> Tuple[int, constants.StatusCode]: + send_end = self.get_attribute(constants.ResourceAttribute.send_end_enabled) + + self.device.write(data) + + if send_end: + # EOM4882 + pass + + return len(data), constants.StatusCode.success diff --git a/pyvisa_sim/sessions/tcpip.py b/pyvisa_sim/sessions/tcpip.py new file mode 100644 index 0000000..71763e5 --- /dev/null +++ b/pyvisa_sim/sessions/tcpip.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +"""TCPIP simulated session class. + +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. + +""" +from pyvisa import constants, rname + +from . import session + + +class BaseTCPIPSession(session.MessageBasedSession): + """Base class for TCPIP sessions.""" + + +@session.Session.register(constants.InterfaceType.tcpip, "INSTR") +class TCPIPInstrumentSession(BaseTCPIPSession): + parsed: rname.TCPIPInstr + + def after_parsing(self) -> None: + self.attrs[constants.ResourceAttribute.interface_number] = int( + self.parsed.board + ) + self.attrs[constants.ResourceAttribute.tcpip_address] = self.parsed.host_address + self.attrs[ + constants.ResourceAttribute.tcpip_device_name + ] = self.parsed.lan_device_name + + +@session.Session.register(constants.InterfaceType.tcpip, "SOCKET") +class TCPIPSocketSession(BaseTCPIPSession): + parsed: rname.TCPIPSocket + + def after_parsing(self) -> None: + self.attrs[constants.ResourceAttribute.interface_number] = int( + self.parsed.board + ) + self.attrs[constants.ResourceAttribute.tcpip_address] = self.parsed.host_address + self.attrs[constants.ResourceAttribute.tcpip_port] = int(self.parsed.port) diff --git a/pyvisa_sim/sessions/usb.py b/pyvisa_sim/sessions/usb.py new file mode 100644 index 0000000..025f7b2 --- /dev/null +++ b/pyvisa_sim/sessions/usb.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +"""USB simulated session class. + +:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. + +""" +from typing import Union + +from pyvisa import constants, rname + +from . import session + + +class BaseUSBSession(session.MessageBasedSession): + parsed: Union[rname.USBInstr, rname.USBRaw] + + def after_parsing(self) -> None: + self.attrs[constants.ResourceAttribute.interface_number] = int( + self.parsed.board + ) + self.attrs[ + constants.ResourceAttribute.manufacturer_id + ] = self.parsed.manufacturer_id + self.attrs[constants.ResourceAttribute.model_code] = self.parsed.model_code + self.attrs[ + constants.ResourceAttribute.usb_serial_number + ] = self.parsed.serial_number + self.attrs[constants.ResourceAttribute.usb_interface_number] = int( + self.parsed.board + ) + + +@session.Session.register(constants.InterfaceType.usb, "INSTR") +class USBInstrumentSession(BaseUSBSession): + parsed: rname.USBInstr + + +@session.Session.register(constants.InterfaceType.usb, "RAW") +class USBRawSession(BaseUSBSession): + parsed: rname.USBRaw diff --git a/pyvisa_sim/tcpip.py b/pyvisa_sim/tcpip.py deleted file mode 100644 index e338031..0000000 --- a/pyvisa_sim/tcpip.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -""" - pyvisa-sim.tcpip - ~~~~~~~~~~~~~~~~ - - TCPIP simulated session class. - - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. -""" -import time - -from pyvisa import constants - -from . import sessions - - -class BaseTCPIPSession(sessions.Session): - """Base class for TCPIP sessions.""" - - def read(self, count): - end_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) - enabled, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR_EN) - timeout, _ = self.get_attribute(constants.VI_ATTR_TMO_VALUE) - timeout /= 1000 - - start = time.time() - - out = b"" - - while time.time() - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data): - send_end = self.get_attribute(constants.VI_ATTR_SEND_END_EN) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM 4882 - pass - - return len(data), constants.StatusCode.success - - -@sessions.Session.register(constants.InterfaceType.tcpip, "INSTR") -class TCPIPInstrumentSession(BaseTCPIPSession): - def after_parsing(self): - self.attrs[constants.VI_ATTR_INTF_NUM] = int(self.parsed.board) - self.attrs[constants.VI_ATTR_TCPIP_ADDR] = self.parsed.host_address - self.attrs[constants.VI_ATTR_TCPIP_DEVICE_NAME] = self.parsed.lan_device_name - - -@sessions.Session.register(constants.InterfaceType.tcpip, "SOCKET") -class TCPIPSocketSession(BaseTCPIPSession): - def after_parsing(self): - self.attrs[constants.VI_ATTR_INTF_NUM] = int(self.parsed.board) - self.attrs[constants.VI_ATTR_TCPIP_ADDR] = self.parsed.host_address - self.attrs[constants.VI_ATTR_TCPIP_PORT] = int(self.parsed.port) diff --git a/pyvisa_sim/testsuite/conftest.py b/pyvisa_sim/testsuite/conftest.py index 1b33796..f19c047 100644 --- a/pyvisa_sim/testsuite/conftest.py +++ b/pyvisa_sim/testsuite/conftest.py @@ -4,16 +4,16 @@ import pyvisa -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def resource_manager(): - rm = pyvisa.ResourceManager('@sim') + rm = pyvisa.ResourceManager("@sim") yield rm rm.close() @pytest.fixture def channels(): - path = os.path.join(os.path.dirname(__file__), 'fixtures', 'channels.yaml') - rm = pyvisa.ResourceManager(path + '@sim') + path = os.path.join(os.path.dirname(__file__), "fixtures", "channels.yaml") + rm = pyvisa.ResourceManager(path + "@sim") yield rm rm.close() diff --git a/pyvisa_sim/testsuite/test_all.py b/pyvisa_sim/testsuite/test_all.py index 5fb9b89..9441e29 100644 --- a/pyvisa_sim/testsuite/test_all.py +++ b/pyvisa_sim/testsuite/test_all.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import logging + import pytest from pyvisa.errors import VisaIOError @@ -179,3 +181,18 @@ def test_instrument_for_error_state(resource, resource_manager): inst.write(":VOLT:IMM:AMPL 0") assert_instrument_response(inst, ":SYST:ERR?", "1, Command error") + + +def test_device_write_logging(caplog, resource_manager) -> None: + instr = resource_manager.open_resource( + "USB0::0x1111::0x2222::0x4444::0::INSTR", + read_termination="\n", + write_termination="\n", + ) + + with caplog.at_level(logging.DEBUG): + instr.write("*IDN?") + instr.read() + + assert "input buffer: b'D'" not in caplog.text + assert r"input buffer: b'*IDN?\n'" in caplog.text diff --git a/pyvisa_sim/testsuite/test_common.py b/pyvisa_sim/testsuite/test_common.py new file mode 100644 index 0000000..5257543 --- /dev/null +++ b/pyvisa_sim/testsuite/test_common.py @@ -0,0 +1,98 @@ +from typing import List, Optional + +import pytest + +from pyvisa_sim import common + + +@pytest.mark.parametrize( + "bits, want", + [ + (0, 0b0), + (1, 0b1), + (5, 0b0001_1111), + (7, 0b0111_1111), + (8, 0b1111_1111), + (11, 0b0111_1111_1111), + ], +) +def test_create_bitmask(bits, want): + got = common._create_bitmask(bits) + assert got == want + + +@pytest.mark.parametrize( + "data, data_bits, send_end, want", + [ + (b"\x01", None, False, b"\x01"), + (b"hello world!", None, False, b"hello world!"), + # Only apply the mask + (b"\x03", 2, None, b"\x03"), # 0b0000_0011 --> 0b0000_0011 + (b"\x04", 2, None, b"\x00"), # 0b0000_0100 --> 0b0000_0000 + (b"\xff", 5, None, b"\x1f"), # 0b1111_1111 --> 0b0001_1111 + (b"\xfe", 7, None, b"\x7e"), # 0b1111_1110 --> 0b0111_1110 + (b"\xfe", 8, None, b"\xfe"), # 0b1111_1110 --> 0b1111_1110 + (b"\xff", 9, None, b"\xff"), # 0b1111_1111 --> 0b1111_1111 + # Always set highest bit *of data_bits* to 0 + (b"\x04", 2, False, b"\x00"), # 0b0000_0100 --> 0b0000_0000 + (b"\x04", 3, False, b"\x00"), # 0b0000_0100 --> 0b0000_0000 + (b"\x05", 3, False, b"\x01"), # 0b0000_0101 --> 0b0000_0001 + (b"\xff", 7, False, b"\x3f"), # 0b1111_1111 --> 0b0011_1111 + (b"\xff", 8, False, b"\x7f"), # 0b1111_1111 --> 0b0111_1111 + # Always set highest bit *of data_bits* to 1 + (b"\x04", 2, True, b"\x02"), # 0b0000_0100 --> 0b0000_0010 + (b"\x04", 3, True, b"\x04"), # 0b0000_0100 --> 0b0000_0100 + (b"\x01", 3, True, b"\x05"), # 0b0000_0001 --> 0b0000_0101 + (b"\x9f", 7, True, b"\x5f"), # 0b1001_1111 --> 0b0101_1111 + (b"\x9f", 8, True, b"\x9f"), # 0b1001_1111 --> 0b1001_1111 + # data_bits >8 bits act like data_bits=8, as type(data) is "bytes" + # which is limited 8 bits per character. + (b"\xff", 9, None, b"\xff"), + (b"\xff", 9, False, b"\x7f"), + (b"\xff", 9, True, b"\xff"), + # send_end=None only applies the mask everywhere and doesn't touch the + # highest bit + # 0x6d: 0b0110_1101 (m) --> 0x0d: 0b0000_1101 (\r) + # 0x5e: 0b0101_1110 (^) --> 0x0e: 0b0000_1110 + # 0x25: 0b0010_0101 (%) --> 0x05: 0b0000_0101 + # 0x25: 0b0010_0101 (%) --> 0x05: 0b0000_0101 + (b"\x6d\x5e\x25\x25", 4, None, b"\r\x0e\x05\x05"), + # send_end=False sets highest post-mask bit to 0 for all + # 0x6d: 0b0110_1101 (m) --> 0x05: 0b0000_0101 + # 0x5e: 0b0101_1110 (^) --> 0x06: 0b0000_0110 + # 0x25: 0b0010_0101 (%) --> 0x05: 0b0000_0101 + # 0x25: 0b0010_0101 (%) --> 0x05: 0b0000_0101 + (b"\x6d\x5e\x25\x25", 4, False, b"\x05\x06\x05\x05"), + # send_end=True sets highest bit to 0 except for final byte + # 0x6d: 0b0110_1101 (m) --> 0x05: 0b0000_0101 + # 0x5e: 0b0101_1110 (^) --> 0x06: 0b0000_0110 + # 0x25: 0b0010_0101 (%) --> 0x05: 0b0000_0101 + # 0x25: 0b0010_0101 (%) --> 0x0d: 0b0000_1101 + (b"\x6d\x5e\x25\x25", 4, True, b"\x05\x06\x05\x0d"), + # 0x61: 0b0110_0001 (a) --> 0x21: 0b0010_0001 (!) + # 0xb1: 0b1011_0001 (±) --> 0x31: 0b0011_0001 (1) + (b"a\xb1", 6, None, b"\x21\x31"), + # 0x61: 0b0110_0001 (a) --> 0x01: 0b0000_0001 + # 0xb1: 0b1011_0001 (±) --> 0x11: 0b0001_0001 + (b"a\xb1", 6, False, b"\x01\x11"), + # 0x61: 0b0110_0001 (a) --> 0x01: 0b0000_0001 + # 0xb1: 0b1011_0001 (±) --> 0x31: 0b0011_0001 (1) + (b"a\xb1", 6, True, b"\x011"), + ], +) +def test_iter_bytes( + data: bytes, data_bits: Optional[int], send_end: bool, want: List[bytes] +) -> None: + got = b"".join(common.iter_bytes(data, data_bits=data_bits, send_end=send_end)) + assert got == want + + +def test_iter_bytes_with_send_end_requires_data_bits() -> None: + with pytest.raises(ValueError): + # Need to wrap in list otherwise the iterator is never called. + list(common.iter_bytes(b"", data_bits=None, send_end=True)) + + +def test_iter_bytes_raises_on_bad_data_bits() -> None: + with pytest.raises(ValueError): + list(common.iter_bytes(b"", data_bits=0, send_end=None)) diff --git a/pyvisa_sim/testsuite/test_parser.py b/pyvisa_sim/testsuite/test_parser.py new file mode 100644 index 0000000..5a5bb56 --- /dev/null +++ b/pyvisa_sim/testsuite/test_parser.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +from typing import Dict, Tuple + +import pytest + +from pyvisa_sim import parser +from pyvisa_sim.component import NoResponse, OptionalStr + + +@pytest.mark.parametrize( + "dialogue_dict, want", + [ + ({"q": "foo", "r": "bar"}, ("foo", "bar")), + ({"q": "foo ", "r": " bar"}, ("foo", "bar")), + ({"q": " foo", "r": "bar "}, ("foo", "bar")), + ({"q": " foo ", "r": " bar "}, ("foo", "bar")), + # Make sure to support queries that don't have responses + ({"q": "foo"}, ("foo", NoResponse)), + # Ignore other keys + ({"q": "foo", "bar": "bar"}, ("foo", NoResponse)), + ], +) +def test_get_pair(dialogue_dict: Dict[str, str], want: Tuple[str, OptionalStr]) -> None: + got = parser._get_pair(dialogue_dict) + assert got == want + + +def test_get_pair_requires_query_key() -> None: + with pytest.raises(KeyError): + parser._get_pair({"r": "bar"}) + + +@pytest.mark.parametrize( + "dialogue_dict, want", + [ + ({"q": "foo", "r": "bar", "e": "baz"}, ("foo", "bar", "baz")), + ({"q": "foo ", "r": " bar", "e": " baz "}, ("foo", "bar", "baz")), + ({"q": " foo", "r": "bar ", "e": "baz "}, ("foo", "bar", "baz")), + ({"q": " foo ", "r": " bar ", "e": " baz"}, ("foo", "bar", "baz")), + # Make sure to support queries that don't have responses + ({"q": "foo"}, ("foo", NoResponse, NoResponse)), + ({"q": "foo", "r": "bar"}, ("foo", "bar", NoResponse)), + ({"q": "foo", "e": "bar"}, ("foo", NoResponse, "bar")), + # Ignore other keys + ({"q": "foo", "bar": "bar"}, ("foo", NoResponse, NoResponse)), + ], +) +def test_get_triplet( + dialogue_dict: Dict[str, str], want: Tuple[str, OptionalStr, OptionalStr] +) -> None: + got = parser._get_triplet(dialogue_dict) + assert got == want + + +def test_get_triplet_requires_query_key() -> None: + with pytest.raises(KeyError): + parser._get_triplet({"r": "bar"}) diff --git a/pyvisa_sim/testsuite/test_serial.py b/pyvisa_sim/testsuite/test_serial.py new file mode 100644 index 0000000..e8f98d8 --- /dev/null +++ b/pyvisa_sim/testsuite/test_serial.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +import pyvisa + +from pyvisa_sim.sessions import serial + +serial.SerialInstrumentSession + + +def test_serial_write_with_termination_last_bit(resource_manager): + instr = resource_manager.open_resource( + "ASRL4::INSTR", + read_termination="\n", + write_termination="\r\n", + ) + + # Ensure that we test the `asrl_end` block of serial.SerialInstrumentSession.write + instr.set_visa_attribute( + pyvisa.constants.ResourceAttribute.asrl_end_out, + pyvisa.constants.SerialTermination.last_bit, + ) + + instr.set_visa_attribute( + pyvisa.constants.ResourceAttribute.send_end_enabled, + pyvisa.constants.VI_FALSE, + ) + + instr.write("*IDN?") + assert instr.read() == "SCPI,MOCK,VERSION_1.0" diff --git a/pyvisa_sim/usb.py b/pyvisa_sim/usb.py deleted file mode 100644 index c80ac16..0000000 --- a/pyvisa_sim/usb.py +++ /dev/null @@ -1,127 +0,0 @@ -# -*- coding: utf-8 -*- -""" - pyvisa-sim.usb - ~~~~~~~~~~~~~~ - - USB simulated session class. - - :copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details. - :license: MIT, see LICENSE for more details. -""" -import queue -import time - -from pyvisa import constants - -from . import sessions - - -@sessions.Session.register(constants.InterfaceType.usb, "INSTR") -class USBInstrumentSession(sessions.Session): - def __init__(self, resource_manager_session, resource_name, parsed): - super(USBInstrumentSession, self).__init__( - resource_manager_session, resource_name, parsed - ) - - def after_parsing(self): - self.attrs[constants.VI_ATTR_INTF_NUM] = int(self.parsed.board) - self.attrs[constants.VI_ATTR_MANF_ID] = self.parsed.manufacturer_id - self.attrs[constants.VI_ATTR_MODEL_CODE] = self.parsed.model_code - self.attrs[constants.VI_ATTR_USB_SERIAL_NUM] = self.parsed.serial_number - self.attrs[constants.VI_ATTR_USB_INTFC_NUM] = int(self.parsed.board) - - def read(self, count): - end_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) - enabled, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR_EN) - timeout, _ = self.get_attribute(constants.VI_ATTR_TMO_VALUE) - timeout /= 1000 - - start = time.time() - - out = b"" - - while time.time() - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data): - send_end = self.get_attribute(constants.VI_ATTR_SEND_END_EN) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM 4882 - pass - - return len(data), constants.StatusCode.success - - -@sessions.Session.register(constants.InterfaceType.usb, "RAW") -class USBRawSession(sessions.Session): - def __init__(self, resource_manager_session, resource_name, parsed): - super(USBRawSession, self).__init__( - resource_manager_session, resource_name, parsed - ) - - def after_parsing(self): - self.attrs[constants.VI_ATTR_INTF_NUM] = int(self.parsed.board) - self.attrs[constants.VI_ATTR_MANF_ID] = self.parsed.manufacturer_id - self.attrs[constants.VI_ATTR_MODEL_CODE] = self.parsed.model_code - self.attrs[constants.VI_ATTR_USB_SERIAL_NUM] = self.parsed.serial_number - self.attrs[constants.VI_ATTR_USB_INTFC_NUM] = int(self.parsed.board) - - def read(self, count): - end_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) - enabled, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR_EN) - timeout, _ = self.get_attribute(constants.VI_ATTR_TMO_VALUE) - timeout /= 1000 - - now = start = time.time() - - out = b"" - - while now - start <= timeout: - last = self.device.read() - - if not last: - time.sleep(0.01) - now = time.time() - continue - - out += last - - if enabled: - if len(out) > 0 and out[-1] == end_char: - return out, constants.StatusCode.success_termination_character_read - - if len(out) == count: - return out, constants.StatusCode.success_max_count_read - else: - return out, constants.StatusCode.error_timeout - - def write(self, data): - send_end = self.get_attribute(constants.VI_ATTR_SEND_END_EN) - - for i in range(len(data)): - self.device.write(data[i : i + 1]) - - if send_end: - # EOM 4882 - pass - - return len(data), constants.StatusCode.success diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index fce9e0f..0000000 --- a/setup.cfg +++ /dev/null @@ -1,67 +0,0 @@ -[metadata] -name = PyVISA-sim -author = Hernan E. Grecco -author_email = hernan.grecco@gmail.com -maintainer = Matthieu C. Dartiailh -maintainer_email = m.dartiailh@gmail.com -license = MIT License -description = Simulated backend for PyVISA implementing TCPIP, GPIB, RS232, and USB resources -keywords = - Remote - VISA - GPIB - USB - serial - RS232 - measurement - acquisition - simulator - mock -url = https://github.com/pyvisa/pyvisa-sim -long_description = file: README.rst, AUTHORS.rst, CHANGES.rst -long_description_content_type = text/x-rst -classifiers = - Development Status :: 4 - Beta - Intended Audience :: Developers - Intended Audience :: Science/Research - License :: OSI Approved :: MIT License - Operating System :: Microsoft :: Windows - Operating System :: POSIX :: Linux - Operating System :: MacOS :: MacOS X - Programming Language :: Python - Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator - Topic :: Software Development :: Libraries :: Python Modules - Programming Language :: Python :: 3.6 - Programming Language :: Python :: 3.7 - Programming Language :: Python :: 3.8 - Programming Language :: Python :: 3.9 -platforms = Linux; Windows; Mac - -[options] -packages = - pyvisa_sim - pyvisa_sim.testsuite -zip_safe = False -install_requires = - pyvisa>=1.11.0 - PyYAML - stringparser - setuptools -setup_requires = setuptools>=42; wheel; setuptools_scm[toml]>=3.4.3 -use_2to3 = False - -[options.package_data] -pyvisa_sim = default.yaml - -[check-manifest] -ignore = - .travis.yml - tox.ini - -[tool:pytest] -addopts = - -vv - -ra - --strict - --tb=short -testpaths = pyvisa-sim/testsuite \ No newline at end of file